Snowflakeの動的テーブルを触ってみた

本記事では、Snowflakeの動的テーブルの動作を確認します。

従来ではタスクとストリームを使って実装していたリアルタイム処理が、動的テーブルを使うことで簡潔に実装できるようです。

概要

まずは、Snowflakeの動的テーブルがどのような機能をなのかを確認するために、公式ドキュメントを確認します。

docs.snowflake.com

動的テーブルは、指定したクエリの結果を具体化します。ターゲットテーブルを別に作成し、そのテーブルのデータを変換および更新するコードを記述する代わりに、ターゲットテーブルを動的テーブルとして定義し、変換を実行する SQL ステートメントを指定することができます。自動プロセスでは、定期的な リフレッシュ によって、具体化された結果が自動的にリフレッシュされます。

動的テーブルのコンテンツは特定のクエリによって決定されるため、 DML を使用してコンテンツを変更することはできません。動的テーブルの行を挿入、更新、削除することはありません。自動リフレッシュプロセスは、クエリ結果を動的テーブルに具体化します。

タスクとストリームを使って実装していた一部の処理を、動的テーブルを使って簡潔に実装できるようです。

従来では、Snowflakeの機能のみでリアルタイム処理をしたい場合には、タスクとストリームを組み合わせて実装していました。

例えば、rawテーブルに取り込んだid,first_name,last_nameからなるJSONデータをパースし、id,first_name,last_nameの3列からなるnamesテーブルに格納する処理では、まずrawテーブルに対してストリーム(rawstream1)を作成します。

パース処理を実行するタスク(raw to names)のスケジュールはできる限りリアルタイムで処理するために毎分実行するように設定します。

タスクが実行されるとストリーム(rawstream1)を通してrawテーブルが更新されているかどうかを確認します。

更新されている場合には定義した処理(新しいJSONデータが入っていた場合には、それをパースしnamesテーブルに格納。既存データが更新、削除されていた場合にはそれを更新、削除)を実行します。

更新されていない場合には処理はスキップされます。

-- rawテーブル作成
CREATE OR REPLACE TABLE raw
(var VARIANT);
 
-- ストリーム作成
CREATE OR REPLACE STREAM rawstream1
ON TABLE raw;
 
-- namesテーブル作成
CREATE OR REPLACE TABLE names
(
id INT,
first_name STRING,
last_name STRING
);

-- タスク作成
CREATE OR REPLACE TASK raw_to_names
WAREHOUSE = mywh
SCHEDULE = '1 minute'
WHEN
SYSTEM$STREAM_HAS_DATA('rawstream1')
AS
MERGE INTO names n
USING (
SELECT
var:id id,
var:fname fname,
var:lname lname
FROM rawstream1
)
r1 ON n.id = TO_NUMBER(r1.id)
WHEN MATCHED AND metadata$action = 'DELETE' THEN
DELETE
WHEN MATCHED AND metadata$action = 'INSERT' THEN
UPDATE SET n.first_name = r1.fname, n.last_name = r1.lname
WHEN NOT MATCHED AND metadata$action = 'INSERT' THEN
INSERT (id, first_name, last_name)
VALUES (r1.id, r1.fname, r1.lname);

一方、動的テーブルを使った場合には、以下のようにrawテーブルとnamesテーブル(動的テーブル)の2つのオブジェクトのみで実装できるようになるとのことです。

-- rawテーブル作成
CREATE OR REPLACE TABLE raw
(var VARIANT);

-- Dynamic Table作成
CREATE OR REPLACE DYNAMIC TABLE names
TARGET_LAG = '1 minute'
WAREHOUSE = mywh
AS
SELECT
var:id::int id,
var:fname::string first_name,
var:lname::string last_name
FROM raw;

コスト

タスクとストリーム、動的テーブルのどちらもコストの請求は変わらなさそうです。

  • ストレージ
    • テーブルの容量に応じたコスト
  • クラウドサービス
    • ベースとなるテーブルが更新されているかどうか確認するためにかかるコスト
  • コンピュート
    • ベースとなるテーブルが更新されていた場合に実行される処理にかかるコスト
    • 更新されていない場合は処理がスキップされるのでコストがかからない

クラウドサービスのコストは日々のコンピュートコストの10%まで無料です。

ベーステーブルの更新確認1回あたりにかかるコストは小さいため、基本的に上記の無料枠に収まります。

一方で、以下のような場合には無料枠に収まらず追加で徴収される可能性があります。

  • タスクとストリーム、動的テーブルを無数に作成し更新頻度を1分にする
  • 動いているジョブが少なく日々のコンピュートコストが低い

