Ebben az útmutatóban elsősorban a parkettafájl PySpark DataFrame/SQL-be történő beolvasására/betöltésére összpontosítunk a read.parquet() függvény használatával, amely elérhető a pyspark.sql.DataFrameReader osztályban.
Tartalom témája:
Szerezd meg a parkettareszelőt
Olvassa be a parkettafájlt a PySpark DataFrame-be
Olvassa be a Parquet fájlt a PySpark SQL-be
Pyspark.sql.DataFrameReader.parquet()
Ez a funkció a parkettafájl beolvasására és a PySpark DataFrame-be való betöltésére szolgál. Felveszi a parketta fájl elérési útját/fájlnevét. Egyszerűen használhatjuk a read.parquet() függvényt, mivel ez az általános függvény.
Szintaxis:
Lássuk a read.parquet() szintaxisát:
spark_app.read.parquet(file_name.parquet/path)Először telepítse a PySpark modult a pip paranccsal:
pip install pyspark
Szerezd meg a parkettareszelőt
Egy parkettafájl olvasásához szükség van azokra az adatokra, amelyekben a parkettafájl ezekből az adatokból generálódik. Ebben a részben látni fogjuk, hogyan lehet parkettafájlt generálni a PySpark DataFrame-ből.
Hozzunk létre egy PySpark DataFrame-et 5 rekordból, és írjuk ezt az “industry_parquet” parketta fájlba.
import pysparka pyspark.sql-ből importálhatja a SparkSession,Row-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# hozzon létre egy adatkeretet, amely az iparág adatait tárolja
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Mezőgazdaság' ,Terület= 'EGYESÜLT ÁLLAMOK' ,
Értékelés= 'Forró' ,Összes_alkalmazott= 100 ),
Row(Típus= 'Mezőgazdaság' ,Terület= 'India' ,Értékelés= 'Forró' ,Összes_alkalmazott= 200 ),
Row(Típus= 'Fejlesztés' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Meleg' ,Összes_alkalmazott= 100 ),
Row(Típus= 'Oktatás' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Menő' ,Összes_alkalmazott= 400 ),
Row(Típus= 'Oktatás' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Meleg' ,Összes_alkalmazott= húsz )
])
# Aktuális DataFrame
Industry_df.show()
# Írja be az ipar_df-et a parketta fájlba
Industry_df.coalesce( 1 ).write.parquet( 'ipar_parketta' )
Kimenet:
Ez a DataFrame, amely 5 rekordot tartalmaz.
Létrejön egy parketta fájl az előző DataFrame-hez. Itt a fájlnév kiterjesztéssel „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Ezt a fájlt használjuk a teljes oktatóanyagban.
Olvassa be a parkettafájlt a PySpark DataFrame-be
Megvan a parkettareszelő. Olvassuk el ezt a fájlt a read.parquet() függvény segítségével, és töltsük be a PySpark DataFrame-be.
import pysparka pyspark.sql-ből importálhatja a SparkSession,Row-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be a parketta fájlt a dataframe_from_parquet objektumba.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# A dataframe_from_parquet-DataFrame megjelenítése
dataframe_from_parquet.show()
Kimenet:
A DataFrame-et a parketta fájlból létrehozott show() metódussal jelenítjük meg.
SQL lekérdezések parketta fájllal
A DataFrame-be való betöltés után lehetőség nyílik az SQL táblák létrehozására és a DataFrame-ben lévő adatok megjelenítésére. Létre kell hoznunk egy IDEIGLENES NÉZETet, és az SQL parancsokkal vissza kell adni a rekordokat a parketta fájlból létrehozott DataFrame-ből.
1. példa:
Hozzon létre egy ideiglenes nézetet „Szektorok” néven, és használja a SELECT parancsot a DataFrame rekordjainak megjelenítéséhez. Erre hivatkozhat oktatóanyag amely elmagyarázza, hogyan hozhat létre NÉZETet a Sparkban – SQL.
import pysparka pyspark.sql-ből importálhatja a SparkSession,Row-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be a parketta fájlt a dataframe_from_parquet objektumba.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )
# Hozzon létre nézetet a fenti parketta fájlból - 'Szektorok'
dataframe_from_parquet.createOrReplaceTempView( 'Szektorok' )
# Lekérdezés a szektorok összes rekordjának megjelenítéséhez
linuxhint_spark_app.sql( 'válasszon * a szektorokból' ).előadás()
Kimenet:
2. példa:
Az előző VIEW segítségével írja be az SQL lekérdezést:
- Az „Indához” tartozó szektorok összes rekordjának megjelenítése.
- A 100-nál nagyobb alkalmazottal rendelkező szektorok összes rekordjának megjelenítése.
linuxhint_spark_app.sql( 'válasszon * olyan szektorokból, ahol Terület='India'' ).előadás()
# Lekérdezés a 100-nál nagyobb alkalmazottal rendelkező szektorok összes rekordjának megjelenítéséhez
linuxhint_spark_app.sql( 'válasszon *-ot azokból az ágazatokból, ahol az összes alkalmazott >100' ).előadás()
Kimenet:
Csak egy rekord van „India” területtel, és két olyan rekord van, ahol az alkalmazottak száma meghaladja a 100-at.
Olvassa be a Parquet fájlt a PySpark SQL-be
Először is létre kell hoznunk egy VIEW-t a CREATE paranccsal. Az SQL lekérdezésben a „path” kulcsszó használatával beolvashatjuk a parquet fájlt a Spark SQL-be. Az elérési út után meg kell adnunk a fájl nevét/helyét.
Szintaxis:
spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA nézet_neve parkettalehetőségek HASZNÁLATÁVAL (útvonal ' file_name.parquet ')' )1. példa:
Hozzon létre egy ideiglenes nézetet „Szektor2” néven, és olvassa be a parkettafájlt. Az sql() függvénnyel írja be a kiválasztási lekérdezést a nézetben lévő összes rekord megjelenítéséhez.
import pysparka pyspark.sql-ből importálhatja a SparkSession,Row-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be a parketta fájlt a SparkSQL-be
linuxhint_spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA 2. szektorban parkettalehetőségek HASZNÁLATÁVAL (útvonal' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )
# Lekérdezés a Szektor2 összes rekordjának megjelenítéséhez
linuxhint_spark_app.sql( 'select * from Sector2' ).előadás()
Kimenet:
2. példa:
Használja az előző NÉZETet, és írja be a lekérdezést, hogy megjelenítse az összes rekordot, amelynek besorolása „Meleg” vagy „Cool”.
# Lekérdezés a Sector2 összes rekordjának megjelenítéséhez, értékeléssel – Hot vagy Cool.linuxhint_spark_app.sql( 'válasszon *-ot a 2. szektorból, ahol Rating='Meleg' VAGY Értékelés='Cool'' ).előadás()
Kimenet:
Három rekord van a „Hot” vagy a „Cool” minősítéssel.
Következtetés
A PySparkban a write.parquet() függvény a DataFrame-et a parkettafájlba írja. A read.parquet() függvény beolvassa a parkettafájlt a PySpark DataFrame-be vagy bármely más adatforrásba. Megtanultuk, hogyan kell beolvasni a parketta fájlt a PySpark DataFrame-be és a PySpark táblába. Ennek az oktatóanyagnak a részeként azt is megvitattuk, hogyan hozhatunk létre táblákat a PySpark DataFrame-ből, és hogyan szűrhetjük az adatokat a WHERE záradék használatával.