Azure Machine LearningでトレーニングしたAIモデルをJetBotに自動デプロイする

先端技術部テクノロジーリサーチグループの色部です。

本記事では、以下の記事でトレーニングしたJetBotのAIモデルを、JetBotに自動でデプロイする仕組みについて紹介します。

blog.jbs.co.jp

前提

前提条件については、上記掲載の記事までと同様になりますが、Azure Machine Learning(以下、AML)からの自動デプロイにあたって、下記のリソースを新たに使用します。

  • Azure Blob Storage(以下、Blobストレージ)
  • Azure Functions
  • Device Twins(Iot Hub)

リソース構成

自動デプロイのためのリソース構成は、下図の通りです。

  1. AMLから、トレーニング済みのモデルを、Blobストレージにプッシュします。
  2. Azure Functionsで、モデルのSASトークンを取得し、Device Twinsに追加します。
  3. JetBot側から、SASトークンの更新をトリガーに、モデルをダウンロードします。

モデルのデプロイに使用するリソースとして、IoT HubのIoT Centralの使用も検討しましたが、今回は既にIoT HubをJetBotと連携している背景もあり、IoT HubのDevice Twinsからモデルがダウンロードできないかを考えました。

また、Azure環境の外から委任アクセス(およびダウンロード)できる必要があったため、Azureリソースへの制限付きアクセス権を付与するSAS(Shared Access Signatures)の概念を用いた、SASトークンを使用する仕組みにしました。

順を追って確認していきます。

モデルのプッシュ

まず、AMLから、トレーニングしたモデルを、Blobストレージにプッシュします。

モデルのプッシュは、上記掲載の記事にて作成したモデルトレーニング、モデルの精度評価を行うパイプラインに、YAMLを定義してコンポーネントを追加する形で構築します。

また、モデルのアップロードを実行するスクリプトも、他のパイプラインに倣い、コンポーネント内に含める形としました。

特に下記に留意して、モデルのアップロードを行います。

  • モデルレジストリの「最新」モデルを、Blobストレージにアップロードする
  • モデルレジストリの「すべての」モデルを、Blobストレージにアップロードする

編集後、パイプラインを実行すると、指定したBlobストレージに、モデルが追加されるようになります。

SASトークンの取得と追加

次に、BlobストレージにプッシュされたモデルのSASトークンを、Azure Functionsを使って取得します。

また、取得したSASトークンを、IoT HubのDevice Twinsにアップロードします。

下記手順を参考に、スクリプトを作成します。

learn.microsoft.com

C#、JavaScript等でも作成できますが、今回はPythonで作成することにします。

また今回は、Blobストレージへのアップロードをトリガーとして、Azure Functionsを作動するようにします。 SASトークンの属性値については、必要に応じて変更してください。

Azure Functions内で実行するスクリプトの一例は、以下の通りです。

import logging
import azure.functions as func
import time
from datetime import datetime, timezone, timedelta
from azure.storage.blob import generate_blob_sas, BlobSasPermissions
from azure.iot.hub import IoTHubRegistryManager
import os

