本記事では、Snowflakeに備わっているSnowpipeという機能を用いてデータベースに自動でデータを格納(ロード)する仕組みを実際に構築してみました。
設定方法に関しては、Snowflake の公式ドキュメントを参考に行っていきます。
Snowpipeとは
Snowflake が提供している機能の1つで、公式ドキュメントでは下記のように記載されています。
Snowpipeを使用すると、ファイルがステージで利用可能になり次第、ファイルからデータをロードできます。つまり、スケジュールで COPY ステートメントを手動で実行してより大きなバッチをロードするのではなく、マイクロバッチのファイルからデータをロードして数分以内にユーザーが利用できるようにします。
Snowpipeを利用することによって、Microsoft Azure などのクラウドストレージを含むステージにファイルが格納されると、そのデータをすぐにテーブルにロードすることができるようになります。
これにより、データが格納されるたびに手動でロードする必要がなくなり、データが利用可能になるまでのラグを少なくすることができます。
今回の設定と事前準備
今回はMicrosoft Azure 上の特定のコンテナに CSV ファイルを格納すると、自動で Snowflake のテーブルにデータがロードされるように設定します。
事前準備
以下のアカウントとファイルを用意します。
- Microsoft Azure のアカウント
- Snowflake のアカウント
- Snowflakeにテストデータを格納するためのCSVファイル
CSVファイルには下記のような、名前、年齢、生年月日が含まれているものを用意しました。
また、Microsoft Azure でのオブジェクトの生成には Azure Cloud Shell を利用したため、下記の Microsoft Learn の記事を参考に事前に設定を行いました。 learn.microsoft.com
構成のイメージ
Microsoft Azure
- リソースグループ(snowpipe_demo)
- データ格納用ストレージアカウント
- CSVファイルを格納するコンテナ(snowpipestrage)
- キュー格納用ストレージアカウント
- Event Grid からの通知を受け取るキュー(snowpipequeue)
- データ格納用ストレージアカウント
上記に加え、Event Grid サブスクリプションを作成します。
Snowflake
- データベース(SNOWPIPE_DEMO_DB)
- Schema(PUBLIC)
- Stage(DEMO_STAGE)
- Table(DEMO_TABLE)
- Schema(PUBLIC)
実装
それでは実際に自動化を行っていきます。 実装の手順としては、大きく分けて3つのステップに分かれています。
Microsoft Azure の環境構築
Microsoft Azure に大本となるリソースグループを作成します。今回は snowpipe_demo という名前のリソースグループを作成します。
az group create --name <リソースグループ名> --location japaneast
作成したリソースグループ内に必要なリソースを作成していきます。
ファイルを格納するコンテナの作成
ファイルを格納する用のストレージアカウントを作成します。
az storage account create --resource-group <リソースグループ名> --name <データ格納用ストレージアカウント名> --sku Standard_LRS --location japaneast --kind BlobStorage --access-tier Hot
作成したファイル格納用のストレージアカウント内にコンテナを作成します。
az storage container create --account-name <データ格納用ストレージアカウント名> --name <コンテナ名>
メッセージを格納するキューの作成
メッセージを格納する用のストレージアカウントを作成します。
az storage account create --resource-group <リソースグループ名> --name <メッセージ格納用ストレージアカウント名> --sku Standard_LRS --location japaneast --kind StorageV2
メッセージ格納用のストレージアカウント内にキューを作成します。
az storage queue create --account-name <メッセージ格納用ストレージアカウント名> --name <キュー名>
イベントを通知する Event Grid サブスクリプションの作成
作成されたコンテナにファイルが格納されると、キューにメッセージが送信されるように Event Grid サブスクリプションを作成していきます。
まず、Microsoft Azure の Azure Event Grid を利用するために、Event Grid リソースプロバイダーが登録済みかを確認します。今回利用した Azure Cloud Shell では既に登録済みの状態となっていました
az provider show --namespace Microsoft.EventGrid --query "registrationState"
"Registered" と表示されない場合は、別途登録を行うそうです。
az provider register --namespace Microsoft.EventGrid
また、Event Grid の拡張機能が必要になるためインストールを行います。
az extension add --name eventgrid
Event Grid サブスクリプションの作成時に使用する、コンテナとキューのIDを環境変数として設定します。
export storageid=$(az storage account show --name <データ格納用ストレージアカウント名> --resource-group <リソースグループ名> --query id --output tsv) export queuestorageid=$(az storage account show --name <メッセージ格納用ストレージアカウント名> --resource-group <リソースグループ名> --query id --output tsv) export queueid="$queuestorageid/queueservices/default/queues/<キュー名>"
最後にEvent Grid サブスクリプションを作成します。
az eventgrid event-subscription create --source-resource-id $storageid --name <イベントグリッド名> --endpoint-type storagequeue --endpoint $queueid --advanced-filter data.api stringin CopyBlob PutBlob PutBlockList FlushWithClose SftpCommit
Microsoft Azure と Snowflake 間の統合設定
Microsoft Azure 上のコンテナやキューに Snowflake からアクセスができるように統合設定を行います。
統合に必要な情報を収集
Snowflake で統合の設定をする際に必要な Microsoft Azure の情報を事前に収集します。
- テナントID
- Microsoft Entra ID に記載されている値をそのまま利用します。
- 作成したキューのURL
- メッセージ格納用のストレージアカウント内に作成したキューのページにあるURLを利用します。
NOTIFICATION INTEGRATION の作成
Snowflake から 作成した QUEUE へアクセス可能にするために NOTIFICATION INTEGRATION を作成します。
CREATE NOTIFICATION INTEGRATION azure_queue_int ENABLED = true TYPE = QUEUE NOTIFICATION_PROVIDER = AZURE_STORAGE_QUEUE AZURE_STORAGE_QUEUE_PRIMARY_URI = '<キューのURL>' AZURE_TENANT_ID = '<テナントID>';
作成した NOTIFICATION INTEGRATION の情報から、AZURE_CONSENT_URL と AZURE_MULTI_TENANT_APP_NAME のデータを確認します。
DESC NOTIFICATION INTEGRATION azure_queue_int;
AZURE_CONSENT_URL に記載された URL にアクセスすると、Microsoftの許可リクエストが表示されるので許可します。これにより、Snowflake用の サービスプリンシパルが Microsoft Azure に作成されます。
Microsoft Azure 内のエンタープライズアプリケーションを確認し、AZURE_MULTI_TENANT_APP_NAME の「_」以前の名前と一致するサービスプリンシパルが作成されていることを確認します。
作成したストレージにアクセスするために必要な権限をサービスプリンシパルに付与します。
- ロール:ストレージ キュー データ共同作成者
- アクセスの割り当て先:ユーザー、グループ、またはサービス プリンシパル
- メンバー:サービスプリンシパルの名前
STORAGE INTEGRATION の作成
Snowflake から 作成したコンテナへアクセス可能にするために STORAGE INTEGRATION を作成します。
CREATE STORAGE INTEGRATION azure_storage_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'AZURE' ENABLED = TRUE AZURE_TENANT_ID = '<テナントID>' STORAGE_ALLOWED_LOCATIONS = ('azure://<データ格納用のストレージアカウント名>.blob.core.windows.net/<コンテナ名>/')
こちらでもSTORAGE INTEGRATION の情報から、AZURE_CONSENT_URL と AZURE_MULTI_TENANT_APP_NAME のデータを確認します。
DESC STORAGE INTEGRATION azure_storage_int;
NOTIFICATION INTEGRATION と同様に AZURE_CONSENT_URL に記載された URL にアクセスし、Snowflake 用の サービスプリンシパルの作成と確認を行います。
作成したストレージにアクセスするために必要な権限をサービスプリンシパルに付与します。
- ロール:ストレージ BLOB データ共同作成者
- アクセスの割り当て先:ユーザー、グループ、またはサービス プリンシパル
- メンバー:サービスプリンシパルの名前
自動取り込み の設定
ステージの作成
ストレージ統合に利用する STAGE を作成します。今回はDBの作成から行います。
CREATE DATABASE SNOWPIPE_DEMO_DB; CREATE STAGE SNOWPIPE_DEMO_DB.PUBLIC.demo_stage URL = 'azure://<データ格納用のストレージアカウント名>.blob.core.windows.net/<コンテナ名>/' STORAGE_INTEGRATION = <STORAGE INTEGRATION 名>;
テーブルの作成
データの格納先となるテーブルを作成します。
CREATE TABLE SNOWPIPE_DEMO_DB.PUBLIC.DEMO_TABLE( name String, age Int, DoB DATE );
Snowpipeの作成
最後に Snowpipe の設定を行います。 NOTIFICATION INTEGRATION 名は必ず大文字に設定する必要があります。 file_format についてはヘッダー付のCSVファイルを対象としているため、他の形式の場合は下記を参考に調整が必要です。
CREATE SNOWPIPE_DEMO_DB.PUBLIC.DEMO_PIPE auto_ingest = true integration = '<NOTIFICATION INTEGRATION 名の大文字表記>' as copy into <データ格納先のテーブル> from @<ストレージ統合した STAGE> file_format = (type = 'CSV' SKIP_HEADER = 1);
実装内容の確認とまとめ
ここまで実装した内容を確認するために、Microsoft Azure のコンテナに実際にファイルを格納します。
格納後に Snowflake のテーブルを確認すると、上手くデータが取り込まれていました。
まとめ
本記事では、Microsoft Azure 上に CSV ファイルを格納すると、自動で Snowflake にロードされる仕組みの実装を行ってみました。
初めてSnowpipeを実装される方の手助けに少しでも繋がっていれば幸いです。
武信 雄平(日本ビジネスシステムズ株式会社)
Data&AI プラットフォーム部 Data ソリューション1グループ。Snowflakeを中心としたデータ基盤の構築・運用を経験。趣味は美味しいもの(特に甘味)を食べに行くことです。
担当記事一覧