Azure Event HubsからSnowflakeへのストリーミング

本記事では、Azure Event Hubs(以下、Event Hubs)へのストリーミングデータをApache Kafka(以下、Kafka)コネクタを使用してSnowflakeのテーブルに格納する方法を実際に検証した手順としてご紹介いたします。

なお、検証するにあたっては以下のドキュメントを参考にしました。

Getting Started with Snowpipe Streaming and Azure Event Hubs

Azure Event HubsとAzure VM を作成

Azure Event Hubsの作成

Event Hubs名前空間の作成

パブリックアクセスを選択し、Azureポータルから以下の構成で作成します。

Event Hubsの作成

作成したEvent Hubs名前空間の左メニューのエンティティ、Event Hubsを選択し、+イベントハブをクリックします。

任意の名前を入力して「確認と作成」をクリックして作成します。

Azure VMの作成

Azureポータルから以下の構成で作成し、ネットワーク設定でローカルPCからアクセスできるようにしておきます。

Linux VM コンソールに接続する

作成したVMにTeratern等を使用してログインします。

Snowflakeでの認証に使用するキーペアを作成する

以下のコマンドを実行して、ランダムな文字列が出力されることを確認します。パスワードを要求されるので任意の文字列を入力します。

 [azureuser@StreamingVM ~]$ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
 [azureuser@StreamingVM ~]$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
 [azureuser@StreamingVM ~]$ grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
 [azureuser@StreamingVM ~]$ cat pub.Key
Snowpipeストリーミング用のKafkaコネクタをインストールする

以下のコマンドを実行して、Complete!と出力されることを確認します。

[azureuser@StreamingVM ~]$ passwd=changeit
[azureuser@StreamingVM ~]$ directory=/home/azureuser/snowpipe-streaming
[azureuser@StreamingVM ~]$ mkdir -p $directory
[azureuser@StreamingVM ~]$ cd $directory
[azureuser@StreamingVM snowpipe-streaming]$ pwd=`pwd`
[azureuser@StreamingVM snowpipe-streaming]$ sudo yum clean all
[azureuser@StreamingVM snowpipe-streaming]$ sudo yum -y install openssl vim-common java-1.8.0-openjdk-devel.x86_64 gzip tar jq python3-pip
Kafkaをインストールする

以下のコマンドを実行してKafkaコネクタをインストールします。

[azureuser@StreamingVM snowpipe-streaming]$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
[azureuser@StreamingVM snowpipe-streaming]$ tar xvfz kafka_2.12-2.8.1.tgz -C $pwd
[azureuser@StreamingVM snowpipe-streaming]$ rm -rf $pwd/kafka_2.12-2.8.1.tgz
[azureuser@StreamingVM snowpipe-streaming]$ cd /tmp && cp /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts kafka.client.truststore.jks[azureuser@StreamingVM tmp]$ cd /tmp && keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $passwd -keypass $passwd -dname "CN=snowflake.com" -alias snowflake -storetype pkcs12
Snowflake用のKafkaコネクタをインストールする

以下のコマンドを実行してKafkaコネクタをインストールします。

[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.2.1/snowflake-kafka-connector-2.2.1.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-kafka-connector-2.2.1.jar
Snowpipe streaming SDKをインストールする

以下のコマンドを実行してSnowpipe streaming SDKをインストールします。

[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/2.1.0/snowflake-ingest-sdk-2.1.0.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-ingest-sdk-2.1.0.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.5/snowflake-jdbc-3.14.5.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-jdbc-3.14.5.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -O $pwd/kafka_2.12-2.8.1/libs/bc-fips-1.0.1.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -O $pwd/kafka_2.12-2.8.1/libs/bcpkix-fips-1.0.3.jar
接続文字列を取得する

Event Hubs名前空間の左メニューの設定、共有アクセスポリシーを選択し、ポリシーをクリックしてPrimary connection stringをコピーします。

VMで以下のコマンド実行します。CS変数には上記でコピーした内容(接続文字列)を代入します。

[azureuser@StreamingVM tmp]$ export CS="Endpoint=sb://XXXXXXXX.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXL0fa8QZz7Nh7vqe+AEhC6Lwj0="
接続文字列からKafkaブローカー文字列(BS)を抽出する

以下コマンド実行してBSを環境変数として設定します。

[azureuser@StreamingVM tmp]$ export BS=`echo $CS | awk -F\/ '{print $3":9093"}'`
[azureuser@StreamingVM tmp]$ echo "export CS=\"$CS\"" >> ~/.bashrc
[azureuser@StreamingVM tmp]$ echo "export BS=$BS" >> ~/.bashrc
Kafka コネクタの設定ファイルを作成する

以下のコマンドを実行して、クライアントが Event Hubs 名前空間で認証するための構成ファイルをディレクトリに生成します。

[azureuser@StreamingVM tmp]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM tmp]$ mkdir -p $dir && cd $dir
[azureuser@StreamingVM scripts]$ cat << EOF > $dir/connect-standalone.properties
#************CREATING SNOWFLAKE Connector****************
bootstrap.servers=$BS
#************SNOWFLAKE VALUE CONVERSION****************
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true #************SNOWFLAKE **************** offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # required EH Kafka security settings
security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS"; consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS"; plugin.path=/home/azureuser/snowpipe-streaming/kafka_2.12-2.8.1/libs EOF
プロデューサーのセキュリティ設定ファイルを作成する

以下のコマンドを実行して、ストリーミングデータを取得するためのKafkaプロデューサー用のセキュリティ構成ファイルを作成します。

[azureuser@StreamingVM scripts]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM scripts]$ cat << EOF > $dir/client.properties
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS";
EOF
client.propertiesファイルの作成を確認する

