Snowpipeでデータの取り込みを自動化してみた

本記事では、Snowflakeに備わっているSnowpipeという機能を用いてデータベースに自動でデータを格納(ロード)する仕組みを実際に構築してみました。

設定方法に関しては、Snowflake の公式ドキュメントを参考に行っていきます。

docs.snowflake.com

Snowpipeとは

 Snowflake が提供している機能の1つで、公式ドキュメントでは下記のように記載されています。

docs.snowflake.com

Snowpipeを使用すると、ファイルがステージで利用可能になり次第、ファイルからデータをロードできます。つまり、スケジュールで COPY ステートメントを手動で実行してより大きなバッチをロードするのではなく、マイクロバッチのファイルからデータをロードして数分以内にユーザーが利用できるようにします。

Snowpipeを利用することによって、Microsoft Azure などのクラウドストレージを含むステージにファイルが格納されると、そのデータをすぐにテーブルにロードすることができるようになります。

これにより、データが格納されるたびに手動でロードする必要がなくなり、データが利用可能になるまでのラグを少なくすることができます。

今回の設定と事前準備

今回はMicrosoft Azure 上の特定のコンテナに CSV ファイルを格納すると、自動で Snowflake のテーブルにデータがロードされるように設定します。

事前準備

以下のアカウントとファイルを用意します。

  • Microsoft Azure のアカウント
  • Snowflake のアカウント
  • Snowflakeにテストデータを格納するためのCSVファイル

CSVファイルには下記のような、名前、年齢、生年月日が含まれているものを用意しました。

また、Microsoft Azure でのオブジェクトの生成には Azure Cloud Shell を利用したため、下記の Microsoft Learn の記事を参考に事前に設定を行いました。 learn.microsoft.com

構成のイメージ

Microsoft Azure
  • リソースグループ(snowpipe_demo)
    • データ格納用ストレージアカウント
      • CSVファイルを格納するコンテナ(snowpipestrage)
    • キュー格納用ストレージアカウント
      • Event Grid からの通知を受け取るキュー(snowpipequeue)

上記に加え、Event Grid サブスクリプションを作成します。

Snowflake
  • データベース(SNOWPIPE_DEMO_DB)
    • Schema(PUBLIC)
      • Stage(DEMO_STAGE)
      • Table(DEMO_TABLE)

実装

それでは実際に自動化を行っていきます。 実装の手順としては、大きく分けて3つのステップに分かれています。

Microsoft Azure の環境構築

Microsoft Azure に大本となるリソースグループを作成します。今回は snowpipe_demo という名前のリソースグループを作成します。

az group create --name <リソースグループ名> --location japaneast

作成したリソースグループ内に必要なリソースを作成していきます。

ファイルを格納するコンテナの作成

ファイルを格納する用のストレージアカウントを作成します。

az storage account create --resource-group <リソースグループ名> --name <データ格納用ストレージアカウント名> --sku Standard_LRS --location japaneast --kind BlobStorage --access-tier Hot

作成したファイル格納用のストレージアカウント内にコンテナを作成します。

az storage container create --account-name <データ格納用ストレージアカウント名> --name <コンテナ名>

メッセージを格納するキューの作成

メッセージを格納する用のストレージアカウントを作成します。

az storage account create --resource-group <リソースグループ名> --name <メッセージ格納用ストレージアカウント名> --sku Standard_LRS --location japaneast --kind StorageV2

メッセージ格納用のストレージアカウント内にキューを作成します。

 az storage queue create --account-name <メッセージ格納用ストレージアカウント名> --name <キュー名>

イベントを通知する Event Grid サブスクリプションの作成

作成されたコンテナにファイルが格納されると、キューにメッセージが送信されるように Event Grid サブスクリプションを作成していきます。

まず、Microsoft Azure の Azure Event Grid を利用するために、Event Grid リソースプロバイダーが登録済みかを確認します。今回利用した Azure Cloud Shell では既に登録済みの状態となっていました

az provider show --namespace Microsoft.EventGrid --query "registrationState"

"Registered" と表示されない場合は、別途登録を行うそうです。

az provider register --namespace Microsoft.EventGrid

また、Event Grid の拡張機能が必要になるためインストールを行います。

az extension add --name eventgrid

Event Grid サブスクリプションの作成時に使用する、コンテナとキューのIDを環境変数として設定します。

export storageid=$(az storage account show --name <データ格納用ストレージアカウント名> --resource-group <リソースグループ名> --query id --output tsv)
export queuestorageid=$(az storage account show --name <メッセージ格納用ストレージアカウント名> --resource-group <リソースグループ名> --query id --output tsv)
export queueid="$queuestorageid/queueservices/default/queues/<キュー名>"

最後にEvent Grid サブスクリプションを作成します。

az eventgrid event-subscription create --source-resource-id $storageid --name <イベントグリッド名> --endpoint-type storagequeue --endpoint $queueid --advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit

Microsoft Azure と Snowflake 間の統合設定

Microsoft Azure 上のコンテナやキューに Snowflake からアクセスができるように統合設定を行います。

