PySpark Read.Parquet()

Pyspark Read Parquet



A PySparkban a write.parquet() függvény írja a DataFrame-et a parkettafájlba, a read.parquet() pedig beolvassa a parkettafájlt a PySpark DataFrame-be vagy bármely más adatforrásba. Az Apache Spark oszlopainak gyors és hatékony feldolgozásához tömörítenünk kell az adatokat. Az adattömörítés megtakarítja a memóriánkat, és az összes oszlop lapos szintre konvertálódik. Ez azt jelenti, hogy létezik lapos oszlop szintű tároló. Az ezeket tároló fájl PARQUET fájlként ismert.

Ebben az útmutatóban elsősorban a parkettafájl PySpark DataFrame/SQL-be ​​történő beolvasására/betöltésére összpontosítunk a read.parquet() függvény használatával, amely elérhető a pyspark.sql.DataFrameReader osztályban.

Tartalom témája:







Szerezd meg a parkettareszelőt



Olvassa be a parkettafájlt a PySpark DataFrame-be



Olvassa be a Parquet fájlt a PySpark SQL-be





Pyspark.sql.DataFrameReader.parquet()

Ez a funkció a parkettafájl beolvasására és a PySpark DataFrame-be való betöltésére szolgál. Felveszi a parketta fájl elérési útját/fájlnevét. Egyszerűen használhatjuk a read.parquet() függvényt, mivel ez az általános függvény.

Szintaxis:



Lássuk a read.parquet() szintaxisát:

spark_app.read.parquet(file_name.parquet/path)

Először telepítse a PySpark modult a pip paranccsal:

pip install pyspark

Szerezd meg a parkettareszelőt

Egy parkettafájl olvasásához szükség van azokra az adatokra, amelyekben a parkettafájl ezekből az adatokból generálódik. Ebben a részben látni fogjuk, hogyan lehet parkettafájlt generálni a PySpark DataFrame-ből.

Hozzunk létre egy PySpark DataFrame-et 5 rekordból, és írjuk ezt az “industry_parquet” parketta fájlba.

import pyspark

a pyspark.sql-ből importálhatja a SparkSession,Row-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# hozzon létre egy adatkeretet, amely az iparág adatait tárolja

industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Mezőgazdaság' ,Terület= 'EGYESÜLT ÁLLAMOK' ,
Értékelés= 'Forró' ,Összes_alkalmazott= 100 ),

Row(Típus= 'Mezőgazdaság' ,Terület= 'India' ,Értékelés= 'Forró' ,Összes_alkalmazott= 200 ),

Row(Típus= 'Fejlesztés' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Meleg' ,Összes_alkalmazott= 100 ),

Row(Típus= 'Oktatás' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Menő' ,Összes_alkalmazott= 400 ),

Row(Típus= 'Oktatás' ,Terület= 'EGYESÜLT ÁLLAMOK' ,Értékelés= 'Meleg' ,Összes_alkalmazott= húsz )

])

# Aktuális DataFrame

Industry_df.show()

# Írja be az ipar_df-et a parketta fájlba

Industry_df.coalesce( 1 ).write.parquet( 'ipar_parketta' )

Kimenet:

Ez a DataFrame, amely 5 rekordot tartalmaz.

Létrejön egy parketta fájl az előző DataFrame-hez. Itt a fájlnév kiterjesztéssel „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet”. Ezt a fájlt használjuk a teljes oktatóanyagban.

Olvassa be a parkettafájlt a PySpark DataFrame-be

Megvan a parkettareszelő. Olvassuk el ezt a fájlt a read.parquet() függvény segítségével, és töltsük be a PySpark DataFrame-be.

import pyspark

a pyspark.sql-ből importálhatja a SparkSession,Row-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be a parketta fájlt a dataframe_from_parquet objektumba.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# A dataframe_from_parquet-DataFrame megjelenítése

dataframe_from_parquet.show()

Kimenet:

A DataFrame-et a parketta fájlból létrehozott show() metódussal jelenítjük meg.

SQL lekérdezések parketta fájllal

