Как да внедрите поточно предаване на данни в реално време в Python

Kak Da Vnedrite Potocno Predavane Na Danni V Realno Vreme V Python



Овладяването на внедряването на поточно предаване на данни в реално време в Python действа като основно умение в днешния свят, свързан с данни. Това ръководство изследва основните стъпки и основните инструменти за използване на поточно предаване на данни в реално време с автентичност в Python. От избора на подходяща рамка като Apache Kafka или Apache Pulsar до писането на код на Python за безпроблемно потребление на данни, обработка и ефективна визуализация, ние ще придобием необходимите умения за конструиране на гъвкави и ефективни канали за данни в реално време.

Пример 1: Внедряване на поточно предаване на данни в реално време в Python

Внедряването на поточно предаване на данни в реално време в Python е от решаващо значение в днешната епоха и свят, управлявани от данни. В този подробен пример ще преминем през процеса на изграждане на система за поточно предаване на данни в реално време с помощта на Apache Kafka и Python в Google Colab.







За инициализиране на примера, преди да започнем да кодираме, изграждането на специфична среда в Google Colab е от съществено значение. Първото нещо, което трябва да направим, е да инсталираме необходимите библиотеки. Използваме библиотеката „kafka-python“ за интеграция на Kafka.



! пип Инсталирай кафка-питон


Тази команда инсталира библиотеката „kafka-python“, която предоставя функциите на Python и свързванията за Apache Kafka. След това импортираме необходимите библиотеки за нашия проект. Импортирането на необходимите библиотеки, включително „KafkaProducer“ и „KafkaConsumer“ са класовете от библиотеката „kafka-python“, които ни позволяват да взаимодействаме с брокерите на Kafka. JSON е библиотеката на Python за работа с JSON данните, които използваме за сериализиране и десериализиране на съобщенията.



от kafka import KafkaProducer, KafkaConsumer
импортиране на json


Създаване на продуцент на Кафка





Това е важно, защото продуцентът на Kafka изпраща данните в тема на Kafka. В нашия пример създаваме продуцент, който да изпраща симулирани данни в реално време към тема, наречена „тема в реално време“.

Създаваме екземпляр „KafkaProducer“, който посочва адреса на брокера на Kafka като „localhost:9092“. След това използваме „value_serializer“, функция, която сериализира данните, преди да ги изпрати на Kafka. В нашия случай ламбда функция кодира данните като UTF-8-кодиран JSON. Сега, нека симулираме някои данни в реално време и ги изпратим в темата за Kafka.



продуцент = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )
# Симулирани данни в реално време
данни = { 'sensor_id' : 1 , 'температура' : 25.5 , 'влажност' : 60.2 }
# Изпращане на данни към темата
продуцент.изпрати ( 'тема в реално време' , данни )


В тези редове ние дефинираме речник „данни“, който представлява симулирани данни от сензор. След това използваме метода „изпращане“, за да публикуваме тези данни в „темата в реално време“.

След това искаме да създадем Kafka потребител, а Kafka потребител чете данните от Kafka тема. Ние създаваме потребител, който да консумира и обработва съобщенията в „темата в реално време“. Създаваме екземпляр „KafkaConsumer“, като посочваме темата, която искаме да използваме, например (тема в реално време) и адреса на брокера на Kafka. Тогава „value_deserializer“ е функция, която десериализира данните, получени от Kafka. В нашия случай ламбда функция декодира данните като UTF-8-кодиран JSON.

потребител = KafkaConsumer ( 'тема в реално време' ,
bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )


Използваме итеративен цикъл, за да консумираме непрекъснато и обработваме съобщенията от темата.

# Четене и обработка на данни в реално време
за съобщение в консуматор:
данни = съобщение.стойност
печат ( f „Получени данни: {данни}“ )


Ние извличаме стойността на всяко съобщение и нашите симулирани сензорни данни вътре в цикъла и ги отпечатваме на конзолата. Изпълнението на производителя и потребителя на Kafka включва изпълнение на този код в Google Colab и изпълнение на кодовите клетки поотделно. Производителят изпраща симулираните данни към темата Kafka, а потребителят чете и отпечатва получените данни.


Анализ на изхода, докато кодът се изпълнява

Ще наблюдаваме данни в реално време, които се произвеждат и консумират. Форматът на данните може да варира в зависимост от нашата симулация или действителния източник на данни. В този подробен пример ние обхващаме целия процес на настройка на система за поточно предаване на данни в реално време с помощта на Apache Kafka и Python в Google Colab. Ще обясним всеки ред код и значението му при изграждането на тази система. Потокът на данни в реално време е мощна възможност и този пример служи като основа за по-сложни приложения в реалния свят.