統合に必要な情報を収集

Snowflake で統合の設定をする際に必要な Microsoft Azure の情報を事前に収集します。

  • テナントID
    • Microsoft Entra ID に記載されている値をそのまま利用します。
  • 作成したキューのURL
    • メッセージ格納用のストレージアカウント内に作成したキューのページにあるURLを利用します。
NOTIFICATION INTEGRATION の作成

Snowflake から 作成した QUEUE へアクセス可能にするために NOTIFICATION INTEGRATION を作成します。

CREATE NOTIFICATION INTEGRATION azure_queue_int
    ENABLED = true
    TYPE = QUEUE
    NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE
    AZURE_STORAGE_QUEUE_PRIMARY_URI = '<キューのURL>'
    AZURE_TENANT_ID = '<テナントID>';

作成した NOTIFICATION INTEGRATION の情報から、AZURE_CONSENT_URL と AZURE_MULTI_TENANT_APP_NAME のデータを確認します。

DESC NOTIFICATION INTEGRATION azure_queue_int;

AZURE_CONSENT_URL に記載された URL にアクセスすると、Microsoftの許可リクエストが表示されるので許可します。これにより、Snowflake用の サービスプリンシパルが Microsoft Azure に作成されます。

Microsoft Azure 内のエンタープライズアプリケーションを確認し、AZURE_MULTI_TENANT_APP_NAME の「_」以前の名前と一致するサービスプリンシパルが作成されていることを確認します。

作成したストレージにアクセスするために必要な権限をサービスプリンシパルに付与します。

  • ロール:ストレージ キュー データ共同作成者
  • アクセスの割り当て先:ユーザー、グループ、またはサービス プリンシパル
  • メンバー:サービスプリンシパルの名前

STORAGE INTEGRATION の作成

Snowflake から 作成したコンテナへアクセス可能にするために STORAGE INTEGRATION を作成します。

CREATE STORAGE INTEGRATION azure_storage_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'AZURE'
  ENABLED = TRUE
  AZURE_TENANT_ID = '<テナントID>'
  STORAGE_ALLOWED_LOCATIONS = ('azure://<データ格納用のストレージアカウント名>.blob.core.windows.net/<コンテナ名>/')

こちらでもSTORAGE INTEGRATION の情報から、AZURE_CONSENT_URL と AZURE_MULTI_TENANT_APP_NAME のデータを確認します。

DESC STORAGE INTEGRATION azure_storage_int;

NOTIFICATION INTEGRATION と同様に AZURE_CONSENT_URL に記載された URL にアクセスし、Snowflake 用の サービスプリンシパルの作成と確認を行います。

作成したストレージにアクセスするために必要な権限をサービスプリンシパルに付与します。

  • ロール:ストレージ BLOB データ共同作成者
  • アクセスの割り当て先:ユーザー、グループ、またはサービス プリンシパル
  • メンバー:サービスプリンシパルの名前

自動取り込み の設定

ステージの作成

ストレージ統合に利用する STAGE を作成します。今回はDBの作成から行います。

CREATE DATABASE SNOWPIPE_DEMO_DB;

CREATE STAGE SNOWPIPE_DEMO_DB.PUBLIC.demo_stage
    URL = 'azure://<データ格納用のストレージアカウント名>.blob.core.windows.net/<コンテナ名>/'
    STORAGE_INTEGRATION = <STORAGE INTEGRATION 名>;

テーブルの作成

データの格納先となるテーブルを作成します。

CREATE TABLE SNOWPIPE_DEMO_DB.PUBLIC.DEMO_TABLE(
    name String,
    age Int,
    DoB DATE
);

Snowpipeの作成

最後に Snowpipe の設定を行います。 NOTIFICATION INTEGRATION 名は必ず大文字に設定する必要があります。 file_format についてはヘッダー付のCSVファイルを対象としているため、他の形式の場合は下記を参考に調整が必要です。

docs.snowflake.com

CREATE SNOWPIPE_DEMO_DB.PUBLIC.DEMO_PIPE
  auto_ingest = true
  integration = '<NOTIFICATION INTEGRATION 名の大文字表記>'
  as
  copy into <データ格納先のテーブル>
  from @<ストレージ統合した STAGE>
  file_format = (type = 'CSV' SKIP_HEADER = 1);

実装内容の確認とまとめ

ここまで実装した内容を確認するために、Microsoft Azure のコンテナに実際にファイルを格納します。

格納後に Snowflake のテーブルを確認すると、上手くデータが取り込まれていました。

まとめ

本記事では、Microsoft Azure 上に CSV ファイルを格納すると、自動で Snowflake にロードされる仕組みの実装を行ってみました。

初めてSnowpipeを実装される方の手助けに少しでも繋がっていれば幸いです。

執筆担当者プロフィール
武信 雄平

武信 雄平(日本ビジネスシステムズ株式会社)

Data&AI プラットフォーム部 Data ソリューション1グループ。Snowflakeを中心としたデータ基盤の構築・運用を経験。趣味は美味しいもの(特に甘味)を食べに行くことです。

担当記事一覧