Azure Data Factoryでパイプラインにストレージイベントトリガーを実装する

前回の記事では、Azure Data Factoryにてパラメーター化されたデータパイプラインを作成し、実行する方法を解説しました。

前回、パイプライン実行は「今すぐトリガー」によって手動で行っていました。

今回は、パイプラインにストレージイベントトリガーを実装し、「ストレージアカウントのイベント発生時にパイプラインが実行されるトリガー」を実装していきたいと思います。

※ パイプラインの実装方法については以前の関連記事を参考にしてください。

実施概要

ストレージアカウントで発生するイベントによって、EventGridを通じてパイプラインの実行をトリガーします。

今回は、ストレージアカウントのコンテナ内にCSVファイルがアップロードされたことをトリガーに、パイプラインを実行します。

[参考]トリガーの種類

  • Schedule
    • スケジュールトリガーは、あらかじめ設定した「実行予定日時」にパイプラインを実行します。
    • トリガーでは、"毎週" または "月曜日午後 5 時と木曜日午後 9 時" のような間隔がサポートされており、データセットパターンに依存しないため、柔軟性があります。
  • Tumbling window
    • タンブリング ウィンドウ トリガーは、状態を維持しながら、指定した開始時刻から定期的に実行される種類のトリガーです。
    • タンブリング ウィンドウとは、固定サイズで重複しない一連の連続する時間間隔です。
  • Storage events
    • ストレージアカウントでのファイルの作成や削除などのイベントに基づいて、パイプラインをトリガーすることが出来ます。
    • Data Factory および Synapse のパイプラインは Azure Event Grid とネイティブに統合されているため、そのようなイベントでパイプラインをトリガーできます。
  • Custom events
    • カスタム イベントトリガー は、Event Grid の カスタム トピックを処理します。

本記事では、トリガーの種類3つ目の「ストレージイベントトリガー」を使用します。

事前準備

1.トリガーを実装するためのパイプラインを作成します。

参考:Data Factoryを使用してAzure Blob StorageからAzure SQL Databaseにデータをコピーする - JBS Tech Blog

2.Azureサブスクリプションが Event Grid リソースプロバイダーに登録されている必要があります。[サブスクリプション]→[リソースプロバイダー]→”Microsoft.EventGrid“が登録されていることを確認します。

実施手順

ファイル作成をトリガーにパイプラインを実行

ストレージアカウントのcontainer02/2023/ 配下のcsvファイルが作成されたことをトリガーとしてパイプラインを実行します。

⑴Data Factory Studioにてパイプラインを開き、[トリガーの追加]から[新規/編集]をクリックします。

⑵トリガーの追加画面にて[トリガーの選択]→[+新規]をクリックします。

⑶トリガーのプロパティを設定します。

⑷パイプラインの実行と確認

ストレージアカウントのコンテナー内にファイルをアップロードします。

SQL Databaseにてクエリの実行等で、テーブルのデータを確認します。

システム変数を使用して動的にパスを取得

上記で実装したトリガー方法に加えて、トリガー実行時にパラメーターによってメタデータを取得し、取得した値をパイプライン内の設定やSQLテーブルへの値の挿入などに使用することも出来ます。

"@trrigerBody().folderPath"パラメーターを使用して、トリガー実行時に取り込むファイルのファイルパスを取得、SQLテーブルに追加の列として挿入する方法をご紹介します。

※パイプラインにトリガーが実装されていることが前提となります

⑴SQL Database内のテーブルに”foldername”カラムを追加します。

カラム追加後、Data Factory StudioのSQL Databaseデータセットのスキーマを再インポートし、スキーマの更新を行います。

※パイプライン実行時に”Failure happened on 'Sink' side.”等のエラーが出る場合、テーブル側のデータ型が間違っている可能性があります。

⑵パイプラインパラメーターに”trigger_folderPath”を追加します。

⑶コピーアクティビティの設定の[ソース]タブにて、追加の列を設定します。

名前に、SQLのテーブルに追加したカラム名と同じ名前を挿入し、値には[動的なコンテンツの追加]からフォルダーパスのパラメーターを設定します。

※確認

⑷コピーアクティビティの設定の[マップ]タブにて、列のマッピングを設定します。

⑸トリガーのパラメーターの値に”@triggerBody().folderPath”を設定し、[OK]をクリックします。

[参考]トリガー作成時に使用可能なシステム関数

変数名 説明
@trigger().outputs.windowStartTime トリガーの実行に関連付けられているウィンドウの開始。
@trigger().outputs.windowEndTime トリガーの実行に関連付けられているウィンドウの終了。
@trigger().scheduledTime トリガーがパイプライン実行を呼び出すようにスケジュールされた時間。
@trigger().startTime トリガーがパイプライン実行を呼び出すために実際に起動した時間。 これは、トリガーのスケジュールされた時刻とはやや異なる場合があります。

システム変数 - Azure Data Factory & Azure Synapse | Microsoft Learn

⑹パイプラインの実行と確認

ストレージアカウントのコンテナー内にファイルをアップロードします。

SQL Databaseにてクエリの実行等で、テーブルのデータを確認します。

foldername列にフォルダーパスが挿入されていることを確認します。

おわりに

本記事では、ストレージイベントトリガーとトリガーパラメーターを使用した簡単なトリガー実装方法を紹介しました。

ストレージアカウントで発生するイベントによってData Factoryのパイプラインをトリガー出来ることに加え、トリガー実行時のメタデータ(トリガー実行開始時間・ファイル名・フォルダーパスなど)を取得・パイプライン内で利用することも出来ます。

執筆担当者プロフィール
和村 桜希

和村 桜希(日本ビジネスシステムズ株式会社)

2022年度入社。データ系サービスを扱うグループに所属しています。 趣味はダンスです。

担当記事一覧