Microsoft Agent Framework で A2A を試す: Python でリモート AI エージェント連携を最小構成から理解する

異なる環境やサービスで動作するエージェントを、呼び出し側で個別実装の違いを強く意識せずに連携したい場面があります。

Microsoft Agent Framework で A2A を使えば、そのような外部エージェントを「リモートの AI エージェント」として接続し、Agent Framework から統一的な方法で利用できます。

この記事では、ローカルで実行した AI エージェントを、Agent Framework を使ってリモートの AI エージェントとして接続・利用する方法を紹介します。

Agent Framework で A2A 連携を試したい方や、異なる agent を統一的な方法で呼び出す構成を確認したい方を想定した内容です。Python のサンプルコードを読みながら進めるため、Python に触れたことがあると理解しやすくなっています。

実行環境

  • Python: 3.12.10
  • agent-framework 1.0.0rc4
  • agent-framework-a2a 1.0.0b260311

A2A とは

A2A は Agent-to-Agent Protocol の略で、異なるフレームワークや実装で作られたエージェント同士を、共通のプロトコルで接続するための仕組みです。

Agent Framework の文脈では、A2A を次のように捉えると分かりやすいです。

  • MCP: エージェントがツールやデータソースを使うための標準
  • A2A: エージェント同士が接続し、依頼や応答をやり取りするための標準

A2A を利用することで、接続先エージェントの内部実装を知らなくても、Agent Card でスキルを発見し、メッセージやタスクとして依頼を送り、必要に応じて streaming で進捗や応答を受け取れます。

A2A の詳細は、A2A Protocol で確認できます。

Agent Framework における A2A

Agent Framework では、A2A を使うことで「外部の A2A 準拠エージェントを、リモートの AI エージェントとして接続して利用する」ことができます。

Python の A2A integration を通じて、主に次のことが可能です。

  1. Agent Card を解決して、接続先エージェントの名前や能力を把握する
  2. A2AAgent を使って、リモートエージェントを Agent Framework から呼び出す
  3. stream=True により、streaming 応答を逐次受け取る
  4. background=True や continuation token を使って、長時間実行タスクを扱う

加えて、認証が必要な A2A endpoint に対しては `auth_interceptor` を使ってヘッダー付与を差し込めます。

シンプルに実装も行うことが可能です。

  1. A2ACardResolver で Agent Card を取得する
  2. A2AAgent を生成する
  3. agent.run(...) で通常応答を受け取る、または `agent.run(..., stream=True)` で逐次更新を受け取る
  4. 必要であれば background=True で実行し、continuation token を使って後続の取得や再購読を行う

また、Agent Framework は A2A client として使うだけではなく、自身の agent を A2A server として公開することもできます。

詳細は以下の公式ページをご参照ください。

A2A 統合 | Microsoft Learn

A2A client として接続する

ここでは、A2A クライアントが接続先エージェントの情報を取得し、呼び出しを行うための基本的な流れを示します。

コードは、Microsoft Learn の A2A integration と公式 GitHub の Python A2A client sample を参考にしています。

Agent Card を解決して接続

まずは、接続先の A2A endpoint から Agent Card を解決し、そのメタデータを使って A2AAgent を生成し、エージェントを呼び出します。

接続先のエージェントは agent_url で指定します。この例では http://127.0.0.1:5001 を使っており、ローカルで起動した A2A server を指しています。

処理は次の流れで行います。

  1. A2ACardResolver で Agent Card を解決する
  2. A2AAgent を生成する
  3. run(...) で non-streaming 呼び出しをする

実際のコードは次のとおりです。

import asyncio
import httpx
from a2a.client import A2ACardResolver
from a2a.client.errors import A2AClientError
from a2a.utils.constants import PREV_AGENT_CARD_WELL_KNOWN_PATH
from agent_framework.a2a import A2AAgent


async def resolve_agent_card(agent_url: str, card_path: str | None):
    async with httpx.AsyncClient(timeout=60.0) as http_client:
        resolver = A2ACardResolver(httpx_client=http_client, base_url=agent_url)
        try:
            return await resolver.get_agent_card(relative_card_path=card_path)
        except A2AClientError:
            if card_path is not None:
                raise
            return await resolver.get_agent_card(
                relative_card_path=PREV_AGENT_CARD_WELL_KNOWN_PATH
            )


