PySpark olvasása JSON()

Pyspark Olvasasa Json



A PySpark DataFrame-ekkel való munka közben azt a PySpark DataFrame-ben kell tárolni, ha feldolgozni szeretné a JSON-adatokat. A DataFrame-ben való tárolást követően az adatokon alkalmazhatjuk a különböző műveleteket és módszereket. Emellett számos előnnyel jár, ha a JSON-t PySpark DataFrame-re konvertáljuk, mivel az egyszerű, és egyszerűbb módon tudjuk átalakítani/particionálni az adatokat.

Tartalom témája:

JSON beolvasása a PySpark DataFrame-be a Pandas.read_json() használatával







JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával



JSON olvasása PySpark DataFrame-be a PySpark SQL használatával



Ebben az oktatóanyagban megvizsgáljuk, hogyan lehet JSON-t beolvasni a PySpark DataFrame-be a pandas.read_json(), a spark.read.json() és a spark.sql használatával. Minden forgatókönyvben megvizsgáljuk a különböző példákat a különböző JSON-formátumok figyelembevételével.





A következő példák megvalósítása előtt telepítse a PySpark könyvtárat.

pip install pyspark

A sikeres telepítés után a következőképpen láthatja a kimenetet:



JSON beolvasása a PySpark DataFrame-be a Pandas.read_json() használatával

A PySparkban a createDataFrame() metódus a DataFrame közvetlen létrehozására szolgál. Itt csak a JSON-fájlt/útvonalat kell átadnunk a JSON-fájlnak a pandas.read_json() metóduson keresztül. Ez a read_json() metódus a Pandas modulban elérhető fájlnevet/útvonalat veszi fel. Ezért szükséges a Pandas modul importálása és használata.

Szintaxis:

spark_app.createDataFrame(pandas.read_json( 'file_name.json' ))

Példa:

Hozzon létre egy „student_skill.json” nevű JSON-fájlt, amely 2 rekordot tartalmaz. Itt a billentyűk/oszlopok a „Student 1” és a „Student 2” (diák 2). A sorok a név, az életkor, a skill1 és a skill2.

import pyspark

import pandákat

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# A pandas.read_json() használata

kandidaat_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( 'student_skill.json' ))

jelölt_skills.show()

Kimenet:

Láthatjuk, hogy a JSON-adatok meghatározott oszlopokkal és sorokkal PySpark DataFrame-be lettek konvertálva.

2. JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával

A read.json() egy olyan metódus, amely hasonló a Pandas read_json()-hoz. Itt a read.json() elérési utat választ a JSON-hoz vagy közvetlenül a JSON-fájlhoz, és közvetlenül betölti azt a PySpark DataFrame-be. Ebben a forgatókönyvben nincs szükség a createDataFrame() metódus használatára. Ha egyszerre több JSON-fájlt szeretne olvasni, a JSON-fájlnevek listáját egy vesszővel elválasztott listán kell átadnunk. Az összes JSON-rekord egyetlen DataFrame-ben van tárolva.

Szintaxis:

Egyetlen fájl - spark_app.read.json( 'file_name.json' )

Több fájl - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

1. forgatókönyv: Olvassa el az egysoros JSON-t

Ha a JSON-fájl rekord1, rekord2, rekord3… (egysoros) formátumú, akkor egysoros JSON-nak hívhatjuk. A Spark feldolgozza ezeket a rekordokat, és sorokként tárolja a PySpark DataFrame-ben. Minden rekord egy sor a PySpark DataFrame-ben.

Hozzon létre egy „candidate_skills.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be akandidaat_skills.json fájlt a PySpark DataFrame-be

kandidaat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

jelölt_skills.show()

Kimenet:

Láthatjuk, hogy a JSON-adatok PySpark DataFrame-be konvertálva meghatározott rekordokkal és oszlopnevekkel.

2. forgatókönyv: Olvassa el a több sorral rendelkező JSON-t

Ha a JSON-fájl több sorból áll, akkor a read.option().json() metódust kell használnia a többsoros paraméter átadásához, amelyet igazra kell állítani. Ez lehetővé teszi számunkra, hogy több sorral rendelkező JSON-t töltsünk be a PySpark DataFrame-be.

read.option( 'többsoros' , 'igaz' ).json( 'file_name.json' )

Hozzon létre egy „multi.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be a multi.json fájlt (több sorral) a PySpark DataFrame-be

kandidaat_skills = linuxhint_spark_app.read.option( 'többsoros' , 'igaz' ).json( 'multi.json' )

jelölt_skills.show()

Kimenet:

3. forgatókönyv: Olvasson több JSON-t

Már tárgyaltuk ennek az oktatóanyagnak a kezdő szakaszában a több JSON-fájlról. Ha egyszerre több JSON-fájlt szeretne olvasni, és egyetlen PySpark DataFrame-ben szeretné tárolni, akkor át kell adnunk a fájlnevek listáját a read.json() metódusnak.

Hozzon létre két JSON-fájlt „candidate_skills.json” és „candidate_skills2.json” néven, és töltse be őket a PySpark DataFrame-be.

A „candidate_skills.json” fájl három rekordot tartalmaz.

