Овладяването на внедряването на поточно предаване на данни в реално време в Python действа като основно умение в днешния свят, свързан с данни. Това ръководство изследва основните стъпки и основните инструменти за използване на поточно предаване на данни в реално време с автентичност в Python. От избора на подходяща рамка като Apache Kafka или Apache Pulsar до писането на код на Python за безпроблемно потребление на данни, обработка и ефективна визуализация, ние ще придобием необходимите умения за конструиране на гъвкави и ефективни канали за данни в реално време.
Пример 1: Внедряване на поточно предаване на данни в реално време в Python
Внедряването на поточно предаване на данни в реално време в Python е от решаващо значение в днешната епоха и свят, управлявани от данни. В този подробен пример ще преминем през процеса на изграждане на система за поточно предаване на данни в реално време с помощта на Apache Kafka и Python в Google Colab.
За инициализиране на примера, преди да започнем да кодираме, изграждането на специфична среда в Google Colab е от съществено значение. Първото нещо, което трябва да направим, е да инсталираме необходимите библиотеки. Използваме библиотеката „kafka-python“ за интеграция на Kafka.
! пип Инсталирай кафка-питон
Тази команда инсталира библиотеката „kafka-python“, която предоставя функциите на Python и свързванията за Apache Kafka. След това импортираме необходимите библиотеки за нашия проект. Импортирането на необходимите библиотеки, включително „KafkaProducer“ и „KafkaConsumer“ са класовете от библиотеката „kafka-python“, които ни позволяват да взаимодействаме с брокерите на Kafka. JSON е библиотеката на Python за работа с JSON данните, които използваме за сериализиране и десериализиране на съобщенията.
от kafka import KafkaProducer, KafkaConsumer
импортиране на json
Създаване на продуцент на Кафка
Това е важно, защото продуцентът на Kafka изпраща данните в тема на Kafka. В нашия пример създаваме продуцент, който да изпраща симулирани данни в реално време към тема, наречена „тема в реално време“.
Създаваме екземпляр „KafkaProducer“, който посочва адреса на брокера на Kafka като „localhost:9092“. След това използваме „value_serializer“, функция, която сериализира данните, преди да ги изпрати на Kafka. В нашия случай ламбда функция кодира данните като UTF-8-кодиран JSON. Сега, нека симулираме някои данни в реално време и ги изпратим в темата за Kafka.
продуцент = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )
# Симулирани данни в реално време
данни = { 'sensor_id' : 1 , 'температура' : 25.5 , 'влажност' : 60.2 }
# Изпращане на данни към темата
продуцент.изпрати ( 'тема в реално време' , данни )
В тези редове ние дефинираме речник „данни“, който представлява симулирани данни от сензор. След това използваме метода „изпращане“, за да публикуваме тези данни в „темата в реално време“.
След това искаме да създадем Kafka потребител, а Kafka потребител чете данните от Kafka тема. Ние създаваме потребител, който да консумира и обработва съобщенията в „темата в реално време“. Създаваме екземпляр „KafkaConsumer“, като посочваме темата, която искаме да използваме, например (тема в реално време) и адреса на брокера на Kafka. Тогава „value_deserializer“ е функция, която десериализира данните, получени от Kafka. В нашия случай ламбда функция декодира данните като UTF-8-кодиран JSON.
потребител = KafkaConsumer ( 'тема в реално време' ,bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )
Използваме итеративен цикъл, за да консумираме непрекъснато и обработваме съобщенията от темата.
за съобщение в консуматор:
данни = съобщение.стойност
печат ( f „Получени данни: {данни}“ )
Ние извличаме стойността на всяко съобщение и нашите симулирани сензорни данни вътре в цикъла и ги отпечатваме на конзолата. Изпълнението на производителя и потребителя на Kafka включва изпълнение на този код в Google Colab и изпълнение на кодовите клетки поотделно. Производителят изпраща симулираните данни към темата Kafka, а потребителят чете и отпечатва получените данни.
Анализ на изхода, докато кодът се изпълнява
Ще наблюдаваме данни в реално време, които се произвеждат и консумират. Форматът на данните може да варира в зависимост от нашата симулация или действителния източник на данни. В този подробен пример ние обхващаме целия процес на настройка на система за поточно предаване на данни в реално време с помощта на Apache Kafka и Python в Google Colab. Ще обясним всеки ред код и значението му при изграждането на тази система. Потокът на данни в реално време е мощна възможност и този пример служи като основа за по-сложни приложения в реалния свят.
Пример 2: Внедряване на поточно предаване на данни в реално време в Python с помощта на данни от фондовия пазар
Нека направим още един уникален пример за внедряване на поточно предаване на данни в реално време в Python, използвайки различен сценарий; този път ще се фокусираме върху данните от борсата. Ние създаваме система за поточно предаване на данни в реално време, която улавя промените в цената на акциите и ги обработва с помощта на Apache Kafka и Python в Google Colab. Както беше показано в предишния пример, започваме с конфигуриране на нашата среда в Google Colab. Първо инсталираме необходимите библиотеки:
! пип Инсталирай kafka-python yfinance
Тук добавяме библиотеката „yfinance“, която ни позволява да получаваме данни за фондовия пазар в реално време. След това импортираме необходимите библиотеки. Продължаваме да използваме класовете „KafkaProducer“ и „KafkaConsumer“ от библиотеката „kafka-python“ за взаимодействие с Kafka. Ние импортираме JSON, за да работим с JSON данните. Ние също използваме „yfinance“, за да получим данни за фондовия пазар в реално време. Също така импортираме библиотеката „време“, за да добавим забавяне във времето, за да симулираме актуализациите в реално време.
импортиране на json
импортиране на финанси като yf
импортиране време
Сега създаваме производител на Kafka за данни за запаси. Нашият производител на Kafka получава данни за акциите в реално време и ги изпраща в тема на Kafka, наречена „stock-price“.
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )
докато Вярно:
запас = yf.Ticker ( 'AAPL' ) # Пример: акции на Apple Inc
stock_data = stock.history ( Период = '1d' )
последна_цена = складови_данни [ 'Близо' ] .iloc [ - 1 ]
данни = { 'символ' : 'AAPL' , 'цена' : последна цена }
продуцент.изпрати ( 'цена на акции' , данни )
време.сън ( 10 ) # Симулирайте актуализации в реално време на всеки 10 секунди
Създаваме екземпляр „KafkaProducer“ с адреса на брокера на Kafka в този код. Вътре в цикъла използваме „yfinance“, за да получим най-новата цена на акциите на Apple Inc. („AAPL“). След това извличаме последната цена на затваряне и я изпращаме в темата „цена на акции“. В крайна сметка въвеждаме забавяне във времето, за да симулираме актуализациите в реално време на всеки 10 секунди.
Нека създадем потребител на Kafka, който да чете и обработва данните за цената на акциите от темата „цена на акции“.
потребител = KafkaConsumer ( 'цена на акции' ,bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )
за съобщение в консуматор:
stock_data = съобщение.стойност
печат ( f „Получени борсови данни: {stock_data['symbol']} – Цена: {stock_data['price']}“ )
Този код е подобен на потребителската настройка в предишния пример. Той непрекъснато чете и обработва съобщенията от темата „цена на акции“ и отпечатва борсовия символ и цената на конзолата. Ние изпълняваме кодовите клетки последователно, например една по една в Google Colab, за да стартираме производителя и потребителя. Производителят получава и изпраща актуализациите на цените на акциите в реално време, докато потребителят чете и показва тези данни.
от kafka import KafkaProducer, KafkaConsumer
импортиране на json
импортиране на финанси като yf
импортиране време
продуцент = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
стойност_сериализатор =ламбда v: json.dumps ( в ) .кодирам ( 'utf-8' ) )
докато Вярно:
запас = yf.Ticker ( 'AAPL' ) # акции на Apple Inc
stock_data = stock.history ( Период = '1d' )
последна_цена = складови_данни [ 'Близо' ] .iloc [ - 1 ]
данни = { 'символ' : 'AAPL' , 'цена' : последна цена }
продуцент.изпрати ( 'цена на акции' , данни )
време.сън ( 10 ) # Симулирайте актуализации в реално време на всеки 10 секунди
потребител = KafkaConsumer ( 'цена на акции' ,
bootstrap_servers = 'localhost:9092' ,
десериализатор_на_стойност =ламбда x: json.loads ( x.decode ( 'utf-8' ) ) )
за съобщение в консуматор:
stock_data = съобщение.стойност
печат ( f „Получени борсови данни: {stock_data['symbol']} – Цена: {stock_data['price']}“ )
При анализа на изхода след изпълнение на кода ще наблюдаваме актуализациите на цените на акциите в реално време за Apple Inc., които се произвеждат и консумират.
Заключение
В този уникален пример ние демонстрирахме внедряването на поточно предаване на данни в реално време в Python, използвайки Apache Kafka и библиотеката „yfinance“ за улавяне и обработка на данните от фондовия пазар. Обяснихме подробно всеки ред от кода. Поточното предаване на данни в реално време може да се приложи в различни области за изграждане на приложения в реалния свят във финансите, IoT и др.