以下のコマンド実行してファイルが作成されていることを確認します。

[azureuser@StreamingVM scripts]$ ll /home/azureuser/snowpipe-streaming/scripts
total 8
-rw-rw-r--. 1 azureuser azureuser  369 Feb 26 02:01 client.properties
-rw-rw-r--. 1 azureuser azureuser 1376 Feb 26 02:00 connect-standalone.properties
Event Hubsとの接続を確認する

以下のコマンドを実行してEvent Hubsへデータが送信されることを確認します。

[azureuser@StreamingVM scripts]$ $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --describe --topic streaming
Topic: streaming        PartitionCount: 1       ReplicationFactor: 0    Configs: cleanup.policy=delete,message.timestamp.type=LogAppendTime,retention.ms=3600000,message.timestamp.difference.max.ms=7776000000
Topic: streaming        Partition: 0    Leader: 0       Replicas:       Isr:

ストリーミング用のSnowflakeアカウントを作成

ACCOUNTADMIN ロールを持つユーザーでSnowflakeアカウントにログインし、以下のSQL コマンドを実行して、ストリーミング処理に必要なユーザー、データベース、およびロールを作成します。

後程使用するため、最後に出力されたハイフンを含めたACCOUNT_IDENTIFER(アカウント識別子)を控えておきます。

SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'AZ_STREAMING_DB';
SET WH = 'AZ_STREAMING_WH';
SET ROLE = 'AZ_STREAMING_RL';
USE ROLE ACCOUNTADMIN;
-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER';
-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);
-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';
-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);
-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;
-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636
WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

以下のコマンドをターミナルから実行して、ストリーミングユーザーがプログラムでSnowflakeにアクセスするための公開鍵を控えておきます。

[azureuser@StreamingVM scripts]$ cat /home/azureuser/pub.Key

Snowflakeワークシートで、次のSQLコマンド<pubKey>を上記で出力した内容に置き換えて実行します。

use role accountadmin;
alter user streaming_user set rsa_public_key='< pubKey >';

Snowflakeからログアウトし、ユーザー:STREAMING_USER/パスワード:Test1234567を使用して再度サインインします。ワークシートで以下のSQLコマンドを実行して、デフォルトのデータベースにスキーマを作成します。

SET DB = 'AZ_STREAMING_DB';
SET SCHEMA = 'AZ_STREAMING_SCHEMA';
USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);

Snowpipeストリーミング用のKafkaコネクタの設定

以下のコマンドを実行して、Kafka コネクタの接続パラメータを収集します。

[azureuser@StreamingVM ~]$ outf=/tmp/params
[azureuser@StreamingVM ~]$ cat << EOF > /tmp/get_params
a=''
until [ ! -z \$a ]
do
  read -p "Input Snowflake account identifier: e.g. ylmxgak-wyb53646 ==> " a
done
echo export clstr_url=\$a.snowflakecomputing.com > $outf
export clstr_url=\$a.snowflakecomputing.com
read -p "Snowflake cluster user name: default: streaming_user ==> " user
if [[ \$user == "" ]]
then
  user="streaming_user"
fi
echo export user=\$user >> $outf
export user=\$user
pass=''
until [ ! -z \$pass ]
do
  read -p "Private key passphrase ==> " pass
