Azure Stream Analytics 構築編

前回の記事において、Stream Analyticsの基礎的な情報をご紹介させていただきました。

今回は実際にStream Analytics ジョブの作成およびデータ連携を行っていきます。

シナリオ

本記事では、EventHubsで受け取ったメッセージをStream Analyticsを通してAzure Data Lake Storage Gen2(ADLS Gen2)に格納する、という処理について考えます。

Stream Analyticsジョブ

Stream Analytics ジョブ作成

Stream Analyticsの構築は最初に「ジョブ」の作成を行います。

以下の手順でStream Analyticsのジョブ作成を行います。

プロジェクトの詳細

「プロジェクトの詳細」セクションでは下記の2つを選択します。

  • サブスクリプションの選択
  • リソースグループの選択

プロジェクトの詳細

インスタンスの詳細

「インスタンスの詳細」セクションでは下記の3つの項目で記入および選択をします。

  • 名前空間の名前 を記入
    •  Stream Analyticsジョブ の「名前」を記入します
  • リージョンを選択
    • StreamAnalyticsジョブを作成するリージョンを選択します
    • 今回は「Japan East」を選択します
  • ホスティング環境の選択
    • Stream Analyticsジョブをデプロイする環境を選択します
    • 選択可能な環境は「クラウド」「Edge」の2種類です
    • クラウドの場合は Azure 上に、Edgeの場合はIoT Edge デバイス上に構築することが可能です

インスタンスの詳細

ストリーミングユニットの詳細

「ストリーミングユニットの詳細」セクションでは、Stream Analyticsジョブを実行するためのリソースを設定します。

今回は設定値として「1」を設定します。

ストリーミングユニットの詳細

入力、出力、クエリの作成

Stream Analyticsジョブ の作成が完了したら、次に「入力」、「出力」、「クエリ」の作成を行います。

作成したStream Analyiticsの「ジョブトポロジ」からそれぞれ選択します。

ジョブ作成

入力の作成

入力の作成では、先ず、どの入力先からのデータを受け取るのかサービスの選択を行います。

今回は、「イベントハブ」を入力に設定します。

入力の作成

入力の作成に当たって、手動もしくは自動での設定が選択可能となります。

  • 手動の場合 
    • イベントハブ設定を手動で行う
  • 自動の場合
    •  サブスクリプションからイベントハブを選択する

今回は「サブスクリプションからイベントハブを選択する」を選択します。

作成済みのイベントハブを元に下記の4項目で選択します。

  • サブスクリプション
  • イベントハブの名前空間
  • イベントハブ名
  • イベントハブコンシューマグループ

イベントハブ編集
認証モード

「認証モード」はイベント ハブへの接続に使用する認証の種類を指定します。

下記3種類から選択することが可能です。

  • 接続文字列
  • システム割り当てマネージドIDの作成
  • ユーザ割り当て済みマネージドIDの選択

今回は「接続文字列」を選択します。

接続文字列

認証モードに「接続文字列」を選択すると、 Event Hubs にアクセスできるようにする共有アクセス ポリシーである「Event Hub ポリシー名」を新規で作成するか、もしくは既存のものを使用することができます。

※このオプションは、Event Hubs 設定を手動で指定するオプションを選択しない限り、自動的に設定されます。

ポリシー作成
イベントシリアル化形式

「イベントシリアル化形式」では受信するデータストリームのシリアル化形式を指定します。

選択可能な形式はJSON、CSV、Avroなどがあります

今回は「JSON」を選択します。

イベントシリアル化
イベントの圧縮タイプ

「イベントの圧縮タイプ」では受信データ ストリームを読み取るために使用される圧縮の種類を指定します。

選択可能な形式の例です。

  • なし
  • Gzip
  • Deflate

今回は「なし」を選択します。

圧縮タイプ

出力の作成

出力の作成では先ず、どの出力先に対してデータを連携するのかサービスの選択を行います。

今回は、「Blob Storage または ADLS Gen2」を入力に設定します。

出力設定

出力の作成に当たって、手動もしくは自動での設定が選択可能となります。

  • 手動の場合
    • Blob Storage または ADLS Gen2設定を手動で行う
  • 自動の場合
    • サブスクリプションからBlob Storage または ADLS Gen2 を選択する

今回は「サブスクリプションからBlob Storageまたは ADLS Gen2 を選択する」を選択します。

次に、作成済みのイベントハブを元に「サブスクリプション」、「ストレージアカウント」、「コンテナ」を設定します。

出力設定
認証モード

「認証モード」はイベント ハブへの接続に使用する認証の種類を指定します。

下記3種類から選択することが可能です。

  • 接続文字列
  • システム割り当てマネージドIDの作成
  • ユーザ割り当て済みマネージドIDの選択

