A valós idejű adatfolyam megvalósítása a Pythonban

A Valos Ideju Adatfolyam Megvalositasa A Pythonban



A valós idejű adatfolyam megvalósításának elsajátítása a Pythonban elengedhetetlen készség a mai, adatokkal érintett világban. Ez az útmutató feltárja az alapvető lépéseket és alapvető eszközöket a valós idejű adatfolyam hitelesített használatához a Pythonban. Az olyan illeszkedő keretrendszer kiválasztásától kezdve, mint az Apache Kafka vagy az Apache Pulsar, a könnyed adatfelhasználást, -feldolgozást és hatékony megjelenítést biztosító Python-kód írásáig elsajátítjuk az agilis és hatékony valós idejű adatcsatornák felépítéséhez szükséges készségeket.

1. példa: Valós idejű adatfolyam megvalósítása Pythonban

A valós idejű adatfolyam megvalósítása Pythonban kulcsfontosságú a mai adatvezérelt korban és világban. Ebben a részletes példában egy valós idejű adatfolyam-rendszer felépítésének folyamatát mutatjuk be Apache Kafka és Python segítségével a Google Colabban.







A példa inicializálásához a kódolás megkezdése előtt elengedhetetlen egy adott környezet létrehozása a Google Colabban. Az első dolog, amit tennünk kell, a szükséges könyvtárak telepítése. A „kafka-python” könyvtárat használjuk a Kafka-integrációhoz.



! csipog telepítés kafka-python


Ez a parancs telepíti a „kafka-python” könyvtárat, amely biztosítja a Python függvényeket és az Apache Kafka kötéseit. Ezután importáljuk a projektünkhöz szükséges könyvtárakat. A szükséges könyvtárak, köztük a „KafkaProducer” és a „KafkaConsumer” importálása a „kafka-python” könyvtárból azok az osztályok, amelyek lehetővé teszik számunkra, hogy kapcsolatba léphessünk a Kafka brókerekkel. A JSON a Python-könyvtár, amely a JSON-adatokkal dolgozik, amelyeket az üzenetek sorosítására és deszerializálására használunk.



a kafka importból KafkaProducer, KafkaConsumer
json importálása


Kafka producer létrehozása





Ez azért fontos, mert egy Kafka-producer elküldi az adatokat egy Kafka-témának. Példánkban létrehozunk egy termelőt, amely szimulált valós idejű adatokat küld a „valós idejű témakörnek”.

Létrehozunk egy „KafkaProducer” példányt, amely a Kafka bróker címét „localhost:9092”-ként határozza meg. Ezután a „value_serializer” függvényt használjuk, amely sorba rendezi az adatokat, mielőtt elküldené azokat Kafkának. Esetünkben egy lambda függvény UTF-8 kódolású JSON-ként kódolja az adatokat. Most szimuláljunk néhány valós idejű adatot, és küldjük el a Kafka témához.



producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ban ben ) .kódol ( 'utf-8' ) )
# Szimulált valós idejű adatok
adatok = { 'sensor_id' : 1 , 'hőfok' : 25.5 , 'páratartalom' : 60.2 }
# Adatok küldése a témához
producer.küld ( 'valós idejű téma' , adatok )


Ezekben a sorokban egy „adat” szótárt definiálunk, amely szimulált szenzoradatokat reprezentál. Ezután a „küldés” módszert használjuk az adatok közzétételére a „valós idejű témában”.

Ezután egy Kafka-fogyasztót akarunk létrehozni, és egy Kafka-fogyasztó beolvassa az adatokat egy Kafka-témából. Olyan fogyasztót hozunk létre, aki a „valós idejű témában” fogyasztja és feldolgozza az üzeneteket. Létrehozunk egy „KafkaConsumer” példányt, megadva a felhasználni kívánt témát, például (valós idejű téma) és a Kafka bróker címét. Ezután a „value_deserializer” egy olyan függvény, amely deszerializálja a Kafkától kapott adatokat. Esetünkben egy lambda függvény UTF-8 kódolású JSON-ként dekódolja az adatokat.

fogyasztó = KafkaConsumer ( 'valós idejű téma' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekódolni ( 'utf-8' ) ) )


Iteratív ciklust használunk a téma üzeneteinek folyamatos fogyasztására és feldolgozására.

# Valós idejű adatok olvasása és feldolgozása
számára üzenet ban ben fogyasztó:
adat = üzenet.érték
nyomtatás ( f „Fogadott adatok: {data}” )


Minden egyes üzenet értékét és a szimulált szenzoradatokat lekérjük a hurkon belül, és kinyomtatjuk a konzolra. A Kafka producer és fogyasztó futtatása magában foglalja a kód futtatását a Google Colabban, és a kódcellák egyenkénti végrehajtását. A szimulált adatokat a gyártó elküldi a Kafka témának, a fogyasztó pedig beolvassa és kinyomtatja a kapott adatokat.


A kimenet elemzése a kód futása közben

Valós idejű adatokat fogunk megfigyelni, amelyek előállítása és felhasználása folyamatban van. Az adatformátum a szimulációnktól vagy a tényleges adatforrástól függően változhat. Ebben a részletes példában bemutatjuk a valós idejű adatfolyam-rendszer beállításának teljes folyamatát az Apache Kafka és Python használatával a Google Colabban. Elmagyarázzuk az egyes kódsorokat és azok jelentőségét a rendszer felépítésében. A valós idejű adatfolyam hatékony képesség, és ez a példa alapjául szolgál a bonyolultabb valós alkalmazásokhoz.

