PySpark Pandas_Udf()

Pyspark Pandas Udf



Трансформирането на PySpark DataFrame е възможно с помощта на функцията pandas_udf(). Това е дефинирана от потребителя функция, която се прилага върху PySpark DataFrame със стрелка. Можем да изпълняваме векторизираните операции с помощта на pandas_udf(). Може да се реализира чрез предаване на тази функция като декоратор. Нека се потопим в това ръководство, за да научим синтаксиса, параметрите и различните примери.

Тема на съдържанието:

Ако искате да знаете за инсталацията на PySpark DataFrame и модула, прегледайте това статия .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () е наличен в модула sql.functions в PySpark, който може да бъде импортиран чрез ключовата дума „from“. Използва се за извършване на векторизирани операции на нашия PySpark DataFrame. Тази функция се реализира като декоратор чрез предаване на три параметъра. След това можем да създадем дефинирана от потребителя функция, която връща данните във векторен формат (както използваме series/NumPy за това) с помощта на стрелка. В рамките на тази функция можем да върнем резултата.



Структура и синтаксис:



Първо, нека да разгледаме структурата и синтаксиса на тази функция:

@pandas_udf(тип данни)
def име_на_функция(операция) -> конвертиране_формат:
изявление за връщане

Тук function_name е името на нашата дефинирана функция. Типът данни указва типа данни, който се връща от тази функция. Можем да върнем резултата с ключовата дума „return“. Всички операции се извършват вътре във функцията със задаване на стрелка.





Pandas_udf (функция и ReturnType)

  1. Първият параметър е дефинираната от потребителя функция, която му се предава.
  2. Вторият параметър се използва за указване на типа данни за връщане от функцията.

Данни:

В цялото това ръководство използваме само един PySpark DataFrame за демонстрация. Всички дефинирани от потребителя функции, които дефинираме, се прилагат върху този PySpark DataFrame. Уверете се, че сте създали този DataFrame във вашата среда първо след инсталирането на PySpark.



импортиране на pyspark

от pyspark.sql импортирайте SparkSession

linuxhint_spark_app = SparkSession.builder.appName( „Linux Hint“ ).getOrCreate()

от pyspark.sql.functions импортирайте pandas_udf

от pyspark.sql.types import *

импортирайте панди като панда

# детайли за зеленчуци

зеленчук =[{ 'Тип' : 'зеленчук' , име : 'домат' , 'locate_country' : 'САЩ' , 'количество' : 800 },

{ 'Тип' : 'плод' , име : 'банан' , 'locate_country' : 'КИТАЙ' , 'количество' : двадесет },

{ 'Тип' : 'зеленчук' , име : 'домат' , 'locate_country' : 'САЩ' , 'количество' : 800 },

{ 'Тип' : 'зеленчук' , име : 'манго' , 'locate_country' : 'ЯПОНИЯ' , 'количество' : 0 },

{ 'Тип' : 'плод' , име : 'лимон' , 'locate_country' : 'ИНДИЯ' , 'количество' : 1700 },

{ 'Тип' : 'зеленчук' , име : 'домат' , 'locate_country' : 'САЩ' , 'количество' : 1200 },

{ 'Тип' : 'зеленчук' , име : 'манго' , 'locate_country' : 'ЯПОНИЯ' , 'количество' : 0 },

{ 'Тип' : 'плод' , име : 'лимон' , 'locate_country' : 'ИНДИЯ' , 'количество' : 0 }

]

# създайте пазарна рамка от данни от горните данни

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Изход:

Тук създаваме тази DataFrame с 4 колони и 8 реда. Сега използваме pandas_udf(), за да създадем дефинираните от потребителя функции и да ги приложим към тези колони.

Pandas_udf() с различни типове данни

В този сценарий създаваме някои дефинирани от потребителя функции с pandas_udf() и ги прилагаме върху колони и показваме резултатите с помощта на метода select(). Във всеки случай ние използваме pandas.Series, докато извършваме векторизираните операции. Това разглежда стойностите на колоната като едномерен масив и операцията се прилага върху колоната. В самия декоратор ние указваме типа на връщане на функцията.

Пример 1: Pandas_udf() с тип низ

Тук създаваме две дефинирани от потребителя функции с тип връщане на низ, за ​​да преобразуваме стойностите на колоната тип низ в главни и малки букви. И накрая, ние прилагаме тези функции върху колоните „type“ и „locate_country“.

# Преобразувайте колона тип в главни букви с pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

връщане i.str.upper()

# Конвертирайте колоната locate_country в малки букви с pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

връщане i.str.lower()

# Показване на колоните чрез select()

