Tartalom témája:
JSON beolvasása a PySpark DataFrame-be a Pandas.read_json() használatával
JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával
JSON olvasása PySpark DataFrame-be a PySpark SQL használatával
Ebben az oktatóanyagban megvizsgáljuk, hogyan lehet JSON-t beolvasni a PySpark DataFrame-be a pandas.read_json(), a spark.read.json() és a spark.sql használatával. Minden forgatókönyvben megvizsgáljuk a különböző példákat a különböző JSON-formátumok figyelembevételével.
A következő példák megvalósítása előtt telepítse a PySpark könyvtárat.
pip install pysparkA sikeres telepítés után a következőképpen láthatja a kimenetet:
JSON beolvasása a PySpark DataFrame-be a Pandas.read_json() használatával
A PySparkban a createDataFrame() metódus a DataFrame közvetlen létrehozására szolgál. Itt csak a JSON-fájlt/útvonalat kell átadnunk a JSON-fájlnak a pandas.read_json() metóduson keresztül. Ez a read_json() metódus a Pandas modulban elérhető fájlnevet/útvonalat veszi fel. Ezért szükséges a Pandas modul importálása és használata.
Szintaxis:
spark_app.createDataFrame(pandas.read_json( 'file_name.json' ))Példa:
Hozzon létre egy „student_skill.json” nevű JSON-fájlt, amely 2 rekordot tartalmaz. Itt a billentyűk/oszlopok a „Student 1” és a „Student 2” (diák 2). A sorok a név, az életkor, a skill1 és a skill2.
import pyspark
import pandákat
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# A pandas.read_json() használata
kandidaat_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( 'student_skill.json' ))
jelölt_skills.show()
Kimenet:
Láthatjuk, hogy a JSON-adatok meghatározott oszlopokkal és sorokkal PySpark DataFrame-be lettek konvertálva.
2. JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával
A read.json() egy olyan metódus, amely hasonló a Pandas read_json()-hoz. Itt a read.json() elérési utat választ a JSON-hoz vagy közvetlenül a JSON-fájlhoz, és közvetlenül betölti azt a PySpark DataFrame-be. Ebben a forgatókönyvben nincs szükség a createDataFrame() metódus használatára. Ha egyszerre több JSON-fájlt szeretne olvasni, a JSON-fájlnevek listáját egy vesszővel elválasztott listán kell átadnunk. Az összes JSON-rekord egyetlen DataFrame-ben van tárolva.
Szintaxis:
Egyetlen fájl - spark_app.read.json( 'file_name.json' )Több fájl - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])
1. forgatókönyv: Olvassa el az egysoros JSON-t
Ha a JSON-fájl rekord1, rekord2, rekord3… (egysoros) formátumú, akkor egysoros JSON-nak hívhatjuk. A Spark feldolgozza ezeket a rekordokat, és sorokként tárolja a PySpark DataFrame-ben. Minden rekord egy sor a PySpark DataFrame-ben.
Hozzon létre egy „candidate_skills.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.
import pyspark
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be akandidaat_skills.json fájlt a PySpark DataFrame-be
kandidaat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )
jelölt_skills.show()
Kimenet:
Láthatjuk, hogy a JSON-adatok PySpark DataFrame-be konvertálva meghatározott rekordokkal és oszlopnevekkel.
2. forgatókönyv: Olvassa el a több sorral rendelkező JSON-t
Ha a JSON-fájl több sorból áll, akkor a read.option().json() metódust kell használnia a többsoros paraméter átadásához, amelyet igazra kell állítani. Ez lehetővé teszi számunkra, hogy több sorral rendelkező JSON-t töltsünk be a PySpark DataFrame-be.
read.option( 'többsoros' , 'igaz' ).json( 'file_name.json' )Hozzon létre egy „multi.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.
import pyspark
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be a multi.json fájlt (több sorral) a PySpark DataFrame-be
kandidaat_skills = linuxhint_spark_app.read.option( 'többsoros' , 'igaz' ).json( 'multi.json' )
jelölt_skills.show()
Kimenet:
3. forgatókönyv: Olvasson több JSON-t
Már tárgyaltuk ennek az oktatóanyagnak a kezdő szakaszában a több JSON-fájlról. Ha egyszerre több JSON-fájlt szeretne olvasni, és egyetlen PySpark DataFrame-ben szeretné tárolni, akkor át kell adnunk a fájlnevek listáját a read.json() metódusnak.
Hozzon létre két JSON-fájlt „candidate_skills.json” és „candidate_skills2.json” néven, és töltse be őket a PySpark DataFrame-be.
A „candidate_skills.json” fájl három rekordot tartalmaz.
A „candidate_skill2.json” fájl csak egyetlen rekordot tartalmaz.
import pyspark
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be a jelölt_skills és jelölt_skills2 fájlokat egyszerre a PySpark DataFrame-be
kandidaat_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])
jelölt_készségek.show()
Kimenet:
Végül a DataFrame négy rekordot tartalmaz. Az első három rekord az első JSON-hoz, az utolsó rekordok pedig a második JSON-hoz tartoznak.
JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával
A read.json() egy olyan metódus, amely hasonló a Pandas read_json()-hoz. Itt a read.json() elérési utat választ a JSON-hoz vagy közvetlenül a JSON-fájlhoz, és közvetlenül betölti a PySpark DataFrame-be. Ebben a forgatókönyvben nincs szükség a createDataFrame() metódus használatára. Ha egyszerre több JSON-fájlt szeretne olvasni, a JSON-fájlnevek listáját egy vesszővel elválasztott listán kell átadnunk. Az összes JSON-rekord egyetlen DataFrame-ben van tárolva.
Szintaxis:
Egyetlen fájl - spark_app.read.json( 'file_name.json' )Több fájl - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])
1. forgatókönyv: Olvassa el az egysoros JSON-t
Ha a JSON-fájl rekord1, rekord2, rekord3… (egysoros) formátumú, akkor egysoros JSON-nak hívhatjuk. A Spark feldolgozza ezeket a rekordokat, és sorokként tárolja a PySpark DataFrame-ben. Minden rekord egy sor a PySpark DataFrame-ben.
Hozzon létre egy „candidate_skills.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.
import pyspark
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Olvassa be akandidaat_skills.json fájlt a PySpark DataFrame-be
kandidaat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )
jelölt_készségek.show()
Kimenet:
Láthatjuk, hogy a JSON-adatok PySpark DataFrame-be konvertálva meghatározott rekordokkal és oszlopnevekkel.
JSON olvasása PySpark DataFrame-be a PySpark SQL használatával
A PySpark SQL használatával létrehozható egy ideiglenes nézet a JSON-adatainkról. Közvetlenül megadhatjuk a JSON-t az ideiglenes nézet létrehozásakor. Nézze meg a következő szintaxist. Ezt követően a SELECT paranccsal megjeleníthetjük a PySpark DataFrame-et.
Szintaxis:
spark_app.sql( 'ÁTMENETI NÉZET LÉTREHOZÁSA VIEW_NAME AZ json OPTIONS HASZNÁLATÁVAL (elérési út: 'file_name.json')' )Itt a „VIEW_NAME” a JSON-adatok nézete, a „file_name” pedig a JSON-fájl neve.
1. példa:
Tekintsük az előző példákban használt JSON-fájlt – „candidate_skills.json”. Válassza ki az összes sort a DataFrame-ből a SELECT gombbal a „*” operátorral. Itt a * kiválasztja a PySpark DataFrame összes oszlopát.
import pysparkimport pandákat
a pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# A spark.sql használata VIEW létrehozásához a JSON-ból
kandidat_skills = linuxhint_spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA Candidate_data AZ json OPCIÓK HASZNÁLATÁVAL ('candidate_skills.json' elérési út)' )
# Használja a SELECT lekérdezést a Candidate_data összes rekordjának kiválasztásához.
linuxhint_spark_app.sql( 'SELECT * from Candidate_data' ).előadás()
Kimenet:
A PySpark DataFrame (JSON-ból olvasva) rekordjainak teljes száma 3.
2. példa:
Most szűrje le a PySpark DataFrame rekordjait az életkor oszlopa alapján. Használja a „nagyobb, mint” operátort az életkorhoz, hogy megkapja azokat a sorokat, amelyek életkora 22-nél nagyobb.
# Használja a SELECT lekérdezést a 22 évesnél idősebb rekordok kiválasztásához.linuxhint_spark_app.sql( 'SELECT * from Candidate_data, ahol életkor >22' ).előadás()
Kimenet:
Csak egy olyan rekord van a PySpark DataFrame-ben, amelynek életkora meghaladja a 22 évet.
Következtetés
Megtanultuk a JSON beolvasásának három különböző módját a PySpark DataFrame-be. Először is megtanultuk, hogyan kell használni a Pandas modulban elérhető read_json() metódust a JSON olvasásához a PySpark DataFrame-be. Ezután megtanultuk, hogyan kell beolvasni az egysoros/többsoros JSON-fájlokat a spark.read.json() metódussal az option() segítségével. Egyszerre több JSON-fájl olvasásához a fájlnevek listáját kell átadnunk ennek a módszernek. PySpark SQL használatával a JSON-fájlt a rendszer beolvassa az ideiglenes nézetbe, és a DataFrame a SELECT lekérdezéssel jelenik meg.