Azure Functions × Semantic Kernelで複数エージェントの回答をストリーミング配信するエンドポイントの実装

先日、Semantic Kernelを使って、複数エージェントからの回答をストリーミングで返す方法を試してみました。

今回は、Azure Functions でエンドポイントを実装し、エージェントの回答をストリーミングで送信する方法を紹介します。

実装環境

  • semantic-kernel 1.37.0
  • Python 3.12

Azure Functionsの準備

今回は、Azure FunctionsをPythonで実装を行います。

Python での実装でストリーミングを行うには、拡張機能「azurefunctions-extensions-http-fastapi」のインストールと環境変数の設定が必要です。

詳しくは以下のリンクを参照してください。

ohina.work

実装

Azure Functionsからクライアントへのストリーミングは、HTTP/1.1のTransfer-Encoding: chunkedを用い、StreamingResponseに非同期ジェネレータを渡してNDJSONを改行区切りで逐次送信する方式で行います。

ストリーミングで送信される内容は、エージェントの作成完了などの処理進捗と、エージェントによって生成された回答です。

Azure Functions の処理の流れ

  1. HTTPトリガーで処理を開始
  2. エージェント生成(get_agents)
    1. AzureChatCompletion に認証情報を渡し、ChatCompletionAgent を2体生成(PhysicsExpert / ChemistryExpert)。
  3. NDJSONStreamer を準備し、並列オーケストレーション本体は run_orchestration() をバックグラウンドタスクとして起動。
  4. 並列オーケストレーション(run_orchestration)
    1. 進捗通知: 最初に status 行("start creating agents" → "agents created")を送信。
    2. エージェント生成の失敗時: error 行を送って close(早期終了)。
    3. status("orchestration-start") を送信。
    4. invoke(task=ユーザー質問) で並列推論スタート。
    5. 生成される回答をストリーミングで送信。
  5. 終了処理と例外: Timeout やその他例外は error 行で通知。最後に runtime.stop_when_idle()、status("orchestration-end") を送り、ストリーム close。

コード

環境変数には、次の値を設定してください。

AZURE_OPENAI_DEPLOYMENT="gpt-4.1-mini"
AZURE_OPENAI_ENDPOINT="<openai_endpoint>"

function_app.py に次のコードを記載します。

import asyncio
import json
import os
from typing import AsyncIterator, Optional

import azure.functions as func
from azure.identity import AzureCliCredential

from azurefunctions.extensions.http.fastapi import (
    Request,
    StreamingResponse,
    JSONResponse,
)

from semantic_kernel.agents import Agent, ChatCompletionAgent, ConcurrentOrchestration
from semantic_kernel.agents.runtime import InProcessRuntime
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.agents.orchestration.orchestration_base import ChatMessageContent

# ローカル環境で.envから環境変数を読み取る場合に利用
from dotenv import load_dotenv
load_dotenv(dotenv_path=".env", override=True)

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)

def get_agents() -> list[Agent]:
    """Create and return a list of agents for concurrent orchestration."""
    # Azure OpenAI settings are read by AzureChatCompletion from environment variables.
    AZURE_CHAT_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT")

    # 本番/Functions ではDefaultAzureCredential()が推奨
    credential = AzureCliCredential()

    physics_agent = ChatCompletionAgent(
        name="PhysicsExpert",
        instructions="You are an expert in physics. You answer questions from a physics perspective.",
        service=AzureChatCompletion(credential=credential, deployment_name=AZURE_CHAT_DEPLOYMENT),
    )
    chemistry_agent = ChatCompletionAgent(
        name="ChemistryExpert",
        instructions="You are an expert in chemistry. You answer questions from a chemistry perspective.",
        service=AzureChatCompletion(credential=credential,deployment_name=AZURE_CHAT_DEPLOYMENT),
    )

    return [physics_agent, chemistry_agent]