async def main():
    agent_url = "http://127.0.0.1:5001"
    message = "Tell me about yourself"

    agent_card = await resolve_agent_card(agent_url, None)
    print(f"[agent] name={agent_card.name}")
    print(f"[agent] description={agent_card.description}")
    print(f"[agent] url={agent_card.url}")

    async with A2AAgent(
        name=agent_card.name,
        agent_card=agent_card,
        url=agent_url,
    ) as agent:
        response = await agent.run(message)
        print("[response]")
        print(response.text or "<empty>")

if __name__ == "__main__":
    asyncio.run(main())

Agent Framework agent を A2A server として公開する


続いて、ローカルで動く Agent Framework のエージェントを A2A endpoint として公開する単一ファイルの server サンプルです。

今回のローカルサンプルは、Azure OpenAI などの外部リソースは使わず、Agent Framework agent を A2A server として公開する流れをローカルで追いやすい最小構成にしています。

役割ごとに小分けして確認したあと、そのまま利用できる全体コードをまとめます。

ローカル A2A server は、次の 4つの部品で構成されます。

  1. LocalAgentFrameworkAgent: 実際の応答を返す Agent Framework 側の agent 本体
  2. AgentFrameworkA2AExecutor: A2A request を agent 呼び出しへ変換する橋渡し
  3. build_agent_card() と build_app(): 公開情報と HTTP アプリケーションの組み立て
  4. main(): uvicorn で server を起動するエントリポイント

Agent 本体を用意する

公開対象となる Agent Framework 側の agent を用意します。ここでは BaseAgent を継承し、受け取ったメッセージを短いテキストで返すだけのローカル agent にしています。

あわせて、Azure OpenAI を利用するメソッドも同じ class に追加し、run_stream() から切り替えられる形にしておきます。

処理内容は次のとおりです。

  • run_stream() に実際の応答ロジックを書きます
  • OpenAI や Azure OpenAI などを利用する agent にしたい場合も、基本的にはこの run_stream() の中、または run_stream() から呼び出す補助関数にモデル呼び出し処理を記載します
    • run_stream_local() は現在のローカル固定メッセージ版です
    • run_stream_with_azure_openai() は Azure OpenAI を使う差し替え例です
    • Azure OpenAI を使う場合は AZURE_OPENAI_ENDPOINT と AZURE_OPENAI_CHAT_DEPLOYMENT_NAME など AzureOpenAIChatClient が参照する設定を事前に用意します
  • normalize_messages() で入力を正規化し、最後のユーザー入力を取り出します
  • AgentResponseUpdate は contents=[Content.from_text(...)] で返します

Agent 本体のコードは次のとおりです。

class LocalAgentFrameworkAgent(BaseAgent):
    async def run(self, messages=None, *, thread=None, **kwargs):
        updates = [
            update async for update in self.run_stream(messages, thread=thread, **kwargs)
        ]
        return AgentResponse.from_updates(updates)

    async def run_stream(self, messages=None, *, thread=None, **kwargs):
        async for update in self.run_stream_local(messages, thread=thread, **kwargs):
            yield update

        # Azure OpenAI を使う場合は、上の 2 行を次の 2 行に置き換えます。
        # async for update in self.run_stream_with_azure_openai(messages, thread=thread, **kwargs):
        #     yield update

    async def run_stream_local(self, messages=None, *, thread=None, **kwargs):
        normalized_messages = normalize_messages(messages)
        user_text = normalized_messages[-1].text.strip() if normalized_messages else ""
        if not user_text:
            user_text = "(empty input)"

        chunks = [
            "Hello from the Agent Framework A2A demo agent.\n",
            f"Received message: {user_text}\n",
            "This response is generated by a local BaseAgent implementation and exposed over A2A.",
        ]
        response_id = str(uuid4())

        for chunk in chunks:
            await asyncio.sleep(0.05)
            yield AgentResponseUpdate(
                contents=[Content.from_text(text=chunk)],
                role="assistant",
                response_id=response_id,
            )

    async def run_stream_with_azure_openai(self, messages=None, *, thread=None, **kwargs):
        normalized_messages = normalize_messages(messages)
        user_text = normalized_messages[-1].text.strip() if normalized_messages else ""
        if not user_text:
            user_text = "(empty input)"

        chat_client = AzureOpenAIChatClient(
            credential=AzureCliCredential(),
            endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            deployment_name=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"),
        )
        llm_agent = Agent(
            name="LocalAgentFrameworkAgent",
            description="A2A-backed agent using Azure OpenAI",
            instructions=(
                "You are a helpful assistant exposed through an A2A endpoint. "
                "Answer the user's request concisely in Japanese."
            ),
            client=chat_client,
        )

        response_id = str(uuid4())
        stream = llm_agent.run(user_text, stream=True)
        async for update in stream:
            text = getattr(update, "text", None)
            if not text:
                continue

            yield AgentResponseUpdate(
                contents=[Content.from_text(text=text)],
                role="assistant",
                response_id=response_id,
            )

        await stream.get_final_response()

