Paano Magpapatupad ng Real-Time na Data Streaming sa Python

Paano Magpapatupad Ng Real Time Na Data Streaming Sa Python



Ang pag-master sa pagpapatupad ng real-time na data streaming sa Python ay gumaganap bilang isang mahalagang kasanayan sa mundong may kinalaman sa data ngayon. Sinasaliksik ng gabay na ito ang mga pangunahing hakbang at mahahalagang tool para sa paggamit ng real-time na data streaming na may authenticity sa Python. Mula sa pagpili ng angkop na balangkas tulad ng Apache Kafka o Apache Pulsar hanggang sa pagsulat ng Python code para sa walang hirap na pagkonsumo ng data, pagproseso, at epektibong visualization, makukuha natin ang mga kinakailangang kasanayan upang makabuo ng maliksi at mahusay na real-time na mga channel ng data.

Halimbawa 1: Pagpapatupad ng Real-Time na Data Streaming sa Python

Ang pagpapatupad ng real-time na streaming ng data sa Python ay napakahalaga sa panahon at mundong hinihimok ng data ngayon. Sa detalyadong halimbawang ito, tatalakayin natin ang proseso ng pagbuo ng real-time na data streaming system gamit ang Apache Kafka at Python sa Google Colab.







Upang masimulan ang halimbawa bago tayo magsimulang mag-coding, ang pagbuo ng isang partikular na kapaligiran sa Google Colab ay mahalaga. Ang unang bagay na kailangan nating gawin ay i-install ang mga kinakailangang aklatan. Ginagamit namin ang library na 'kafka-python' para sa pagsasama ng Kafka.



! pip i-install kafka-python


Ini-install ng command na ito ang library na 'kafka-python' na nagbibigay ng mga function ng Python at mga binding para sa Apache Kafka. Susunod, ini-import namin ang mga kinakailangang aklatan para sa aming proyekto. Ang pag-import ng mga kinakailangang aklatan kasama ang “KafkaProducer” at “KafkaConsumer” ay ang mga klase mula sa library ng “kafka-python” na nagpapahintulot sa amin na makipag-ugnayan sa mga Kafka broker. Ang JSON ay ang Python library para gumana sa JSON data na ginagamit namin para i-serialize at deserialize ang mga mensahe.



mula sa kafka import KafkaProducer, KafkaConsumer
import json


Paglikha ng isang Kafka Producer





Mahalaga ito dahil ipinapadala ng isang producer ng Kafka ang data sa isang paksa ng Kafka. Sa aming halimbawa, lumikha kami ng isang producer upang magpadala ng isang simulate na real-time na data sa isang paksa na tinatawag na 'real-time-topic.'

Gumagawa kami ng instance na 'KafkaProducer' na tumutukoy sa address ng Kafka broker bilang 'localhost:9092'. Pagkatapos, ginagamit namin ang 'value_serializer', isang function na nagse-serialize ng data bago ito ipadala sa Kafka. Sa aming kaso, ang isang lambda function ay nag-encode ng data bilang UTF-8-encoded JSON. Ngayon, gayahin natin ang ilang real-time na data at ipadala ito sa paksang Kafka.



producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( sa ) .encode ( 'utf-8' ) )
# Simulated real-time na data
data = { 'sensor_id' : 1 , 'temperatura' : 25.5 , 'halumigmig' : 60.2 }
# Pagpapadala ng data sa paksa
producer.send ( 'real-time-topic' , data )


Sa mga linyang ito, tinukoy namin ang isang diksyunaryo ng 'data' na kumakatawan sa isang kunwa na data ng sensor. Pagkatapos ay ginagamit namin ang paraan ng 'ipadala' upang i-publish ang data na ito sa 'real-time-topic.'

Pagkatapos, gusto naming lumikha ng isang Kafka consumer, at isang Kafka consumer ang nagbabasa ng data mula sa isang Kafka na paksa. Gumagawa kami ng isang mamimili upang ubusin at iproseso ang mga mensahe sa 'real-time-topic.' Gumagawa kami ng instance na “KafkaConsumer,” na tumutukoy sa paksang gusto naming gamitin, hal., (real-time-topic) at address ng Kafka broker. Pagkatapos, ang 'value_deserializer' ay isang function na nagde-deserialize sa data na natanggap mula sa Kafka. Sa aming kaso, ang isang lambda function ay nagde-decode ng data bilang UTF-8-encoded JSON.

