PySpark DataFrame konvertálása JSON-ba

Pyspark Dataframe Konvertalasa Json Ba



Strukturált adatok továbbítása JSON használatával lehetséges, és kevés memóriát is fogyaszt. A PySpark RDD-vel vagy PySpark DataFrame-mel összehasonlítva a JSON kevés memóriát és szerializálást fogyaszt, ami a JSON segítségével lehetséges. A PySpark DataFrame-et a pyspark.sql.DataFrameWriter.json() metódussal tudjuk JSON-ba konvertálni. Ezen kívül két másik módja is van a DataFrame JSON-ba konvertálásának.

Tartalom témája:

Tekintsünk egy egyszerű PySpark DataFrame-et az összes példában, és alakítsuk át JSON-ba az említett funkciók segítségével.







Szükséges modul:

Telepítse a PySpark könyvtárat a környezetébe, ha még nincs telepítve. A telepítéshez használja a következő parancsot:



pip install pyspark

PySpark DataFrame JSON-ba a To_json() és ToPandas() használatával

A to_json() metódus elérhető a Pandas modulban, amely a Pandas DataFrame-et JSON-ba konvertálja. Ezt a módszert akkor használhatjuk, ha PySpark DataFrame-ünket Pandas DataFrame-re konvertáljuk. A PySpark DataFrame Pandas DataFrame-re konvertálásához a toPandas() metódust használjuk. Lássuk a to_json() szintaxisát a paramétereivel együtt.



Szintaxis:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Az Orient az átalakított JSON kívánt formátumban történő megjelenítésére szolgál. „Rekordokat”, „táblázatot”, „értékeket”, „oszlopokat”, „indexet”, „felosztást” igényel.
  2. Az index az index felvételére/eltávolítására szolgál az átalakított JSON-karakterláncból. Ha „True”-ra van állítva, az indexek megjelennek. Ellenkező esetben az indexek nem jelennek meg, ha az orientáció „osztott” vagy „tábla”.

1. példa: Tájékozódás „rekordként”

Hozzon létre egy „skills_df” PySpark DataFrame-et 3 sorból és 4 oszlopból. Alakítsa át ezt a DataFrame-et JSON-ba az orient paraméter „rekordként” megadásával.

import pyspark

import pandákat

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# készségadatok 3 sorral és 4 oszloppal

készségek =[{ 'azonosító' : 123 , 'személy' : 'Édesem' , 'készség' : 'festmény' , 'díj' : 25000 },

{ 'azonosító' : 112 , 'személy' : 'Mouni' , 'készség' : 'tánc' , 'díj' : 2000 },

{ 'azonosító' : 153 , 'személy' : 'Tulasi' , 'készség' : 'olvasás' , 'díj' : 1200 }

]

# hozza létre a készségek adatkeretét a fenti adatokból

skillek_df = linuxhint_spark_app.createDataFrame(skills)

# A tényleges készségek adatai

skillek_df.show()

# Konvertálás JSON-ba a to_json() használatával, orientációval mint 'rekordok'

json_skills_data = készségek_df.toPandas().to_json(orient= 'rekordok' )

print(json_skills_data)

Kimenet:



+---+------+-----+--------+

| id|személy|díj| készség|

+---+------+-----+--------+

| 123 | Drágám| 25000 |festés|

| 112 | Mouni| 2000 | tánc|

| 153 |Tulasi| 1200 | olvasás|

+---+------+-----+--------+

[{ 'azonosító' : 123 , 'személy' : 'Édesem' , 'díj' : 25000 , 'készség' : 'festmény' },{ 'azonosító' : 112 , 'személy' : 'Mouni' , 'díj' : 2000 , 'készség' : 'tánc' },{ 'azonosító' : 153 , 'személy' : 'Tulasi' , 'díj' : 1200 , 'készség' : 'olvasás' }]

Láthatjuk, hogy a PySpark DataFrame JSON-tömbbé konvertálva egy értékszótárral. Itt a kulcsok az oszlop nevét, az érték pedig a PySpark DataFrame sor/cella értékét jelöli.

2. példa: Orientálás „Split”-ként