Azure OpenAI を利用する場合は、この節のコードに加えて次の import も必要です。

import os

from azure.identity import AzureCliCredential
from agent_framework import Agent
from agent_framework.azure import AzureOpenAIChatClient

executor で A2A request を橋渡しする

次に、A2A request を受け取って LocalAgentFrameworkAgent を呼び出す executor を用意します。

class AgentFrameworkA2AExecutor(AgentExecutor):
    def __init__(self, agent: BaseAgent):
        self._agent = agent

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        user_text = context.get_user_input() or "Hello"
        task_id = context.task_id or str(uuid4())
        context_id = context.context_id or str(uuid4())

        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.working),
                final=False,
            )
        )

        response = await self._agent.run(user_text)
        response_parts = [
            Part(root=TextPart(text=msg.text))
            for msg in response.messages
            if msg.text
        ]

        if not response_parts:
            response_parts.append(Part(root=TextPart(text=str(response.text or "<empty>"))))

        await event_queue.enqueue_event(
            TaskArtifactUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                artifact=Artifact(
                    artifact_id=str(uuid4()),
                    name="response",
                    parts=response_parts,
                ),
                append=False,
                last_chunk=True,
            )
        )

        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.completed),
                final=True,
            )
        )

Agent Card と HTTP アプリケーションを組み立てる

server として公開するには、Agent Card と request handler を組み立てる必要があります。

ここでは次の対応関係を押さえると分かりやすいです。

  • build_agent_card() は公開する agent の名前や説明、URL、skills を定義します
  • DefaultRequestHandler は executor を受け取り、A2A request を処理します
  • A2AStarletteApplication(...).build() が実際に起動可能な ASGI アプリケーションを返します

公開情報と HTTP アプリケーションの組み立ては次のコードで実行します。

def build_agent_card(host: str, port: int) -> AgentCard:
    return AgentCard(
        name="Agent Framework Local A2A Demo Agent",
        description="Minimal Agent Framework agent exposed as an A2A endpoint for local testing.",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        capabilities=AgentCapabilities(streaming=False),
        skills=[
            AgentSkill(
                id="agent-framework-echo-summary",
                name="Agent Framework Echo Summary",
                description="Returns a short text response containing the received input.",
                tags=["agent-framework", "demo", "echo", "a2a"],
                examples=["Tell me about yourself", "Summarize this request"],
            )
        ],
    )


def build_app(host: str, port: int):
    framework_agent = LocalAgentFrameworkAgent(
        name="LocalAgentFrameworkAgent",
        description="Minimal BaseAgent used as the backing implementation for an A2A endpoint.",
    )
    request_handler = DefaultRequestHandler(
        agent_executor=AgentFrameworkA2AExecutor(framework_agent),
        task_store=InMemoryTaskStore(),
    )
    return A2AStarletteApplication(
        agent_card=build_agent_card(host, port),
        http_handler=request_handler,
    ).build()

全体コード

ここまでの部品を 1 つにまとめたコードです。

import argparse
import asyncio
import os
from uuid import uuid4