Пример 2: Внедряване на поточно предаване на данни в реално време в Python с помощта на данни от фондовия пазар

Нека направим още един уникален пример за внедряване на поточно предаване на данни в реално време в Python, използвайки различен сценарий; този път ще се фокусираме върху данните от борсата. Ние създаваме система за поточно предаване на данни в реално време, която улавя промените в цената на акциите и ги обработва с помощта на Apache Kafka и Python в Google Colab. Както беше показано в предишния пример, започваме с конфигуриране на нашата среда в Google Colab. Първо инсталираме необходимите библиотеки:

! пип Инсталирай kafka-python yfinance


Тук добавяме библиотеката „yfinance“, която ни позволява да получаваме данни за фондовия пазар в реално време. След това импортираме необходимите библиотеки. Продължаваме да използваме класовете „KafkaProducer“ и „KafkaConsumer“ от библиотеката „kafka-python“ за взаимодействие с Kafka. Ние импортираме JSON, за да работим с JSON данните. Ние също използваме „yfinance“, за да получим данни за фондовия пазар в реално време. Също така импортираме библиотеката „време“, за да добавим забавяне във времето, за да симулираме актуализациите в реално време.

от kafka import KafkaProducer, KafkaConsumer
импортиране на json
импортиране на финанси като yf
импортиране време


Сега създаваме производител на Kafka за данни за запаси. Нашият производител на Kafka получава данни за акциите в реално време и ги изпраща в тема на Kafka, наречена „stock-price“.

продуцент = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )

докато Вярно:
запас = yf.Ticker ( 'AAPL' ) # Пример: акции на Apple Inc
stock_data = stock.history ( Период = '1d' )
последна_цена = складови_данни [ 'Близо' ] .iloc [ - 1 ]
данни = { 'символ' : 'AAPL' , 'цена' : последна цена }
продуцент.изпрати ( 'цена на акции' , данни )
време.сън ( 10 ) # Симулирайте актуализации в реално време на всеки 10 секунди


Създаваме екземпляр „KafkaProducer“ с адреса на брокера на Kafka в този код. Вътре в цикъла използваме „yfinance“, за да получим най-новата цена на акциите на Apple Inc. („AAPL“). След това извличаме последната цена на затваряне и я изпращаме в темата „цена на акции“. В крайна сметка въвеждаме забавяне във времето, за да симулираме актуализациите в реално време на всеки 10 секунди.

Нека създадем потребител на Kafka, който да чете и обработва данните за цената на акциите от темата „цена на акции“.

потребител = KafkaConsumer ( 'цена на акции' ,
bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )

за съобщение в консуматор:
stock_data = съобщение.стойност
печат ( f „Получени борсови данни: {stock_data['symbol']} – Цена: {stock_data['price']}“ )


Този код е подобен на потребителската настройка в предишния пример. Той непрекъснато чете и обработва съобщенията от темата „цена на акции“ и отпечатва борсовия символ и цената на конзолата. Ние изпълняваме кодовите клетки последователно, например една по една в Google Colab, за да стартираме производителя и потребителя. Производителят получава и изпраща актуализациите на цените на акциите в реално време, докато потребителят чете и показва тези данни.

! пип Инсталирай kafka-python yfinance
от kafka import KafkaProducer, KafkaConsumer
импортиране на json
импортиране на финанси като yf
импортиране време
продуцент = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )

докато Вярно:
запас = yf.Ticker ( 'AAPL' ) # акции на Apple Inc
stock_data = stock.history ( Период = '1d' )
последна_цена = складови_данни [ 'Близо' ] .iloc [ - 1 ]

данни = { 'символ' : 'AAPL' , 'цена' : последна цена }

продуцент.изпрати ( 'цена на акции' , данни )

време.сън ( 10 ) # Симулирайте актуализации в реално време на всеки 10 секунди
потребител = KafkaConsumer ( 'цена на акции' ,
bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )

за съобщение в консуматор:
stock_data = съобщение.стойност
печат ( f „Получени борсови данни: {stock_data['symbol']} – Цена: {stock_data['price']}“ )


При анализа на изхода след изпълнение на кода ще наблюдаваме актуализациите на цените на акциите в реално време за Apple Inc., които се произвеждат и консумират.

Заключение

В този уникален пример ние демонстрирахме внедряването на поточно предаване на данни в реално време в Python, използвайки Apache Kafka и библиотеката „yfinance“ за улавяне и обработка на данните от фондовия пазар. Обяснихме подробно всеки ред от кода. Поточното предаване на данни в реално време може да се приложи в различни области за изграждане на приложения в реалния свят във финансите, IoT и др.