Microsoft Fabric Apache Sparkジョブ定義を使ってみた

Data&AI事業本部所属の福濵です。2023年に入社しMicrosoft Fabricについて基礎から勉強しています。

本記事では、Apache Sparkジョブ定義の実装方法についてご紹介します。

はじめに

Apache Sparkジョブ定義(以降、Sparkジョブ定義)とは、Data Engineeringにあるアイテムの1つです。

Sparkを使用してデータの変換や分析などを行うには、NotebookまたはApache Sparkジョブ定義を使用した方法があります。Notebookはデータを対話的に変換、分析できるのに対し、Apache Sparkジョブ定義ではスケジュールに基づいた実行が可能です。

参考:Apache Spark ジョブ定義 - Microsoft Fabric | Microsoft Learn

本記事では、ローカルのスクリプトファイル(.py)を使用したSparkジョブ定義の実装をご紹介します。

Apache Sparkジョブ定義実装の全体像

本記事では、レイクハウスのFilesフォルダー内にある「JapanPopulation_2023.csv」のテーブル読み込みを、手動および定期実行するSparkジョブ定義を作成します。

今回は、SKUがF2のFabric容量を割り当てたワークスペース内で実装を行います。

※ 実装は以下のページを参考にしています。

参考:Apache Spark ジョブ定義を作成する - Microsoft Fabric | Microsoft Learn

作成物は以下の通りです。

  • レイクハウス:Lakehouse01
    • csvファイル、テーブルを格納
  • 参照スクリプトファイル:createTablefromCSV.py
    • レイクハウス内の対象ファイルを対象テーブルに読み込む
  • Apache Sparkジョブ定義:Spark_tes
    • 参照するスクリプトファイル、実行先および定期実行の設定

構成図

今回の構成図は下記の通りです。

主な処理は下記の通りです。

  • Sparkジョブ定義の実行先にJapanPopulation_2023.csvファイルを指定し、JapanPopulationテーブルを読み込み先として指定
  • Sparkジョブ定義は5分毎に定期実行

実装

以下の流れで実装を行います。

  1. 前準備(csvファイル、参照スクリプトファイルの準備)
  2. レイクハウス(Lakehouse01)の作成
  3. csvファイル(JapanPopulation_2023.csv)の格納
  4. Sparkジョブ定義の作成(Spark_test)
  5. Sparkジョブ定義の実行
  6. 確認

前準備

csvファイルの準備

1. 以下のページからExcelファイルをダウンロードします。

総務省統計局(人口推計 全国 1 年齢(各歳)、男女別人口及び人口性比-総人口、日本人人口(2023年10月1日現在) | ファイル | 統計データを探す | 政府統計の総合窓口 (e-stat.go.jp)

2. 以下のように体裁を修正し、「JapanPopulation_2023.csv」としてローカルに保存します。

スクリプトフファイルの準備

以下のコードを記載した「createTablefromCSV.py」ファイルを作成します。

import sys
import os
#import Constant
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf


if __name__ == "__main__":

    #Spark session builder
    spark_session = (SparkSession
          .builder
          .appName("sjdsampleapp") 
          .config("spark.some.config.option", "some-value")
          .getOrCreate())
    
    spark_context = spark_session.sparkContext
    spark_context.setLogLevel("DEBUG")



    print("spark.synapse.pool.name : " + spark_session.sparkContext.getConf().get("spark.synapse.pool.name")) 
    print() 
    print("spark.driver.cores : " + spark_context.getConf().get("spark.driver.cores")) 
    print("spark.driver.memory : " + spark_context.getConf().get("spark.driver.memory")) 
    print("spark.executor.cores : " + spark_context.getConf().get("spark.executor.cores")) 
    print("spark.executor.memory : " + spark_context.getConf().get("spark.executor.memory")) 
    print("spark.executor.instances: " + spark_context.getConf().get("spark.executor.instances")) 
    print() 
    print("spark.dynamicAllocation.enabled : " + spark_context.getConf().get("spark.dynamicAllocation.enabled")) 
    print("spark.dynamicAllocation.maxExecutors : " + spark_context.getConf().get("spark.dynamicAllocation.maxExecutors")) 
    print("spark.dynamicAllocation.minExecutors : " + spark_context.getConf().get("spark.dynamicAllocation.minExecutors")) 
    
    #tableName = "JapanPopulation"
    # You can download the sample CSV file from this site "https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page" and upload it to the files section of the lakehouse. 
    csvFilePath = "Files/JapanPopulation_2023.csv"
    #deltaTablePath = SaveToLH + "/Tables/" + tableName
    deltaTablePath = "Tables/JapanPopulation"

    df = spark_session.read.format('csv').options(header='true', inferschema='true').load(csvFilePath)
    df.write.mode('overwrite').format('delta').save(deltaTablePath)

※以下のページを参考し、一部を書き換えて作成をしています。

fabric-samples/docs-samples/data-engineering/createTablefromCSV.py at main · microsoft/fabric-samples · GitHub

レイクハウスの作成

1. Fabricを開き「Data Engineering」を選択します。

2. 「新規」の「レイクハウス」を選択し、レイクハウス名を入力して「作成」を押下します。

3. ワークスペースにレイクハウスが追加されたことを確認します。

サンプルファイルの格納

1. レイクハウスの画面に切り替わったら「データの取得」>「ファイルのアップロード」を押下します。

 

2. 事前に準備したcsvファイル(JapanPopulation_2023.csv)を選択し、「アップロード」を押下します。

3. Filesフォルダー内にcsvファイルが格納されました。

Sparkジョブ定義の作成

1. ワークスペースに切り替え、「新規」>「Sparkジョブ定義」を選択します。

2. 任意の名前を入力し、「作成」を押下します。

3. Sparkジョブ定義の画面に切り替わります。言語が「PySpark(Python)」であることを確認し、参照するスクリプトファイルをアップロードします。

4. メイン定義ファイルの「アップロード」を押下し、事前に準備した「createTabefromCSV.py」ファイルを選択します。

5. Lakehouse参照の「+追加」を押下し、作成したレイクハウスを選択し「追加」を押下します。
※ 今回は、追加の参照ファイルおよびコマンドライン引数の設定は行いません。

Sparkジョブ定義の実行

1. 「実行」を押下すると、Sparkジョブ定義の手動実行ができます。実行後、下の実行タブから実行ログを確認できます。

2. 「設定(歯車のボタン)」>「スケジュール」からスケジュール実行の設定を行うことができます。今回は5分毎に実行するスケジュールを設定しました。

3. 実行ログを確認します。5分毎に実行されていることが確認できます。

実行結果の確認

レイクハウスを開き、Tablesフォルダー内にJapanPopulationテーブルが追加されていることを確認します。

これで、Sparkジョブ定義を使用した、csvファイルのテーブル読み込みの実装が完了しました。

おわりに

今回はMicrosoft FabricのSparkジョブ定義の実装を行いました。

本実装では、ローカルのスクリプトを参照しましたが、Sparkジョブ定義ではGit統合を使用したスクリプトの参照(プレビュー※2024/12/04時点)等も可能です。

参考:Spark ジョブ定義のソース管理 - Microsoft Fabric | Microsoft Learn

本記事が、Microsoft Fabric学習の参考になれば幸いです。

執筆担当者プロフィール
福濵 比南

福濵 比南(日本ビジネスシステムズ株式会社)

2023年度入社。Data&AIプラットフォーム部所属。

担当記事一覧