class NDJSONStreamer:
    """Utility to stream newline-delimited JSON (NDJSON) via an asyncio Queue."""

    def __init__(self) -> None:
        self._queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue()

    async def send(self, payload: dict) -> None:
        # NDJSON: one JSON object per line
        line = json.dumps(payload, ensure_ascii=False) + "\n"
        await self._queue.put(line.encode("utf-8"))

    async def close(self) -> None:
        await self._queue.put(None)  # sentinel to end the stream

    async def aiter(self) -> AsyncIterator[bytes]:
        while True:
            item = await self._queue.get()
            if item is None:
                break
            yield item


@app.route(route="agents/stream", methods=["POST"], auth_level=func.AuthLevel.ANONYMOUS)
async def stream_agents(req: Request): 
    """Run multiple agents concurrently and stream responses via HTTP chunked (NDJSON)."""
    try:
        body = await req.json()
    except Exception:
        return JSONResponse({"error": "Invalid JSON payload."}, status_code=400)

    task = body.get("task")
    timeout = body.get("timeout", 30)  # seconds
    if not isinstance(task, str) or not task.strip():
        return JSONResponse({"error": "Request must include a non-empty 'task' string."}, status_code=400)

    streamer = NDJSONStreamer()

    async def run_orchestration() -> None:

        await streamer.send({"type": "status", "message": "start creating agents"})
        try:
            agents = get_agents()
            await streamer.send({"type": "status", "message": "agents created"})
        except Exception as ex:
            # get_agents 失敗時はここで終了
            await streamer.send({"type": "error", "message": f"get_agents failed: {type(ex).__name__}: {str(ex)}"})
            await streamer.close()
            return

        async def on_streaming(chunk: StreamingChatMessageContent, is_complete: bool) -> None:
            # Attempt to extract agent name; fallback to "unknown" if not present
            agent_name = None
            # Some SK builds attach agent name to chunk.metadata or chunk.name; guard for availability
            if hasattr(chunk, "name") and chunk.name:
                agent_name = chunk.name
            elif hasattr(chunk, "metadata") and isinstance(chunk.metadata, dict):
                agent_name = chunk.metadata.get("name")
            if not agent_name:
                agent_name = "unknown"

            await streamer.send({
                "type": "token",
                "agent": agent_name,
                "content": chunk.content or "",
                "is_complete": bool(is_complete),
            })

        async def on_agent_response(message: ChatMessageContent) -> None:
            agent_name = getattr(message, "name", None) or "unknown"
            await streamer.send({
                "type": "final",
                "agent": agent_name,
                "content": message.content or "",
            })

        concurrent_orchestration = ConcurrentOrchestration(
            members=agents,
            agent_response_callback=on_agent_response,
            streaming_agent_response_callback=on_streaming,
        )

        runtime = InProcessRuntime()
        runtime.start()

        try:
            await streamer.send({"type": "status", "message": "orchestration-start"})

            # Invoke orchestration
            orchestration_result = await concurrent_orchestration.invoke(
                task=task,
                runtime=runtime,
            )

            # Wait for collected results (from CollectionActor)
            results = await orchestration_result.get(timeout=timeout)

        except asyncio.TimeoutError:
            await streamer.send({"type": "error", "message": "Timeout waiting for agent results."})
        except Exception as ex:
            await streamer.send({"type": "error", "message": f"{type(ex).__name__}: {str(ex)}"})
        finally:
            # Stop runtime after completion
            await runtime.stop_when_idle()
            await streamer.send({"type": "status", "message": "orchestration-end"})
            await streamer.close()

    # Kick off orchestration in background while returning a streaming response
    asyncio.create_task(run_orchestration())

    # StreamingResponse with NDJSON content-type; the host will use chunked transfer
    return StreamingResponse(
        streamer.aiter(),
        media_type="application/x-ndjson",
        #media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

動作画面

以下の動作では、2つのエージェントの回答がストリーミングで送信されていることが確認できます。

youtu.be

クライアント側のコード(Blazor WASM)

本記事の主題は Azure Functions の実装のため、クライアント側はコードのみを掲載します。

長いため折りたたみます。


Blazor WASMのコード(クリックで開く)

pages/AgentsStreamWasm.razor

@page "/agents-stream-wasm"
@using YourApp.Models
@inject IJSRuntime JS
@inject NavigationManager Nav
@using System.Text

<h3>Agents Stream (Blazor WASM)</h3>
<div class="mb-2">
    <label>Function Base URL:</label>
    <input class="form-control" @bind="BaseUrl" placeholder="https://localhost:7071" />
</div>
<div class="mb-2">
    <label>Task:</label>
    <input class="form-control" @bind="TaskText" placeholder="What is temperature?" />
</div>
<div class="mb-2">
    <label>Timeout (sec):</label>
    <input type="number" class="form-control" @bind="TimeoutSec" />
</div>
<div class="mb-2">
    <button class="btn btn-primary" @onclick="Start">Start</button>
    <button class="btn btn-secondary" @onclick="Stop" disabled="@(_streamModule is null || _streamId is null)">Stop</button>
</div>

<h5>Stream Log</h5>
<ul>
    @foreach (var log in Logs)
    {
        <li>@log</li>
    }
</ul>

<h5>Live Outputs (streaming tokens)</h5>
<ul>
    @foreach (var kvp in _agentBuffers)
    {
        <li>
            <b>@kvp.Key</b>:
            <span>@kvp.Value.ToString()</span>
        </li>
    }
</ul>

<h5>Messages</h5>
<ul>
    @foreach (var m in Messages)
    {
        <li>
            <b>@m.Type</b> [@m.Agent]:
            @if (m.Type == "status" || m.Type == "error")
            {
                <span>@m.Message</span>
            }
            else
            {
                <span>@m.Content</span>
            }
            @if (m.Results != null)
            {
                <div>
                    <i>Summary:</i>
                    <ul>
                        @foreach (var r in m.Results)
                        {
                            <li>@r.Agent: @r.Content</li>
                        }
                    </ul>
                </div>
            }
        </li>
    }
</ul>

@code {
    private string BaseUrl = "http://localhost:7071";
    private string TaskText = "What is temperature?";
    private int TimeoutSec = 30;

    private List<string> Logs = new();
    private List<AgentStreamMessage> Messages = new();

    // エージェント別のライブ出力用バッファ
    private Dictionary<string, StringBuilder> _agentBuffers = new();

    private IJSObjectReference? _streamModule; // JS モジュール参照
    private DotNetObjectReference<AgentsStreamWasm>? _selfRef; // このコンポーネント参照
    private string? _streamId; // abort のための ID

    protected override async Task OnAfterRenderAsync(bool firstRender)
    {
        if (firstRender)
        {
            var modulePath = new Uri(new Uri(Nav.BaseUri), "js/agentsStream.js").ToString();
            _streamModule = await JS.InvokeAsync<IJSObjectReference>("import", modulePath);
            Logs.Add($"[ui] JS module loaded: {modulePath}");
            StateHasChanged();
        }
    }

    public async ValueTask DisposeAsync()
    {
        if (_streamModule is not null)
        {
            await _streamModule.DisposeAsync();
            _streamModule = null;
        }
        _selfRef?.Dispose();
        _selfRef = null;
    }

    private async Task Start()
    {
        await Stop(); // 既存ストリームがあれば停止
        Logs.Clear();
        Messages.Clear();
        _agentBuffers.Clear(); // ライブ出力をクリア

        var url = new Uri(new Uri(BaseUrl), "/api/agents/stream").ToString();
        Logs.Add($"[ui] start streaming: {url} task={TaskText} timeout={TimeoutSec}");

        if (_streamModule is null)
        {
            var modulePath = new Uri(new Uri(Nav.BaseUri), "js/agentsStream.js").ToString();
            _streamModule = await JS.InvokeAsync<IJSObjectReference>("import", modulePath);
            Logs.Add($"[ui] JS module loaded: {modulePath}");
        }

        _selfRef = DotNetObjectReference.Create(this);
        _streamId = Guid.NewGuid().ToString("N");

        var payload = new { task = TaskText, timeout = TimeoutSec };
        string? functionKey = null; // 必要なら関数キー

        var returnedId = await _streamModule.InvokeAsync<string>(
            "startagentStream", url, payload, _selfRef, functionKey, _streamId);

        _streamId = returnedId; // 念のため返却値を保持
        Logs.Add($"[ui] streaming started (streamId={_streamId})");
    }

    private async Task Stop()
    {
        if (_streamModule is not null && _streamId is not null)
        {
            await _streamModule.InvokeVoidAsync("abortagentStream", _streamId);
            Logs.Add("[ui] stop requested");
        }
        _selfRef?.Dispose();
        _selfRef = null;
        _streamId = null;
    }

    [JSInvokable]
    public void OnStreamMessage(AgentStreamMessage msg)
    {
        // トークンはライブ出力へ反映(ChatGPT のように徐々に表示)
        if (string.Equals(msg.Type, "token", StringComparison.OrdinalIgnoreCase))
        {
            var agent = string.IsNullOrWhiteSpace(msg.Agent) ? "unknown" : msg.Agent;
            var token = msg.Content ?? string.Empty;

            if (!_agentBuffers.TryGetValue(agent, out var sb))
            {
                sb = new StringBuilder();
                _agentBuffers[agent] = sb;
            }
            sb.Append(token);

            // 完了フラグが付いていたら確定(optional: Messages にも追加)
            if (msg.IsComplete == true)
            {
                Messages.Add(new AgentStreamMessage
                {
                    Type = "final",
                    Agent = agent,
                    Content = sb.ToString()
                });
            }

            //Logs.Add($"[ui] token agent={agent} len={token.Length} total={sb.Length}");
            StateHasChanged();
            return; // token は Messages に積まない(ライブ領域に表示)
        }

        // final/summary/status/error は従来通りメッセージ一覧に追加
        Messages.Add(msg);
        Logs.Add($"[ui] recv type={msg.Type} agent={msg.Agent} len={msg.Content?.Length ?? 0}");

        // final が来たら、ライブバッファを最新内容で上書き or 破棄(好みに応じて)
        if (string.Equals(msg.Type, "final", StringComparison.OrdinalIgnoreCase))
        {
            var agent = string.IsNullOrWhiteSpace(msg.Agent) ? "unknown" : msg.Agent;
            if (!_agentBuffers.TryGetValue(agent, out var sb))
            {
                sb = new StringBuilder();
                _agentBuffers[agent] = sb;
            }
            sb.Clear();
            sb.Append(msg.Content ?? string.Empty);
        }

        StateHasChanged();
    }

    [JSInvokable]
    public void OnStreamError(string error)
    {
        Logs.Add($"[ui] error: {error}");
        StateHasChanged();
    }

    [JSInvokable]
    public void OnStreamCompleted()
    {
        Logs.Add("[ui] stream completed");
        StateHasChanged();
    }

    [JSInvokable]
    public void OnStreamLog(string text)
    {
        Logs.Add(text);
        StateHasChanged();
    }
}


js/agentsStream.js

// wwwroot/js/agentsStream.js
// NDJSON/SSE ストリーミング受信を Blazor に橋渡しする JS モジュール

const controllers = new Map();

function appendQuery(url, q) {
  const sep = url.includes('?') ? '&' : '?';
  return url + sep + q;
}

function normalizeDevUrl(url) {
  try {
    const u = new URL(url);
    // 開発時に https://localhost:* へ接続しているがページが http の場合は http にフォールバック
    if (u.hostname === 'localhost' && u.protocol === 'https:' && window.location.protocol === 'http:') {
      u.protocol = 'http:';
      return u.toString();
    }
  } catch (_) {}
  return url;
}

export function startagentStream(url, body, dotNetRef, functionKey, streamId) {
  const id = streamId || (globalThis.crypto?.randomUUID?.() || String(Date.now()));
  const controller = new AbortController();
  const signal = controller.signal;
  controllers.set(id, controller);

  const finalUrl = normalizeDevUrl(url);
  const fetchUrl = (functionKey && functionKey.length > 0)
    ? appendQuery(finalUrl, `code=${encodeURIComponent(functionKey)}`)
    : finalUrl;

  dotNetRef.invokeMethodAsync('OnStreamLog', `[js] start stream id=${id} url=${fetchUrl}`);

  fetch(fetchUrl, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Accept': 'text/event-stream, application/x-ndjson'
    },
    body: JSON.stringify(body),
    signal
  }).then(resp => {
    dotNetRef.invokeMethodAsync('OnStreamLog', `[js] response status=${resp.status} content-type=${resp.headers.get('content-type')}`);
    const reader = resp.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    const pump = () => {
      reader.read().then(({ done, value }) => {
        if (done) {
          dotNetRef.invokeMethodAsync('OnStreamCompleted');
          return;
        }
        buffer += decoder.decode(value, { stream: true });

        // 行単位で処理(SSE: data: <json> / NDJSON: <json>)
        const lines = buffer.split(/\r?\n/);
        buffer = lines.pop(); // 未完の行は次回へ

        for (let raw of lines) {
          const line = raw.trim();
          if (!line) continue; // 空行はスキップ
          let jsonText = line;
          if (jsonText.startsWith('data:')) {
            jsonText = jsonText.slice(5).trim();
          }
          try {
            const obj = JSON.parse(jsonText);
            dotNetRef.invokeMethodAsync('OnStreamMessage', obj);
          } catch (e) {
            dotNetRef.invokeMethodAsync('OnStreamLog', `[js] parse error: ${e?.message ?? e}. line=${line}`);
          }
        }
        pump();
      }).catch(err => {
        dotNetRef.invokeMethodAsync('OnStreamError', err?.message ?? String(err));
      });
    };
    pump();
  }).catch(err => {
    dotNetRef.invokeMethodAsync('OnStreamError', err?.message ?? String(err));
  });

  return id;
}