今回は「接続文字列」を選択します

認証モード
イベントシリアル化形式

「イベントシリアル化形式」では、出力するデータストリームのシリアル化形式を指定します。

選択可能な形式はJSON、CSV、Avro、Parquet、Delta Lake などがあります

今回は「JSON」を選択します。

シリアル化形式
フォーマット

「フォーマット」では出力するデータストリームのフォーマットを指定します。

  • 改行区切り
    • 各 JSON オブジェクトを改行で区切って出力をフォーマットします
  • アレイ
    • JSON オブジェクトのアレイとして出力をフォーマットします

今回は「改行区切り」を選択します。

出力フォーマット
書き込みモード

「書き込みモード」では出力するデータストリームをファイルに出力する方式を指定します。

  • アペンド(追記) モード
    •  クエリの結果は使用可能になるとすぐに書き出されます
    •  同じファイルに複数の追加が行われる可能性があります
  • 1 回実行 モード
    • 時間パーティションのすべての結果が使用可能になったときに、ジョブは出力ファイルに 1 回だけ書き込みます。(※プレビュー)
    • 重複を避けることが可能になります

今回は「結果が到着したときにアペンドする」を選択します。

出力形式
パスパターン

パス パターンでは、日付と時刻の変数 ({date}、{time}) のインスタンスを 1 つ以上使用して、BLOB が書き込まれる際のディレクトリを指定できます。

今回は「test」ディレクトリを指定します。

※後ほど、日付と時刻の変数を出力先に指定した場合の処理も実施します。

パスパターン

クエリの作成

「入力」、「出力」の作成後、「ジョブトポロジ」からクエリを選択します。

クエリウィンドウを開くと作成した「入力」、「出力」から自動でクエリが生成されます。

クエリ作成

もし、出力するデータを変更したい場合は任意のクエリに直接編集することが可能です。

動作確認

以下の手順で作成したStream Analyticsジョブを実際に動作させて行きます。

ジョブの開始

「入力」、「出力」、「クエリ」の作成が完了したら、「概要」からジョブを開始します。

「ジョブの開始」をクリックし、「ジョブ出力の開始時刻」を選択します。

「ジョブ出力の開始時刻」は以下の3種類が選択可能です

  • 現在
    •  出力の開始点をジョブの開始時刻と同じにします
  • カスタム
    • 出力の開始点を指定可能
  • 最終停止時刻
    • 以前にジョブが開始されたが、手動で停止されたか失敗した場合に使用可能
    • このオプションを選択すると、データが失われないように、最後の出力時刻を使用してジョブが再開される

今回は「現在」を選択しジョブを開始します。

ジョブ開始

ジョブ実行結果

今回は、EventHubに対してPythonコードを利用してメッセージを送信し、Stream Analyticsで処理され、Blob Storageにログが出力されることを確認します。

出力で設定したBlob Storageのtestディレクトリにメッセージが出力されたことを確認できました。

実行結果確認

格納されたメッセージを確認すると、処理の中で3つの値が付与されることが分かります。

送信したメッセージ

"id":1,"username":"Masunaga, Hayato"

受信したメッセージ

{"id":1,"username":"Masunaga, Hayato","EventProcessedUtcTime":"2024-03-06T03:58:15.4286014Z","PartitionId":0,"EventEnqueuedUtcTime":"2024-03-06T03:58:15.1970000Z"}」

付与された値は以下の通りです。

  • EventProcessedUtcTime
    • Stream Analytics でイベントを処理する日時
  • EventEnqueuedUtcTime
    • 「イベントハブ」でイベントを受信した日時
  • PartitionId
    • パーティションキー

出力結果確認
日付形式によるデータの出力

出力の部分で設定を行ったパスパターンを変更することで、日付形式のディレクトリに対してデータを出力することが可能です。

日付形式
クエリによる出力データの変更

クエリを用いて出力するデータを「username」のみに変更するといった使い方をすることも可能です。

※記載方法はSQLと基本的には同様になります。

クエリの変更による出力結果

おわりに

今回の記事では、Stream Analytics構築編ということで、具体的な構築の流れと実際に構築したジョブの動作確認について紹介させていただきました。

  • Stream Analytics ジョブの構築の手順
  • 実際にメッセージをEvent Hubs に送信し、 Event Hubsに送信されたメッセージをStream Analyticsで受信し、Blob Storageに格納する

前回の入門編と合わせて読んでいただければ Stream Analytics についての最低限の知識と構築の流れを理解していただけるかと思います。

執筆担当者プロフィール
増永 隼人

増永 隼人(日本ビジネスシステムズ株式会社)

エンジニアとして、主にLinux、Hadoop、Azure、AWS等を担当領域としております。

担当記事一覧