Tartalom témája:
- Pandas_udf() különböző adattípusokkal
- Pandas_udf() különböző adattípusokkal a Groupby() és Agg() használatával
Ha szeretne tudni a PySpark DataFrame és a modul telepítéséről, kövesse ezt cikk .
Pyspark.sql.functions.pandas_udf()
A pandas_udf () a PySpark sql.functions moduljában érhető el, amely a „from” kulcsszóval importálható. A vektorizált műveletek végrehajtására szolgál a PySpark DataFrame-ünkön. Ez a funkció dekorátorként valósul meg három paraméter átadásával. Ezt követően létrehozhatunk egy felhasználó által definiált függvényt, amely vektoros formátumban adja vissza az adatokat (ahogyan ehhez a sorozatot/NumPy-t használjuk) egy nyíl segítségével. Ezen a függvényen belül vissza tudjuk adni az eredményt.
Szerkezet és szintaxis:
Először nézzük meg ennek a függvénynek a szerkezetét és szintaxisát:
@pandas_udf(adattípus)def függvény_neve(művelet) -> convert_format:
visszáru nyilatkozat
Itt a függvény_neve a definiált függvény neve. Az adattípus határozza meg a függvény által visszaadott adattípust. Az eredményt a „return” kulcsszó használatával adhatjuk vissza. Az összes műveletet a függvényen belül hajtják végre a nyíl hozzárendeléssel.
Pandas_udf (függvény és visszatérési típus)
- Az első paraméter a felhasználó által definiált függvény, amelyet átadunk neki.
- A második paraméter a függvény visszatérési adattípusának megadására szolgál.
Adat:
Ebben a teljes útmutatóban csak egy PySpark DataFrame-et használunk a demonstrációhoz. Az általunk meghatározott összes felhasználó által definiált függvényt alkalmazzuk ezen a PySpark DataFrame-en. Győződjön meg arról, hogy először a PySpark telepítése után hozza létre ezt a DataFrame-et a környezetben.
import pyspark
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
a pyspark.sql.functions fájlból importálja a pandas_udf fájlt
a pyspark.sql.types importból *
import pandákat pandaként
# zöldségrészletek
zöldség =[{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 800 },
{ 'típus' : 'gyümölcs' , 'név' : 'banán' , 'locate_country' : 'KÍNA' , 'Mennyiség' : húsz },
{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 800 },
{ 'típus' : 'növényi' , 'név' : 'Mangó' , 'locate_country' : 'JAPÁN' , 'Mennyiség' : 0 },
{ 'típus' : 'gyümölcs' , 'név' : 'citrom' , 'locate_country' : 'INDIA' , 'Mennyiség' : 1700 },
{ 'típus' : 'növényi' , 'név' : 'paradicsom' , 'locate_country' : 'EGYESÜLT ÁLLAMOK' , 'Mennyiség' : 1200 },
{ 'típus' : 'növényi' , 'név' : 'Mangó' , 'locate_country' : 'JAPÁN' , 'Mennyiség' : 0 },
{ 'típus' : 'gyümölcs' , 'név' : 'citrom' , 'locate_country' : 'INDIA' , 'Mennyiség' : 0 }
]
# hozza létre a piaci adatkeretet a fenti adatokból
market_df = linuxhint_spark_app.createDataFrame(vegetable)
market_df.show()
Kimenet:
Itt hozzuk létre ezt a DataFrame-et 4 oszlopból és 8 sorból. Most a pandas_udf() segítségével hozzuk létre a felhasználó által definiált függvényeket, és alkalmazzuk őket ezekre az oszlopokra.
Pandas_udf() különböző adattípusokkal
Ebben a forgatókönyvben létrehozunk néhány felhasználó által definiált függvényt a pandas_udf() paranccsal, és alkalmazzuk őket az oszlopokra, és megjelenítjük az eredményeket a select() metódussal. A vektorizált műveletek végrehajtásához minden esetben a pandas.Series-t használjuk. Ez az oszlopértékeket egydimenziós tömbnek tekinti, és a művelet az oszlopra kerül. Magában a dekorátorban megadjuk a függvény visszatérési típusát.
1. példa: Pandas_udf() karakterlánctípussal
Itt két felhasználó által definiált függvényt hozunk létre a karakterlánc visszatérési típusával, hogy a karakterlánc típusú oszlopértékeket nagy- és kisbetűssé alakítsuk. Végül ezeket a függvényeket a „type” és a „locate_country” oszlopokra alkalmazzuk.
# Konvertálja a típusoszlopot nagybetűssé a pandas_udf segítségével@pandas_udf(StringType())
def type_upper_case(i: panda.Series) -> panda.Series:
return i.str.upper()
# Konvertálja a locate_country oszlopot kisbetűsre a pandas_udf segítségével
@pandas_udf(StringType())
def country_lower_case(i: panda.Series) -> panda.Series:
return i.str.lower()
# Jelenítse meg az oszlopokat a select() segítségével
market_df.select( 'típus' ,type_upper_case( 'típus' ), 'locate_country' ,
ország_kisbetű( 'locate_country' )).előadás()
Kimenet:
Magyarázat:
A StringType() függvény a pyspark.sql.types modulban érhető el. Ezt a modult már importáltuk a PySpark DataFrame létrehozásakor.
- Először is, az UDF (felhasználó által definiált függvény) az str.upper() függvény segítségével nagybetűvel adja vissza a karakterláncokat. Az str.upper() a Series Data Structure-ban érhető el (mivel a függvényen belüli nyíllal sorra konvertálunk), amely az adott karakterláncot nagybetűssé alakítja. Végül ezt a függvényt a „type” oszlopra alkalmazzuk, amely a select() metóduson belül van megadva. Korábban a típus oszlopban lévő összes karakterlánc kisbetűvel íródott. Most nagybetűre változtatták őket.
- Másodszor, az UDF az str.lower() függvény segítségével nagybetűvel adja vissza a karakterláncokat. Az str.lower() elérhető a Series Data Structure-ban, amely az adott karakterláncot kisbetűssé alakítja. Végül ezt a függvényt a „type” oszlopra alkalmazzuk, amely a select() metóduson belül van megadva. Korábban a típus oszlopban lévő összes karakterlánc nagybetűvel íródott. Most kisbetűre változtatták.
2. példa: Pandas_udf() egész szám típussal
Hozzon létre egy UDF-et, amely a PySpark DataFrame egész oszlopát Pandas sorozattá alakítja, és adjunk hozzá 100-at minden értékhez. Adja át a „mennyiség” oszlopot ennek a függvénynek a select() metóduson belül.
# Adjunk hozzá 100-at@pandas_udf(IntegerType())
def add_100(i: panda.Series) -> panda.Series:
return i+ 100
# Adja át a mennyiség oszlopot a fenti függvénynek és jelenítse meg.
market_df.select( 'Mennyiség' ,add_100( 'Mennyiség' )).előadás()
Kimenet:
Magyarázat:
Az UDF-en belül az összes értéket iteráljuk, és sorozattá konvertáljuk. Ezt követően a sorozat minden értékéhez hozzáadunk 100-at. Végül a „mennyiség” oszlopot átadjuk ennek a függvénynek, és láthatjuk, hogy az összes értékhez hozzáadódik a 100.
Pandas_udf() különböző adattípusokkal a Groupby() és Agg() használatával
Nézzük meg a példákat az UDF átadására az összesített oszlopoknak. Itt először az oszlopértékeket csoportosítjuk a groupby() függvénnyel, az összesítést pedig az agg() függvénnyel. Az UDF-ünket ezen az összesített függvényen belül adjuk át.
Szintaxis:
pyspark_dataframe_object.groupby( 'csoportosítás_oszlop' ).agg(UDF(pyspark_dataframe_object[ 'oszlop' ]))
Itt először a csoportosítás oszlopban lévő értékek vannak csoportosítva. Ezután minden egyes csoportosított adaton megtörténik az összesítés az UDF-ünkhöz képest.
1. példa: Pandas_udf() az összesített átlaggal()
Itt létrehozunk egy felhasználó által definiált függvényt return típusú floattal. A függvényen belül az átlagot az átlag() függvény segítségével számítjuk ki. Ez az UDF átkerül a „mennyiség” oszlopba, hogy megkapja az egyes típusok átlagos mennyiségét.
# adja vissza az átlagot/átlagot@pandas_udf( 'úszó' )
def average_function(i: panda.Series) -> float:
return i.mean()
# Adja át a mennyiség oszlopot a függvénynek a típusoszlop csoportosításával.
market_df.groupby( 'típus' ).agg(average_function(market_df[ 'Mennyiség' ])).előadás()
Kimenet:
A „típus” oszlopban szereplő elemek alapján csoportosítunk. Két csoport alakul ki - „gyümölcs” és „zöldség”. Minden csoportra kiszámolják az átlagot és visszaadják.
2. példa: Pandas_udf() az Aggregate Max() és Min() értékekkel
Itt két felhasználó által definiált függvényt hozunk létre egész (int) visszatérési típussal. Az első UDF a minimális, a második UDF a maximális értéket adja vissza.
# pandas_udf, amely a minimális értéket adja vissza@pandas_udf( 'int' )
def min_(i: panda.Series) -> int:
return i.min()
# pandas_udf, amely a maximális értéket adja vissza
@pandas_udf( 'int' )
def max_(i: panda.Series) -> int:
return i.max()
# Adja át a mennyiség oszlopot a min_ pandas_udf-nek a locate_country csoportosításával.
market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'Mennyiség' ])).előadás()
# Adja át a mennyiség oszlopot a max_ pandas_udf-nek a locate_country csoportosításával.
market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'Mennyiség' ])).előadás()
Kimenet:
A minimális és maximális értékek visszaadásához a min() és max() függvényeket használjuk az UDF-ek visszatérési típusában. Most csoportosítjuk az adatokat a „locate_country” oszlopban. Négy csoport jön létre („KÍNA”, „INDIA”, „JAPÁN”, „USA”). Csoportonként a maximális mennyiséget adjuk vissza. Hasonlóképpen visszaküldjük a minimális mennyiséget.
Következtetés
Alapvetően a pandas_udf () a vektorizált műveletek végrehajtására szolgál PySpark DataFrame-ünkön. Láttuk, hogyan hozható létre a pandas_udf() és hogyan alkalmazhatja a PySpark DataFrame-re. A jobb megértés érdekében a különböző példákat az összes adattípus (karakterlánc, float és egész) figyelembevételével tárgyaltuk. A pandas_udf() használata a groupby()-vel az agg() függvényen keresztül lehetséges.