mamimili = KafkaConsumer ( 'real-time-topic' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


Gumagamit kami ng umuulit na loop upang patuloy na ubusin at iproseso ang mga mensahe mula sa paksa.

# Pagbabasa at pagproseso ng real-time na data
para sa mensahe sa mamimili:
data = mensahe.halaga
print ( f 'Natanggap na Data: {data}' )


Kinukuha namin ang halaga ng bawat mensahe at ang aming na-simulate na data ng sensor sa loob ng loop at i-print ito sa console. Ang pagpapatakbo sa producer at consumer ng Kafka ay nagsasangkot ng pagpapatakbo ng code na ito sa Google Colab at pag-execute ng mga cell ng code nang paisa-isa. Ipinapadala ng producer ang simulate na data sa paksang Kafka, at binabasa at ini-print ng consumer ang natanggap na data.


Pagsusuri ng Output Habang Tumatakbo ang Code

Obserbahan namin ang isang real-time na data na ginagawa at ginagamit. Maaaring mag-iba ang format ng data depende sa aming simulation o aktwal na data source. Sa detalyadong halimbawang ito, sinasaklaw namin ang buong proseso ng pag-set up ng real-time na data streaming system gamit ang Apache Kafka at Python sa Google Colab. Ipapaliwanag namin ang bawat linya ng code at ang kahalagahan nito sa pagbuo ng sistemang ito. Ang real-time na data streaming ay isang malakas na kakayahan, at ang halimbawang ito ay nagsisilbing pundasyon para sa mas kumplikadong mga real-world na application.

Halimbawa 2: Pagpapatupad ng Real-Time na Data Streaming sa Python Gamit ang Stock Market Data

Gumawa tayo ng isa pang natatanging halimbawa ng pagpapatupad ng real-time na data streaming sa Python gamit ang ibang senaryo; sa pagkakataong ito, tututukan natin ang data ng stock market. Gumagawa kami ng real-time na data streaming system na kumukuha ng mga pagbabago sa presyo ng stock at pinoproseso ang mga ito gamit ang Apache Kafka at Python sa Google Colab. Gaya ng ipinakita sa nakaraang halimbawa, nagsisimula tayo sa pamamagitan ng pag-configure ng ating kapaligiran sa Google Colab. Una, i-install namin ang mga kinakailangang aklatan:

! pip i-install kafka-python yfinance


Dito, idinaragdag namin ang library ng 'yfinance' na nagbibigay-daan sa amin na makakuha ng real-time na data ng stock market. Susunod, ini-import namin ang mga kinakailangang aklatan. Patuloy naming ginagamit ang mga klase ng “KafkaProducer” at “KafkaConsumer” mula sa library ng “kafka-python” para sa pakikipag-ugnayan ng Kafka. Nag-import kami ng JSON para gumana sa data ng JSON. Ginagamit din namin ang 'yfinance' para makakuha ng real-time na data ng stock market. Ini-import din namin ang library ng 'oras' upang magdagdag ng pagkaantala sa oras upang gayahin ang mga real-time na update.

mula sa kafka import KafkaProducer, KafkaConsumer
import json
import yfinance bilang yf
angkat oras


Ngayon, lumikha kami ng isang producer ng Kafka para sa data ng stock. Ang aming producer ng Kafka ay nakakakuha ng real-time na data ng stock at ipinapadala ito sa isang paksa ng Kafka na pinangalanang 'stock-price'.

producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( sa ) .encode ( 'utf-8' ) )

habang totoo:
stock = yf.Ticker ( 'AAPL' ) # Halimbawa: stock ng Apple Inc
stock_data = stock.history ( panahon = '1d' )
huling_presyo = stock_data [ 'malapit' ] .iloc [ - 1 ]
data = { 'simbolo' : 'AAPL' , 'presyo' : huling presyo }
producer.send ( 'stock-presyo' , data )
oras.tulog ( 10 ) # Gayahin ang mga real-time na update bawat 10 segundo


Gumagawa kami ng instance na 'KafkaProducer' kasama ang address ng Kafka broker sa code na ito. Sa loob ng loop, ginagamit namin ang 'yfinance' para makuha ang pinakabagong presyo ng stock para sa Apple Inc. ('AAPL'). Pagkatapos, kinukuha namin ang huling presyo ng pagsasara at ipinadala ito sa paksang 'stock-price'. Sa kalaunan, ipinakilala namin ang isang pagkaantala ng oras upang gayahin ang mga real-time na update bawat 10 segundo.

Gumawa tayo ng isang consumer ng Kafka upang basahin at iproseso ang data ng presyo ng stock mula sa paksang 'stock-price'.

mamimili = KafkaConsumer ( 'stock-presyo' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

para sa mensahe sa mamimili:
stock_data = message.value
print ( f 'Nakatanggap na Data ng Stock: {stock_data['symbol']} - Presyo: {stock_data['price']}' )


Ang code na ito ay katulad ng setup ng consumer ng nakaraang halimbawa. Patuloy nitong binabasa at pinoproseso ang mga mensahe mula sa paksang 'stock-price' at ini-print ang simbolo ng stock at presyo sa console. Isinasagawa namin ang mga cell ng code nang sunud-sunod, hal., isa-isa sa Google Colab upang patakbuhin ang producer at consumer. Nakukuha at ipinapadala ng producer ang real-time na mga update sa presyo ng stock habang binabasa at ipinapakita ng consumer ang data na ito.

! pip i-install kafka-python yfinance
mula sa kafka import KafkaProducer, KafkaConsumer
import json
import yfinance bilang yf
angkat oras
producer = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( sa ) .encode ( 'utf-8' ) )

habang totoo:
stock = yf.Ticker ( 'AAPL' ) # stock ng Apple Inc
stock_data = stock.history ( panahon = '1d' )
huling_presyo = stock_data [ 'malapit' ] .iloc [ - 1 ]

data = { 'simbolo' : 'AAPL' , 'presyo' : huling presyo }

producer.send ( 'stock-presyo' , data )

oras.tulog ( 10 ) # Gayahin ang mga real-time na update bawat 10 segundo
mamimili = KafkaConsumer ( 'stock-presyo' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

para sa mensahe sa mamimili:
stock_data = message.value
print ( f 'Nakatanggap na Data ng Stock: {stock_data['symbol']} - Presyo: {stock_data['price']}' )


Sa pagsusuri ng output pagkatapos tumakbo ang code, mapapansin namin ang real-time na mga update sa presyo ng stock para sa Apple Inc. na ginagawa at ginagamit.

Konklusyon

Sa natatanging halimbawang ito, ipinakita namin ang pagpapatupad ng real-time na data streaming sa Python gamit ang Apache Kafka at ang library ng 'yfinance' upang makuha at iproseso ang data ng stock market. Lubusan naming ipinaliwanag ang bawat linya ng code. Maaaring ilapat ang real-time na data streaming sa iba't ibang larangan upang bumuo ng mga real-world na application sa pananalapi, IoT, at higit pa.