AutoGen v0.4.7によるマルチエージェントの並列処理

今回は、複数エージェントの並列処理を含む、マルチエージェントを構築に関して記載します。

マルチエージェントの構築はAutoGen v0.4.7のフレームワークを利用して行います。

Semantic Kernelでのエージェントの並列処理に関しては、次の記事に記載しています。

blog.jbs.co.jp

実行環境

  • Python 3.11
  • AutoGen v0.4.7

並列処理のシナリオ

今回は、以下のような流れで回答を生成するマルチエージェントを作成します。

  1. 質問の入力
  2. LLMでエージェントの選択
  3. 選択された複数のエージェントを並列で処理
  4. 回答の集約し最終的な回答を生成


AutoGenでのエージェント並列処理

AutoGen v0.4.7では以下の情報をもとに実装を行います。

AutoGenも更新のペースが速いので、試す際は最新のドキュメントを確認してください。

microsoft.github.io

オーケストレーションのエージェントの定義

オーケストレーションを行うエージェントは、Group Chat — AutoGenのGroup Chat Managerの設定をもとに定義します。

このエージェントの処置内容は以下のものです。

  1. ユーザーのインプットをもとに、回答するエージェントを選択し呼び出す
  2. 選択されたエージェントの回答が完了後、Summarizeエージェントを呼び出し、回答を要約する
# OrchestratorAgentの定義
class OrchestratorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient, agent_types: Dict[str, str], participant_descriptions: Dict[str, str]):
        super().__init__("Orchestrator Agent")
        self.model_client = client
        self.agent_types = agent_types  # エージェント名からエージェントタイプへのマッピング
        self.participant_descriptions = participant_descriptions  # エージェント名から説明へのマッピング
    
    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> FinalResult:
        print(f"{'-'*80}\nOrchestrator-{self.id}:\nReceived task: {message.task}")
        # エージェントを選択するためのプロンプトを準備
        prompt = "以下のエージェントが利用可能です:\n"
        for agent_name, description in self.participant_descriptions.items():
            prompt += f"- {agent_name}: {description}\n"
        prompt += "\nユーザーの質問は次の通りです:\n"
        prompt += message.task
        prompt += "\n\n上記の質問に回答するのに適したエージェントを選択してください。複数のエージェントを選択可能です。最低2つ以上のエージェントを選択してください。"
        prompt += "\n\nエージェント名をカンマで区切って列挙してください。"

        # LLMを使用してエージェントを選択
        selected_agent_names = await self.select_agents(prompt)
        if not selected_agent_names:
            # 有効なエージェントが選択されない場合、general_information_expertをデフォルトで選択
            print(f"no agents selected")
            selected_agent_names = ["general_information_expert", "company_rule_expert"]

        # 選択されたエージェントにタスクをディスパッチ
        tasks = []
        print(f"selected agents: {selected_agent_names}")
        for agent_name in selected_agent_names:
            agent_id = AgentId(self.agent_types[agent_name], agent_name)
            tasks.append(self.send_message(WorkerTask(task=message.task), agent_id))

        # 並列にタスクを実行
        results = await asyncio.gather(*tasks)

        # 結果を集約
        aggregated_result = "\n".join([result.result for result in results])
        print(f"集約された結果:\n{aggregated_result}")

        # SummarizeAgentを使用して結果を要約
        summarize_agent_id = AgentId('summarize_agent', 'default')
        summarize_result = await self.send_message(WorkerTask(task=aggregated_result), summarize_agent_id)

        #return FinalResult(result=aggregated_result)
        return FinalResult(result=summarize_result.result)

    async def select_agents(self, prompt):
        #print(f"prompt: {prompt}")
        # LLMを使用してエージェントを選択
        model_result = await self.model_client.create([SystemMessage(content=prompt)])
        assert isinstance(model_result.content, str)
        selected_agents_text = model_result.content.strip()

        print(f"selected_agents_text: {selected_agents_text}")
        # 選択されたエージェントを解析
        selected_agent_names = [name.strip() for name in selected_agents_text.split(",") if name.strip() in self.agent_types]
        return selected_agent_names

他のエージェントの定義

4つのエージェントと2つのツールを定義します。

CompanyRuleExpertAgentとSalesExpertAgentはそれぞれtoolを利用するように定義します。toolの結果は特定の文字列を返すように設定しています。

Toolが返す内容は、架空のサンプルを利用しています。

まずはエージェントが利用するToolの定義を行います。

# toolの定義
class InternalDocumentationOfSalesRAGPlugin():
    '''
    営業に関するドキュメントのtool
    '''

    async def search_sales_mock(self, query: str) -> Annotated[str, "data of sales"]:
        print(f"query: {query}")
        # Simulate a long-running operation
        await asyncio.sleep(5)
        mock_sales_data = """
        営業の規定:
        営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。
        また、競合情報の取扱いには注意し、守秘義務を厳守してください。
        リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。
        """ 
        print(f'sales tool returns: "{mock_sales_data}"')
        return mock_sales_data

