Pyspark.sql.DataFrameReader.csv()
Ez a módszer az adatok beolvasására szolgál a CSV-fájl(ok)ból, és tárolja azokat a PySpark DataFrame-ben. A beállításokat a CSV DataFrame-be történő beolvasása közben veszi át. A különböző lehetőségeket példákkal részletesen tárgyaljuk. Ha egynél több CSV-fájlt ad át, fontos, hogy a fájlneveket kiterjesztéssel adjuk át egy listában, amelyet vesszővel választunk el. Ha csak egy CSV-fájlt olvas, akkor nem kell megadnia a fájl nevét egy listában.
Szintaxis:
Egyetlen fájl - spark_app.read.csv('file.csv', lehetőségek …)
Több fájl – spark_app.read.csv(['file1.csv','file2.csv',…],options…)
Lehetőség van az opciók és a fájlnevek elkülönítésére is.
Egyetlen fájl – spark_app.read.options(options…).csv('file.csv')
Több fájl – spark_app.read.options(options…).csv(['file1.csv','file2.csv',…])
A következő példák megvalósítása előtt telepítse a PySpark könyvtárat.
pip install pyspark
A sikeres telepítés után a következőképpen láthatja a kimenetet:
1. forgatókönyv: A CSV-fájl fejlécének olvasása
Hozzon létre egy „person_skill.csv” nevű CSV-fájlt 5 rekorddal, amely a következőkben látható, és töltse be a PySpark DataFrame-be:
A fejléc paraméter az oszlopnevek megadására szolgál a PySpark DataFrame-ben. Logikai érték kell hozzá. Ha „True”, akkor a CSV-fájlban létező tényleges oszlopnevek a DataFrame-ben vannak megadva. Ellenkező esetben a c0, c1, c2… meg van adva, és a tényleges oszlopnevek sorok lesznek. A legjobb gyakorlat, ha a fejléc paraméterét igazra állítja.
1. példa: Fejléc = igaz
import pysparka pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Töltse be a - person_skill.csv nevű csv-t a fejléccel ellátott oszlopcímkékkel ellátott készségekbe
skillek = linuxhint_spark_app.read.csv( 'person_skill.csv' , fejléc = igaz)
# Jelenítse meg a DataFrame-et
skillek.show()
Kimenet:
Magyarázat:
Láthatjuk, hogy a PySpark DataFrame a CSV-fájlból jön létre meghatározott oszlopokkal és sorokkal.
Az oszlopok ellenőrzéséhez használja a következő parancsot:
készségek.oszlopok
2. példa: Fejléc = False
import pysparka pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Töltse be a - person_skill.csv nevű csv-t a fejléc nélküli oszlopcímkékkel ellátott készségekbe
skillek = linuxhint_spark_app.read.csv( 'person_skill.csv' , fejléc =Hamis)
# Jelenítse meg a DataFrame-et
skillek.show()
Kimenet:
Magyarázat:
Láthatjuk, hogy a PySpark DataFrame a CSV-fájlból jön létre, meglévő oszlopok nélkül.
Ezenkívül a meglévő oszlopok sorokként vannak tárolva a PySpark DataFrame-ben.
készségek.oszlopok
A Read.options.csv()
Most a read.options.csv() metódussal olvassuk be a CSV-fájlt. Itt a csv()-ben argumentumként és fájlnévként kell átadnunk az olyan opciókat, mint a határoló, fejléc stb. Adjuk át a fejléc paraméterét „Igaz” értékre állítva.
1. forgatókönyv:
import pysparka pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# A read.options.csv() használata
skillek = linuxhint_spark_app.read. lehetőségek ( fejléc =Igaz).csv( 'person_skill.csv' )
# Jelenítse meg a DataFrame-et
skillek.show()
Kimenet:
2. forgatókönyv: A CSV-fájlhatároló olvasása
A határoló paraméter azt a karaktert veszi fel, amely az egyes mezők elválasztására szolgál. Alapértelmezés szerint vessző (,) szükséges. Használjuk ugyanazt a CSV-fájlt, mint az első forgatókönyvben, és adjuk meg a vesszőt (',') határolóként.
import pysparka pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# A read.options.csv() használata határolóval és fejléccel
skillek = linuxhint_spark_app.read. lehetőségek ( fejléc =Igaz,határoló= ',' .csv( 'person_skill.csv' )
# Jelenítse meg a DataFrame-et
skillek.show()
Kimenet:
Több fájl olvasása
Eddig egyetlen CSV-fájlt olvastunk. Nézzük meg, hogyan lehet egynél több CSV-fájlt olvasni. Ebben a forgatókönyvben a több fájlban lévő sorok egyetlen PySpark DataFrame-hez vannak hozzáfűzve. Csak át kell adnunk a fájlneveket egy listában a metóduson belül.
Példa:
Legyen a következő „person_skill.csv” és „person_skill2.csv” CSV-fájlok a következő adatokkal:
Olvassa el ezt a két CSV-fájlt, és tárolja őket egyetlen PySpark DataFrame-ben.
import pysparka pyspark.sql-ből importálja a SparkSession-t
linuxhint_spark_app = SparkSession.builder.appName( 'Linux tipp' ).getOrCreate()
# Töltsön be 2 csv-fájlt (person_skill.csv és person_skill2.csv) a képességekbe fejléccel ellátott oszlopcímkékkel
skillek = linuxhint_spark_app.read.csv([ 'person_skill.csv' , 'person_skill2.csv' ],szep= ',' , fejléc = igaz)
skillek.show()
Kimenet:
Magyarázat:
Az első CSV 6, a második CSV 3 rekordot tartalmaz. Láthatjuk, hogy először az első CSV kerül betöltésre a DataFrame-be. Ezután a második CSV betöltődik. Végül a PySpark DataFrame 9 rekordot tartalmaz.
Következtetés
A CSV beolvasása a PySpark DataFrame-be meglehetősen egyszerű a pyspark.sql.DataFrameReader.csv() metódussal. Ennek a módszernek a fejléc és a határoló paraméterek átadása lehetséges az oszlopok és a formátum megadása érdekében. A PySpark támogatja több CSV-fájl egyidejű olvasását is a megadott metódusokkal és azok lehetőségeivel. Ebben a cikkben különböző lehetőségeket mérlegelve láttuk a példákat. Ezen kívül két módot láttunk arra, hogy a lehetőségeket átadjuk a metódusnak.