PySpark olvasás CSV()

Pyspark Olvasas Csv



A PySpark DataFrame létrehozása a CSV-adatokból lehetséges a PySparkban a read.csv() függvény használatával. Bizonyos esetekben, ha a külső adatokat a PySpark DataFrame-be szeretné betölteni, a PySpark számos formátumot támogat, például JSON-t, CSV-t stb. Ebben az oktatóanyagban látni fogjuk, hogyan kell beolvasni a CSV-adatokat és betölteni a PySpark DataFrame-be. Azt is megvitatjuk, hogyan lehet egyszerre több CSV-fájlt betölteni egyetlen DataFrame-be, példákkal.

Pyspark.sql.DataFrameReader.csv()

Ez a módszer az adatok beolvasására szolgál a CSV-fájl(ok)ból, és tárolja azokat a PySpark DataFrame-ben. A beállításokat a CSV DataFrame-be történő beolvasása közben veszi át. A különböző lehetőségeket példákkal részletesen tárgyaljuk. Ha egynél több CSV-fájlt ad át, fontos, hogy a fájlneveket kiterjesztéssel adjuk át egy listában, amelyet vesszővel választunk el. Ha csak egy CSV-fájlt olvas, akkor nem kell megadnia a fájl nevét egy listában.

Szintaxis:







Egyetlen fájl - spark_app.read.csv('file.csv', lehetőségek …)

Több fájl – spark_app.read.csv(['file1.csv','file2.csv',…],options…)



Lehetőség van az opciók és a fájlnevek elkülönítésére is.



Egyetlen fájl – spark_app.read.options(options…).csv('file.csv')





Több fájl – spark_app.read.options(options…).csv(['file1.csv','file2.csv',…])

A következő példák megvalósítása előtt telepítse a PySpark könyvtárat.



pip install pyspark

A sikeres telepítés után a következőképpen láthatja a kimenetet:

1. forgatókönyv: A CSV-fájl fejlécének olvasása

Hozzon létre egy „person_skill.csv” nevű CSV-fájlt 5 rekorddal, amely a következőkben látható, és töltse be a PySpark DataFrame-be:

A fejléc paraméter az oszlopnevek megadására szolgál a PySpark DataFrame-ben. Logikai érték kell hozzá. Ha „True”, akkor a CSV-fájlban létező tényleges oszlopnevek a DataFrame-ben vannak megadva. Ellenkező esetben a c0, c1, c2… meg van adva, és a tényleges oszlopnevek sorok lesznek. A legjobb gyakorlat, ha a fejléc paraméterét igazra állítja.

1. példa: Fejléc = igaz

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Töltse be a - person_skill.csv nevű csv-t a fejléccel ellátott oszlopcímkékkel ellátott készségekbe

skillek = linuxhint_spark_app.read.csv( 'person_skill.csv' , fejléc = igaz)

# Jelenítse meg a DataFrame-et

skillek.show()

Kimenet:

Magyarázat:

Láthatjuk, hogy a PySpark DataFrame a CSV-fájlból jön létre meghatározott oszlopokkal és sorokkal.

Az oszlopok ellenőrzéséhez használja a következő parancsot:

készségek.oszlopok

2. példa: Fejléc = False

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Töltse be a - person_skill.csv nevű csv-t a fejléc nélküli oszlopcímkékkel ellátott készségekbe

skillek = linuxhint_spark_app.read.csv( 'person_skill.csv' , fejléc =Hamis)

# Jelenítse meg a DataFrame-et

skillek.show()

Kimenet:

Magyarázat:

Láthatjuk, hogy a PySpark DataFrame a CSV-fájlból jön létre, meglévő oszlopok nélkül.

Ezenkívül a meglévő oszlopok sorokként vannak tárolva a PySpark DataFrame-ben.

készségek.oszlopok

A Read.options.csv()

Most a read.options.csv() metódussal olvassuk be a CSV-fájlt. Itt a csv()-ben argumentumként és fájlnévként kell átadnunk az olyan opciókat, mint a határoló, fejléc stb. Adjuk át a fejléc paraméterét „Igaz” értékre állítva.

1. forgatókönyv:

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# A read.options.csv() használata

skillek = linuxhint_spark_app.read. lehetőségek ( fejléc =Igaz).csv( 'person_skill.csv' )

# Jelenítse meg a DataFrame-et

skillek.show()

Kimenet:

2. forgatókönyv: A CSV-fájlhatároló olvasása

A határoló paraméter azt a karaktert veszi fel, amely az egyes mezők elválasztására szolgál. Alapértelmezés szerint vessző (,) szükséges. Használjuk ugyanazt a CSV-fájlt, mint az első forgatókönyvben, és adjuk meg a vesszőt (',') határolóként.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# A read.options.csv() használata határolóval és fejléccel

skillek = linuxhint_spark_app.read. lehetőségek ( fejléc =Igaz,határoló= ',' .csv( 'person_skill.csv' )

# Jelenítse meg a DataFrame-et

skillek.show()

Kimenet:

Több fájl olvasása

Eddig egyetlen CSV-fájlt olvastunk. Nézzük meg, hogyan lehet egynél több CSV-fájlt olvasni. Ebben a forgatókönyvben a több fájlban lévő sorok egyetlen PySpark DataFrame-hez vannak hozzáfűzve. Csak át kell adnunk a fájlneveket egy listában a metóduson belül.

Példa:

Legyen a következő „person_skill.csv” és „person_skill2.csv” CSV-fájlok a következő adatokkal:


Olvassa el ezt a két CSV-fájlt, és tárolja őket egyetlen PySpark DataFrame-ben.

import pyspark

a pyspark.sql-ből importálja a SparkSession-t

linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()

# Töltsön be 2 csv-fájlt (person_skill.csv és person_skill2.csv) a képességekbe fejléccel ellátott oszlopcímkékkel

skillek = linuxhint_spark_app.read.csv([ 'person_skill.csv' , 'person_skill2.csv' ],szep= ',' , fejléc = igaz)

skillek.show()

Kimenet:

Magyarázat:

Az első CSV 6, a második CSV 3 rekordot tartalmaz. Láthatjuk, hogy először az első CSV kerül betöltésre a DataFrame-be. Ezután a második CSV betöltődik. Végül a PySpark DataFrame 9 rekordot tartalmaz.

Következtetés

A CSV beolvasása a PySpark DataFrame-be meglehetősen egyszerű a pyspark.sql.DataFrameReader.csv() metódussal. Ennek a módszernek a fejléc és a határoló paraméterek átadása lehetséges az oszlopok és a formátum megadása érdekében. A PySpark támogatja több CSV-fájl egyidejű olvasását is a megadott metódusokkal és azok lehetőségeivel. Ebben a cikkben különböző lehetőségeket mérlegelve láttuk a példákat. Ezen kívül két módot láttunk arra, hogy a lehetőségeket átadjuk a metódusnak.