done
echo export key_pass=\$pass >> $outf
export key_pass=\$pass
read -p "Full path to your Snowflake private key file, default: /home/azureuser/rsa_key.p8 ==> " p8
if [[ \$p8 == "" ]]
then
  p8="/home/azureuser/rsa_key.p8"
fi
priv_key=\`cat \$p8 | grep -v PRIVATE | tr -d '\n'\`
echo export priv_key=\$priv_key  >> $outf
export priv_key=\$priv_key
cat $outf >> $HOME/.bashrc
EOF

Private key passphraseの後のXXXを、上記手順で控えておいた$passの内容で置き換えて、以下のコマンドを実行します。

[azureuser@StreamingVM ~]$ . /tmp/get_params
Input Snowflake account identifier: e.g. ylmxgak-wyb53646 ==> XXXXX-XXXXX
Snowflake cluster user name: default: streaming_user ==>
Private key passphrase ==> XXXX
Full path to your Snowflake private key file, default: /home/azureuser/rsa_key.p8 ==>

Snowflake Kafka接続設定ファイルの作成

次のコマンドを実行して、Kafka コネクタの設定ファイルを生成します。

[azureuser@StreamingVM ~]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM ~]$ cat << EOF > $dir/snowflakeconnectorAZ.properties
name=snowpipeStreaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=4
topics=streaming
snowflake.private.key.passphrase=$key_pass
snowflake.database.name=AZ_STREAMING_DB
snowflake.schema.name=AZ_STREAMING_SCHEMA
snowflake.topic2table.map=streaming:AZ_STREAMING_TBL
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=$clstr_url
snowflake.user.name=$user
snowflake.private.key=$priv_key
snowflake.role.name=AZ_STREAMING_RL
snowflake.ingestion.method=snowpipe_streaming
snowflake.enable.schematization=false
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
valur.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
errors.tolerance=all
EOF

ストリーミングデータの確認

ここからは、実際にストリーミングデータをSnowflakeのテーブルに格納します。

Snowpipeストリーミング用のKafkaコネクタを開始する

以下のSQLコマンドを実行してVMのIPアドレスを許可します。(XXXはVMのIPアドレスに置換してください)

CREATE NETWORK POLICY kafka_network_policy ALLOWED_IP_LIST = ('XXX.XXX.XXX.XXX');
ALTER USER STREAMING_USER SET NETWORK_POLICY = kafka_network_policy;

Linux コンソールに戻り、次のコマンドを実行して Kafka コネクタを起動します。

[azureuser@StreamingVM ~]$ $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/connect-standalone.sh $HOME/snowpipe-streaming/scripts/connect-standalone.properties $HOME/snowpipe-streaming/scripts/snowflakeconnectorAZ.properties

このようなメッセージが出力されていることを確認します。

上記の画面を開いたままにして、コネクタを引き続き実行させます。

Kafkaプロデューサーを起動する

VM への新しい ssh セッション接続を開き、以下のコマンドを実行します。

while true; 
do
  curl --connect-timeout 5 -k http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming;
  sleep 10; 
done

以下の画面のように出力されることを確認します。

上記のスクリプトでは、プロデューサーは、サンフランシスコ ベイエリアのリアルタイムのフライト データを JSON 形式で提供するに対してクエリを実行します。データには、タイムスタンプ、icao番号、フライトID、目的地空港、航空機の経度、緯度、高度などの情報が含まれます。

データはイベントハブのトピックに取り込まれ、SnowpipeストリーミングKafkaコネクタによって取得され、Snowflakeテーブルに直接配信されます。

Snowflakeで取り込んだデータのクエリ

STREAMING_USERでSnowflakeにログインし、データがSnowflakeにストリーミングされたことを確認するために、ワークシートで以下の SQL コマンドを実行します。(10秒ごとにレコードが増えていきます)

select * from az_streaming_tbl;

最後に

Azure Event Hubs経由でストリーミングデータをSnowflakeのテーブルに格納するまでを検証しました、

KafkaコネクタがVM上にあるので、本番環境では冗長化の検討が必要なるかと思います。

今後、機会があれば別の構成でのストリーミングデータの取り込みを試してみたいと思います。

執筆担当者プロフィール
土山 和也

土山 和也(日本ビジネスシステムズ株式会社)

ユーザーサポート、開発、運用、構築業務を経験し、現在はアーキテクトとして提案・プロジェクト支援に従事。専門はデータベースでDBA歴十数年。Azure/AWSを担当。

担当記事一覧