export function abortagentStream(streamId) {
  const c = controllers.get(streamId);
  if (c) {
    c.abort();
    controllers.delete(streamId);
  }
}


Models/AgentStreamMessage.cs

using System.Collections.Generic;

namespace YourApp.Models
{
    public sealed class AgentStreamMessage
    {
        public string Type { get; set; } // token | final | summary | status | error
        public string Agent { get; set; }
        public string Content { get; set; }
        public bool? IsComplete { get; set; }
        public List<SummaryItem> Results { get; set; }
        public string Message { get; set; }
        public double? ElapsedSec { get; set; }
        //public string Request_Id { get; set; } // サーバが request_id を付ける場合に備えて
    }

    public sealed class SummaryItem
    {
        public string Agent { get; set; }
        public string Content { get; set; }
    }
}

最後に

今回は、Azure Functions と Semantic Kernel を使い、複数エージェントの回答をストリーミングで送信する方法を記載しました。

マルチエージェントなど複雑な処理を含むシステムでは、最終的な回答生成まで時間がかかり、ユーザーエクスペリエンスが低下することがあります。

進捗の送信や回答をストリーミングで送信することで体感の待ち時間を減少させ、ユーザーは処理の過程を把握しつつ最終的な結果に到達できます。

執筆担当者プロフィール
寺澤 駿

寺澤 駿(日本ビジネスシステムズ株式会社)

IoTやAzure Cognitive ServicesのAIを活用したデモ環境・ソリューション作成を担当。

担当記事一覧