def main(myblob: func.InputStream, context: func.Context):
    data = myblob.read()
    data_size = len(data)
    logging.info(f"Data Size: {data_size} bytes")
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.blob_properties['ContentLength']} bytes")

    # 環境変数からBlob Storage情報を取得
    storage_account_name = os.getenv("STORAGE_ACCOUNT_NAME")
    storage_account_key = os.getenv("STORAGE_ACCOUNT_KEY")
    container_name = os.getenv("CONTAINER_NAME", "azureml")
    blob_name = myblob.name.split('/')[-1]

    # ファイルが目的のファイルであるか確認
    if not any(blob_name.endswith(file) for file in ["model_best.pth", "checkpoint.pth"]):
        return

    # SASトークンを生成
    sas_token = generate_blob_sas(account_name=storage_account_name,
                                  container_name=container_name,
                                  blob_name=blob_name,
                                  account_key=storage_account_key,
                                  permission=BlobSasPermissions(read=True),
                                  start=datetime.now(timezone.utc),
                                  expiry=datetime.now(timezone.utc) + timedelta(hours=1))

    # SAS URLを生成
    sas_url = f"https://{storage_account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
    
    # 環境変数からIoT Hub情報を取得
    iothub_connection_string = os.getenv("IOTHUB_CONNECTION_STRING")
    device_id = os.getenv("DEVICE_ID")

    # 同期エラー回避のために3秒待機
    time.sleep(3)

    # IoT HubのDevice Twinsにアップロード
    registry_manager = IoTHubRegistryManager(iothub_connection_string)

    # 既存のBlobを取得
    twin = registry_manager.get_twin(device_id)
    desired_properties = twin.properties.desired
    existing_blobs = desired_properties.get("blobs", [])

    # 新しいBlobを既存のBlobに追加
    new_blob = {"sas_url": sas_url, "name": blob_name}
    if new_blob not in existing_blobs:
        existing_blobs = [blob for blob in existing_blobs if blob["name"] != blob_name]
        existing_blobs.append(new_blob)
        time.sleep(5)
    desired_properties["blobs"] = existing_blobs

    # 更新する前に最新のDevice Twinsを再度取得
    twin = registry_manager.get_twin(device_id)
    twin_patch = {"properties": {"desired": desired_properties}}
    updated_twin = registry_manager.update_twin(device_id, twin_patch, twin.etag)

上記スクリプトを、Azure Functions経由で実行することで、IoT HubのDevice Twins上に、SASトークンが追加されるようになります。

JSON形式のDevice Twins上では、以下のように格納されるはずです。

"properties": {
        "desired": {
            "$metadata": {
                "$lastUpdated": "<lastUpdated>",
                "$lastUpdatedVersion": 201,
                "blobs": {
                    "$lastUpdated": "<lastUpdated>",
                    "$lastUpdatedVersion": 201
                }
            },
            "$version": 201,
            "blobs": [
                {
                    "name": "checkpoint.pth",
                    "sas_url": "<your_sas_url>"
                },
                {
                    "name": "model_best.pth",
                    "sas_url": "<your_sas_url>"
                }
            ]
        },
        "reported": {
            "$metadata": {
                "$lastUpdated": "<lastUpdated>"
            },
            "$version": 1
        }
    },

モデルの自動ダウンロード

最後に、JetBotに紐づいているIoT HubのDevice Twinsから、JetBotにモデルをダウンロードします。

モデルのダウンロードは、JetBot側で定期実行されるスクリプトを利用して行います。

JetBot側での定期実行は、crontabコマンドを使用して設定します。

「crontab -e」をJetBot側のターミナル画面で実行し、下記のように入力することで、例えば、1時間ごとにスクリプトを定期実行するように設定できます。

0 * * * * bash -c 'cd /home/jetbot/jetbot_ros && /usr/bin/python3 /home/jetbot/jetbot_ros/download_model.py'

また、定期実行するスクリプトについては、下記のように実装します。

  • SASトークンを、IoT HubのDevice Twinsで確認し、SASトークンが更新されていなければ、モデルをダウンロードしないようにする
  • モデルダウンロードのために取得したSASトークンを、次回の更新確認用に、環境変数等に格納する
  • スクリプト実行のため、必要に応じて、Dockerfileの編集や.envファイル、logディレクトリを追加する

スクリプトを設定後、実行ログを確認し、既定のディレクトリにモデルがダウンロードされていれば、成功です。

まとめ

Azure Machine Learning上でトレーニングしたjetbot_rosのRoad Followingモデルを、JetBotに自動でデプロイする仕組みについて紹介しました。

本記事で紹介した仕組みが、読者の方の一助になれば幸いです。

執筆担当者プロフィール
色部 晟洋

色部 晟洋(日本ビジネスシステムズ株式会社)

SharePoint Onlineサイト構築・Microsoft365移行等でプリセールス・PMを経験後、AI等の先端技術を扱う部門に異動。好きな映画は『風立ちぬ』です。

担当記事一覧