market_df.select( 'Тип' ,type_uppercase( 'Тип' ), 'locate_country' ,
държава_малък_списък( 'locate_country' )).покажи()

Изход:

Обяснение:

Функцията StringType() е достъпна в модула pyspark.sql.types. Вече импортирахме този модул, докато създавахме PySpark DataFrame.

  1. Първо, UDF (дефинирана от потребителя функция) връща низовете в главни букви с помощта на функцията str.upper(). Функцията str.upper() е налична в структурата на серийните данни (тъй като преобразуваме в серия със стрелка във функцията), която преобразува дадения низ в главни букви. И накрая, тази функция се прилага към колоната „тип“, която е посочена в метода select(). Преди това всички низове в колоната тип са с малки букви. Сега те са променени на главни букви.
  2. Второ, UDF връща низовете в главни букви с помощта на функцията str.lower(). Функцията str.lower() е налична в структурата на серийните данни, която преобразува дадения низ в малки букви. И накрая, тази функция се прилага към колоната „тип“, която е посочена в метода select(). Преди това всички низове в колоната тип бяха с главни букви. Сега те са променени на малки букви.

Пример 2: Pandas_udf() с тип Integer

Нека създадем UDF, който преобразува целочислената колона на PySpark DataFrame в серията Pandas и да добавим 100 към всяка стойност. Предайте колоната „количество“ на тази функция в метода select().

# Добавете 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

връщане i+ 100

# Прехвърлете колоната за количество към горната функция и дисплей.

market_df.select( 'количество' ,добавете_100( 'количество' )).покажи()

Изход:

Обяснение:

Вътре в UDF ние итерираме всички стойности и ги преобразуваме в серии. След това добавяме 100 към всяка стойност в серията. Накрая предаваме колоната „количество“ на тази функция и можем да видим, че 100 се добавя към всички стойности.

Pandas_udf() с различни типове данни с помощта на Groupby() & Agg()

Нека да разгледаме примерите за предаване на UDF към обобщените колони. Тук стойностите на колоните първо се групират с помощта на функцията groupby(), а агрегирането се извършва с помощта на функцията agg(). Предаваме нашия UDF вътре в тази агрегатна функция.

Синтаксис:

pyspark_dataframe_object.groupby( 'групиране_колона' ).agg(UDF
(pyspark_dataframe_object[ 'колона' ]))

Тук първо се групират стойностите в колоната за групиране. След това агрегирането се извършва на всеки групирани данни по отношение на нашия UDF.

Пример 1: Pandas_udf() с Aggregate Mean()

Тук създаваме дефинирана от потребителя функция с връщащ тип float. Вътре във функцията изчисляваме средната стойност с помощта на функцията mean(). Този UDF се предава на колоната „количество“, за да се получи средното количество за всеки тип.

# върне средната/средната стойност

@pandas_udf( 'плавам' )

def average_function(i: panda.Series) -> float:

връщане i.mean()

# Предавайте колоната за количество към функцията, като групирате колоната за тип.

market_df.groupby( 'Тип' ).agg(средна_функция(market_df[ 'количество' ])).покажи()

Изход:

Групираме въз основа на елементи в колоната „тип“. Оформят се две групи – „плодове” и „зеленчуци”. За всяка група се изчислява и връща средната стойност.

Пример 2: Pandas_udf() с Aggregate Max() и Min()

Тук създаваме две дефинирани от потребителя функции с целочислен (int) тип връщане. Първият UDF връща минималната стойност, а вторият UDF връща максималната стойност.

# pandas_udf, който връща минималната стойност

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

връщане i.min()

# pandas_udf, които връщат максималната стойност

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

връщане i.max()

# Предайте колоната за количество към min_ pandas_udf чрез групиране на locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'количество' ])).покажи()

# Предавайте колоната за количество към max_ pandas_udf чрез групиране на locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'количество' ])).покажи()

Изход:

За да върнем минимални и максимални стойности, ние използваме функциите min() и max() в връщания тип на UDF. Сега групираме данните в колоната „locate_country“. Оформят се четири групи (“КИТАЙ”, “ИНДИЯ”, “ЯПОНИЯ”, “САЩ”). За всяка група връщаме максималното количество. По същия начин връщаме минималното количество.

Заключение

По принцип pandas_udf () се използва за извършване на векторизираните операции на нашия PySpark DataFrame. Видяхме как да създадем pandas_udf() и да го приложим към PySpark DataFrame. За по-добро разбиране обсъдихме различните примери, като разгледахме всички типове данни (низ, плаващо число и цяло число). Възможно е да използвате pandas_udf() с groupby() чрез функцията agg().