Python でのリアルタイム データ ストリーミングの実装をマスターすることは、今日のデータが関与する世界では必須のスキルとして機能します。このガイドでは、Python で信頼性のあるリアルタイム データ ストリーミングを利用するための中心的な手順と重要なツールについて説明します。 Apache Kafka や Apache Pulsar などの適切なフレームワークの選択から、楽なデータ消費、処理、効果的な視覚化のための Python コードの作成まで、アジャイルで効率的なリアルタイム データ チャネルを構築するために必要なスキルを習得します。
例 1: Python でのリアルタイム データ ストリーミングの実装
Python でリアルタイム データ ストリーミングを実装することは、今日のデータドリブンの時代と世界において非常に重要です。この詳細な例では、Google Colab で Apache Kafka と Python を使用してリアルタイム データ ストリーミング システムを構築するプロセスを説明します。
コーディングを始める前にサンプルを初期化するには、Google Colab で特定の環境を構築することが不可欠です。最初に行う必要があるのは、必要なライブラリをインストールすることです。 Kafka の統合には「kafka-python」ライブラリを使用します。
! ピップ インストール カフカPython
このコマンドは、Python 関数と Apache Kafka のバインディングを提供する「kafka-python」ライブラリをインストールします。次に、プロジェクトに必要なライブラリをインポートします。 「KafkaProducer」や「KafkaConsumer」などの必要なライブラリをインポートすることは、Kafka ブローカーとの対話を可能にする「kafka-python」ライブラリのクラスです。 JSON は、メッセージのシリアル化と逆シリアル化に使用する JSON データを操作する Python ライブラリです。
from kafka import KafkaProducer、KafkaConsumer
jsonをインポートする
Kafka プロデューサーの作成
Kafka プロデューサーはデータを Kafka トピックに送信するため、これは重要です。この例では、シミュレートされたリアルタイム データを「real-time-topic」というトピックに送信するプロデューサーを作成します。
Kafka ブローカーのアドレスを「localhost:9092」として指定する「KafkaProducer」インスタンスを作成します。次に、データを Kafka に送信する前にシリアル化する関数「value_serializer」を使用します。この場合、ラムダ関数はデータを UTF-8 でエンコードされた JSON としてエンコードします。次に、リアルタイム データをシミュレートして、Kafka トピックに送信してみましょう。
プロデューサー = カフカプロデューサー ( ブートストラップサーバー = 「ローカルホスト:9092」 、
値シリアライザー =lambda v: json.dumps ( で ) 。エンコード ( 「utf-8」 ) )
# シミュレートされたリアルタイム データ
データ = { 'センサーID' : 1 、 '温度' : 25.5 、 「湿度」 : 60.2 }
# トピックにデータを送信する
プロデューサー.送信 ( 「リアルタイムトピック」 、 データ )
これらの行では、シミュレートされたセンサー データを表す「データ」辞書を定義します。次に、「send」メソッドを使用して、このデータを「リアルタイム トピック」に公開します。
次に、Kafka コンシューマーを作成し、Kafka コンシューマーが Kafka トピックからデータを読み取ります。 「リアルタイム トピック」内のメッセージを消費して処理するコンシューマーを作成します。 「KafkaConsumer」インスタンスを作成し、消費したいトピック (リアルタイムトピック) と Kafka ブローカーのアドレスを指定します。そして「value_deserializer」はKafkaから受け取ったデータをデシリアライズする関数です。この場合、ラムダ関数はデータを UTF-8 でエンコードされた JSON としてデコードします。
Consumer = KafkaConsumer ( 「リアルタイムトピック」 、ブートストラップサーバー = 「ローカルホスト:9092」 、
value_deserializer =ラムダ x: json.loads ( x.デコード ( 「utf-8」 ) ) )
反復ループを使用して、トピックからのメッセージを継続的に消費して処理します。
のために メッセージ で 消費者:
データ = メッセージ.値
印刷する ( f 「受信データ: {data}」 )
各メッセージの値とシミュレートされたセンサー データをループ内で取得し、コンソールに出力します。 Kafka のプロデューサーとコンシューマーを実行するには、このコードを Google Colab で実行し、コード セルを個別に実行する必要があります。プロデューサはシミュレートされたデータを Kafka トピックに送信し、コンシューマは受信したデータを読み取って出力します。
コード実行時の出力の分析
生成および消費されるリアルタイム データを観察します。データ形式は、シミュレーションまたは実際のデータ ソースによって異なる場合があります。この詳細な例では、Google Colab で Apache Kafka と Python を使用してリアルタイム データ ストリーミング システムをセットアップするプロセス全体をカバーします。コードの各行と、このシステムを構築する上でのその重要性について説明します。リアルタイム データ ストリーミングは強力な機能であり、この例は、より複雑な現実世界のアプリケーションの基盤として機能します。
例 2: 株式市場データを使用した Python でのリアルタイム データ ストリーミングの実装
別のシナリオを使用して、Python でリアルタイム データ ストリーミングを実装する別のユニークな例を実行してみましょう。今回は株式市場のデータに焦点を当てます。株価の変化をキャプチャし、Google Colab の Apache Kafka と Python を使用して処理するリアルタイム データ ストリーミング システムを作成します。前の例で示したように、まず Google Colab で環境を構成します。まず、必要なライブラリをインストールします。
! ピップ インストール Kafka-Python yfinance
ここでは、リアルタイムの株式市場データを取得できる「yfinance」ライブラリを追加します。次に、必要なライブラリをインポートします。 Kafka の対話には、「kafka-python」ライブラリの「KafkaProducer」クラスと「KafkaConsumer」クラスを引き続き使用します。 JSON データを操作するために JSON をインポートします。また、リアルタイムの株式市場データを取得するために「yfinance」も使用しています。また、「time」ライブラリをインポートして時間遅延を追加し、リアルタイム更新をシミュレートします。
jsonをインポートする
インポート として そうだね
輸入 時間
次に、ストック データ用の Kafka プロデューサを作成します。 Kafka プロデューサーはリアルタイムの株価データを取得し、それを「株価」という名前の Kafka トピックに送信します。
値シリアライザー =lambda v: json.dumps ( で ) 。エンコード ( 「utf-8」 ) )
その間 真実:
在庫 = yf.ティッカー ( 「AAPL」 ) # 例:Apple Inc.の株式
在庫データ = 在庫.履歴 ( 期間 = 「1d」 )
last_price = 在庫データ [ '近い' 】 .iloc [ - 1 】
データ = { 'シンボル' : 「AAPL」 、 '価格' : last_price }
プロデューサー.送信 ( '株価' 、 データ )
時間.睡眠 ( 10 ) # 10 秒ごとにリアルタイム更新をシミュレートします
このコードでは、Kafka ブローカーのアドレスを使用して「KafkaProducer」インスタンスを作成します。ループ内では、「yfinance」を使用して Apple Inc. (「AAPL」) の最新の株価を取得します。次に、最後の終値を抽出し、それを「株価」トピックに送信します。最終的には、10 秒ごとのリアルタイム更新をシミュレートするために時間遅延を導入します。
「株価」トピックから株価データを読み取り、処理する Kafka コンシューマーを作成してみましょう。
Consumer = KafkaConsumer ( '株価' 、ブートストラップサーバー = 「ローカルホスト:9092」 、
value_deserializer =ラムダ x: json.loads ( x.デコード ( 「utf-8」 ) ) )
のために メッセージ で 消費者:
在庫データ = メッセージ.値
印刷する ( f 「受信した株価データ: {stock_data['symbol']} - 価格: {stock_data['price']}」 )
このコードは、前の例のコンシューマ設定に似ています。 「株価」トピックからのメッセージを継続的に読み取り、処理し、株価記号と価格をコンソールに出力します。コードセルを順番に実行します。たとえば、Google Colab で 1 つずつ実行して、プロデューサーとコンシューマーを実行します。生産者はリアルタイムの株価更新を取得して送信し、消費者はこのデータを読み取って表示します。
from kafka import KafkaProducer、KafkaConsumer
jsonをインポートする
インポート として そうだね
輸入 時間
プロデューサー = カフカプロデューサー ( ブートストラップサーバー = 「ローカルホスト:9092」 、
値シリアライザー =lambda v: json.dumps ( で ) 。エンコード ( 「utf-8」 ) )
その間 真実:
在庫 = yf.ティッカー ( 「AAPL」 ) #アップル社株
在庫データ = 在庫.履歴 ( 期間 = 「1d」 )
last_price = 在庫データ [ '近い' 】 .iloc [ - 1 】
データ = { 'シンボル' : 「AAPL」 、 '価格' : last_price }
プロデューサー.送信 ( '株価' 、 データ )
時間.睡眠 ( 10 ) # 10 秒ごとにリアルタイム更新をシミュレートします
Consumer = KafkaConsumer ( '株価' 、
ブートストラップサーバー = 「ローカルホスト:9092」 、
value_deserializer =ラムダ x: json.loads ( x.デコード ( 「utf-8」 ) ) )
のために メッセージ で 消費者:
在庫データ = メッセージ.値
印刷する ( f 「受信した株価データ: {stock_data['symbol']} - 価格: {stock_data['price']}」 )
コードの実行後の出力の分析では、Apple Inc. のリアルタイムの株価更新が生成および消費されることを観察します。
結論
このユニークな例では、Apache Kafka と「yfinance」ライブラリを使用して株式市場データを取得して処理する Python でのリアルタイム データ ストリーミングの実装を実証しました。コードの各行を徹底的に説明しました。リアルタイム データ ストリーミングは、金融、IoT などの実世界のアプリケーションを構築するために、さまざまな分野に適用できます。