先日、Semantic Kernelを使って、複数エージェントからの回答をストリーミングで返す方法を試してみました。
今回は、Azure Functions でエンドポイントを実装し、エージェントの回答をストリーミングで送信する方法を紹介します。
実装環境
- semantic-kernel 1.37.0
- Python 3.12
Azure Functionsの準備
今回は、Azure FunctionsをPythonで実装を行います。
Python での実装でストリーミングを行うには、拡張機能「azurefunctions-extensions-http-fastapi」のインストールと環境変数の設定が必要です。
詳しくは以下のリンクを参照してください。
実装
Azure Functionsからクライアントへのストリーミングは、HTTP/1.1のTransfer-Encoding: chunkedを用い、StreamingResponseに非同期ジェネレータを渡してNDJSONを改行区切りで逐次送信する方式で行います。
ストリーミングで送信される内容は、エージェントの作成完了などの処理進捗と、エージェントによって生成された回答です。
Azure Functions の処理の流れ
- HTTPトリガーで処理を開始
- エージェント生成(get_agents)
- AzureChatCompletion に認証情報を渡し、ChatCompletionAgent を2体生成(PhysicsExpert / ChemistryExpert)。
- NDJSONStreamer を準備し、並列オーケストレーション本体は run_orchestration() をバックグラウンドタスクとして起動。
- 並列オーケストレーション(run_orchestration)
- 進捗通知: 最初に status 行("start creating agents" → "agents created")を送信。
- エージェント生成の失敗時: error 行を送って close(早期終了)。
- status("orchestration-start") を送信。
- invoke(task=ユーザー質問) で並列推論スタート。
- 生成される回答をストリーミングで送信。
- 終了処理と例外: Timeout やその他例外は error 行で通知。最後に runtime.stop_when_idle()、status("orchestration-end") を送り、ストリーム close。
コード
環境変数には、次の値を設定してください。
AZURE_OPENAI_DEPLOYMENT="gpt-4.1-mini" AZURE_OPENAI_ENDPOINT="<openai_endpoint>"
function_app.py に次のコードを記載します。
import asyncio import json import os from typing import AsyncIterator, Optional import azure.functions as func from azure.identity import AzureCliCredential from azurefunctions.extensions.http.fastapi import ( Request, StreamingResponse, JSONResponse, ) from semantic_kernel.agents import Agent, ChatCompletionAgent, ConcurrentOrchestration from semantic_kernel.agents.runtime import InProcessRuntime from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent from semantic_kernel.agents.orchestration.orchestration_base import ChatMessageContent # ローカル環境で.envから環境変数を読み取る場合に利用 from dotenv import load_dotenv load_dotenv(dotenv_path=".env", override=True) app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS) def get_agents() -> list[Agent]: """Create and return a list of agents for concurrent orchestration.""" # Azure OpenAI settings are read by AzureChatCompletion from environment variables. AZURE_CHAT_DEPLOYMENT = os.getenv("AZURE_OPENAI_DEPLOYMENT") # 本番/Functions ではDefaultAzureCredential()が推奨 credential = AzureCliCredential() physics_agent = ChatCompletionAgent( name="PhysicsExpert", instructions="You are an expert in physics. You answer questions from a physics perspective.", service=AzureChatCompletion(credential=credential, deployment_name=AZURE_CHAT_DEPLOYMENT), ) chemistry_agent = ChatCompletionAgent( name="ChemistryExpert", instructions="You are an expert in chemistry. You answer questions from a chemistry perspective.", service=AzureChatCompletion(credential=credential,deployment_name=AZURE_CHAT_DEPLOYMENT), ) return [physics_agent, chemistry_agent] class NDJSONStreamer: """Utility to stream newline-delimited JSON (NDJSON) via an asyncio Queue.""" def __init__(self) -> None: self._queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue() async def send(self, payload: dict) -> None: # NDJSON: one JSON object per line line = json.dumps(payload, ensure_ascii=False) + "\n" await self._queue.put(line.encode("utf-8")) async def close(self) -> None: await self._queue.put(None) # sentinel to end the stream async def aiter(self) -> AsyncIterator[bytes]: while True: item = await self._queue.get() if item is None: break yield item @app.route(route="agents/stream", methods=["POST"], auth_level=func.AuthLevel.ANONYMOUS) async def stream_agents(req: Request): """Run multiple agents concurrently and stream responses via HTTP chunked (NDJSON).""" try: body = await req.json() except Exception: return JSONResponse({"error": "Invalid JSON payload."}, status_code=400) task = body.get("task") timeout = body.get("timeout", 30) # seconds if not isinstance(task, str) or not task.strip(): return JSONResponse({"error": "Request must include a non-empty 'task' string."}, status_code=400) streamer = NDJSONStreamer() async def run_orchestration() -> None: await streamer.send({"type": "status", "message": "start creating agents"}) try: agents = get_agents() await streamer.send({"type": "status", "message": "agents created"}) except Exception as ex: # get_agents 失敗時はここで終了 await streamer.send({"type": "error", "message": f"get_agents failed: {type(ex).__name__}: {str(ex)}"}) await streamer.close() return async def on_streaming(chunk: StreamingChatMessageContent, is_complete: bool) -> None: # Attempt to extract agent name; fallback to "unknown" if not present agent_name = None # Some SK builds attach agent name to chunk.metadata or chunk.name; guard for availability if hasattr(chunk, "name") and chunk.name: agent_name = chunk.name elif hasattr(chunk, "metadata") and isinstance(chunk.metadata, dict): agent_name = chunk.metadata.get("name") if not agent_name: agent_name = "unknown" await streamer.send({ "type": "token", "agent": agent_name, "content": chunk.content or "", "is_complete": bool(is_complete), }) async def on_agent_response(message: ChatMessageContent) -> None: agent_name = getattr(message, "name", None) or "unknown" await streamer.send({ "type": "final", "agent": agent_name, "content": message.content or "", }) concurrent_orchestration = ConcurrentOrchestration( members=agents, agent_response_callback=on_agent_response, streaming_agent_response_callback=on_streaming, ) runtime = InProcessRuntime() runtime.start() try: await streamer.send({"type": "status", "message": "orchestration-start"}) # Invoke orchestration orchestration_result = await concurrent_orchestration.invoke( task=task, runtime=runtime, ) # Wait for collected results (from CollectionActor) results = await orchestration_result.get(timeout=timeout) except asyncio.TimeoutError: await streamer.send({"type": "error", "message": "Timeout waiting for agent results."}) except Exception as ex: await streamer.send({"type": "error", "message": f"{type(ex).__name__}: {str(ex)}"}) finally: # Stop runtime after completion await runtime.stop_when_idle() await streamer.send({"type": "status", "message": "orchestration-end"}) await streamer.close() # Kick off orchestration in background while returning a streaming response asyncio.create_task(run_orchestration()) # StreamingResponse with NDJSON content-type; the host will use chunked transfer return StreamingResponse( streamer.aiter(), media_type="application/x-ndjson", #media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, )
クライアント側のコード(Blazor WASM)
本記事の主題は Azure Functions の実装のため、クライアント側はコードのみを掲載します。
長いため折りたたみます。
Blazor WASMのコード(クリックで開く)
pages/AgentsStreamWasm.razor
@page "/agents-stream-wasm" @using YourApp.Models @inject IJSRuntime JS @inject NavigationManager Nav @using System.Text <h3>Agents Stream (Blazor WASM)</h3> <div class="mb-2"> <label>Function Base URL:</label> <input class="form-control" @bind="BaseUrl" placeholder="https://localhost:7071" /> </div> <div class="mb-2"> <label>Task:</label> <input class="form-control" @bind="TaskText" placeholder="What is temperature?" /> </div> <div class="mb-2"> <label>Timeout (sec):</label> <input type="number" class="form-control" @bind="TimeoutSec" /> </div> <div class="mb-2"> <button class="btn btn-primary" @onclick="Start">Start</button> <button class="btn btn-secondary" @onclick="Stop" disabled="@(_streamModule is null || _streamId is null)">Stop</button> </div> <h5>Stream Log</h5> <ul> @foreach (var log in Logs) { <li>@log</li> } </ul> <h5>Live Outputs (streaming tokens)</h5> <ul> @foreach (var kvp in _agentBuffers) { <li> <b>@kvp.Key</b>: <span>@kvp.Value.ToString()</span> </li> } </ul> <h5>Messages</h5> <ul> @foreach (var m in Messages) { <li> <b>@m.Type</b> [@m.Agent]: @if (m.Type == "status" || m.Type == "error") { <span>@m.Message</span> } else { <span>@m.Content</span> } @if (m.Results != null) { <div> <i>Summary:</i> <ul> @foreach (var r in m.Results) { <li>@r.Agent: @r.Content</li> } </ul> </div> } </li> } </ul> @code { private string BaseUrl = "http://localhost:7071"; private string TaskText = "What is temperature?"; private int TimeoutSec = 30; private List<string> Logs = new(); private List<AgentStreamMessage> Messages = new(); // エージェント別のライブ出力用バッファ private Dictionary<string, StringBuilder> _agentBuffers = new(); private IJSObjectReference? _streamModule; // JS モジュール参照 private DotNetObjectReference<AgentsStreamWasm>? _selfRef; // このコンポーネント参照 private string? _streamId; // abort のための ID protected override async Task OnAfterRenderAsync(bool firstRender) { if (firstRender) { var modulePath = new Uri(new Uri(Nav.BaseUri), "js/agentsStream.js").ToString(); _streamModule = await JS.InvokeAsync<IJSObjectReference>("import", modulePath); Logs.Add($"[ui] JS module loaded: {modulePath}"); StateHasChanged(); } } public async ValueTask DisposeAsync() { if (_streamModule is not null) { await _streamModule.DisposeAsync(); _streamModule = null; } _selfRef?.Dispose(); _selfRef = null; } private async Task Start() { await Stop(); // 既存ストリームがあれば停止 Logs.Clear(); Messages.Clear(); _agentBuffers.Clear(); // ライブ出力をクリア var url = new Uri(new Uri(BaseUrl), "/api/agents/stream").ToString(); Logs.Add($"[ui] start streaming: {url} task={TaskText} timeout={TimeoutSec}"); if (_streamModule is null) { var modulePath = new Uri(new Uri(Nav.BaseUri), "js/agentsStream.js").ToString(); _streamModule = await JS.InvokeAsync<IJSObjectReference>("import", modulePath); Logs.Add($"[ui] JS module loaded: {modulePath}"); } _selfRef = DotNetObjectReference.Create(this); _streamId = Guid.NewGuid().ToString("N"); var payload = new { task = TaskText, timeout = TimeoutSec }; string? functionKey = null; // 必要なら関数キー var returnedId = await _streamModule.InvokeAsync<string>( "startagentStream", url, payload, _selfRef, functionKey, _streamId); _streamId = returnedId; // 念のため返却値を保持 Logs.Add($"[ui] streaming started (streamId={_streamId})"); } private async Task Stop() { if (_streamModule is not null && _streamId is not null) { await _streamModule.InvokeVoidAsync("abortagentStream", _streamId); Logs.Add("[ui] stop requested"); } _selfRef?.Dispose(); _selfRef = null; _streamId = null; } [JSInvokable] public void OnStreamMessage(AgentStreamMessage msg) { // トークンはライブ出力へ反映(ChatGPT のように徐々に表示) if (string.Equals(msg.Type, "token", StringComparison.OrdinalIgnoreCase)) { var agent = string.IsNullOrWhiteSpace(msg.Agent) ? "unknown" : msg.Agent; var token = msg.Content ?? string.Empty; if (!_agentBuffers.TryGetValue(agent, out var sb)) { sb = new StringBuilder(); _agentBuffers[agent] = sb; } sb.Append(token); // 完了フラグが付いていたら確定(optional: Messages にも追加) if (msg.IsComplete == true) { Messages.Add(new AgentStreamMessage { Type = "final", Agent = agent, Content = sb.ToString() }); } //Logs.Add($"[ui] token agent={agent} len={token.Length} total={sb.Length}"); StateHasChanged(); return; // token は Messages に積まない(ライブ領域に表示) } // final/summary/status/error は従来通りメッセージ一覧に追加 Messages.Add(msg); Logs.Add($"[ui] recv type={msg.Type} agent={msg.Agent} len={msg.Content?.Length ?? 0}"); // final が来たら、ライブバッファを最新内容で上書き or 破棄(好みに応じて) if (string.Equals(msg.Type, "final", StringComparison.OrdinalIgnoreCase)) { var agent = string.IsNullOrWhiteSpace(msg.Agent) ? "unknown" : msg.Agent; if (!_agentBuffers.TryGetValue(agent, out var sb)) { sb = new StringBuilder(); _agentBuffers[agent] = sb; } sb.Clear(); sb.Append(msg.Content ?? string.Empty); } StateHasChanged(); } [JSInvokable] public void OnStreamError(string error) { Logs.Add($"[ui] error: {error}"); StateHasChanged(); } [JSInvokable] public void OnStreamCompleted() { Logs.Add("[ui] stream completed"); StateHasChanged(); } [JSInvokable] public void OnStreamLog(string text) { Logs.Add(text); StateHasChanged(); } }
js/agentsStream.js
// wwwroot/js/agentsStream.js // NDJSON/SSE ストリーミング受信を Blazor に橋渡しする JS モジュール const controllers = new Map(); function appendQuery(url, q) { const sep = url.includes('?') ? '&' : '?'; return url + sep + q; } function normalizeDevUrl(url) { try { const u = new URL(url); // 開発時に https://localhost:* へ接続しているがページが http の場合は http にフォールバック if (u.hostname === 'localhost' && u.protocol === 'https:' && window.location.protocol === 'http:') { u.protocol = 'http:'; return u.toString(); } } catch (_) {} return url; } export function startagentStream(url, body, dotNetRef, functionKey, streamId) { const id = streamId || (globalThis.crypto?.randomUUID?.() || String(Date.now())); const controller = new AbortController(); const signal = controller.signal; controllers.set(id, controller); const finalUrl = normalizeDevUrl(url); const fetchUrl = (functionKey && functionKey.length > 0) ? appendQuery(finalUrl, `code=${encodeURIComponent(functionKey)}`) : finalUrl; dotNetRef.invokeMethodAsync('OnStreamLog', `[js] start stream id=${id} url=${fetchUrl}`); fetch(fetchUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream, application/x-ndjson' }, body: JSON.stringify(body), signal }).then(resp => { dotNetRef.invokeMethodAsync('OnStreamLog', `[js] response status=${resp.status} content-type=${resp.headers.get('content-type')}`); const reader = resp.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; const pump = () => { reader.read().then(({ done, value }) => { if (done) { dotNetRef.invokeMethodAsync('OnStreamCompleted'); return; } buffer += decoder.decode(value, { stream: true }); // 行単位で処理(SSE: data: <json> / NDJSON: <json>) const lines = buffer.split(/\r?\n/); buffer = lines.pop(); // 未完の行は次回へ for (let raw of lines) { const line = raw.trim(); if (!line) continue; // 空行はスキップ let jsonText = line; if (jsonText.startsWith('data:')) { jsonText = jsonText.slice(5).trim(); } try { const obj = JSON.parse(jsonText); dotNetRef.invokeMethodAsync('OnStreamMessage', obj); } catch (e) { dotNetRef.invokeMethodAsync('OnStreamLog', `[js] parse error: ${e?.message ?? e}. line=${line}`); } } pump(); }).catch(err => { dotNetRef.invokeMethodAsync('OnStreamError', err?.message ?? String(err)); }); }; pump(); }).catch(err => { dotNetRef.invokeMethodAsync('OnStreamError', err?.message ?? String(err)); }); return id; } export function abortagentStream(streamId) { const c = controllers.get(streamId); if (c) { c.abort(); controllers.delete(streamId); } }
Models/AgentStreamMessage.cs
using System.Collections.Generic; namespace YourApp.Models { public sealed class AgentStreamMessage { public string Type { get; set; } // token | final | summary | status | error public string Agent { get; set; } public string Content { get; set; } public bool? IsComplete { get; set; } public List<SummaryItem> Results { get; set; } public string Message { get; set; } public double? ElapsedSec { get; set; } //public string Request_Id { get; set; } // サーバが request_id を付ける場合に備えて } public sealed class SummaryItem { public string Agent { get; set; } public string Content { get; set; } } }
最後に
今回は、Azure Functions と Semantic Kernel を使い、複数エージェントの回答をストリーミングで送信する方法を記載しました。
マルチエージェントなど複雑な処理を含むシステムでは、最終的な回答生成まで時間がかかり、ユーザーエクスペリエンスが低下することがあります。
進捗の送信や回答をストリーミングで送信することで体感の待ち時間を減少させ、ユーザーは処理の過程を把握しつつ最終的な結果に到達できます。