これまで、Informatica 社の提供するInformatica Intelligent Cloud Service (以降、IICS)と呼ばれるデータ活用のプラットフォームについてご紹介してきました。
最初の記事では、データのETL (Extract/Transform/Load)処理を行うために、データの処理を実行するコンポーネントである、「Secure Agent」の構築について書きました。
blog.jbs.co.jp
次の記事では、オンプレミスのSQLサーバやクラウドサービスで提供されているストレージ・データウェアハウスのサービスと連携を行うための「接続(Connector)」の構築について書きました。
blog.jbs.co.jp
本記事では、いよいよETL処理の方法を設計する「マッピング」について、説明します。
マッピング作成
マッピングとは
マッピングは、IICSのサービスの一つである「Cloud Data Integration」の中に含まれる一つの機能です。
何のデータを読み込んで、どういった加工を実施して、何処にデータを格納するかをWeb UIの画面で設定するものです。
以下は、マッピングを新規に作成する際に表示される最初の画面です。はじめは「ソース」と「ターゲット」と書かれている2つの四角いアイコンに、矢印の線が結ばれている状態になっています。
それぞれのアイコンを「トランスフォーメーション」と呼びます。
「ソース」と書かれているものは、「ソーストランスフォーメーション」、「ターゲット」と書かれているものは「ターゲットトランスフォーメーション」と呼ばれ、それぞれ、データの読み込む元と、格納先を示す「トランスフォーメーション」です。
また、矢印は「データフロー」と呼び、データの加工の流れを示します。
トランスフォーメーション同士を複数データフローで繋いくことにより、ETLに必要なデータ加工処理一連の流れを開発することができます。
環境
今回、Azure Data Lake Storage Gen2(以降、ADLS)から、2つのディレクトリ内に格納されている csv ファイルを読み込み、加工を実施し、Snowflake にある1つのテーブルに書き込む、といった処理を実施します。
マッピングで定義した加工処理を Secure Agent が処理を実施し、Snowflake に加工後のデータを格納します。
加工シナリオ
とあるかき氷屋さんでは、販売しているかき氷のデータを「商品マスタデータ」で管理しています。
また、日々の売り上げ情報を「売上データ」で管理しています。
それぞれの csv データから必要なデータを抽出し Snowflake 上にある「sales_tb」というテーブルにデータを格納します。
Snowflake のテーブルの定義は以下のようになっています。
テーブルの各カラム(列)が持つ情報としては、以下の図のようになっています。
どのかき氷がいつ、いくらで販売されたかを商品名で確認できるシンプルなテーブルをデータ加工によって得ることが今回のゴールです。
読み込むファイル
読み込む csv ファイルは、コンテナ「maindir」配下の「01_MASTER」と「02_LIST」に格納されています。
Informatica の接続(Connector)の設定では、コンテナ「maindir」配下のデータすべてにアクセス可能です。
「01_MASTER」に「商品マスターデータ」が、「02_LIST」に「売上データ」がcsvファイル形式で格納されています。
書き込むテーブル
加工後のデータのテーブルは以下のデータベース、スキーマに配置しています。
Informatica の接続(Connector)の設定では、上図のデータベース、スキーマ、テーブルすべてに対する OWNERSHIP 権限を持ったロールを付与しているユーザ情報を利用しています。
新規マッピングの作成
お待たせしました。いよいよマッピングの作成に移ります。
サービスのメニュー画面から、「データ統合」をクリックします。
「参照」をクリックし、プロジェクトの一覧を表示します。
マッピングなど、加工処理のロジックなどは、「アセット」と呼ばれ、プロジェクトや、そのプロジェクト配下のフォルダに作成、保存します。
それぞれ管理単位や、用途ごとに作成し、アセットの管理を行えます。
今回、新規に「01_project」という名前のプロジェクトを作成し、その中でマッピングの作成を実施します。
すべてのプロジェクト一覧画面の右上「新規プロジェクト」をクリックし、新しくプロジェクトを作成します。
プロジェクト名を入力後、「保存」をクリックで、一覧画面から作成したプロジェクトをクリックします。
続いて、新規フォルダを作成します。画面右上の「新規フォルダ」をクリックします。
フォルダ名を入力後、「保存」をクリックで、一覧画面から作成したフォルダをクリックします。
作成したフォルダに移行したら、「新規」アイコンをクリックします。新規をクリックすることで、参照しているプロジェクトやフォルダ配下にアセットが作成されます。
新しいアセットのメニューが開きます。「マッピング」を選択し、作成をクリックします。
新しいマッピングが作成されます。この画面から、マッピングの設定を進めていきます。
マッピングの名前は画面下の「名前」から変更できます。今回は「ETL_Sales_1」と名前を付けます。実行時のログなどから、ジョブを特定する際にも使われるジョブの名前です。
ソーストランスフォーメーションの設定
デザインの枠の中にあらかじめ存在している「ソース」と書かれた四角のアイコンをクリックします。
これは冒頭説明した「ソーストランスフォーメーション」です。ソースをクリックしたら画面下のメニューがソーストランスフォーメーションの設定内容が表示されるようになります。
まず「全般」で「ソーストランスフォーメーション」の名前を変更します。その後、「商品マスターデータ」を読み出すソーストランスフォーメーションを作成します。名前を「master」としました。
次に、「ソース」をクリックし、接続を選択します。ソースに指定したいファイルは ADLS 上にありますので、ADLS のコネクタの接続を選択します。
オブジェクトの選択をクリックし、ADLS 上にある「商品マスターデータ」の csv ファイルを選択します。
ファイル選択後に「形式」という欄が出現します。今回読み出すファイル形式の csv ファイルに対応する形式「Flat」を選択します。選択後は「形式オプション」というボタンが出現しますので、そちらをクリックします。
形式オプションの設定では、読み取るファイルが csv (カンマ区切り)なのか? tsv (タブ区切り)なのか?文字コードは何なのか?などなど、これから読み取るフラットファイルの内容を決定する非常に重要な設定内容になっています。
こちらの設定をおろそかにしてしまうと、取り込んだデータが文字化けしたり、本来データが格納されるカラムとは別のカラムにデータが入ってしまうといった事象を引き起こしてしまいます。
デフォルト値は以下の画像の内容で設定されています。
今回のデータはデフォルトの設定のまま利用します。
利用経験上、以下の項目は変更する機会が多いです。
- コードページ:文字コードに合わせて変更します。
- 区切り文字:どの文字が列の区切りとして利用されているか。
- ヘッダー行番号:どの行がヘッダー(列名・カラム名)として利用されているか。
- 最初のデータ行:データはどの行から始まっているか。
- ターゲットヘッダー:ヘッダを読み込ませるかどうか。
各項目の詳細な説明は以下の公式ドキュメントをご確認ください。
docs.informatica.com
ここまで設定した内容で、正しくデータが取得できているかを「プレビュー」で確認ができます。
プレビュータブから「プレビューの実行」をクリックします。
マッピングの保存を求められます。「はい」をクリックして先に進みます。
その後、プレビューの条件を設定します。行数はデフォルトで100行までとなっています。今回はそのままにして実行します。ランタイム環境は自身で作成した Secure Agent を指定します。
プレビューを実行すると、無事にデータが取れていることが確認できます。「FileName」という新規の列が自動で付与されます。
同様の手順で「売上データ」を読み込む「ソーストランスフォーメーション」も設定します。
加工処理を行うトランスフォーメーションの設定
ジョイナトランスフォーメーションの設定 -2つのデータソースの結合 -
「商品マスターデータ」の「a1」カラムと、「売上データ」の「c3」カラムは同じ「商品コード」を示しており、これらを用いて2つのテーブルを結合します。
2つデータソースを結合する際は「ジョイナトランスフォーメーション」を利用して実施します。
ジョイナトランスフォーメーションをマッピングに加えた後に、結合したいデータソースをMaster、Detailのいずれかにつなげます。結合条件の違いで、意識すべきポイントとなります。
今回、「商品マスターデータ」を参照して「売上データ」に情報を付与する、といった条件下の元加工を実施します。SQLの操作でいうところの「内部結合」を実施します。
そのため、Masterに「商品マスターデータ」を、Detailに「売上データ」をつなぎます。
それぞれのソースを接続した後に、受信フィールドを確認すると「フィールド名」と呼ばれる、読み込んだデータの列名が競合しているといった警告が出ています。
これは、ソースデータを読み込んだ際に付与される「FileName」という名前の列が、「master」,「list」双方のソースに存在するために発生しています。
競合を解決するために「フィールド名の競合の解決」をクリックし、フィールド名を編集します。
一括名前変更オプションを活用してフィールド名を変更します。一括オプションの内容で、どの位置に文字を付与するかを指定できます。
今回は「master」のフィールド名の末尾に共通して「_master」という文字を加えるように設定を行います。
指定後のフィールド名を確認すると、「master」 に指定したデータのフィールド名に「_master」が付与されたことが確認できます。
競合がなくなった後は、結合条件の設定を行います。
- 結合タイプ:ノーマル。内部結合を行います。
- 結合条件:簡易。各データソースから1つの列を1つの演算子で比較する条件を設定していきます。
結合条件は「+」アイコンをクリックして作成します。
条件のレコードが追加され、マスタ(Master に指定したデータソース)、詳細(Detail に指定したデータソース)から条件にするカラムを選択します。
各カラムを選択した後には演算子を選択します。今回は、「商品マスターデータ」の「商品コード」と「売上データ」の「商品コード」が同じもの同士をつなぎ合わせるので、「=(等号)」を入力します。
ここまで設定を終えた後は、作成したジョイナトランスフォーメーションのプレビューを利用し、情報を結合できているかを確認します。無事に、結合元のそれぞれが持つ「商品コード」に相当する列の情報を利用して結合ができていることが確認できます。
式トランスフォーメーションの設定 - データの型変換 -
ジョイナトランスフォーメーションやソーストランスフォーメーションの操作では、SQL やコードを書かずとも、データを抽出したり、結合の操作を行ってきました。
データに対して細かい条件をもとに処理をしたい場合には「式トランスフォーメーション」を利用して、関数を組み合わせて処理の記述することで、データの加工が行えます。
関数一覧はこちらから確認できます。
docs.informatica.com
今回は、取得したデータの型を格納先に合わせて変更する、といった処理を式トランスフォーメーションで実施します。
実施したい加工としては、文字列として取り込んだ「c2」カラムの日付を、Informatica 上の日付の型「date/time」の型として認識するカラムを作成する。といったことを実施します。
マッピングに「式トランスフォーメーション」を追加します。「式」をクリックし「+」アイコンをクリックすることでフィールドを作成することができます。
フィールドの設定画面です。式トランスフォーメーションでは、作成する式によって処理されたカラムを新しい列、つまり新しいフィールドとして持つことになります。処理前の列も処理後の列も両方保持することができます。作成するフィールドの名前を「名前」に入力し、フィールドの型を「タイプ」で指定します。
フィールドの設定を行った後は、追加されたフィールドの「式」の列にある「設定」をクリックし、式の内容を記述していきます。
式には関数や参照するフィールド名を記述し、処理の内容を定義します。今回は「TO_DATE 関数」を利用して、フィールド「c2」の内容を date/time の型に変換しています。
TO_DATE 関数の記述方法については以下を参照しています。
docs.informatica.com
式に記載した関数の文法確認は、右上の「検証」をクリックすることで確認ができます。
画面左に、「式は有効です。」と、緑のチェックアイコンが表示されていれば、文法上の誤りはなく、プレビューやマッピング自体が実行できるようになります。
式は設定後、入力内容が式の列から確認できるようになります。
式設定後に、式トランスフォーメーション内でプレビューを実施します。
ターゲットトランスフォーメーションの設定
最後に、加工したデータを必要なものだけ格納するターゲットトランスフォーメーションの設定を行います。
ターゲットトランスフォーメーションの「ターゲット」を選択し「接続」から、データを格納する先の接続(Connector)を選択します。
今回は Snowflake のテーブルに書き込むので、Snowflake 用の接続(Connector)を選択します。
接続設定後は「オブジェクト」の「選択」から、データを書き込むテーブルを選択します。
「データベース」→「スキーマ」と順番に選択していき、対象のテーブルを選択します。
オブジェクトが設定されていることを確認したら、そのテーブルに対する操作内容も確認します。今回はデータを格納するだけですので、デフォルトの「Insert」を選択します。
ターゲットのテーブルの設定が終わり、最後に「フィールドマッピング」を行います。
フィールドマッピングは、「どのフィールドをターゲットに書き込むか」を設定する作業です。
以下画像左側の受信フィールドは、ターゲットトランスフォーメーションまでに作成されたフィールドが並べられています。画像右側の「ターゲットフィールド」の「フィールド名」は Snowflake のテーブルにあるカラム名です。
「受信フィールド」の「フィールド名を選択」し、「ターゲットフィールド」の「マッピングされたフィールド」に紐づけることで、Snowflake のテーブルにフィールドの値を格納することができます。
改めて、テーブルに欲しい情報は
- 販売した年月日の情報
- 商品名
- 販売価格
です。
必要なデータを以下の、直前の式トランスフォーメーションのプレビューから検討すると、赤い四角で囲ったカラムがそれに該当します。
このフィールドを適切なターゲットのフィールドに紐づける必要があります。
基本的にはマウスのドラッグアンドドロップの操作で、左の「受信フィールド」の「フィールド名」から右の「ターゲットフィールド」の「マッピングされたフィールド」へ紐づけるのですが、フィールド名から紐づけるフィールドを推測し、紐づけを行う機能、「スマートマップ」があります。
「自動マップ」をクリックして「スマートマップ」を選択します。
スマートマップで、Snowflakeのテーブル存在するカラム名と全く同じ「sales_date」フィールドが受信フィールドからターゲットフィールドに選択されました。
残りは手動で紐づけを行います。
product_name:a2_master(商品名)
price:a4_master(販売価格)
ここまで設定を行ったらいよいよ実行です。
マッピング実行
実行と進行状況の確認
無事マッピングの設定が終わり、ジョブの状態が「有効」であれば、マッピングを実行することができます。
画面右上の「実行」ボタンをクリックします。
自動で選択されますが、必要に応じて実行するランタイム環境を選択します。内容に問題なければ、画面右下の実行ボタンをクリックで、マッピングが開始されます。
実行後は「マイジョブ」から実行状況を確認することができます。
「マイジョブ」は画面左のメニューから確認ができます。
ジョブステータスの画面です。ジョブの実行結果や、読み書きしたデータの行数を確認することが可能です。そのほか、詳細なジョブ実行状況や、エラーログの内容を確認することができる「セッションログ」もこちらの画面からダウンロードして確認することが可能です。
結果の確認
Snowflakeに格納されているテーブルを確認すると、必要なデータがしっかり格納されていることがわかります。
おわりに
本記事では、実際にマッピングを作成し、データ加工一連の処理を実践しました。
本記事までに、Informatica製品を利用してデータ加工をするために必要な環境構築から、処理までを記事化してきました。
今後は実ケースなどを元に、加工処理や機能のナレッジ展開を実施できればと思います。