PySpark Pandas_Udf()

Pyspark Pandas Udf



A PySpark DataFrame átalakítása a pandas_udf() függvény használatával lehetséges. Ez egy felhasználó által definiált függvény, amelyet a PySpark DataFrame nyíllal alkalmaznak. A vektorizált műveleteket a pandas_udf() segítségével tudjuk végrehajtani. Ezt a funkciót dekoratőrként való átadásával lehet megvalósítani. Merüljön el ebben az útmutatóban, hogy megismerje a szintaxist, a paramétereket és a különböző példákat.

Tartalom témája:

Ha szeretne tudni a PySpark DataFrame és a modul telepítéséről, kövesse ezt cikk .







Pyspark.sql.functions.pandas_udf()

A pandas_udf () a PySpark sql.functions moduljában érhető el, amely a „from” kulcsszóval importálható. A vektorizált műveletek végrehajtására szolgál a PySpark DataFrame-ünkön. Ez a funkció dekorátorként valósul meg három paraméter átadásával. Ezt követően létrehozhatunk egy felhasználó által definiált függvényt, amely vektoros formátumban adja vissza az adatokat (ahogyan ehhez a sorozatot/NumPy-t használjuk) egy nyíl segítségével. Ezen a függvényen belül vissza tudjuk adni az eredményt.



Szerkezet és szintaxis:



Először nézzük meg ennek a függvénynek a szerkezetét és szintaxisát:

@pandas_udf(adattípus)
def függvény_neve(művelet) -> convert_format:
visszáru nyilatkozat

Itt a függvény_neve a definiált függvény neve. Az adattípus határozza meg a függvény által visszaadott adattípust. Az eredményt a „return” kulcsszó használatával adhatjuk vissza. Az összes műveletet a függvényen belül hajtják végre a nyíl hozzárendeléssel.





Pandas_udf (függvény és visszatérési típus)

  1. Az első paraméter a felhasználó által definiált függvény, amelyet átadunk neki.
  2. A második paraméter a függvény visszatérési adattípusának megadására szolgál.

Adat:

Ebben a teljes útmutatóban csak egy PySpark DataFrame-et használunk a demonstrációhoz. Az általunk meghatározott összes felhasználó által definiált függvényt alkalmazzuk ezen a PySpark DataFrame-en. Győződjön meg arról, hogy először a PySpark telepítése után hozza létre ezt a DataFrame-et a környezetben.



import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

a pyspark.sql.functions fájlból importálja a pandas_udf fájlt

a pyspark.sql.types importból *

import pandákat pandaként

# zöldségrészletek

zöldség =[{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 800 },

{ 'típus' : 'gyümölcs' , 'név' : 'banán' , 'locate_country' : 'KÍNA' , 'Mennyiség' : húsz },

{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 800 },

{ 'típus' : 'növényi' , 'név' : 'Mangó' , 'locate_country' : 'JAPÁN' , 'Mennyiség' : 0 },

{ 'típus' : 'gyümölcs' , 'név' : 'citrom' , 'locate_country' : 'INDIA' , 'Mennyiség' : 1700 },

{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 1200 },

{ 'típus' : 'növényi' , 'név' : 'Mangó' , 'locate_country' : 'JAPÁN' , 'Mennyiség' : 0 },

{ 'típus' : 'gyümölcs' , 'név' : 'citrom' , 'locate_country' : 'INDIA' , 'Mennyiség' : 0 }

]

# hozza létre a piaci adatkeretet a fenti adatokból

market_df = linuxhint_spark_app.createDataFrame(vegetable)

market_df.show()

Kimenet:

Itt hozzuk létre ezt a DataFrame-et 4 oszlopból és 8 sorból. Most a pandas_udf() segítségével hozzuk létre a felhasználó által definiált függvényeket, és alkalmazzuk őket ezekre az oszlopokra.

Pandas_udf() különböző adattípusokkal

Ebben a forgatókönyvben létrehozunk néhány felhasználó által definiált függvényt a pandas_udf() paranccsal, és alkalmazzuk őket az oszlopokra, és megjelenítjük az eredményeket a select() metódussal. A vektorizált műveletek végrehajtásához minden esetben a pandas.Series-t használjuk. Ez az oszlopértékeket egydimenziós tömbnek tekinti, és a művelet az oszlopra kerül. Magában a dekorátorban megadjuk a függvény visszatérési típusát.

1. példa: Pandas_udf() karakterlánctípussal

Itt két felhasználó által definiált függvényt hozunk létre a karakterlánc visszatérési típusával, hogy a karakterlánc típusú oszlopértékeket nagy- és kisbetűssé alakítsuk. Végül ezeket a függvényeket a „type” és a „locate_country” oszlopokra alkalmazzuk.

# Konvertálja a típusoszlopot nagybetűssé a pandas_udf segítségével

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

return i.str.upper()

# Konvertálja a locate_country oszlopot kisbetűsre a pandas_udf segítségével

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Jelenítse meg az oszlopokat a select() segítségével

market_df.select( 'típus' ,type_upper_case( 'típus' ), 'locate_country' ,
ország_kisbetű( 'locate_country' )).előadás()

Kimenet:

Magyarázat:

A StringType() függvény a pyspark.sql.types modulban érhető el. Ezt a modult már importáltuk a PySpark DataFrame létrehozásakor.

  1. Először is, az UDF (felhasználó által definiált függvény) az str.upper() függvény segítségével nagybetűvel adja vissza a karakterláncokat. Az str.upper() a Series Data Structure-ban érhető el (mivel a függvényen belüli nyíllal sorra konvertálunk), amely az adott karakterláncot nagybetűssé alakítja. Végül ezt a függvényt a „type” oszlopra alkalmazzuk, amely a select() metóduson belül van megadva. Korábban a típus oszlopban lévő összes karakterlánc kisbetűvel íródott. Most nagybetűre változtatták őket.
  2. Másodszor, az UDF az str.lower() függvény segítségével nagybetűvel adja vissza a karakterláncokat. Az str.lower() elérhető a Series Data Structure-ban, amely az adott karakterláncot kisbetűssé alakítja. Végül ezt a függvényt a „type” oszlopra alkalmazzuk, amely a select() metóduson belül van megadva. Korábban a típus oszlopban lévő összes karakterlánc nagybetűvel íródott. Most kisbetűre változtatták.

2. példa: Pandas_udf() egész szám típussal

Hozzon létre egy UDF-et, amely a PySpark DataFrame egész oszlopát Pandas sorozattá alakítja, és adjunk hozzá 100-at minden értékhez. Adja át a „mennyiség” oszlopot ennek a függvénynek a select() metóduson belül.

# Adjunk hozzá 100-at

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

return i+ 100

# Adja át a mennyiség oszlopot a fenti függvénynek és jelenítse meg.

market_df.select( 'Mennyiség' ,add_100( 'Mennyiség' )).előadás()

Kimenet:

Magyarázat:

Az UDF-en belül az összes értéket iteráljuk, és sorozattá konvertáljuk. Ezt követően a sorozat minden értékéhez hozzáadunk 100-at. Végül a „mennyiség” oszlopot átadjuk ennek a függvénynek, és láthatjuk, hogy az összes értékhez hozzáadódik a 100.

Pandas_udf() különböző adattípusokkal a Groupby() és Agg() használatával

Nézzük meg a példákat az UDF átadására az összesített oszlopoknak. Itt először az oszlopértékeket csoportosítjuk a groupby() függvénnyel, az összesítést pedig az agg() függvénnyel. Az UDF-ünket ezen az összesített függvényen belül adjuk át.

Szintaxis:

pyspark_dataframe_object.groupby( 'csoportosítás_oszlop' ).agg(UDF
(pyspark_dataframe_object[ 'oszlop' ]))

Itt először a csoportosítás oszlopban lévő értékek vannak csoportosítva. Ezután minden egyes csoportosított adaton megtörténik az összesítés az UDF-ünkhöz képest.

1. példa: Pandas_udf() az összesített átlaggal()

Itt létrehozunk egy felhasználó által definiált függvényt return típusú floattal. A függvényen belül az átlagot az átlag() függvény segítségével számítjuk ki. Ez az UDF átkerül a „mennyiség” oszlopba, hogy megkapja az egyes típusok átlagos mennyiségét.

# adja vissza az átlagot/átlagot

@pandas_udf( 'úszó' )

def average_function(i: panda.Series) -> float:

return i.mean()

# Adja át a mennyiség oszlopot a függvénynek a típusoszlop csoportosításával.

market_df.groupby( 'típus' ).agg(average_function(market_df[ 'Mennyiség' ])).előadás()

Kimenet:

A „típus” oszlopban szereplő elemek alapján csoportosítunk. Két csoport alakul ki - „gyümölcs” és „zöldség”. Minden csoportra kiszámolják az átlagot és visszaadják.

2. példa: Pandas_udf() az Aggregate Max() és Min() értékekkel

Itt két felhasználó által definiált függvényt hozunk létre egész (int) visszatérési típussal. Az első UDF a minimális, a második UDF a maximális értéket adja vissza.

# pandas_udf, amely a minimális értéket adja vissza

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

return i.min()

# pandas_udf, amely a maximális értéket adja vissza

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

return i.max()

# Adja át a mennyiség oszlopot a min_ pandas_udf-nek a locate_country csoportosításával.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'Mennyiség' ])).előadás()

# Adja át a mennyiség oszlopot a max_ pandas_udf-nek a locate_country csoportosításával.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'Mennyiség' ])).előadás()

Kimenet:

A minimális és maximális értékek visszaadásához a min() és max() függvényeket használjuk az UDF-ek visszatérési típusában. Most csoportosítjuk az adatokat a „locate_country” oszlopban. Négy csoport jön létre („KÍNA”, „INDIA”, „JAPÁN”, „USA”). Csoportonként a maximális mennyiséget adjuk vissza. Hasonlóképpen visszaküldjük a minimális mennyiséget.

Következtetés

Alapvetően a pandas_udf () a vektorizált műveletek végrehajtására szolgál PySpark DataFrame-ünkön. Láttuk, hogyan hozható létre a pandas_udf() és hogyan alkalmazhatja a PySpark DataFrame-re. A jobb megértés érdekében a különböző példákat az összes adattípus (karakterlánc, float és egész) figyelembevételével tárgyaltuk. A pandas_udf() használata a groupby()-vel az agg() függvényen keresztül lehetséges.