import uvicorn
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps.jsonrpc import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import (
    Artifact,
    AgentCapabilities,
    AgentCard,
    AgentSkill,
    Message,
    Part,
    Role,
    TaskArtifactUpdateEvent,
    TaskState,
    TaskStatus,
    TaskStatusUpdateEvent,
    TextPart,
)
from azure.identity import AzureCliCredential
from agent_framework import Agent, AgentResponse, AgentResponseUpdate, BaseAgent, Content, normalize_messages
from agent_framework.azure import AzureOpenAIChatClient


class LocalAgentFrameworkAgent(BaseAgent):
    async def run(self, messages=None, *, thread=None, **kwargs):
        updates = [
            update async for update in self.run_stream(messages, thread=thread, **kwargs)
        ]
        return AgentResponse.from_updates(updates)

    async def run_stream(self, messages=None, *, thread=None, **kwargs):
        async for update in self.run_stream_local(messages, thread=thread, **kwargs):
            yield update

        # Azure OpenAI を使う場合は、上の 2 行を次の 2 行に置き換えます。
        #async for update in self.run_stream_with_azure_openai(messages, thread=thread, **kwargs):
        #    yield update

    async def run_stream_local(self, messages=None, *, thread=None, **kwargs):
        normalized_messages = normalize_messages(messages)
        user_text = normalized_messages[-1].text.strip() if normalized_messages else ""
        if not user_text:
            user_text = "(empty input)"

        chunks = [
            "Hello from the Agent Framework A2A demo agent.\n",
            f"Received message: {user_text}\n",
            "This response is generated by a local BaseAgent implementation and exposed over A2A.",
        ]
        response_id = str(uuid4())

        for chunk in chunks:
            await asyncio.sleep(0.05)
            yield AgentResponseUpdate(
                contents=[Content.from_text(text=chunk)],
                role="assistant",
                response_id=response_id,
            )

    async def run_stream_with_azure_openai(self, messages=None, *, thread=None, **kwargs):
        normalized_messages = normalize_messages(messages)
        user_text = normalized_messages[-1].text.strip() if normalized_messages else ""
        if not user_text:
            user_text = "(empty input)"

        chat_client = AzureOpenAIChatClient(
            credential=AzureCliCredential(),
            endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
            deployment_name=os.getenv("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME"),
        )
        llm_agent = Agent(
            name="LocalAgentFrameworkAgent",
            description="A2A-backed agent using Azure OpenAI",
            instructions=(
                "You are a helpful assistant exposed through an A2A endpoint. "
                "Answer the user's request concisely in Japanese."
            ),
            client=chat_client,
        )

        response_id = str(uuid4())
        stream = llm_agent.run(user_text, stream=True)
        async for update in stream:
            text = getattr(update, "text", None)
            if not text:
                continue

            yield AgentResponseUpdate(
                contents=[Content.from_text(text=text)],
                role="assistant",
                response_id=response_id,
            )

        await stream.get_final_response()


class AgentFrameworkA2AExecutor(AgentExecutor):
    def __init__(self, agent: BaseAgent):
        self._agent = agent

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        user_text = context.get_user_input() or "Hello"
        task_id = context.task_id or str(uuid4())
        context_id = context.context_id or str(uuid4())

        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.working),
                final=False,
            )
        )

        response = await self._agent.run(user_text)
        response_parts = [
            Part(root=TextPart(text=msg.text))
            for msg in response.messages
            if msg.text
        ]

        if not response_parts:
            response_parts.append(Part(root=TextPart(text=str(response.text or "<empty>"))))

        await event_queue.enqueue_event(
            TaskArtifactUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                artifact=Artifact(
                    artifact_id=str(uuid4()),
                    name="response",
                    parts=response_parts,
                ),
                append=False,
                last_chunk=True,
            )
        )

        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(state=TaskState.completed),
                final=True,
            )
        )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_id = context.task_id or str(uuid4())
        context_id = context.context_id or str(uuid4())
        await event_queue.enqueue_event(
            TaskStatusUpdateEvent(
                task_id=task_id,
                context_id=context_id,
                status=TaskStatus(
                    state=TaskState.canceled,
                    message=Message(
                        role=Role.agent,
                        message_id=str(uuid4()),
                        task_id=task_id,
                        context_id=context_id,
                        parts=[Part(root=TextPart(text="The Agent Framework A2A demo task was canceled."))],
                    ),
                ),
                final=True,
            )
        )