2. példa: Valós idejű adatfolyam megvalósítása Pythonban tőzsdei adatok használatával

Vegyünk egy másik egyedi példát a valós idejű adatfolyam megvalósítására Pythonban egy másik forgatókönyv használatával; ezúttal a tőzsdei adatokra koncentrálunk. Valós idejű adatfolyam-rendszert hozunk létre, amely rögzíti a részvényárfolyam változásait, és feldolgozza azokat az Apache Kafka és a Python segítségével a Google Colabban. Ahogy az előző példában is bemutattuk, először a környezetünk konfigurálásával kezdjük a Google Colabban. Először telepítjük a szükséges könyvtárakat:

! csipog telepítés kafka-python yfinance


Itt hozzáadjuk az „yfinance” könyvtárat, amely lehetővé teszi, hogy valós idejű tőzsdei adatokat kapjunk. Ezután importáljuk a szükséges könyvtárakat. Továbbra is a „KafkaProducer” és a „KafkaConsumer” osztályokat használjuk a „kafka-python” könyvtárból a Kafka interakcióhoz. JSON-t importálunk a JSON-adatokkal való együttműködéshez. Az „yfinance”-t is használjuk, hogy valós idejű tőzsdei adatokat kapjunk. Importáljuk az „idő” könyvtárat is, hogy késleltetést adjunk hozzá a valós idejű frissítések szimulálásához.

a kafka importból KafkaProducer, KafkaConsumer
json importálása
import yfinance mint yf
import idő


Most létrehozunk egy Kafka gyártót a készletadatokhoz. Kafka gyártónk valós idejű készletadatokat kap, és elküldi a „készletár” nevű Kafka-témának.

producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ban ben ) .kódol ( 'utf-8' ) )

míg Igaz:
stock = yf.Ticker ( 'AAPL' ) # Példa: Apple Inc. részvény
stock_data = stock.history ( időszak = '1d' )
utolsó_ár = készletadatok [ 'Bezárás' ] .iloc [ - 1 ]
adatok = { 'szimbólum' : 'AAPL' , 'ár' : utolsó ár }
producer.küld ( 'részvényárfolyam' , adatok )
idő.alvás ( 10 ) # Szimulálja a valós idejű frissítéseket 10 másodpercenként


Létrehozunk egy „KafkaProducer” példányt a Kafka bróker címével ebben a kódban. A hurkon belül az „yfinance”-t használjuk az Apple Inc. (“AAPL”) legfrissebb részvényárfolyamának lekéréséhez. Ezután kivonjuk az utolsó záró árat és elküldjük a „tőzsdei ár” témakörbe. Végül bevezetünk egy késleltetést, amely 10 másodpercenként szimulálja a valós idejű frissítéseket.

Hozzunk létre egy Kafka-fogyasztót, aki elolvassa és feldolgozza a részvényárfolyam adatait a „tőzsdei ár” témakörből.

fogyasztó = KafkaConsumer ( 'részvényárfolyam' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekódolni ( 'utf-8' ) ) )

számára üzenet ban ben fogyasztó:
stock_data = üzenet.érték
nyomtatás ( f 'Fogadott készletadatok: {stock_data['symbol']} - Ár: {stock_data['price']}' )


Ez a kód hasonló az előző példa fogyasztói beállításához. Folyamatosan olvassa és feldolgozza a „tőzsdei ár” témakör üzeneteit, és kinyomtatja a konzolra a részvény szimbólumot és az árat. A kódcellákat szekvenciálisan hajtjuk végre, például egyenként a Google Colabban a gyártó és a fogyasztó futtatásához. A gyártó megkapja és elküldi a valós idejű részvényárfolyam-frissítéseket, miközben a fogyasztó elolvassa és megjeleníti ezeket az adatokat.

! csipog telepítés kafka-python yfinance
a kafka importból KafkaProducer, KafkaConsumer
json importálása
import yfinance mint yf
import idő
producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( ban ben ) .kódol ( 'utf-8' ) )

míg Igaz:
stock = yf.Ticker ( 'AAPL' ) # Apple Inc. részvény
stock_data = stock.history ( időszak = '1d' )
utolsó_ár = készletadatok [ 'Bezárás' ] .iloc [ - 1 ]

adatok = { 'szimbólum' : 'AAPL' , 'ár' : utolsó ár }

producer.küld ( 'részvényárfolyam' , adatok )

idő.alvás ( 10 ) # Szimulálja a valós idejű frissítéseket 10 másodpercenként
fogyasztó = KafkaConsumer ( 'részvényárfolyam' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.dekódolni ( 'utf-8' ) ) )

számára üzenet ban ben fogyasztó:
stock_data = üzenet.érték
nyomtatás ( f 'Fogadott készletadatok: {stock_data['symbol']} - Ár: {stock_data['price']}' )


A kód lefutása utáni kimenet elemzése során megfigyeljük az Apple Inc. valós idejű részvényárfolyam-frissítéseit, amelyek előállítása és felhasználása történik.

Következtetés

Ebben az egyedülálló példában bemutattuk a valós idejű adatfolyam megvalósítását Pythonban az Apache Kafka és az „yfinance” könyvtár segítségével a tőzsdei adatok rögzítésére és feldolgozására. Alaposan elmagyaráztuk a kód minden sorát. A valós idejű adatfolyam különféle területeken alkalmazható valós alkalmazások létrehozásához a pénzügy, az IoT és egyebek területén.