本記事では、Azure Event Hubs(以下、Event Hubs)へのストリーミングデータをApache Kafka(以下、Kafka)コネクタを使用してSnowflakeのテーブルに格納する方法を実際に検証した手順としてご紹介いたします。
なお、検証するにあたっては以下のドキュメントを参考にしました。
Getting Started with Snowpipe Streaming and Azure Event Hubs
- Azure Event HubsとAzure VM を作成
- Azure Event Hubsの作成
- Azure VMの作成
- Linux VM コンソールに接続する
- Snowflakeでの認証に使用するキーペアを作成する
- Snowpipeストリーミング用のKafkaコネクタをインストールする
- Kafkaをインストールする
- Snowflake用のKafkaコネクタをインストールする
- Snowpipe streaming SDKをインストールする
- 接続文字列を取得する
- 接続文字列からKafkaブローカー文字列(BS)を抽出する
- Kafka コネクタの設定ファイルを作成する
- プロデューサーのセキュリティ設定ファイルを作成する
- client.propertiesファイルの作成を確認する
- Event Hubsとの接続を確認する
- ストリーミング用のSnowflakeアカウントを作成
- Snowpipeストリーミング用のKafkaコネクタの設定
- Snowflake Kafka接続設定ファイルの作成
- ストリーミングデータの確認
- Snowflakeで取り込んだデータのクエリ
- 最後に
Azure Event HubsとAzure VM を作成
Azure Event Hubsの作成
Event Hubs名前空間の作成
パブリックアクセスを選択し、Azureポータルから以下の構成で作成します。
Event Hubsの作成
作成したEvent Hubs名前空間の左メニューのエンティティ、Event Hubsを選択し、+イベントハブをクリックします。
任意の名前を入力して「確認と作成」をクリックして作成します。
Azure VMの作成
Azureポータルから以下の構成で作成し、ネットワーク設定でローカルPCからアクセスできるようにしておきます。
Linux VM コンソールに接続する
作成したVMにTeratern等を使用してログインします。
Snowflakeでの認証に使用するキーペアを作成する
以下のコマンドを実行して、ランダムな文字列が出力されることを確認します。パスワードを要求されるので任意の文字列を入力します。
[azureuser@StreamingVM ~]$ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
[azureuser@StreamingVM ~]$ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
[azureuser@StreamingVM ~]$ grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
[azureuser@StreamingVM ~]$ cat pub.Key
Snowpipeストリーミング用のKafkaコネクタをインストールする
以下のコマンドを実行して、Complete!と出力されることを確認します。
[azureuser@StreamingVM ~]$ passwd=changeit
[azureuser@StreamingVM ~]$ directory=/home/azureuser/snowpipe-streaming
[azureuser@StreamingVM ~]$ mkdir -p $directory
[azureuser@StreamingVM ~]$ cd $directory
[azureuser@StreamingVM snowpipe-streaming]$ pwd=`pwd`
[azureuser@StreamingVM snowpipe-streaming]$ sudo yum clean all
[azureuser@StreamingVM snowpipe-streaming]$ sudo yum -y install openssl vim-common java-1.8.0-openjdk-devel.x86_64 gzip tar jq python3-pip
Kafkaをインストールする
以下のコマンドを実行してKafkaコネクタをインストールします。
[azureuser@StreamingVM snowpipe-streaming]$ wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
[azureuser@StreamingVM snowpipe-streaming]$ tar xvfz kafka_2.12-2.8.1.tgz -C $pwd
[azureuser@StreamingVM snowpipe-streaming]$ rm -rf $pwd/kafka_2.12-2.8.1.tgz
[azureuser@StreamingVM snowpipe-streaming]$ cd /tmp && cp /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts kafka.client.truststore.jks[azureuser@StreamingVM tmp]$ cd /tmp && keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $passwd -keypass $passwd -dname "CN=snowflake.com" -alias snowflake -storetype pkcs12
Snowflake用のKafkaコネクタをインストールする
以下のコマンドを実行してKafkaコネクタをインストールします。
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/2.2.1/snowflake-kafka-connector-2.2.1.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-kafka-connector-2.2.1.jar
Snowpipe streaming SDKをインストールする
以下のコマンドを実行してSnowpipe streaming SDKをインストールします。
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/2.1.0/snowflake-ingest-sdk-2.1.0.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-ingest-sdk-2.1.0.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.5/snowflake-jdbc-3.14.5.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-jdbc-3.14.5.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -O $pwd/kafka_2.12-2.8.1/libs/bc-fips-1.0.1.jar
[azureuser@StreamingVM tmp]$ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -O $pwd/kafka_2.12-2.8.1/libs/bcpkix-fips-1.0.3.jar
接続文字列を取得する
Event Hubs名前空間の左メニューの設定、共有アクセスポリシーを選択し、ポリシーをクリックしてPrimary connection stringをコピーします。
VMで以下のコマンド実行します。CS変数には上記でコピーした内容(接続文字列)を代入します。
[azureuser@StreamingVM tmp]$ export CS="Endpoint=sb://XXXXXXXX.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXL0fa8QZz7Nh7vqe+AEhC6Lwj0="
接続文字列からKafkaブローカー文字列(BS)を抽出する
以下コマンド実行してBSを環境変数として設定します。
[azureuser@StreamingVM tmp]$ export BS=`echo $CS | awk -F\/ '{print $3":9093"}'`
[azureuser@StreamingVM tmp]$ echo "export CS=\"$CS\"" >> ~/.bashrc
[azureuser@StreamingVM tmp]$ echo "export BS=$BS" >> ~/.bashrc
Kafka コネクタの設定ファイルを作成する
以下のコマンドを実行して、クライアントが Event Hubs 名前空間で認証するための構成ファイルをディレクトリに生成します。
[azureuser@StreamingVM tmp]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM tmp]$ mkdir -p $dir && cd $dir
[azureuser@StreamingVM scripts]$ cat << EOF > $dir/connect-standalone.properties
#************CREATING SNOWFLAKE Connector****************
bootstrap.servers=$BS
#************SNOWFLAKE VALUE CONVERSION****************
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#************SNOWFLAKE ****************
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS";
plugin.path=/home/azureuser/snowpipe-streaming/kafka_2.12-2.8.1/libs
EOF
プロデューサーのセキュリティ設定ファイルを作成する
以下のコマンドを実行して、ストリーミングデータを取得するためのKafkaプロデューサー用のセキュリティ構成ファイルを作成します。
[azureuser@StreamingVM scripts]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM scripts]$ cat << EOF > $dir/client.properties
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="\$ConnectionString" password="$CS";
EOF
client.propertiesファイルの作成を確認する
以下のコマンド実行してファイルが作成されていることを確認します。
[azureuser@StreamingVM scripts]$ ll /home/azureuser/snowpipe-streaming/scripts
total 8
-rw-rw-r--. 1 azureuser azureuser 369 Feb 26 02:01 client.properties
-rw-rw-r--. 1 azureuser azureuser 1376 Feb 26 02:00 connect-standalone.properties
Event Hubsとの接続を確認する
以下のコマンドを実行してEvent Hubsへデータが送信されることを確認します。
[azureuser@StreamingVM scripts]$ $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --describe --topic streaming
Topic: streaming PartitionCount: 1 ReplicationFactor: 0 Configs: cleanup.policy=delete,message.timestamp.type=LogAppendTime,retention.ms=3600000,message.timestamp.difference.max.ms=7776000000
Topic: streaming Partition: 0 Leader: 0 Replicas: Isr:
ストリーミング用のSnowflakeアカウントを作成
ACCOUNTADMIN ロールを持つユーザーでSnowflakeアカウントにログインし、以下のSQL コマンドを実行して、ストリーミング処理に必要なユーザー、データベース、およびロールを作成します。
後程使用するため、最後に出力されたハイフンを含めたACCOUNT_IDENTIFER(アカウント識別子)を控えておきます。
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'AZ_STREAMING_DB';
SET WH = 'AZ_STREAMING_WH';
SET ROLE = 'AZ_STREAMING_RL';
USE ROLE ACCOUNTADMIN;
-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER';
-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);
-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';
-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);
-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;
-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636
WITH HOSTLIST AS
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';
以下のコマンドをターミナルから実行して、ストリーミングユーザーがプログラムでSnowflakeにアクセスするための公開鍵を控えておきます。
[azureuser@StreamingVM scripts]$ cat /home/azureuser/pub.Key
Snowflakeワークシートで、次のSQLコマンド<pubKey>を上記で出力した内容に置き換えて実行します。
use role accountadmin;
alter user streaming_user set rsa_public_key='< pubKey >';
Snowflakeからログアウトし、ユーザー:STREAMING_USER/パスワード:Test1234567を使用して再度サインインします。ワークシートで以下のSQLコマンドを実行して、デフォルトのデータベースにスキーマを作成します。
SET DB = 'AZ_STREAMING_DB';
SET SCHEMA = 'AZ_STREAMING_SCHEMA';
USE IDENTIFIER($DB);
CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);
Snowpipeストリーミング用のKafkaコネクタの設定
以下のコマンドを実行して、Kafka コネクタの接続パラメータを収集します。
[azureuser@StreamingVM ~]$ outf=/tmp/params
[azureuser@StreamingVM ~]$ cat << EOF > /tmp/get_params
a=''
until [ ! -z \$a ]
do
read -p "Input Snowflake account identifier: e.g. ylmxgak-wyb53646 ==> " a
done
echo export clstr_url=\$a.snowflakecomputing.com > $outf
export clstr_url=\$a.snowflakecomputing.com
read -p "Snowflake cluster user name: default: streaming_user ==> " user
if [[ \$user == "" ]]
then
user="streaming_user"
fi
echo export user=\$user >> $outf
export user=\$user
pass=''
until [ ! -z \$pass ]
do
read -p "Private key passphrase ==> " pass
done
echo export key_pass=\$pass >> $outf
export key_pass=\$pass
read -p "Full path to your Snowflake private key file, default: /home/azureuser/rsa_key.p8 ==> " p8
if [[ \$p8 == "" ]]
then
p8="/home/azureuser/rsa_key.p8"
fi
priv_key=\`cat \$p8 | grep -v PRIVATE | tr -d '\n'\`
echo export priv_key=\$priv_key >> $outf
export priv_key=\$priv_key
cat $outf >> $HOME/.bashrc
EOF
Private key passphraseの後のXXXを、上記手順で控えておいた$passの内容で置き換えて、以下のコマンドを実行します。
[azureuser@StreamingVM ~]$ . /tmp/get_params
Input Snowflake account identifier: e.g. ylmxgak-wyb53646 ==> XXXXX-XXXXX
Snowflake cluster user name: default: streaming_user ==>
Private key passphrase ==> XXXX
Full path to your Snowflake private key file, default: /home/azureuser/rsa_key.p8 ==>
Snowflake Kafka接続設定ファイルの作成
次のコマンドを実行して、Kafka コネクタの設定ファイルを生成します。
[azureuser@StreamingVM ~]$ dir=/home/azureuser/snowpipe-streaming/scripts
[azureuser@StreamingVM ~]$ cat << EOF > $dir/snowflakeconnectorAZ.properties
name=snowpipeStreaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=4
topics=streaming
snowflake.private.key.passphrase=$key_pass
snowflake.database.name=AZ_STREAMING_DB
snowflake.schema.name=AZ_STREAMING_SCHEMA
snowflake.topic2table.map=streaming:AZ_STREAMING_TBL
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=$clstr_url
snowflake.user.name=$user
snowflake.private.key=$priv_key
snowflake.role.name=AZ_STREAMING_RL
snowflake.ingestion.method=snowpipe_streaming
snowflake.enable.schematization=false
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
valur.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
errors.tolerance=all
EOF
ストリーミングデータの確認
ここからは、実際にストリーミングデータをSnowflakeのテーブルに格納します。
Snowpipeストリーミング用のKafkaコネクタを開始する
以下のSQLコマンドを実行してVMのIPアドレスを許可します。(XXXはVMのIPアドレスに置換してください)
CREATE NETWORK POLICY kafka_network_policy ALLOWED_IP_LIST = ('XXX.XXX.XXX.XXX');
ALTER USER STREAMING_USER SET NETWORK_POLICY = kafka_network_policy;
Linux コンソールに戻り、次のコマンドを実行して Kafka コネクタを起動します。
[azureuser@StreamingVM ~]$ $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/connect-standalone.sh $HOME/snowpipe-streaming/scripts/connect-standalone.properties $HOME/snowpipe-streaming/scripts/snowflakeconnectorAZ.properties
このようなメッセージが出力されていることを確認します。
上記の画面を開いたままにして、コネクタを引き続き実行させます。
Kafkaプロデューサーを起動する
VM への新しい ssh セッション接続を開き、以下のコマンドを実行します。
while true;
do
curl --connect-timeout 5 -k http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming;
sleep 10;
done
以下の画面のように出力されることを確認します。
上記のスクリプトでは、プロデューサーは、サンフランシスコ ベイエリアのリアルタイムのフライト データを JSON 形式で提供するに対してクエリを実行します。データには、タイムスタンプ、icao番号、フライトID、目的地空港、航空機の経度、緯度、高度などの情報が含まれます。
データはイベントハブのトピックに取り込まれ、SnowpipeストリーミングKafkaコネクタによって取得され、Snowflakeテーブルに直接配信されます。
Snowflakeで取り込んだデータのクエリ
STREAMING_USERでSnowflakeにログインし、データがSnowflakeにストリーミングされたことを確認するために、ワークシートで以下の SQL コマンドを実行します。(10秒ごとにレコードが増えていきます)
select * from az_streaming_tbl;
最後に
Azure Event Hubs経由でストリーミングデータをSnowflakeのテーブルに格納するまでを検証しました、
KafkaコネクタがVM上にあるので、本番環境では冗長化の検討が必要なるかと思います。
今後、機会があれば別の構成でのストリーミングデータの取り込みを試してみたいと思います。
土山 和也(日本ビジネスシステムズ株式会社)
ユーザーサポート、開発、運用、構築業務を経験し、現在はアーキテクトとして提案・プロジェクト支援に従事。専門はデータベースでDBA歴十数年。Azure/AWSを担当。
担当記事一覧