A DataFrame-be való betöltés után lehetőség nyílik az SQL táblák létrehozására és a DataFrame-ben lévő adatok megjelenítésére. Létre kell hoznunk egy IDEIGLENES NÉZETet, és az SQL parancsokkal vissza kell adni a rekordokat a parketta fájlból létrehozott DataFrame-ből.

1. példa:

Hozzon létre egy ideiglenes nézetet „Szektorok” néven, és használja a SELECT parancsot a DataFrame rekordjainak megjelenítéséhez. Erre hivatkozhat oktatóanyag amely elmagyarázza, hogyan hozhat létre NÉZETet a Sparkban – SQL.

import pyspark

a pyspark.sql-ből importálhatja a SparkSession,Row-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be a parketta fájlt a dataframe_from_parquet objektumba.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Hozzon létre nézetet a fenti parketta fájlból - 'Szektorok'

dataframe_from_parquet.createOrReplaceTempView( 'Szektorok' )

# Lekérdezés a szektorok összes rekordjának megjelenítéséhez

linuxhint_spark_app.sql( 'válasszon * a szektorokból' ).előadás()

Kimenet:

2. példa:

Az előző VIEW segítségével írja be az SQL lekérdezést:

  1. Az „Indához” tartozó szektorok összes rekordjának megjelenítése.
  2. A 100-nál nagyobb alkalmazottal rendelkező szektorok összes rekordjának megjelenítése.
# Lekérdezés az 'India'-hoz tartozó szektorok összes rekordjának megjelenítéséhez.

linuxhint_spark_app.sql( 'válasszon * olyan szektorokból, ahol Terület='India'' ).előadás()

# Lekérdezés a 100-nál nagyobb alkalmazottal rendelkező szektorok összes rekordjának megjelenítéséhez

linuxhint_spark_app.sql( 'válasszon *-ot azokból az ágazatokból, ahol az összes alkalmazott >100' ).előadás()

Kimenet:

Csak egy rekord van „India” területtel, és két olyan rekord van, ahol az alkalmazottak száma meghaladja a 100-at.

Olvassa be a Parquet fájlt a PySpark SQL-be

Először is létre kell hoznunk egy VIEW-t a CREATE paranccsal. Az SQL lekérdezésben a „path” kulcsszó használatával beolvashatjuk a parquet fájlt a Spark SQL-be. Az elérési út után meg kell adnunk a fájl nevét/helyét.

Szintaxis:

spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA nézet_neve parkettalehetőségek HASZNÁLATÁVAL (útvonal ' file_name.parquet ')' )

1. példa:

Hozzon létre egy ideiglenes nézetet „Szektor2” néven, és olvassa be a parkettafájlt. Az sql() függvénnyel írja be a kiválasztási lekérdezést a nézetben lévő összes rekord megjelenítéséhez.

import pyspark

a pyspark.sql-ből importálhatja a SparkSession,Row-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be a parketta fájlt a SparkSQL-be

linuxhint_spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA 2. szektorban parkettalehetőségek HASZNÁLATÁVAL (útvonal' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Lekérdezés a Szektor2 összes rekordjának megjelenítéséhez

linuxhint_spark_app.sql( 'select * from Sector2' ).előadás()

Kimenet:

2. példa:

Használja az előző NÉZETet, és írja be a lekérdezést, hogy megjelenítse az összes rekordot, amelynek besorolása „Meleg” vagy „Cool”.

# Lekérdezés a Sector2 összes rekordjának megjelenítéséhez, értékeléssel – Hot vagy Cool.

linuxhint_spark_app.sql( 'válasszon *-ot a 2. szektorból, ahol Rating='Meleg' VAGY Értékelés='Cool'' ).előadás()

Kimenet:

Három rekord van a „Hot” vagy a „Cool” minősítéssel.

Következtetés

A PySparkban a write.parquet() függvény a DataFrame-et a parkettafájlba írja. A read.parquet() függvény beolvassa a parkettafájlt a PySpark DataFrame-be vagy bármely más adatforrásba. Megtanultuk, hogyan kell beolvasni a parketta fájlt a PySpark DataFrame-be és a PySpark táblába. Ennek az oktatóanyagnak a részeként azt is megvitattuk, hogyan hozhatunk létre táblákat a PySpark DataFrame-ből, és hogyan szűrhetjük az adatokat a WHERE záradék használatával.