Microsoft Agent Framework では、手順が決まった処理は Workflow(静的フロー)で、実行しながら進め方が変わる協調タスクは Magentic(動的オーケストレーション)で実装できます。
本記事では、Pythonのサンプルコードを通して両者の実装方法と、実務での使い分けの考え方を整理します。
- 実行環境
- 環境変数
- WorkflowとMagenticの関係
- Workflow(ワークフロー基盤):静的な処理パイプライン
- Magentic(オーケストレーション):動的なマルチエージェント協調
- 最後に
実行環境
- Python 3.12
- agent-framework 1.0.0b260107
環境変数
今回の記事では、Azure OpenAI のリソースを利用します。以下の環境変数を設定します。
AZURE_OPENAI_ENDPOINT="https://<リソース名>.openai.azure.com/" AZURE_OPENAI_CHAT_DEPLOYMENT_NAME="<デプロイメント名>" AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME="<Magenticで利用するデプロイメント名>" # Azure CLI 認証を使うため az login が必要です(本記事のコードは APIキーを使用しません)
WorkflowとMagenticの関係
Workflow は、Executor をつないで処理を流すための“ワークフロー基盤”です。実行の進行管理に加えて、メッセージのルーティングやイベント(ストリーミング)出力といった共通機能を提供します。
Magentic は、その Workflow 基盤の上に実装された“マルチエージェント協調パターン(テンプレート)”です。専用のマネージャーが複数の専門エージェントを状況に応じて動的に選択し、反復的にタスクを進める“協調オーケストレーション”を実現します。
なお、Workflow は「静的フロー専用」という意味ではありません。Workflow は基本的に手順や入出力が整理できる処理を、明示的なグラフとして設計するのに向いていますが、実装としては条件分岐(実行結果に応じたルーティング) のような表現も可能です。
一方で Magentic は、分岐条件を事前に固定するのではなく、LLMベースのマネージャーが実行中の状況(進捗・コンテキスト・各エージェントの役割)を見て「次に誰を動かすか」を都度判断する協調パターンです。
使い分けの目安です。
- 手順が事前に描ける(定型パイプライン) → WorkflowBuilder
- 解き方が実行しながら変わる(調査→再計算→再調査…) → MagenticBuilder
Workflow(ワークフロー基盤):静的な処理パイプライン
このサンプルでは、Azure OpenAI で作成した2つのエージェント(Writer → Reviewer)を、それぞれ Executor としてWorkflowに組み込み、固定の順序(静的フロー)で実行します。
サンプル:2つのエージェント実行を直列につなぐ(Azure OpenAI エージェントをWorkflowに組み込む)
このコードでは、次の Writer と Reviewer のフローを実行します。
- Writer(Executor)
- ユーザー入力を受け取り、Azure OpenAI エージェントで下書き(コンテンツ生成)を行い、会話履歴(list[ChatMessage])を次のステップへ渡します。
- Reviewer(Executor)
- Writerが生成した会話履歴を受け取り、Azure OpenAI エージェントでレビュー/推敲して最終結果を作成し、yield_output() でWorkflowの出力として返します。
# Copyright (c) Microsoft. All rights reserved. import asyncio from agent_framework import ( ChatAgent, ChatMessage, Executor, ExecutorFailedEvent, WorkflowBuilder, WorkflowContext, WorkflowFailedEvent, WorkflowRunState, WorkflowStatusEvent, handler, ) from agent_framework._workflows._events import WorkflowOutputEvent from agent_framework.azure import AzureOpenAIChatClient from azure.identity import AzureCliCredential from typing_extensions import Never """ Step 3: Agents in a workflow with streaming A Writer agent generates content, then passes the conversation to a Reviewer agent that finalizes the result. The workflow is invoked with run_stream so you can observe events as they occur. Purpose: Show how to wrap chat agents created by AzureOpenAIChatClient inside workflow executors, wire them with WorkflowBuilder, and consume streaming events from the workflow. Demonstrate the @handler pattern with typed inputs and typed WorkflowContext[T_Out, T_W_Out] outputs. Agents automatically yield outputs when they complete. The streaming loop also surfaces WorkflowEvent.origin so you can distinguish runner-generated lifecycle events from executor-generated data-plane events. Prerequisites: - Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. - Authentication via azure-identity. Use AzureCliCredential and run az login before executing the sample. - Basic familiarity with WorkflowBuilder, executors, edges, events, and streaming runs. """ class Writer(Executor): """Custom executor that owns a domain specific agent for content generation. This class demonstrates: - Attaching a ChatAgent to an Executor so it participates as a node in a workflow. - Using a @handler method to accept a typed input and forward a typed output via ctx.send_message. """ agent: ChatAgent def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "writer"): # Create a domain specific agent using your configured AzureOpenAIChatClient. self.agent = chat_client.create_agent( instructions=( "You are an excellent content writer. You create new content and edit contents based on the feedback." ), ) # Associate this agent with the executor node. The base Executor stores it on self.agent. super().__init__(id=id) @handler async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None: """Generate content and forward the updated conversation. Contract for this handler: - message is the inbound user ChatMessage. - ctx is a WorkflowContext that expects a list[ChatMessage] to be sent downstream. Pattern shown here: 1) Seed the conversation with the inbound message. 2) Run the attached agent to produce assistant messages. 3) Forward the cumulative messages to the next executor with ctx.send_message. """ # Start the conversation with the incoming user message. messages: list[ChatMessage] = [message] # Run the agent and extend the conversation with the agent's messages. response = await self.agent.run(messages) messages.extend(response.messages) # Forward the accumulated messages to the next executor in the workflow. await ctx.send_message(messages) class Reviewer(Executor): """Custom executor that owns a review agent and completes the workflow.""" agent: ChatAgent def __init__(self, chat_client: AzureOpenAIChatClient, id: str = "reviewer"): # Create a domain specific agent that evaluates and refines content. self.agent = chat_client.create_agent( instructions=( "You are an excellent content reviewer. You review the content and provide feedback to the writer." ), ) super().__init__(id=id) @handler async def handle(self, messages: list[ChatMessage], ctx: WorkflowContext[Never, str]) -> None: """Review the full conversation transcript and yield the final output. This node consumes all messages so far. It uses its agent to produce the final text, then yields the output. The workflow completes when it becomes idle. """ response = await self.agent.run(messages) await ctx.yield_output(response.text) async def main(): """Build the two node workflow and run it with streaming to observe events.""" # Create the Azure chat client. AzureCliCredential uses your current az login. chat_client = AzureOpenAIChatClient(credential=AzureCliCredential()) # Instantiate the two agent backed executors. writer = Writer(chat_client) reviewer = Reviewer(chat_client) # Build the workflow using the fluent builder. # Set the start node and connect an edge from writer to reviewer. workflow = WorkflowBuilder().set_start_executor(writer).add_edge(writer, reviewer).build() user_text = await asyncio.to_thread( input, "User message (blank for default): ", ) if not user_text.strip(): user_text = "Create a slogan for a new electric SUV that is affordable and fun to drive." # Run the workflow with the user's initial message and stream events as they occur. # This surfaces executor events, workflow outputs, run-state changes, and errors. async for event in workflow.run_stream( ChatMessage(role="user", text=user_text) ): if isinstance(event, WorkflowStatusEvent): prefix = f"State ({event.origin.value}): " if event.state == WorkflowRunState.IN_PROGRESS: print(prefix + "IN_PROGRESS") elif event.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS: print(prefix + "IN_PROGRESS_PENDING_REQUESTS (requests in flight)") elif event.state == WorkflowRunState.IDLE: print(prefix + "IDLE (no active work)") elif event.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS: print(prefix + "IDLE_WITH_PENDING_REQUESTS (prompt user or UI now)") else: print(prefix + str(event.state)) elif isinstance(event, WorkflowOutputEvent): print(f"Workflow output ({event.origin.value}): {event.data}") elif isinstance(event, ExecutorFailedEvent): print( f"Executor failed ({event.origin.value}): " f"{event.executor_id} {event.details.error_type}: {event.details.message}" ) elif isinstance(event, WorkflowFailedEvent): details = event.details print(f"Workflow failed ({event.origin.value}): {details.error_type}: {details.message}") else: print(f"{event.__class__.__name__} ({event.origin.value}): {event}") if __name__ == "__main__": asyncio.run(main())
実行結果
以下の結果、意図どおりwriterのエージェント後にreviewerのエージェントが実行されています。
User message (blank for default): Microsoft Agent FrameworkのWorkflowとMagenticに関してのタイトル案を考えてください。 WorkflowStartedEvent (FRAMEWORK): WorkflowStartedEvent(origin=WorkflowEventSource.FRAMEWORK, data=None) State (FRAMEWORK): IN_PROGRESS ExecutorInvokedEvent (FRAMEWORK): ExecutorInvokedEvent(executor_id=writer, data=<agent_framework._types.ChatMessage object at 0x00000251D4BE1850>) ExecutorCompletedEvent (FRAMEWORK): ExecutorCompletedEvent(executor_id=writer, data=[[<agent_framework._types.ChatMessage object at 0x00000251D4911430>, <agent_framework._types.ChatMessage object at 0x00000251D5097080>]]) SuperStepStartedEvent (EXECUTOR): SuperStepStartedEvent(iteration=1, data=None) ExecutorInvokedEvent (FRAMEWORK): ExecutorInvokedEvent(executor_id=reviewer, data=[<agent_framework._types.ChatMessage object at 0x00000251D4BE1A60>, <agent_framework._types.ChatMessage object at 0x00000251D4BE1BB0>]) Workflow output (FRAMEWORK): 以下のようなタイトル案はいかがでしょうか。 1. 「Microsoft Agent FrameworkのWorkflow設計とMagentic活用ガイド」 2. 「Microsoft Agent FrameworkにおけるWorkflow管理とMagentic連携手法」 3. 「効率化を実現するMicrosoft Agent Framework WorkflowとMagenticの活用」 4. 「Microsoft Agent Framework Workflowの最適化とMagentic導入事例」 5. 「Magenticで強化するMicrosoft Agent FrameworkのWorkflow設計」 もしご用途やターゲットがあれば、さらにカスタマイズも可能です。お気軽にご連絡ください。 ExecutorCompletedEvent (FRAMEWORK): ExecutorCompletedEvent(executor_id=reviewer, data=['以下のようなタイトル案はいかが でしょうか。\n\n1. 「Microsoft Agent FrameworkのWorkflow設計とMagentic活用ガイド」\n2. 「Microsoft Agent Frameworkにお けるWorkflow管理とMagentic連携手法」\n3. 「効率化を実現するMicrosoft Agent Framework WorkflowとMagenticの活用」\n4. 「Microsoft Agent Framework Workflowの最適化とMagentic導入事例」\n5. 「Magenticで強化するMicrosoft Agent FrameworkのWorkflow設計」\n\nもしご用途やターゲットがあれば、さらにカスタマイズも可能です。お気軽にご連絡ください。']) SuperStepCompletedEvent (EXECUTOR): SuperStepCompletedEvent(iteration=1, data=None) State (FRAMEWORK): IDLE (no active work)
Magentic(オーケストレーション):動的なマルチエージェント協調
Magenticは、研究役(調査)、コード役(計算)のような 専門エージェントを用意し、マネージャーが次に誰を動かすかを都度判断してタスクを進めます。
サンプル:Researcher/Coder + Manager でMagenticワークフロー
このサンプルでは、ResearcherAgent(調査役)と CoderAgent(計算/コード実行役)という 専門エージェントを用意します(Coderは HostedCodeInterpreterTool を利用可能)。
MagenticManager(マネージャー役)が、タスクの進捗に応じて どのエージェントを次に動かすかを動的に選び、最大ラウンド数などの制約(max_round_count 等)の範囲で協調させます。
参照:Microsoft Agent Framework ワークフロー オーケストレーション - Magentic | Microsoft Learn
import asyncio from typing import cast from agent_framework import ( MAGENTIC_EVENT_TYPE_AGENT_DELTA, MAGENTIC_EVENT_TYPE_ORCHESTRATOR, AgentRunUpdateEvent, ChatAgent, ChatMessage, HostedCodeInterpreterTool, MagenticBuilder, WorkflowOutputEvent, ) from agent_framework.azure import AzureOpenAIChatClient, AzureOpenAIResponsesClient from azure.identity import AzureCliCredential async def main(): # Azure CLI で事前にログインしておく # az login credential = AzureCliCredential() # Azure OpenAI クライアント(Agent Framework) chat_client = AzureOpenAIChatClient(credential=credential) responses_client = AzureOpenAIResponsesClient(credential=credential) # Specialist agents researcher_agent = ChatAgent( name="ResearcherAgent", description="Specialist in research and information gathering", instructions=( "You are a Researcher. You find information without additional computation." ), # 検索系モデル/デプロイを使う想定(環境側で対応するデプロイを設定) chat_client=chat_client, ) coder_agent = ChatAgent( name="CoderAgent", description="Writes and executes code to analyze data.", instructions="You solve questions using code.", chat_client=responses_client, tools=HostedCodeInterpreterTool(), ) # Manager agent manager_agent = ChatAgent( name="MagenticManager", description="Orchestrator that coordinates specialist agents", instructions="You coordinate a team to complete complex tasks efficiently.", chat_client=chat_client, ) workflow = ( MagenticBuilder() .participants(researcher=researcher_agent, coder=coder_agent) .with_standard_manager( agent=manager_agent, max_round_count=6, max_stall_count=2, max_reset_count=1, ) .build() ) task = await asyncio.to_thread( input, "Task (blank for default): ", ) if not task.strip(): task = "Compare pros/cons of two approaches and compute a small example with code." output: str | None = None async for event in workflow.run_stream(task): if isinstance(event, AgentRunUpdateEvent): props = event.data.additional_properties if event.data else None et = props.get("magentic_event_type") if props else None if et == MAGENTIC_EVENT_TYPE_ORCHESTRATOR: kind = props.get("orchestrator_message_kind", "") if props else "" text = event.data.text if event.data else "" print(f"\n[ORCH:{kind}]\n{text}\n") elif et == MAGENTIC_EVENT_TYPE_AGENT_DELTA: agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id if event.data and event.data.text: print(f"[STREAM:{agent_id}] {event.data.text}", end="", flush=True) elif isinstance(event, WorkflowOutputEvent): msgs = cast(list[ChatMessage], event.data) if msgs: output = msgs[-1].text if output: print("\n\n=== FINAL OUTPUT ===\n", output) if __name__ == "__main__": asyncio.run(main())
実行結果
以下の結果では、ワークフローの実行・オーケストレーターイベント・エージェントストリーミング・最終出力の一連の流れが実行されています。
出力結果が長かったため、中間は省略しています。
Task: 代表的なマルチモーダル生成モデル(例:画像生成+キャプション)とテキスト専用生成モデルの学習 ・推論の推定 kWh と kgCO2e を表で示し、出力品質(例:FID/ROUGE 等)や速度と合わせて利点・欠点を簡潔に比較してください。前提(データセット規模、GPU仕様、PUE、地域の炭素強度)を明記してください。
[ORCH:user_task]
代表的なマルチモーダル生成モデル(例:画像生成+キャプション)とテキスト専用生成モデルの学習・推論の推定 kWh と kgCO2e を表で示し、出力品質(例:FID/ROUGE 等)や速度と合わせて利点・欠点を簡潔に比較してください。前提(データセット規模、GPU仕様、PUE、地域の炭素強度)を明記してください。
[ORCH:task_ledger]
We are working to address the following user request:
代表的なマルチモーダル生成モデル(例:画像生成+キャプション)とテキスト専用生成モデルの学習・推論の推定 kWh と kgCO2e を表で示し、出力品質(例:FID/ROUGE 等)や速度と合わせて利点・欠点を簡潔に比較してください。前提(データセット規模、GPU仕様、PUE、地域の炭素強度)を明記してください。
To answer this request we have assembled the following team:
- researcher: Specialist in research and information gathering
- coder: Writes and executes code to analyze data.
Here is an initial fact sheet to consider:
1. GIVEN OR VERIFIED FACTS
- The request is for a comparison between representative multimodal generative models (e.g., image generation plus captioning) and text-only generative models.
- Metrics should include estimated kWh and kgCO2e for training and inference, along with output quality metrics (e.g., FID for image, ROUGE for text), speed, advantages, and disadvantages.
- Explicit mention to clarify assumptions/premises: dataset scale, GPU specs, PUE, regional carbon intensity.
2. FACTS TO LOOK UP
- Typical energy consumption (kWh) for training and inference of representative multimodal generation models and text-only language models.
- Typical carbon emissions (kgCO2e) associated with the above based on given parameters like PUE and regional carbon intensity.
- Benchmark output quality values: FID scores for image generation models and ROUGE scores for text generation models.
- Inference speed metrics on typical hardware.
- Dataset scales usually used in multimodal and text-only model training.
- PUE (Power Usage Effectiveness) average values for data centers.
- Carbon intensity values for electricity grids by region.
3. FACTS TO DERIVE
- Estimated kWh and kgCO2e values for both types of models from typical GPU hours used, GPU power consumption, PUE, and carbon intensity.
- Relative speed comparisons based on model size and architecture.
- Table format comparing these factors side by side.
4. EDUCATED GUESSES
- Multimodal models (combining image + text) typically require more compute than text-only models due to larger model sizes and more complex architectures.
- Image generation quality often measured by FID (~lower better), text quality by ROUGE (higher better).
- Text-only models tend to be faster and less energy-consuming in inference.
- Multimodal generation may have higher carbon footprint due to more diverse training data and multi-task architectures.
- Typical modern GPUs used might be NVIDIA A100 or similar, with known power draw (~400W per GPU).
- PUE is often around 1.1–1.3 for efficient data centers.
- Carbon intensity depends on the region; roughly 0.4 kgCO2e/kWh average globally but varies.
- Dataset sizes could be on the order of hundreds of millions of tokens for text-only, and millions of image-caption pairs for multimodal models.
Here is the plan to follow as best as possible:
- Researcher:
- Gather authoritative data on typical training and inference energy consumption (kWh) and carbon emissions (kgCO2e) for representative multimodal and text-only generative models.
- Collect benchmark quality metrics (FID for image, ROUGE for text) and inference speed data from recent papers or benchmark reports.
- Compile typical dataset sizes, GPU specifications (e.g., NVIDIA A100), PUE values, and regional carbon intensity figures as assumptions.
- Coder:
- Perform calculations to estimate kWh and kgCO2e using gathered data and assumptions (e.g., GPU power draw, training hours, PUE, carbon intensity).
- Create a clear comparison table summarizing estimated energy use, emissions, quality scores, speed, and pros/cons of each model type.
- Joint:
- Review and ensure clarity, accuracy, and concise presentation of results in table and bullet points highlighting trade-offs and comparative advantages.
[ORCH:instruction]
Please collect typical training and inference compute time (GPU hours) and power usage, dataset sizes, benchmark output quality (FID for images, ROUGE for text), typical inference speeds, GPU specs (e.g., A100), PUE values, and regional carbon intensity figures for representative multimodal (image+caption) and text-only generation models.
[STREAM:researcher] Certainly[STREAM:researcher] ![STREAM:researcher] Below[STREAM:researcher] is[STREAM:researcher] a[STREAM:researcher] detailed[STREAM:researcher] summary[STREAM:researcher] of[STREAM:researcher] typical[STREAM:researcher] metrics[STREAM:researcher] and[STREAM:researcher] characteristics[STREAM:researcher] for[STREAM:researcher] representative[STREAM:researcher] multim[STREAM:researcher] odal[STREAM:researcher] ([STREAM:researcher] image[STREAM:researcher] +[STREAM:researcher] caption[STREAM:researcher] )[STREAM:researcher] and[STREAM:researcher] text[STREAM:researcher] -only[STREAM:researcher] generation[STREAM:researcher] models[STREAM:researcher] ,[STREAM:researcher] based[STREAM:researcher] on[STREAM:researcher] publicly[STREAM:researcher] available[STREAM:researcher] literature[STREAM:researcher] and[STREAM:researcher] reports[STREAM:researcher] up[STREAM:researcher] to[STREAM:researcher] mid[STREAM:researcher] -[STREAM:researcher] 202[STREAM:researcher] 4[STREAM:researcher] .
[STREAM:researcher] ---
[STREAM:researcher] ##[STREAM:researcher] Mult[STREAM:researcher] im[STREAM:researcher] odal[STREAM:researcher] Models[STREAM:researcher] ([STREAM:researcher] Image[STREAM:researcher] +[STREAM:researcher] Caption[STREAM:researcher] Generation[STREAM:researcher] )
...
=== FINAL OUTPUT ===
以下に、代表的なマルチモーダル生成モデル(画像生成+キャプション)とテキスト専用生成モデルの学習および推論における推定エネルギー消費量(kWh)・CO2排出量(kgCO2e)、出力品質指標、推論速度、そして利点・欠点をまとめた比較表を示します。
---
## 【前提条件】
- **データセット規模:** 大規模(数百万~数億の画像+キャプションペア、数百億~兆トークンのテキスト)
- **GPU仕様:** NVIDIA A100(消費電力 約300W)
- **Power Usage Effectiveness (PUE):** 1.2(効率的なデータセンター想定)
- **地域の炭素強度:** 0.4 kgCO2e / kWh(世界平均に準ずる)
---
## 【比較表】
| 項目 | マルチモーダル生成モデル (画像+キャプション) | テキスト専用生成モデル |
|--------------------|---------------------------------------------|------------------------------------|
| **学習時のGPU時間** | 約30,000 GPU時間 | 約20,000 GPU時間 |
| **学習エネルギー消費量** | 約10,800 kWh | 約7,200 kWh |
| **学習CO2排出量** | 約4,320 kgCO2e | 約2,880 kgCO2e |
| **推論エネルギー消費量** (1Mサンプルあたり) | 約200 kWh | 約50 kWh
|
| **推論CO2排出量** (1Mサンプルあたり) | 約80 kgCO2e | 約20 kgCO2e
|
| **出力品質指標** | FID 25(数値が低いほど良い画像品質の指標) | ROUGE-L 45(数値が高いほど良いテキスト品質) |
| **推論速度** | 約0.5 サンプル/秒 | 約2 サンプル/秒 |
| **利点** | ・複数モダリティを扱い、多彩でリッチな生成が可能<br>・画像とテキストの統合的理解・生成に強い | ・推論が高速かつコストが低い<br>・テキスト生成タスクに特化し高品質な生成が可能 |
| **欠点** | ・学習・推論コストが高い<br>・推論速度が遅め | ・マルチモーダル情報を扱えない<br>・ 画像情報などの非テキスト情報を活用できない |
---
### 【まとめ】
- マルチモーダルモデルは、画像とテキストを統合的に生成できる反面、学習・推論にかかるエネルギーや炭素排出が大きく、推論 速度も遅い傾向にあります。
- テキスト専用モデルは、同等かそれ以上のテキスト生成品質を持ちながら、推論が高速で環境負荷も比較的低い点が特徴です。
- 利用目的や求める出力形式に応じて、環境負荷や速度、品質のバランスを考慮してモデル選択をすることが重要です。
ご要望があれば、さらに詳細なモデル別の比較や、特定の環境・条件下での推定も対応いたします。
最後に
Microsoft Agent Framework は、WorkflowBuilder によって静的な処理フローを堅牢に組み立てることが可能です。一方で、MagenticBuilder を使うことで、状況に応じた動的なマルチエージェント協調も実現できます。
今後は周辺機能の最新アップデートや推奨パターンの動向にも注目していきたいです。