def build_agent_card(host: str, port: int) -> AgentCard:
    return AgentCard(
        name="Agent Framework Local A2A Demo Agent",
        description="Minimal Agent Framework agent exposed as an A2A endpoint for local testing.",
        url=f"http://{host}:{port}/",
        version="1.0.0",
        default_input_modes=["text/plain"],
        default_output_modes=["text/plain"],
        capabilities=AgentCapabilities(streaming=False),
        skills=[
            AgentSkill(
                id="agent-framework-echo-summary",
                name="Agent Framework Echo Summary",
                description="Returns a short text response containing the received input.",
                tags=["agent-framework", "demo", "echo", "a2a"],
                examples=["Tell me about yourself", "Summarize this request"],
            )
        ],
    )


def build_app(host: str, port: int):
    framework_agent = LocalAgentFrameworkAgent(
        name="LocalAgentFrameworkAgent",
        description="Minimal BaseAgent used as the backing implementation for an A2A endpoint.",
    )
    request_handler = DefaultRequestHandler(
        agent_executor=AgentFrameworkA2AExecutor(framework_agent),
        task_store=InMemoryTaskStore(),
    )
    return A2AStarletteApplication(
        agent_card=build_agent_card(host, port),
        http_handler=request_handler,
    ).build()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Run a local A2A server backed by an Agent Framework BaseAgent."
    )
    parser.add_argument("--host", default="127.0.0.1", help="Host to bind the local A2A server.")
    parser.add_argument("--port", default=5001, type=int, help="Port to bind the local A2A server.")
    return parser.parse_args()


def main() -> None:
    args = parse_args()
    uvicorn.run(
        build_app(args.host, args.port),
        host=args.host,
        port=args.port,
        log_level="warning",
    )


if __name__ == "__main__":
    main()

ローカル実行例

「4. A2A client として接続する」のコードを a2a_client.py、「5. Agent Framework agent を A2A server として公開する」のコードを a2a_server.py として、次の手順で実行します。

1. server を起動する

.\.venv\Scripts\python.exe a2a_server.py --host 127.0.0.1 --port 5001

2. client から接続する

.\.venv\Scripts\python.exe a2a_client.py

実行結果例

ここでは、呼び出し先のエージェントが Azure OpenAI を利用するように設定されている場合の実行例を示します。
"Tell me about yourself" を送信すると、次のような応答が返ります。

[agent] name=Agent Framework Local A2A Demo Agent
[agent] description=Minimal Agent Framework agent exposed as an A2A endpoint for local testing.
[agent] url=http://127.0.0.1:5001/
[response]
こんにちは!私はAIアシスタントです。あなたの質問に答えたり、情報提供や 文章作成のサポートをしたりするためにここにいます。何か知りたいことや相談があれば、気軽に教えてくださいね。

最後に

Agent Framework で A2A を使うときの最初の理解ポイントは、client 側では A2ACardResolver と A2AAgent、server 側では AgentExecutor ベースのブリッジです。

この機能は、異なる環境やサービスの agent を統一的な方法で呼び出したい場面で利用可能です。検索 agent、業務特化 agent などを A2A endpoint として公開しておけば、呼び出し側は個別実装の違いを強く意識せずに連携できます。

Agent Framework を使うと、その接続先をリモートの AI エージェントとして扱いやすくなるため、既存 Agent の再利用や疎結合な連携を進めやすくなります。

最新版 SDK では API の変化が比較的速いため、公式 sample をそのまま読むだけでなく、実際に手元で動かして task lifecycle や artifact の返し方まで確認するのが重要です。

執筆担当者プロフィール
寺澤 駿

寺澤 駿(日本ビジネスシステムズ株式会社)

IoTやAzure Cognitive ServicesのAIを活用したデモ環境・ソリューション作成を担当。

担当記事一覧