A „felosztott” orientáció által visszaadott JSON-formátum tartalmazza az oszlopok listáját, az indexlistát és az adatlistát tartalmazó oszlopneveket. A következő az „osztott” orientáció formátuma.

# Konvertáljon JSON-ba a to_json() használatával, az orientációt 'split'-ként

json_skills_data = készségek_df.toPandas().to_json(orient= 'hasított' )

print(json_skills_data)

Kimenet:

{ 'oszlopok' :[ 'azonosító' , 'személy' , 'díj' , 'készség' ], 'index' :[ 0 , 1 , 2 ], 'adat' :[[ 123 , 'Édesem' , 25000 , 'festmény' ],[ 112 , 'Mouni' , 2000 , 'tánc' ],[ 153 , 'Tulasi' , 1200 , 'olvasás' ]]}

3. példa: Tájolás „indexként”

Itt a PySpark DataFrame minden egyes sora megszűnik egy szótár formájában, amelynek oszlop neve a kulcs. Minden szótárnál az index pozíciója kulcsként van megadva.

# Konvertáljon JSON-ba a to_json() használatával, az orientációval mint 'index'

json_skills_data = készségek_df.toPandas().to_json(orient= 'index' )

print(json_skills_data)

Kimenet:

{ '0' :{ 'azonosító' : 123 , 'személy' : 'Édesem' , 'díj' : 25000 , 'készség' : 'festmény' }, '1' :{ 'azonosító' : 112 , 'személy' : 'Mouni' , 'díj' : 2000 , 'készség' : 'tánc' }, '2' :{ 'azonosító' : 153 , 'személy' : 'Tulasi' , 'díj' : 1200 , 'készség' : 'olvasás' }}

4. példa: Tájolás „oszlopokként”

Az oszlopok jelentik az egyes rekordok kulcsát. Minden oszlop tartalmaz egy szótárat, amely az oszlopértékeket indexszámokkal veszi fel.

# Konvertálás JSON-ba a to_json() használatával, az orientációval 'oszlopként'

json_skills_data = készségek_df.toPandas().to_json(orient= 'oszlopok' )

print(json_skills_data)

Kimenet:

{ 'azonosító' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'személy' :{ '0' : 'Édesem' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'díj' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'készség' :{ '0' : 'festmény' , '1' : 'tánc' , '2' : 'olvasás' }}

5. példa: Tájolás „értékként”

Ha csak a JSON-beli értékekre van szüksége, választhatja az „értékek” orientációt. Minden sort megjelenít egy listában. Végül az összes listát egy listában tároljuk. Ez a JSON beágyazott lista típusú.

# Konvertálása JSON-ba a to_json() használatával, orientációval mint 'értékek'

json_skills_data = készségek_df.toPandas().to_json(orient= 'értékek' )

print(json_skills_data)

Kimenet:

[[ 123 , 'Édesem' , 25000 , 'festmény' ],[ 112 , 'Mouni' , 2000 , 'tánc' ],[ 153 , 'Tulasi' , 1200 , 'olvasás' ]]

6. példa: Tájolás „táblázatként”

A „tábla” orientáció a JSON-t adja vissza, amely tartalmazza a sémát mezőnevekkel, valamint az oszlop adattípusait, az indexet elsődleges kulcsként és a Pandas verziót. Az oszlopnevek értékekkel „adatként” jelennek meg.

# Konvertálása JSON-ba a to_json() használatával, orientációval mint 'tábla'

json_skills_data = készségek_df.toPandas().to_json(orient= 'asztal' )

print(json_skills_data)

Kimenet:

{ 'séma' :{ 'mezők' :[{ 'név' : 'index' , 'típus' : 'egész szám' },{ 'név' : 'azonosító' , 'típus' : 'egész szám' },{ 'név' : 'személy' , 'típus' : 'húr' },{ 'név' : 'díj' , 'típus' : 'egész szám' },{ 'név' : 'készség' , 'típus' : 'húr' }], 'elsődleges kulcs' :[ 'index' ], 'pandas_version' : '1.4.0' }, 'adat' :[{ 'index' : 0 , 'azonosító' : 123 , 'személy' : 'Édesem' , 'díj' : 25000 , 'készség' : 'festmény' },{ 'index' : 1 , 'azonosító' : 112 , 'személy' : 'Mouni' , 'díj' : 2000 , 'készség' : 'tánc' },{ 'index' : 2 , 'azonosító' : 153 , 'személy' : 'Tulasi' , 'díj' : 1200 , 'készség' : 'olvasás' }]}

7. példa: Index paraméterrel

Először az indexparamétert adjuk át „Igaz” értékre állítva. Minden oszlopértéknél látni fogja, hogy az indexpozíció kulcsként kerül visszaadásra a szótárban.

A második kimenetben csak az oszlopnevek („oszlopok”) és a rekordok („adatok”) kerülnek visszaadásra az indexpozíciók nélkül, mivel az index értéke „False”.

# Konvertálja JSON-ba a to_json() használatával az index=True beállítással

json_skills_data = készségek_df.toPandas().to_json(index=True)

print(json_skills_data, ' \n ' )

# Konvertálja JSON-ba a to_json() használatával, ahol index=False

json_skills_data= skill_df.toPandas().to_json(index=False,orient= 'hasított' )

print(json_skills_data)

Kimenet:

{ 'azonosító' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'személy' :{ '0' : 'Édesem' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'díj' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'készség' :{ '0' : 'festmény' , '1' : 'tánc' , '2' : 'olvasás' }}

{ 'oszlopok' :[ 'azonosító' , 'személy' , 'díj' , 'készség' ], 'adat' :[[ 123 , 'Édesem' , 25000 , 'festmény' ],[ 112 , 'Mouni' , 2000 , 'tánc' ],[ 153 , 'Tulasi' , 1200 , 'olvasás' ]]

PySpark DataFrame JSON-ba a ToJSON() használatával

A toJSON() metódus a PySpark DataFrame JSON-objektummá konvertálására szolgál. Alapvetően egy JSON-karakterláncot ad vissza, amelyet egy lista vesz körül. A ['{oszlop:érték,…}',…. ] ez a függvény által visszaadott formátum. Itt a PySpark DataFrame minden sora szótárként jelenik meg, kulcsként az oszlopnévvel.

Szintaxis:

dataframe_object.toJSON()

Lehetőség van olyan paraméterek átadására, mint az index, az oszlopcímkék és az adattípus.

Példa:

Hozzon létre egy „skills_df” PySpark DataFrame-et 5 sorból és 4 oszlopból. Konvertálja ezt a DataFrame-et JSON-ba a toJSON() metódussal.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# készségadatok 5 sorból és 4 oszlopból

készségek =[{ 'azonosító' : 123 , 'személy' : 'Édesem' , 'készség' : 'festmény' , 'díj' : 25000 },

{ 'azonosító' : 112 , 'személy' : 'Mouni' , 'készség' : 'zene/tánc' , 'díj' : 2000 },

{ 'azonosító' : 153 , 'személy' : 'Tulasi' , 'készség' : 'olvasás' , 'díj' : 1200 },

{ 'azonosító' : 173 , 'személy' : 'futott' , 'készség' : 'zene' , 'díj' : 2000 },

{ 'azonosító' : 43 , 'személy' : 'Kamala' , 'készség' : 'olvasás' , 'díj' : 10000 }

]

# hozza létre a készségek adatkeretét a fenti adatokból

skillek_df = linuxhint_spark_app.createDataFrame(skills)

# A tényleges készségek adatai

skillek_df.show()

# Konvertálás JSON tömbbe

json_skills_data = készségek_df.toJSON().collect()

print(json_skills_data)

Kimenet:

+---+------+-----+-----------+

| id|személy|díj| készség|

+---+------+-----+-----------+

| 123 | Drágám| 25000 | festés|

| 112 | Mouni| 2000 |zene/tánc|

| 153 |Tulasi| 1200 | olvasás|

| 173 | Futott| 2000 | zene|

| 43 |Kamala| 10000 | olvasás|

+---+------+-----+-----------+

[ '{'id':123,'person':'Méz','díj':25000,'skill':'festmény'}' , '{'id':112,'person':'Mouni','díj':2000,'skill':'zene/tánc'}' , '{'id':153,'person':'Tulasi','díj':1200,'készség':'olvasás'}' , '{'id':173,'person':'Ran','prize':2000,'skill':'zene'}' , '{'id':43,'person':'Kamala','díj':10000,'készség':'olvasás'}' ]

A PySpark DataFrame-ben 5 sor található. Ez az 5 sor vesszővel elválasztott karakterláncokból álló szótárként jelenik meg.

PySpark DataFrame JSON-ba a Write.json() használatával

A write.json() metódus elérhető a PySparkban, amely a PySpark DataFrame-et JSON-fájlba írja/menti. Paraméterként a fájlnevet/útvonalat veszi fel. Alapvetően több fájlban (particionált fájlban) adja vissza a JSON-t. Ha ezeket egyetlen fájlba szeretnénk egyesíteni, használhatjuk a coalesce() metódust.

Szintaxis:

dataframe_object.coalesce( 1 ).write.json('fájl_név')
  1. Hozzáfűzési mód – dataframe_object.write.mode('append').json('fájlnév')
  2. Felülírási mód – dataframe_object.write.mode(‘overwrite’).json(‘fájlnév’)

Lehetséges a meglévő JSON hozzáfűzése/felülírása. A write.mode() segítségével hozzáfűzhetjük az adatokat az „append” átadásával, vagy felülírhatjuk a meglévő JSON-adatokat a „overwrite” átadásával ennek a függvénynek.

1. példa:

Hozzon létre egy „skills_df” PySpark DataFrame-et 3 sorból és 4 oszlopból. Írja ezt a DataFrame-et JSON-ba.

import pyspark

import pandákat

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# készségadatok 3 sorral és 4 oszloppal

készségek =[{ 'azonosító' : 123 , 'személy' : 'Édesem' , 'készség' : 'festmény' , 'díj' : 25000 },

{ 'azonosító' : 112 , 'személy' : 'Mouni' , 'készség' : 'tánc' , 'díj' : 2000 },

{ 'azonosító' : 153 , 'személy' : 'Tulasi' , 'készség' : 'olvasás' , 'díj' : 1200 }

]

# hozza létre a készségek adatkeretét a fenti adatokból

skillek_df = linuxhint_spark_app.createDataFrame(skills)

# write.json()

skillek_df.coalesce( 1 ).write.json( 'skills_data' )

JSON fájl:

Láthatjuk, hogy a skill_data mappa tartalmazza a particionált JSON-adatokat.

Nyissuk meg a JSON fájlt. Láthatjuk, hogy a PySpark DataFrame összes sora JSON-ba konvertálva.

A PySpark DataFrame-ben 5 sor található. Ez az 5 sor vesszővel elválasztott karakterláncokból álló szótárként jelenik meg.

2. példa:

Hozzon létre egy „skills2_df” PySpark DataFrame-et egy sorral. Adjon hozzá egy sort az előző JSON-fájlhoz úgy, hogy a módot „hozzáfűzés”-ként adja meg.

import pyspark

import pandákat

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

készségek2 =[{ 'azonosító' : 78 , 'személy' : 'Mária' , 'készség' : 'lovaglás' , 'díj' : 8960 }

]

# hozza létre a készségek adatkeretét a fenti adatokból

skill2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() hozzáfűzési móddal.

skill2_df.write.mode( 'mellékel' ).json( 'skills_data' )

JSON fájl:

Láthatjuk a particionált JSON fájlokat. Az első fájl az első DataFrame rekordokat, a második pedig a második DataFrame rekordot tartalmazza.

Következtetés

A PySpark DataFrame három különböző módon konvertálható JSON-ba. Először a to_json() metódust tárgyaltuk, amely a PySpark DataFrame-nek a Pandas DataFrame-re való konvertálásával konvertál JSON-ba, különböző példákkal, különböző paraméterek figyelembevételével. Ezután a toJSON() metódust használtuk. Végül megtanultuk, hogyan kell a write.json() függvényt használni a PySpark DataFrame JSON-ba írásához. Hozzáfűzés és felülírás lehetséges ezzel a funkcióval.