class InternalDocumentationRAGPlugin():
    '''
    社内規定に関するドキュメントのtool
    '''
    async def search_rules_mock(self, query: str) -> Annotated[str, "data of company rules"]:
        print(f"query: {query}")
        # Simulate a long-running operation
        await asyncio.sleep(5)
        mock_rule_data = """
        リモートワークの規定:
        リモートワークは週に3回まで可能とします。
        業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。
        セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。
        """ 
        print(f'tool returns: "{mock_rule_data}"')
        return mock_rule_data


# 営業用ツールを定義
async def search_sales_tool(input: Annotated[str, "Input to the sales search tool."]) -> str:
    """
    Tool for searching internal sales documentation.
    """
    sales_rag_plugin = InternalDocumentationOfSalesRAGPlugin()
    result = await sales_rag_plugin.search_sales_mock(input)
    return result


# 社内規定用ツールを定義
async def search_rules_tool(input: Annotated[str, "Input to the rules search tool."]) -> str:
    """
    Tool for searching internal company rules documentation.
    """
    rules_rag_plugin = InternalDocumentationRAGPlugin()
    result = await rules_rag_plugin.search_rules_mock(input)
    print(f"input: {input}")
    return result

次にエージェントを定義します。

# エージェントの定義
class CompanyRuleExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("Company Rule Expert")
        self.description = description
        self.model_client = model_client
        self.tools = [FunctionTool(search_rules_tool, description="Search the internal docuemnts of company rules.")]
        self.system_prompt = """
あなたは社内規定に関して詳しいエージェントです。トピックに対し社内規定を踏まえた回答をしてください。
search_rules_toolツールを利用して社内規定の情報を取得し回答を生成します。
"""

    @message_handler
    async def handle_user_message(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        # Create a session of messages.
        system_prompt = [SystemMessage(content=self.system_prompt)]
        session: List[LLMMessage] = system_prompt + [UserMessage(content=message.task, source="user")]

        # Run the chat completion with the tools.
        create_result = await self.model_client.create(
            messages=session,
            tools=self.tools,
            cancellation_token=ctx.cancellation_token,
        )

        print(f"chat completion with the tools: {create_result.content}")

        # If there are no tool calls, return the result.
        if isinstance(create_result.content, str):
            create_result.content = f"{self.id}:\n" + create_result.content
            return TaskResult(result=create_result.content)
        assert isinstance(create_result.content, list) and all(
            isinstance(call, FunctionCall) for call in create_result.content
        )

        # Add the first model create result to the session.
        session.append(AssistantMessage(content=create_result.content, source="assistant"))

        # Execute the tool calls.
        results = await asyncio.gather(
            *[self._execute_tool_call(call, ctx.cancellation_token) for call in create_result.content]
        )

        # Add the function execution results to the session.
        session.append(FunctionExecutionResultMessage(content=results))

        # Run the chat completion again to reflect on the history and function execution results.
        create_result = await self.model_client.create(
            messages=session,
            cancellation_token=ctx.cancellation_token,
        )
        assert isinstance(create_result.content, str)

        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        
        # Return the result as a message.
        create_result.content = f"company_rule_expert:\n" + create_result.content
        return TaskResult(result=create_result.content)

    
    async def _execute_tool_call(
        self, call: FunctionCall, cancellation_token: CancellationToken
    ) -> FunctionExecutionResult:
        # Find the tool by name.
        tool = next((tool for tool in self.tools if tool.name == call.name), None)
        assert tool is not None

        # Run the tool and capture the result.
        try:
            arguments = json.loads(call.arguments)
            result = await tool.run_json(arguments, cancellation_token)
            return FunctionExecutionResult(call_id=call.id, content=tool.return_value_as_string(result), is_error=False)
        except Exception as e:
            return FunctionExecutionResult(call_id=call.id, content=str(e), is_error=True)


class SalesExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("Sales Expert")
        self.description = description
        self.model_client = model_client
        self.tools = [FunctionTool(search_sales_tool, description="Search the internal docuemnts of sales.")]
        self.system_prompt = """
あなたは社内の営業に関して詳しいエージェントです。トピックに対し営業の情報を踏まえた回答を提案してください。
search_sales_toolツールを利用して営業に関する情報を取得し回答を生成します。
"""

    @message_handler
    async def handle_user_message(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        # Create a session of messages.
        system_prompt = [SystemMessage(content=self.system_prompt)]
        session: List[LLMMessage] = system_prompt + [UserMessage(content=message.task, source="user")]

        # Run the chat completion with the tools.
        create_result = await self.model_client.create(
            messages=session,
            tools=self.tools,
            cancellation_token=ctx.cancellation_token,
        )

        print(f"chat completion with the tools: {create_result.content}")

        # If there are no tool calls, return the result.
        if isinstance(create_result.content, str):
            create_result.content = f"{self.id}:\n" + create_result.content
            return TaskResult(result=create_result.content)
        assert isinstance(create_result.content, list) and all(
            isinstance(call, FunctionCall) for call in create_result.content
        )

        # Add the first model create result to the session.
        session.append(AssistantMessage(content=create_result.content, source="assistant"))

        # Execute the tool calls.
        results = await asyncio.gather(
            *[self._execute_tool_call(call, ctx.cancellation_token) for call in create_result.content]
        )

        # Add the function execution results to the session.
        session.append(FunctionExecutionResultMessage(content=results))

        # Run the chat completion again to reflect on the history and function execution results.
        create_result = await self.model_client.create(
            messages=session,
            cancellation_token=ctx.cancellation_token,
        )
        assert isinstance(create_result.content, str)

        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        
        # Return the result as a message.
        create_result.content = f"sales_expert:\n" + create_result.content
        return TaskResult(result=create_result.content)

    
    async def _execute_tool_call(
        self, call: FunctionCall, cancellation_token: CancellationToken
    ) -> FunctionExecutionResult:
        # Find the tool by name.
        tool = next((tool for tool in self.tools if tool.name == call.name), None)
        assert tool is not None

        # Run the tool and capture the result.
        try:
            arguments = json.loads(call.arguments)
            result = await tool.run_json(arguments, cancellation_token)
            return FunctionExecutionResult(call_id=call.id, content=tool.return_value_as_string(result), is_error=False)
        except Exception as e:
            return FunctionExecutionResult(call_id=call.id, content=str(e), is_error=True)


class GeneralInformationExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("General Information Expert")
        self.description = description
        self.model_client = model_client
        self.system_prompt = """
あなたは一般的な知識や情報に関して詳しいエージェントです。トピックに対し世間一般の知識や情報を踏まえたアイデアを提案してください。
"""
    
    @message_handler
    async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        prompt = self.system_prompt + f"\n\nユーザーからの質問: {message.task}"
        model_result = await self.model_client.create([SystemMessage(content=prompt), UserMessage(content=message.task, source="user")]) 
        model_result.content = f"general_information_expert:\n" + model_result.content
        assert isinstance(model_result.content, str)

        await asyncio.sleep(2)  # Simulate work
        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        return TaskResult(result=model_result.content)


class SummarizeAgent(RoutedAgent):
    def __init__(self, description:str, model_client: ChatCompletionClient):
        super().__init__("Summarize Agent")
        self.description = description
        self.model_client = model_client
        self.system_prompt = """
これまでのエージェントからの回答を要約し、エージェントの意見の同じ点と食い違う点、そこから導き出される結論を回答する。
エージェントの意見の同じ点と食い違う点を記載する際には、どのエージェントからの回答かエージェントのnameを明記する。
「research agentsは実行されませんでした。」のように他のエージェントの回答がない場合は、その旨を回答する。
"""
    
    @message_handler
    async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nSummarizor-{self.id}:\nReceived task:\n {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        prompt = self.system_prompt + f"\n\n他エージェントからの回答: {message.task}"
        model_result = await self.model_client.create([SystemMessage(content=prompt)])
        assert isinstance(model_result.content, str)

        await asyncio.sleep(2)  # Simulate work
        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        return TaskResult(result=model_result.content)

処理の実行

メインの処理では、以下の処置を行います。

  1. モデルクライアントの初期化。
  2. エージェントランタイム (Runtime) の作成。
  3. エージェントの登録。
  4. ユーザー入力タスクを Orchestrator エージェントに送信。
  5. Orchestrator を通して、複数のエージェントによるタスク処理。
  6. 処理結果の集約および出力。
  7. ランタイムのクリーンアップ。
# モデルクライアントを定義 # Agentによってモデルを変えることも可能
    orchestrator_model_client = client #OpenAIChatCompletionClient(model="gpt-4")
    agent_model_client = client

    # ランタイムを作成
    runtime = SingleThreadedAgentRuntime()

    # エージェントタイプと説明を定義
    agent_types = {
        "company_rule_expert": "company_rule_expert",
        "sales_expert": "sales_expert",
        "general_information_expert": "general_information_expert"
    }

    participant_descriptions = {
        "company_rule_expert": "社内規定に関して詳しいエージェント。",
        "sales_expert": "営業の社内情報やルールに詳しいエージェント。",
        "general_information_expert": "一般的な知識や情報に詳しいエージェント。"
    }
    
    # エージェントを登録
    await CompanyRuleExpertAgent.register(runtime, "company_rule_expert", lambda: CompanyRuleExpertAgent(description="社内規定に関して詳しいエージェント。", model_client=agent_model_client))
    await SalesExpertAgent.register(runtime, "sales_expert", lambda: SalesExpertAgent(description="営業の社内情報に詳しいエージェント。", model_client=agent_model_client))
    await GeneralInformationExpertAgent.register(runtime, "general_information_expert", lambda: GeneralInformationExpertAgent(description="一般的な知識や情報に詳しいエージェント。", model_client=agent_model_client))
    await SummarizeAgent.register(runtime, "summarize_agent", lambda: SummarizeAgent(description="他のエージェントの回答を要約するエージェント。", model_client=agent_model_client))
    await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent(
        model_client=orchestrator_model_client,
        agent_types=agent_types,
        participant_descriptions=participant_descriptions
    ))

    # ランタイムを開始
    runtime.start()

    # ユーザーのタスクをOrchestratorAgentに送信
    user_task = UserTask(task="リモートワークの決まりを教えてください。")
    final_result = await runtime.send_message(user_task, AgentId("orchestrator", "default"))

    # 結果を表示
    print(f"{'-'*80}\n最終結果:\n{final_result.result}")

    # ランタイムを停止
    await runtime.stop_when_idle()

コード全体

長いため折りたたみます。


AutoGen v0.4.7 コード全体

import asyncio
import datetime
import json
import logging
import os
from dataclasses import dataclass
from typing import Annotated, Dict, List, Literal, Optional

from autogen_core import (
    AgentId,
    CancellationToken,
    FunctionCall,
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    message_handler,
)
from autogen_core.models import (
    AssistantMessage,
    ChatCompletionClient,
    FunctionExecutionResult,
    FunctionExecutionResultMessage,
    SystemMessage,
    UserMessage,
)
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import AzureOpenAIChatCompletionClient


class InternalDocumentationOfSalesRAGPlugin():
    '''
    営業に関するドキュメントのRAG plugin
    '''
    async def search_sales_mock(self, query: str) -> Annotated[str, "data of sales"]:
        print(f"query: {query}")
        # Simulate a long-running operation
        await asyncio.sleep(5)
        mock_sales_data = """
        営業の規定:
        営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。
        また、競合情報の取扱いには注意し、守秘義務を厳守してください。
        リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。
        """ 
        print(f'sales tool returns: "{mock_sales_data}"')
        return mock_sales_data
        

class InternalDocumentationRAGPlugin():
    '''
    社内規定に関するドキュメントのRAG plugin
    '''

    async def search_rules_mock(self, query: str) -> Annotated[str, "data of company rules"]:
        print(f"query: {query}")
        # Simulate a long-running operation
        await asyncio.sleep(5)
        mock_rule_data = """
        リモートワークの規定:
        リモートワークは週に3回まで可能とします。
        業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。
        セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。
        """ 
        print(f'rule tool returns: "{mock_rule_data}"')
        return mock_rule_data


# 営業用ツールを定義
async def search_sales_tool(input: Annotated[str, "Input to the sales search tool."]) -> str:
    """
    Tool for searching internal sales documentation.
    """
    sales_rag_plugin = InternalDocumentationOfSalesRAGPlugin()
    result = await sales_rag_plugin.search_sales_mock(input)
    return result


# 社内規定用ツールを定義
async def search_rules_tool(input: Annotated[str, "Input to the rules search tool."]) -> str:
    """
    Tool for searching internal company rules documentation.
    """
    rules_rag_plugin = InternalDocumentationRAGPlugin()
    result = await rules_rag_plugin.search_rules_mock(input)
    print(f"input: {input}")
    return result


client = AzureOpenAIChatCompletionClient(
    azure_deployment=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
    model=os.getenv("AZURE_OPENAI_DEPLOYMENT"),
    api_key=os.getenv("AZURE_OPENAI_API_KEY"),
    api_version=os.getenv("OPENAI_API_VERSION"),
    azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"),
)


# メッセージプロトコルの定義
@dataclass
class UserTask:
    task: str

@dataclass
class WorkerTask:
    task: str

@dataclass
class TaskResult:
    result: str

@dataclass
class FinalResult:
    result: str


# エージェントの定義
class CompanyRuleExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("Company Rule Expert")
        self.description = description
        self.model_client = model_client
        #self.tools = [search_rules_tool]
        self.tools = [FunctionTool(search_rules_tool, description="Search the internal docuemnts of company rules.")]
        self.system_prompt = """
あなたは社内規定に関して詳しいエージェントです。トピックに対し社内規定を踏まえた回答をしてください。
search_rules_toolツールを利用して社内規定の情報を取得し回答を生成します。
"""
    ###
    #Toolのドキュメントを参考に実装(上のメソッドでも機能する)
    ###
    @message_handler
    async def handle_user_message(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        # Create a session of messages.
        system_prompt = [SystemMessage(content=self.system_prompt)]
        session: List[LLMMessage] = system_prompt + [UserMessage(content=message.task, source="user")]

        # Run the chat completion with the tools.
        create_result = await self.model_client.create(
            messages=session,
            tools=self.tools,
            cancellation_token=ctx.cancellation_token,
        )

        print(f"chat completion with the tools: {create_result.content}")

        # If there are no tool calls, return the result.
        if isinstance(create_result.content, str):
            create_result.content = f"{self.id}:\n" + create_result.content
            return TaskResult(result=create_result.content)
        assert isinstance(create_result.content, list) and all(
            isinstance(call, FunctionCall) for call in create_result.content
        )

        # Add the first model create result to the session.
        session.append(AssistantMessage(content=create_result.content, source="assistant"))

        # Execute the tool calls.
        results = await asyncio.gather(
            *[self._execute_tool_call(call, ctx.cancellation_token) for call in create_result.content]
        )

        # Add the function execution results to the session.
        session.append(FunctionExecutionResultMessage(content=results))

        # Run the chat completion again to reflect on the history and function execution results.
        create_result = await self.model_client.create(
            messages=session,
            cancellation_token=ctx.cancellation_token,
        )
        assert isinstance(create_result.content, str)

        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        
        # Return the result as a message.
        create_result.content = f"company_rule_expert:\n" + create_result.content
        return TaskResult(result=create_result.content)

    
    async def _execute_tool_call(
        self, call: FunctionCall, cancellation_token: CancellationToken
    ) -> FunctionExecutionResult:
        # Find the tool by name.
        tool = next((tool for tool in self.tools if tool.name == call.name), None)
        assert tool is not None

        # Run the tool and capture the result.
        try:
            arguments = json.loads(call.arguments)
            result = await tool.run_json(arguments, cancellation_token)
            return FunctionExecutionResult(call_id=call.id, content=tool.return_value_as_string(result), is_error=False)
        except Exception as e:
            return FunctionExecutionResult(call_id=call.id, content=str(e), is_error=True)
    

class SalesExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("Sales Expert")
        self.description = description
        self.model_client = model_client
        #self.tools = [search_sales_tool]
        self.tools = [FunctionTool(search_sales_tool, description="Search the internal docuemnts of sales.")]
        self.system_prompt = """
あなたは社内の営業に関して詳しいエージェントです。トピックに対し営業の情報を踏まえた回答を提案してください。
search_sales_toolツールを利用して営業に関する情報を取得し回答を生成します。
"""

    ###
    #Toolのドキュメントを参考に実装(上のメソッドでも機能する)
    ###
    @message_handler
    async def handle_user_message(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        # Create a session of messages.
        system_prompt = [SystemMessage(content=self.system_prompt)]
        session: List[LLMMessage] = system_prompt + [UserMessage(content=message.task, source="user")]

        # Run the chat completion with the tools.
        create_result = await self.model_client.create(
            messages=session,
            tools=self.tools,
            cancellation_token=ctx.cancellation_token,
        )

        print(f"chat completion with the tools: {create_result.content}")

        # If there are no tool calls, return the result.
        if isinstance(create_result.content, str):
            create_result.content = f"{self.id}:\n" + create_result.content
            return TaskResult(result=create_result.content)
        assert isinstance(create_result.content, list) and all(
            isinstance(call, FunctionCall) for call in create_result.content
        )

        # Add the first model create result to the session.
        session.append(AssistantMessage(content=create_result.content, source="assistant"))

        # Execute the tool calls.
        results = await asyncio.gather(
            *[self._execute_tool_call(call, ctx.cancellation_token) for call in create_result.content]
        )

        # Add the function execution results to the session.
        session.append(FunctionExecutionResultMessage(content=results))

        # Run the chat completion again to reflect on the history and function execution results.
        create_result = await self.model_client.create(
            messages=session,
            cancellation_token=ctx.cancellation_token,
        )
        assert isinstance(create_result.content, str)

        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        
        # Return the result as a message.
        create_result.content = f"sales_expert:\n" + create_result.content
        return TaskResult(result=create_result.content)

    
    async def _execute_tool_call(
        self, call: FunctionCall, cancellation_token: CancellationToken
    ) -> FunctionExecutionResult:
        # Find the tool by name.
        tool = next((tool for tool in self.tools if tool.name == call.name), None)
        assert tool is not None

        # Run the tool and capture the result.
        try:
            arguments = json.loads(call.arguments)
            result = await tool.run_json(arguments, cancellation_token)
            return FunctionExecutionResult(call_id=call.id, content=tool.return_value_as_string(result), is_error=False)
        except Exception as e:
            return FunctionExecutionResult(call_id=call.id, content=str(e), is_error=True)


class GeneralInformationExpertAgent(RoutedAgent):
    def __init__(self, description, model_client: ChatCompletionClient):
        super().__init__("General Information Expert")
        self.description = description
        self.model_client = model_client
        self.system_prompt = """
あなたは一般的な知識や情報に関して詳しいエージェントです。トピックに対し世間一般の知識や情報を踏まえたアイデアを提案してください。
"""
    
    @message_handler
    async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nReseachAgent-{self.id}:\nReceived task: {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        prompt = self.system_prompt + f"\n\nユーザーからの質問: {message.task}"
        #model_result = await self.model_client.create([SystemMessage(content=prompt)])
        model_result = await self.model_client.create([SystemMessage(content=prompt), UserMessage(content=message.task, source="user")]) 
        model_result.content = f"general_information_expert:\n" + model_result.content
        assert isinstance(model_result.content, str)

        await asyncio.sleep(2)  # Simulate work
        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        return TaskResult(result=model_result.content)


class SummarizeAgent(RoutedAgent):
    def __init__(self, description:str, model_client: ChatCompletionClient):
        super().__init__("Summarize Agent")
        self.description = description
        self.model_client = model_client
        self.system_prompt = """
これまでのエージェントからの回答を要約し、エージェントの意見の同じ点と食い違う点、そこから導き出される結論を回答する。
エージェントの意見の同じ点と食い違う点を記載する際には、どのエージェントからの回答かエージェントのnameを明記する。
「research agentsは実行されませんでした。」のように他のエージェントの回答がない場合は、その旨を回答する。
"""
    
    @message_handler
    async def handle_task(self, message: WorkerTask, ctx: MessageContext) -> TaskResult:
        print(f"{'-'*80}\nSummarizor-{self.id}:\nReceived task:\n {message.task}")
        start_time = datetime.datetime.now()
        print(f"{self.id} starting task at {start_time}")
        
        prompt = self.system_prompt + f"\n\n他エージェントからの回答: {message.task}"
        model_result = await self.model_client.create([SystemMessage(content=prompt)])
        assert isinstance(model_result.content, str)

        await asyncio.sleep(2)  # Simulate work
        end_time = datetime.datetime.now()
        print(f"{self.id} finished task at {end_time}")
        print(f"Duration for agent {self.id}: {end_time - start_time}")
        return TaskResult(result=model_result.content)


# OrchestratorAgentの定義
class OrchestratorAgent(RoutedAgent):
    def __init__(self, model_client: ChatCompletionClient, agent_types: Dict[str, str], participant_descriptions: Dict[str, str]):
        super().__init__("Orchestrator Agent")
        self.model_client = model_client
        self.agent_types = agent_types  # エージェント名からエージェントタイプへのマッピング
        self.participant_descriptions = participant_descriptions  # エージェント名から説明へのマッピング
    
    @message_handler
    async def handle_task(self, message: UserTask, ctx: MessageContext) -> FinalResult:
        print(f"{'-'*80}\nOrchestrator-{self.id}:\nReceived task: {message.task}")
        # エージェントを選択するためのプロンプトを準備
        prompt = "以下のエージェントが利用可能です:\n"
        for agent_name, description in self.participant_descriptions.items():
            prompt += f"- {agent_name}: {description}\n"
        prompt += "\nユーザーの質問は次の通りです:\n"
        prompt += message.task
        prompt += "\n\n上記の質問に回答するのに適したエージェントを選択してください。複数のエージェントを選択可能です。最低2つ以上のエージェントを選択してください。"
        prompt += "\n\nエージェント名をカンマで区切って列挙してください。"

        # LLMを使用してエージェントを選択
        selected_agent_names = await self.select_agents(prompt)
        if not selected_agent_names:
            # 有効なエージェントが選択されない場合、general_information_expertをデフォルトで選択
            print(f"no agents selected")
            selected_agent_names = ["general_information_expert", "company_rule_expert"]

        # 選択されたエージェントにタスクをディスパッチ
        tasks = []
        print(f"selected agents: {selected_agent_names}")
        for agent_name in selected_agent_names:
            agent_id = AgentId(self.agent_types[agent_name], agent_name)
            tasks.append(self.send_message(WorkerTask(task=message.task), agent_id))

        # 並列にタスクを実行
        results = await asyncio.gather(*tasks)

        # 結果を集約
        aggregated_result = "\n".join([result.result for result in results])
        print(f"集約された結果:\n{aggregated_result}")

        # SummarizeAgentを使用して結果を要約
        summarize_agent_id = AgentId('summarize_agent', 'default')
        summarize_result = await self.send_message(WorkerTask(task=aggregated_result), summarize_agent_id)

        return FinalResult(result=summarize_result.result)

    async def select_agents(self, prompt):# -> List[Literal[*accessible_research_agents]]:
        # LLMを使用してエージェントを選択
        model_result = await self.model_client.create([SystemMessage(content=prompt)])
        assert isinstance(model_result.content, str)
        selected_agents_text = model_result.content.strip()

        print(f"selected_agents_text: {selected_agents_text}")
        # 選択されたエージェントを解析
        selected_agent_names = [name.strip() for name in selected_agents_text.split(",") if name.strip() in self.agent_types]
        return selected_agent_names
    


async def autogen(input):
    # モデルクライアントを定義
    orchestrator_model_client = client
    agent_model_client = client

    # ランタイムを作成
    runtime = SingleThreadedAgentRuntime()

    # エージェントタイプと説明を定義
    agent_types = {
        "company_rule_expert": "company_rule_expert",
        "sales_expert": "sales_expert",
        "general_information_expert": "general_information_expert"
    }

    participant_descriptions = {
        "company_rule_expert": "社内規定に関して詳しいエージェント。",
        "sales_expert": "営業の社内情報やルールに詳しいエージェント。",
        "general_information_expert": "一般的な知識や情報に詳しいエージェント。"
    }


    # エージェントを登録
    await CompanyRuleExpertAgent.register(runtime, "company_rule_expert", lambda: CompanyRuleExpertAgent(description="社内規定に関して詳しいエージェント。", model_client=agent_model_client))
    await SalesExpertAgent.register(runtime, "sales_expert", lambda: SalesExpertAgent(description="営業の社内情報に詳しいエージェント。", model_client=agent_model_client))
    await GeneralInformationExpertAgent.register(runtime, "general_information_expert", lambda: GeneralInformationExpertAgent(description="一般的な知識や情報に詳しいエージェント。", model_client=agent_model_client))
    await SummarizeAgent.register(runtime, "summarize_agent", lambda: SummarizeAgent(description="他のエージェントの回答を要約するエージェント。", model_client=agent_model_client))
    await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent(
        model_client=orchestrator_model_client,
        agent_types=agent_types,
        participant_descriptions=participant_descriptions
    ))

    # ランタイムを開始
    runtime.start()

    # ユーザーのタスクをOrchestratorAgentに送信
    user_task = UserTask(task=input)
    final_result = await runtime.send_message(user_task, AgentId("orchestrator", "default"))

    # 結果を表示
    print(f"{'-'*80}\n最終結果:\n{final_result.result}")

    # ランタイムを停止
    await runtime.stop_when_idle()


asyncio.run(autogen("リモートワークの決まりを教えてください。"))

実行結果

以下が実行結果です。2つのエージェントが並列で実行され、集約された回答がsummarize_agentによって要約されていることが確認できます。

--------------------------------------------------------------------------------
Orchestrator-orchestrator/default:
Received task: リモートワークの決まりを教えてください。
selected_agents_text: company_rule_expert, sales_expert
selected agents: ['company_rule_expert', 'sales_expert']
--------------------------------------------------------------------------------
ReseachAgent-company_rule_expert/company_rule_expert:
Received task: リモートワークの決まりを教えてください。
company_rule_expert/company_rule_expert starting task at 2025-02-25 14:37:25.375512
--------------------------------------------------------------------------------
ReseachAgent-sales_expert/sales_expert:
Received task: リモートワークの決まりを教えてください。
sales_expert/sales_expert starting task at 2025-02-25 14:37:25.382560
chat completion with the tools: [FunctionCall(id='call_Ih1eSCanFgyQEJsGhinUopVb', arguments='{"input":"リモートワーク"}', name='search_rules_tool')]
query: リモートワーク
chat completion with the tools: [FunctionCall(id='call_dWmqWeXPNiltji5UdkDpdSk5', arguments='{"input":"リモートワークの決まり"}', name='search_sales_tool')]

company_rule_expertのtool結果
query: リモートワークの決まり
rule tool returns: "
        リモートワークの規定:
        リモートワークは週に3回まで可能とします。
        業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。
        セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。
        "

sales_expertのtool結果
input: リモートワーク
sales tool returns: "
        営業の規定:
        営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。
        また、競合情報の取扱いには注意し、守秘義務を厳守してください。
        リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。
        "

company_rule_expert/company_rule_expert finished task at 2025-02-25 14:37:33.372355
Duration for agent company_rule_expert/company_rule_expert: 0:00:07.996843
sales_expert/sales_expert finished task at 2025-02-25 14:37:35.285284
Duration for agent sales_expert/sales_expert: 0:00:09.902724

集約された結果:
company_rule_expert:
リモートワークに関する社内規定は以下の通りです。

1. **リモートワークの頻度**: 週に3回までのリモートワークが許可されています。
2. **業務時間中の勤務専念義務**: 業務時間中は勤務に専念する義務があります。業務報告や進捗共有は指定されたツールを使用して行う必要があります。
3. **セキュリティポリシーの遵守**: 機密情報の取り扱いに関しては、セキュリティポリシーを厳守し、特に注意を払うことが求められます。

これらの規定を守ることで、リモートワークの効果を最大限に引き出すことができます。
sales_expert:
リモートワークに関する決まりは以下の通りです:

1. **顧客情報の管理**:顧客情報は社内の指定ツールに正確に記録することが求められます。これにより、チーム全体が最新の情報を把握できます。
2. **提案資料と契約書**:営業で使用する提案資料や契約書は、事前に承認を得てから使用する必要があります。これにより、内容の確認や法的なリスクを回避することができます。
3. **競合情報の取扱い**:競合情報の扱いには注意が必要で、守秘義務を厳守することが求められます。
4. **オンライン会議の活用**:リモート営業時には、オンライン会議ツールを活用して顧客やチームとのコミュニケーションを行います。
5. **進捗報告**:定期的に上司へ営業の進捗報告を行うことが重要です。この報告は、情報共有や業務改善に役立ちます。
6. **情報セキュリティの遵守**:リモートワークにおいては、情報セキュリティを特に重視し、安全な環境で業務を遂行することが求められます。

これらのルールを遵守することで、リモートワークでもスムーズな営業活動が行えるようになります。

--------------------------------------------------------------------------------
Summarizor-summarize_agent/default:
Received task:
 company_rule_expert:
リモートワークに関する社内規定は以下の通りです。

1. **リモートワークの頻度**: 週に3回までのリモートワークが許可されています。
2. **業務時間中の勤務専念義務**: 業務時間中は勤務に専念する義務があります。業務報告や進捗共有は指定されたツールを使用して行う必要があります。
3. **セキュリティポリシーの遵守**: 機密情報の取り扱いに関しては、セキュリティポリシーを厳守し、特に注意を払うことが求められます。

これらの規定を守ることで、リモートワークの効果を最大限に引き出すことができます。
sales_expert:
リモートワークに関する決まりは以下の通りです:

1. **顧客情報の管理**:顧客情報は社内の指定ツールに正確に記録することが求められます。これにより、チーム全体が最新の情報を把握できます。
2. **提案資料と契約書**:営業で使用する提案資料や契約書は、事前に承認を得てから使用する必要があります。これにより、内容の確認や法的なリスクを回避することができます。
3. **競合情報の取扱い**:競合情報の扱いには注意が必要で、守秘義務を厳守することが求められます。
4. **オンライン会議の活用**:リモート営業時には、オンライン会議ツールを活用して顧客やチームとのコミュニケーションを行います。
5. **進捗報告**:定期的に上司へ営業の進捗報告を行うことが重要です。この報告は、情報共有や業務改善に役立ちます。
6. **情報セキュリティの遵守**:リモートワークにおいては、情報セキュリティを特に重視し、安全な環境で業務を遂行することが求められます。

これらのルールを遵守することで、リモートワークでもスムーズな営業活動が行えるようになります。
summarize_agent/default starting task at 2025-02-25 14:37:35.287333
summarize_agent/default finished task at 2025-02-25 14:37:42.996001
Duration for agent summarize_agent/default: 0:00:07.708668
--------------------------------------------------------------------------------

summarize_agentの最終回答
最終結果:
### 要約
- **company_rule_expert** はリモートワークに関する社内規定を以下のように示しました:
  1. 週に3回までのリモートワークの許可
  2. 業務時間中は勤務に専念
  3. セキュリティポリシーの遵守

- **sales_expert** はリモートワークの決まりを以下のように述べました:
  1. 顧客情報の管理は指定ツール
  2. 提案資料や契約書は事前承認が必要
  3. 競合情報の守秘義務
  4. オンライン会議の活用
  5. 進捗報告の定期的実施
  6. 情報セキュリティの遵守

### 意見の同じ点
- **情報セキュリティの遵守**:
  - **company_rule_expert** と **sales_expert** の両方で、情報セキュリティの遵守が重要であるとされています。

- **コミュニケーションの重要性**:
  - **sales_expert** はオンライン会議の活用を挙げており、**company_rule_expert** の業務報告や進捗共有の指定ツール使用も同様にコミュニケーションの重要性を示しています。

### 食い違う点
- **リモートワークの頻度**:
  - **company_rule_expert** は週に3回までのリモートワークを許可していますが、**sales_expert** では具体的なリモートワークの頻度についての言及はありません。

- **業務内容の特定性**:
  - **sales_expert** は営業に特化した規定(顧客情報の管理や進捗報告など)を挙げているのに対し、**company_rule_expert** は広範な業務に関する一般的なルールを提示しています。

### 結論
リモートワークに関する規定は、全体として情報セキュリティや業務専念の重要性を強調しているものの、リモートワークの頻度や業務内容に対する具体的な規定には差異があります。従って、リモートワークを行う際には、両者の規定を統合し、業務の特性に応じて適切に対応することが求められます。

終わりに

今回はAutoGen v0.4.7でのエージェントの並列処理に関して記載しました。

AutoGenはv0.4.xでstableになっていますが、バージョンの更新が非常に速いと感じています。検証を始めたタイミングではv0.4.6だったのですが、1か月の間にv0.4.7になっていましたので、実装する際は最新のバージョンの確認が必須だと思います。

Semantic KernelやLangGraphでのマルチエージェント、複数のエージェントの並列処理も試したので、そのあたりに関しても時間があれば記載していきたいです。

執筆担当者プロフィール
寺澤 駿

寺澤 駿(日本ビジネスシステムズ株式会社)

IoTやAzure Cognitive ServicesのAIを活用したデモ環境・ソリューション作成を担当。

担当記事一覧