A „candidate_skill2.json” fájl csak egyetlen rekordot tartalmaz.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be a jelölt_skills és jelölt_skills2 fájlokat egyszerre a PySpark DataFrame-be

kandidaat_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])

jelölt_készségek.show()

Kimenet:

Végül a DataFrame négy rekordot tartalmaz. Az első három rekord az első JSON-hoz, az utolsó rekordok pedig a második JSON-hoz tartoznak.

JSON olvasása PySpark DataFrame-be a Spark.read.json() használatával

A read.json() egy olyan metódus, amely hasonló a Pandas read_json()-hoz. Itt a read.json() elérési utat választ a JSON-hoz vagy közvetlenül a JSON-fájlhoz, és közvetlenül betölti a PySpark DataFrame-be. Ebben a forgatókönyvben nincs szükség a createDataFrame() metódus használatára. Ha egyszerre több JSON-fájlt szeretne olvasni, a JSON-fájlnevek listáját egy vesszővel elválasztott listán kell átadnunk. Az összes JSON-rekord egyetlen DataFrame-ben van tárolva.

Szintaxis:

Egyetlen fájl - spark_app.read.json( 'file_name.json' )

Több fájl - spark_app.read.json([ 'file1.json' , 'file2.json' ,...])

1. forgatókönyv: Olvassa el az egysoros JSON-t

Ha a JSON-fájl rekord1, rekord2, rekord3… (egysoros) formátumú, akkor egysoros JSON-nak hívhatjuk. A Spark feldolgozza ezeket a rekordokat, és sorokként tárolja a PySpark DataFrame-ben. Minden rekord egy sor a PySpark DataFrame-ben.

Hozzon létre egy „candidate_skills.json” nevű JSON-fájlt, amely 3 rekordot tartalmaz. Olvassa be ezt a JSON-t a PySpark DataFrame-be.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Olvassa be akandidaat_skills.json fájlt a PySpark DataFrame-be

kandidaat_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

jelölt_készségek.show()

Kimenet:

Láthatjuk, hogy a JSON-adatok PySpark DataFrame-be konvertálva meghatározott rekordokkal és oszlopnevekkel.

JSON olvasása PySpark DataFrame-be a PySpark SQL használatával

A PySpark SQL használatával létrehozható egy ideiglenes nézet a JSON-adatainkról. Közvetlenül megadhatjuk a JSON-t az ideiglenes nézet létrehozásakor. Nézze meg a következő szintaxist. Ezt követően a SELECT paranccsal megjeleníthetjük a PySpark DataFrame-et.

Szintaxis:

spark_app.sql( 'ÁTMENETI NÉZET LÉTREHOZÁSA VIEW_NAME AZ json OPTIONS HASZNÁLATÁVAL (elérési út: 'file_name.json')' )

Itt a „VIEW_NAME” a JSON-adatok nézete, a „file_name” pedig a JSON-fájl neve.

1. példa:

Tekintsük az előző példákban használt JSON-fájlt – „candidate_skills.json”. Válassza ki az összes sort a DataFrame-ből a SELECT gombbal a „*” operátorral. Itt a * kiválasztja a PySpark DataFrame összes oszlopát.

import pyspark

import pandákat

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# A spark.sql használata VIEW létrehozásához a JSON-ból

kandidat_skills = linuxhint_spark_app.sql( 'IDEIGLENES NÉZET LÉTREHOZÁSA Candidate_data AZ json OPCIÓK HASZNÁLATÁVAL ('candidate_skills.json' elérési út)' )

# Használja a SELECT lekérdezést a Candidate_data összes rekordjának kiválasztásához.

linuxhint_spark_app.sql( 'SELECT * from Candidate_data' ).előadás()

Kimenet:

A PySpark DataFrame (JSON-ból olvasva) rekordjainak teljes száma 3.

2. példa:

Most szűrje le a PySpark DataFrame rekordjait az életkor oszlopa alapján. Használja a „nagyobb, mint” operátort az életkorhoz, hogy megkapja azokat a sorokat, amelyek életkora 22-nél nagyobb.

# Használja a SELECT lekérdezést a 22 évesnél idősebb rekordok kiválasztásához.

linuxhint_spark_app.sql( 'SELECT * from Candidate_data, ahol életkor >22' ).előadás()

Kimenet:

Csak egy olyan rekord van a PySpark DataFrame-ben, amelynek életkora meghaladja a 22 évet.

Következtetés

Megtanultuk a JSON beolvasásának három különböző módját a PySpark DataFrame-be. Először is megtanultuk, hogyan kell használni a Pandas modulban elérhető read_json() metódust a JSON olvasásához a PySpark DataFrame-be. Ezután megtanultuk, hogyan kell beolvasni az egysoros/többsoros JSON-fájlokat a spark.read.json() metódussal az option() segítségével. Egyszerre több JSON-fájl olvasásához a fájlnevek listáját kell átadnunk ennek a módszernek. PySpark SQL használatával a JSON-fájlt a rendszer beolvassa az ideiglenes nézetbe, és a DataFrame a SELECT lekérdezéssel jelenik meg.