検証

実際にSnowflakeで動的テーブルを作成し、動作を確認します。

オブジェクト作成

以下のオブジェクトを作成

  • DB
  • スキーマ
  • テーブル(動的テーブルが参照するテーブル)
  • 動的テーブル

動的テーブルのCREATE文のtarget_lagオプションで指定した時間間隔で、動的テーブルが更新を試みます。最小値は1分です。

warehouseオプションでは、更新処理する際のウェアハウス(計算リソース)を指定します。今回はデフォルトで用意されているcompute_whを指定しました。

AS以下でnamesテーブルをどのように更新するかを定義しています。

今回はrawテーブルに格納したvariant型のJSONデータをパースするように定義します。

--DB、スキーマを作成
create database test;
create schema test.test;

-- rawテーブル作成
CREATE OR REPLACE TABLE raw
(var VARIANT);

-- Dynamic Table作成
CREATE OR REPLACE DYNAMIC TABLE names
TARGET_LAG = '1 minute'
WAREHOUSE = compute_wh
AS
SELECT
var:id::int id,
var:fname::string first_name,
var:lname::string last_name
FROM raw;

動作確認

動的テーブルが参照するテーブルに、データを挿入します。

-- rawテーブルにJSONデータをINSERT
insert into raw
  select parse_json(column1)
  from values
('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'),
('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}');

データ挿入後すぐにそれぞれのテーブルを確認したところ、rawテーブルにはデータが入っていますが、namesテーブルは空のままです。

rawテーブル

namesテーブル

データ挿入から1分後にnamesテーブルを確認したところ、rawテーブルに挿入したデータがパースされたデータが入っていることが確認できました。

次に、rawテーブルから2つあるレコードのうち1つを削除してみます。

削除から1分後にnamesテーブルを確認したところ、rawテーブルから削除したレコードがnamesテーブルからも削除されていることが確認できました。

更新履歴

動的テーブルの更新履歴は、INFORMATION_SCHEMAのDYNAMIC_TABLE_REFRESH_HISTORY()から確認できます。

INFORMATION_SCHEMAは、アカウントで作成されたオブジェクトに関する広範なメタデータ情報を提供するシステム定義のビューとテーブル関数のセットで構成されており、テーブルのメタデータやタスクの実行履歴などさまざまな情報を確認することができます。

実際に実行して確認した結果がこちらです。

  • NAME:動的テーブル名
  • STATE:動的テーブルのリフレッシュのステータス
  • STATE_CODE:リフレッシュの現行状態を表すコード
  • STATE_MESSAGE:リフレッシュの現行状態の説明
  • QUERY_ID:動的テーブルの結果を生成した SQL ステートメントの ID
  • DATA_TIMESTAMP:リフレッシュが評価されたトランザクションのタイムスタプ。このタイムスタンプより前に到着したベースオブジェクトのデータが動的テーブルに含まれる。
  • REFRESH_START_TIME:リフレッシュジョブの開始時刻
  • REFRESH_END_TIME:リフレッシュの完了時刻

スケジュールは1分に設定していましたが、REFRSH_START_TIME列を見るとそれより短い間隔で更新しているようです。

レコードの増減などの情報は特になく、どのタイミングで増減したのか履歴から確認することはできないようです。

更新停止

動的テーブルは以下のALTER文を実行することで更新停止することができます。

更新再開させたい場合には、suspendをresumeに置き換えて実行します。

使い分け

  • タスクとストリーム
    • 複雑な処理の実行
      • タスクからストアドプロシージャを呼び出すことができる
        • 複数のSQLの実行
        • PythonやJavaScriptなどで定義した処理の実行
  • 動的テーブル
    • 単一テーブルを変換するパイプライン
    • SQLベースのパイプライン 
    • ビューの代わり(マテリアライズドビューと動的テーブルのどちらが良いかは要検討)

まとめ

今回は、JSONデータをパースする処理を動的テーブルを用いて実装しました。

今まで、Snowflakeの機能だけで実装しようとするとタスクとストリームで実装していたものが、動的テーブルを使えば簡単に実装できるようになりました。

タスクとストリームの実装と比較して管理するオブジェクトが減るのは大きなメリットです。

複雑な処理の実装は難しそうですが、リアルタイムで簡単な処理をしないといけない場合は動的テーブルを使っていこうと思います。

執筆担当者プロフィール
平木 伊織

平木 伊織(日本ビジネスシステムズ株式会社)

ハイブリッドクラウド本部ハイブリッドクラウド5部4グループ。Snowflake、AWS、Azureなどを扱っています。

担当記事一覧