今回は、複数エージェントの並列処理を含む、マルチエージェントの構築に関して記載します。
マルチエージェントの構築は、Semantic Kernelのフレームワーク(Agent Framework)を利用して行います。
※ AutoGen v0.4.7でのエージェントの並列処理に関しては、下記記事にて記載しています。
実行環境
- Python 3.11
- semantic-kernel 1.20.0
並列処理のシナリオ
今回は以下のような流れで回答を生成するマルチエージェントを作成します。
- 質問の入力
- LLMでエージェントの選択
- 選択された複数のエージェントを並列で処理
- 回答の集約し最終的な回答を生成
※toolはSemantic KernelではFunctionのことです。
Semantic Kernelでのエージェント並列実行
Semantic Kernel Agent Frameworkを使い、先ほどのマルチエージェントを構築します。
2025年1月時点では、このフレームワークで、現状複数のエージェントをLLMで自動で選択し、エージェントを並列で実行する機能はないようです。
今回はやや力技にはなりますが、目指すシナリオを実現するための実装について以下に記載します。
※ Semantic Kernel Agent Framework はまだ開発中であり、変更される可能性があります。
※ すべてのドキュメントを完全には網羅できていないので、新しいアップデートや既存の情報を引き続き調査していく予定です。
エージェントが利用するPluginの定義
pluginのfunctionが返す内容は、架空のサンプルを利用しています。
class InternalDocumentationOfSalesRAGPlugin(): ''' 営業に関するドキュメントのRAG plugin ''' @kernel_function(description="search documents of sales") async def search_sales_mock(self, query: str) -> Annotated[str, "data of sales"]: print(f"query: {query}") # Simulate a long-running operation await asyncio.sleep(5) mock_sales_data = """ 営業の規定: 営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。 また、競合情報の取扱いには注意し、守秘義務を厳守してください。 リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。 """ print(f'sales tool returns: "{mock_sales_data}"') return mock_sales_data class InternalDocumentationRAGPlugin(): ''' 社内規定に関するドキュメントのRAG plugin ''' @kernel_function(name="search_rule", description="search documents about internal company rules") async def search_rules_mock(self, query: str) -> Annotated[str, "data of company rules"]: print(f"query: {query}") # Simulate a long-running operation await asyncio.sleep(5) mock_rule_data = """ リモートワークの規定: リモートワークは週に3回まで可能とします。 業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。 セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。 """ print(f'rule tool returns: "{mock_rule_data}"') return mock_rule_data
エージェントの定義
エージェントの名前と指示を定義します。
# エージェントを定義 SUPERVISOR_NAME = "SupervisorAgent" SUPERVISOR_INSTRUCTIONS = """ Your responsibility is to review the content given to you by the other agent. The goal is to determine if the given answer from other agents includes specific answer to the qestion. If so, state that it is approved. Only include the word "approved" in English if it is so. If not or the given answer can be improved, provide insight on how to refine suggested answer without example. You should always tie the conversation back to the researcher by the plugin. Answer in the language the question was asked if answer is not "approved". """ SummarizeAgent_NAME = "SummarizeAgent" SummarizeAgent_INSTRUCTIONS = """ これまでのエージェントからの回答を要約し、エージェントの意見の同じ点と食い違う点、そこから導き出される結論を回答する。 エージェントの意見の同じ点と食い違う点を記載する際には、どのエージェントからの回答かエージェントのnameを明記する。 「research agentsは実行されませんでした。」のように他のエージェントの回答がない場合は、その旨を回答する。 """ CompanyDocumentsResearcher_NAME = "CompanyRuleDocumentsResearcher" CompanyDocumentsResearcher_INSTRUCTIONS =""" あなたは社内規定に関して詳しいエージェントです。トピックに対し社内規定を踏まえた回答をしてください。 InternalDocumentationRAGPluginを利用して社内規定の情報を取得し回答を生成します。 """ SalseSpecialistAgent_NAME = "SalesSpecialistAgent" SalseSpecialistAgent_INSTRUCTIONS = """ あなたは社内の営業に関して詳しいエージェントです。トピックに対し営業の情報を踏まえた回答を提案してください。 InternalDocumentationOfSalesRAGPluginを利用して営業に関する情報を取得し回答を生成します。 """ GeneralAnswerAgent_NAME = "GeneralAnswerAgent" GeneralAnswerAgent_INSTRUCTIONS = """ あなたは一般的な知識や情報に関して詳しいエージェントです。トピックに対し世間一般の知識や情報を踏まえたアイデアを提案してください。 """
エージェントの作成
上記で定義した名前と指示を使い、エージェントを作成します。
def _create_kernel_with_chat_completion(service_id: str) -> Kernel: kernel = Kernel() chat_completion = AzureChatCompletion( deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT"), api_key=os.getenv("deployment_api_key"), base_url=os.getenv("deployment_base_url"), service_id=service_id ) kernel.add_service(chat_completion) @kernel.filter(FilterTypes.AUTO_FUNCTION_INVOCATION) async def auto_function_invocation_filter(context: AutoFunctionInvocationContext, next): logging.info("\nAuto function invocation filter") # 履歴やコール内容のログを追加処理 function_calls = context.chat_history.messages[-1].items logging.info(f"Number of function calls: {len(function_calls)}") await next(context) return kernel def create_chat_agent(service_id: str, name: str, instructions: str, kernel: Kernel, plugin=None, plugin_name=None) -> ChatCompletionAgent: if plugin and plugin_name: kernel.add_plugin(plugin=plugin, plugin_name=plugin_name) # semantic-kernel v1.20 arguments = KernelArguments( settings=PromptExecutionSettings( # Set the function_choice_behavior to auto to let the model # decide which function to use, and let the kernel automatically # execute the functions. function_choice_behavior=FunctionChoiceBehavior.Auto(), ) ) else: return ChatCompletionAgent( service_id=service_id, kernel=kernel, name=name, instructions=instructions ) return ChatCompletionAgent( service_id=service_id, kernel=kernel, name=name, instructions=instructions, #execution_settings=settings, arguments=arguments, ) async def main_async(): #logging.basicConfig(level=logging.INFO) # エージェントを作成 agent_Supervisor = create_chat_agent( service_id="SupervisorAgent", name=SUPERVISOR_NAME, instructions=SUPERVISOR_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("SupervisorAgent") ) agent_SummarizeAgent = create_chat_agent( service_id="SummarizeAgent", name="SummarizeAgent", instructions=SummarizeAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("SummarizeAgent") ) agent_documentsResearcher = create_chat_agent( service_id="company_rule_documents_researcher", name=CompanyDocumentsResearcher_NAME, instructions=CompanyDocumentsResearcher_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("company_rule_documents_researcher"), plugin=InternalDocumentationRAGPlugin(), plugin_name="InternalDocumentaionRAG" ) agent_salesSpecialist = create_chat_agent( service_id="salse_specialist_agent", name=SalseSpecialistAgent_NAME, instructions=SalseSpecialistAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("salse_specialist_agent"), plugin=InternalDocumentationOfSalesRAGPlugin(), plugin_name="SalesRAGPlugin" ) agent_generalAnswer = create_chat_agent( service_id="general_answer_agent", name=GeneralAnswerAgent_NAME, instructions=GeneralAnswerAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("general_answer_agent") ) ...
SelectionStrategyの定義
Semantic Kernel Agent FrameworkのAgentGroupChatでは、SelectionStrategyを使い回答するエージェントを選択します。
デフォルトの設定では1つのエージェントのみが選択されるため、複数のエージェントを選択できるように、該当メソッドをオーバーライドして機能を拡張します。
SelectionStrategyのクラスに関しては、以下のリンクでご確認いただけます。
- KernelFunctionSelectionStrategy Class:
class MultiAgentKernelFunctionSelectionStrategy(KernelFunctionSelectionStrategy): """Custom strategy to select multiple agents based on the Kernel Function.""" async def select_agents( self, agents: List[Agent], history: List[ChatMessageContent], ) -> List[Agent]: """Select multiple agents based on the Kernel Function. Args: agents: The list of agents to select from. history: The history of messages in the conversation. Returns: A list of agents to interact with. Raises: AgentExecutionException: If the strategy fails to select agents. """ if self.history_reducer is not None: self.history_reducer.messages = history reduced_history = await self.history_reducer.reduce() if reduced_history is not None: history = reduced_history.messages original_arguments = self.arguments or KernelArguments() execution_settings = original_arguments.execution_settings or {} messages = [message.to_dict(role_key="role", content_key="content") for message in history] filtered_arguments = { self.agent_variable_name: ",".join(agent.name for agent in agents), self.history_variable_name: messages, } extracted_settings = {key: setting.model_dump() for key, setting in execution_settings.items()} combined_arguments = { **original_arguments, **extracted_settings, **{k: v for k, v in filtered_arguments.items()}, } arguments = KernelArguments( **combined_arguments, ) logging.info( f"Kernel Function Selection Strategy next method called, " f"invoking function: {self.function.plugin_name}, {self.function.name}", ) try: result = await self.function.invoke(kernel=self.kernel, arguments=arguments) except Exception as ex: logger.error("Kernel Function Selection Strategy select_agents method failed", exc_info=ex) raise AgentExecutionException("Agent Failure - Strategy failed to execute function.") from ex logging.info( f"Kernel Function Selection Strategy select_agents method completed: " f"{self.function.plugin_name}, {self.function.name}, result: {result.value if result else None}", ) # Parse the result from LLM to determine selected agents agent_names = self._parse_agent_names(result) #print(f"agent_names: {agent_names}") if isawaitable(agent_names): agent_names = await agent_names if not agent_names: raise AgentExecutionException("Agent Failure - Strategy unable to determine next agents.") # Match names to agent instances selected_agents = [agent for agent in agents if agent.name in agent_names] if not selected_agents: raise AgentExecutionException(f"Agent Failure - Strategy unable to select any agents from: {agent_names}") return selected_agents async def next( self, agents: List[Agent], history: List[ChatMessageContent], ) -> List[Agent]: """Override next to return a list of agents.""" if not agents and self.initial_agent is None: raise AgentExecutionException("Agent Failure - No agents present to select.") if not self.has_selected and self.initial_agent is not None: selected_agents = [self.initial_agent] else: selected_agents = await self.select_agents(agents, history) self.has_selected = True return selected_agents def _parse_agent_names(self, result) -> List[str]: """Parse LLM result to extract agent names. Args: result: The result from the Kernel Function. Returns: A list of agent names. """ if result is None or result.value is None: return [] if isinstance(result.value, ChatMessageContent): # デバッグ: 内部構造をチェック logging.info(f"Result content: {result.value.inner_content}") # inner_content を直接返す return result.value.inner_content return str(result.value)
SelectionStrategyで利用するfunctionは以下のように定義し、2エージェント以上を選択するように設定します。
selection_function = KernelFunctionFromPrompt( function_name="selection", prompt=f""" Determine which more than Two participant takes the next turn in a conversation based on the the most recent participant. State only the name of the participants to take the next turn based on the following agents' description. "{CompanyDocumentsResearcher_NAME}": "社内規定に関して詳しいエージェント。", "{SalseSpecialistAgent_NAME}": "営業の社内情報やルールに詳しいエージェント。", "{GeneralAnswerAgent_NAME}": "一般的な知識や情報に詳しいエージェント。" Choose only from these participants: - {CompanyDocumentsResearcher_NAME} - {SalseSpecialistAgent_NAME} - {GeneralAnswerAgent_NAME} make sure that more than 2 agents are selected. History: {{{{$history}}}} """, )
AgentChatGroupの設定
上記のSelectionStrategyで複数のエージェントの選択が可能になったので、次は選択されたエージェントを並列で実行していきます。
AgentChatGroupでinvoke_agent(agent)のメソッドを複数のエージェントで同時に実行すると、次のエラーが発生します。
Exception: Unable to proceed while another agent is active.
1つのAgentChatGroupでは、複数のエージェントが同時にアクティブな状態にならないように制御されているようです。
この制限を回避するために、エージェントを並列で実行する際はそれぞれ別のAgentChatGroupを作成し、同時に処理を行います。
並列での処理終了後、もともとのAgentChatGroupに各エージェントの回答を追加することで統合します。
async def process_agent_with_individual_chat(agent, userInput): # エージェントごとに独立したAgentChatインスタンスを作成 individual_chat = AgentGroupChat(agents=[agent]) # ユーザーからの質問をエージェントの履歴に追加 await individual_chat.add_chat_message(ChatMessageContent(role=AuthorRole.USER, content=userInput)) # メッセージ履歴を保持するリスト agent_messages = [] start_time = datetime.datetime.now() print(f"Starting processing for agent: {agent.name} at {start_time}") # エージェントの応答を処理 async for response in individual_chat.invoke_agent(agent): print(f"# Response from {agent.name}: {response.content}") # 各エージェントのメッセージを収集 agent_messages.append(response) end_time = datetime.datetime.now() print(f"Finished processing for agent: {agent.name} at {end_time}") print(f"Duration for agent {agent.name}: {end_time - start_time}") # 処理が終わったら履歴を返す return agent_messages async def main_async(): .... # エージェントごとに処理を並列で実行し、履歴を収集 tasks = [process_agent_with_individual_chat(agent, userInput) for agent in selected_agents] all_agent_histories = await asyncio.gather(*tasks) # 各エージェントの履歴をメインのチャット履歴に統合 for agent_history in all_agent_histories: for message in agent_history: await chat.add_chat_message(message) # 各メッセージをメインチャット履歴に追加 ....
コード全体
長いため折りたたみます。
Semantic Kernelコード全体(クリックで開く)
# 標準ライブラリ import asyncio import datetime import logging import os from inspect import isawaitable from typing import Annotated, List # Semantic Kernelモジュール from semantic_kernel import Kernel from semantic_kernel.agents import Agent, AgentGroupChat, ChatCompletionAgent from semantic_kernel.agents.strategies.selection.kernel_function_selection_strategy import ( KernelFunctionSelectionStrategy, ) from semantic_kernel.agents.strategies.termination.kernel_function_termination_strategy import ( KernelFunctionTerminationStrategy, ) from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings from semantic_kernel.contents.chat_message_content import ChatMessageContent from semantic_kernel.contents.utils.author_role import AuthorRole from semantic_kernel.exceptions.agent_exceptions import AgentExecutionException from semantic_kernel.filters.auto_function_invocation.auto_function_invocation_context import AutoFunctionInvocationContext from semantic_kernel.filters.filter_types import FilterTypes from semantic_kernel.functions import kernel_function from semantic_kernel.functions.kernel_arguments import KernelArguments from semantic_kernel.functions.kernel_function_from_prompt import KernelFunctionFromPrompt # エージェントを定義 SUPERVISOR_NAME = "SupervisorAgent" SUPERVISOR_INSTRUCTIONS = """ Your responsibility is to review the content given to you by the other agent. The goal is to determine if the given answer from other agents includes specific answer to the qestion. If so, state that it is approved. Only include the word "approved" in English if it is so. If not or the given answer can be improved, provide insight on how to refine suggested answer without example. You should always tie the conversation back to the researcher by the plugin. Answer in the language the question was asked if answer is not "approved". """ SummarizeAgent_NAME = "SummarizeAgent" SummarizeAgent_INSTRUCTIONS = """ これまでのエージェントからの回答を要約し、エージェントの意見の同じ点と食い違う点、そこから導き出される結論を回答する。 エージェントの意見の同じ点と食い違う点を記載する際には、どのエージェントからの回答かエージェントのnameを明記する。 「research agentsは実行されませんでした。」のように他のエージェントの回答がない場合は、その旨を回答する。 """ CompanyDocumentsResearcher_NAME = "CompanyRuleDocumentsResearcher" CompanyDocumentsResearcher_INSTRUCTIONS =""" あなたは社内規定に関して詳しいエージェントです。トピックに対し社内規定を踏まえた回答をしてください。 InternalDocumentationRAGPluginを利用して社内規定の情報を取得し回答を生成します。 """ GeneralAnswerAgent_NAME = "GeneralAnswerAgent" GeneralAnswerAgent_INSTRUCTIONS = """ あなたは一般的な知識や情報に関して詳しいエージェントです。トピックに対し世間一般の知識や情報を踏まえたアイデアを提案してください。 """ SalseSpecialistAgent_NAME = "SalesSpecialistAgent" SalseSpecialistAgent_INSTRUCTIONS = """ あなたは社内の営業に関して詳しいエージェントです。トピックに対し営業の情報を踏まえた回答を提案してください。 InternalDocumentationOfSalesRAGPluginを利用して営業に関する情報を取得し回答を生成します。 """ class InternalDocumentationOfSalesRAGPlugin(): ''' 営業に関するドキュメントのRAG plugin ''' @kernel_function(description="search documents of sales") async def search_sales_mock(self, query: str) -> Annotated[str, "data of sales"]: print(f"query: {query}") # Simulate a long-running operation await asyncio.sleep(5) mock_sales_data = """ 営業の規定: 営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。 また、競合情報の取扱いには注意し、守秘義務を厳守してください。 リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。 """ print(f'sales tool returns: "{mock_sales_data}"') return mock_sales_data class InternalDocumentationRAGPlugin(): ''' 社内規定に関するドキュメントのRAG plugin ''' @kernel_function(name="search_rule", description="search documents about internal company rules") async def search_rules_mock(self, query: str) -> Annotated[str, "data of company rules"]: print(f"query: {query}") # Simulate a long-running operation await asyncio.sleep(5) mock_rule_data = """ リモートワークの規定: リモートワークは週に3回まで可能とします。 業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。 セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。 """ print(f'rule tool returns: "{mock_rule_data}"') return mock_rule_data def _create_kernel_with_chat_completion(service_id: str) -> Kernel: kernel = Kernel() chat_completion = AzureChatCompletion( deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT"), api_key=os.getenv("deployment_api_key"), base_url=os.getenv("deployment_base_url"), service_id=service_id ) kernel.add_service(chat_completion) @kernel.filter(FilterTypes.AUTO_FUNCTION_INVOCATION) async def auto_function_invocation_filter(context: AutoFunctionInvocationContext, next): logging.info("\nAuto function invocation filter") # 履歴やコール内容のログを追加処理 function_calls = context.chat_history.messages[-1].items logging.info(f"Number of function calls: {len(function_calls)}") await next(context) return kernel def create_chat_agent(service_id: str, name: str, instructions: str, kernel: Kernel, plugin=None, plugin_name=None) -> ChatCompletionAgent: if plugin and plugin_name: kernel.add_plugin(plugin=plugin, plugin_name=plugin_name) # v1.20 arguments = KernelArguments( settings=PromptExecutionSettings( # Set the function_choice_behavior to auto to let the model # decide which function to use, and let the kernel automatically # execute the functions. function_choice_behavior=FunctionChoiceBehavior.Auto(), ) ) else: #settings = None return ChatCompletionAgent( service_id=service_id, kernel=kernel, name=name, instructions=instructions ) return ChatCompletionAgent( service_id=service_id, kernel=kernel, name=name, instructions=instructions, arguments=arguments, ) class MultiAgentKernelFunctionSelectionStrategy(KernelFunctionSelectionStrategy): """Custom strategy to select multiple agents based on the Kernel Function.""" async def select_agents( self, agents: List[Agent], history: List[ChatMessageContent], ) -> List[Agent]: """Select multiple agents based on the Kernel Function. Args: agents: The list of agents to select from. history: The history of messages in the conversation. Returns: A list of agents to interact with. Raises: AgentExecutionException: If the strategy fails to select agents. """ if self.history_reducer is not None: self.history_reducer.messages = history reduced_history = await self.history_reducer.reduce() if reduced_history is not None: history = reduced_history.messages original_arguments = self.arguments or KernelArguments() execution_settings = original_arguments.execution_settings or {} messages = [message.to_dict(role_key="role", content_key="content") for message in history] filtered_arguments = { self.agent_variable_name: ",".join(agent.name for agent in agents), self.history_variable_name: messages, } extracted_settings = {key: setting.model_dump() for key, setting in execution_settings.items()} combined_arguments = { **original_arguments, **extracted_settings, **{k: v for k, v in filtered_arguments.items()}, } arguments = KernelArguments( **combined_arguments, ) logging.info( f"Kernel Function Selection Strategy next method called, " f"invoking function: {self.function.plugin_name}, {self.function.name}", ) try: result = await self.function.invoke(kernel=self.kernel, arguments=arguments) except Exception as ex: logger.error("Kernel Function Selection Strategy select_agents method failed", exc_info=ex) raise AgentExecutionException("Agent Failure - Strategy failed to execute function.") from ex logging.info( f"Kernel Function Selection Strategy select_agents method completed: " f"{self.function.plugin_name}, {self.function.name}, result: {result.value if result else None}", ) # Parse the result from LLM to determine selected agents agent_names = self._parse_agent_names(result) #print(f"agent_names: {agent_names}") if isawaitable(agent_names): agent_names = await agent_names if not agent_names: raise AgentExecutionException("Agent Failure - Strategy unable to determine next agents.") # Match names to agent instances selected_agents = [agent for agent in agents if agent.name in agent_names] if not selected_agents: raise AgentExecutionException(f"Agent Failure - Strategy unable to select any agents from: {agent_names}") return selected_agents async def next( self, agents: List[Agent], history: List[ChatMessageContent], ) -> List[Agent]: """Override next to return a list of agents.""" if not agents and self.initial_agent is None: raise AgentExecutionException("Agent Failure - No agents present to select.") if not self.has_selected and self.initial_agent is not None: selected_agents = [self.initial_agent] else: selected_agents = await self.select_agents(agents, history) self.has_selected = True return selected_agents def _parse_agent_names(self, result) -> List[str]: """Parse LLM result to extract agent names. Args: result: The result from the Kernel Function. Returns: A list of agent names. """ if result is None or result.value is None: return [] if isinstance(result.value, ChatMessageContent): # デバッグ: 内部構造をチェック logging.info(f"Result content: {result.value.inner_content}") # inner_content を直接返す return result.value.inner_content return str(result.value) async def process_agent_with_individual_chat(agent, userInput): # エージェントごとに独立したAgentChatインスタンスを作成 individual_chat = AgentGroupChat(agents=[agent]) # ユーザーからの質問をエージェントの履歴に追加 await individual_chat.add_chat_message(ChatMessageContent(role=AuthorRole.USER, content=userInput)) # メッセージ履歴を保持するリスト agent_messages = [] start_time = datetime.datetime.now() print(f"Starting processing for agent: {agent.name} at {start_time}") # エージェントの応答を処理 async for response in individual_chat.invoke_agent(agent): print(f"# Response from {agent.name}: {response.content}") # 各エージェントのメッセージを収集 agent_messages.append(response) end_time = datetime.datetime.now() print(f"Finished processing for agent: {agent.name} at {end_time}") print(f"Duration for agent {agent.name}: {end_time - start_time}") # 処理が終わったら履歴を返す return agent_messages async def main_async(): #logging.basicConfig(level=logging.INFO) # 各エージェントを作成 agent_Supervisor = create_chat_agent( service_id="SupervisorAgent", name=SUPERVISOR_NAME, instructions=SUPERVISOR_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("SupervisorAgent") ) agent_SummarizeAgent = create_chat_agent( service_id="SummarizeAgent", name="SummarizeAgent", instructions=SummarizeAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("SummarizeAgent") ) agent_documentsResearcher = create_chat_agent( service_id="company_rule_documents_researcher", name=CompanyDocumentsResearcher_NAME, instructions=CompanyDocumentsResearcher_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("company_rule_documents_researcher"), plugin=InternalDocumentationRAGPlugin(), plugin_name="InternalDocumentaionRAG" ) agent_salesSpecialist = create_chat_agent( service_id="salse_specialist_agent", name=SalseSpecialistAgent_NAME, instructions=SalseSpecialistAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("salse_specialist_agent"), plugin=InternalDocumentationOfSalesRAGPlugin(), plugin_name="SalesRAGPlugin" ) # generalAnswerAgentの作成 agent_generalAnswer = create_chat_agent( service_id="general_answer_agent", name=GeneralAnswerAgent_NAME, instructions=GeneralAnswerAgent_INSTRUCTIONS, kernel=_create_kernel_with_chat_completion("general_answer_agent") ) selection_function = KernelFunctionFromPrompt( function_name="selection", prompt=f""" Determine which more than Two participant takes the next turn in a conversation based on the the most recent participant. State only the name of the participants to take the next turn based on the following agents' description. "{CompanyDocumentsResearcher_NAME}": "社内規定に関して詳しいエージェント。", "{SalseSpecialistAgent_NAME}": "営業の社内情報やルールに詳しいエージェント。", "{GeneralAnswerAgent_NAME}": "一般的な知識や情報に詳しいエージェント。" Choose only from these participants: - {CompanyDocumentsResearcher_NAME} - {SalseSpecialistAgent_NAME} - {GeneralAnswerAgent_NAME} make sure that more than 2 agents are selected. History: {{{{$history}}}} """, ) TERMINATION_KEYWORD = "satisfactory" termination_function = KernelFunctionFromPrompt( function_name="termination", prompt=f""" Examine the RESPONSE and determine whether the content has been deemed satisfactory. If {SUPERVISOR_NAME}'s response includes "approved", it is satisfactory. If RESPONSE includes more than two {SUPERVISOR_NAME}'s responses, it is satisfactory. If content is satisfactory, respond with single word without explanation: {TERMINATION_KEYWORD}. RESPONSE: {{{{$history}}}} """, ) # 新しい選択戦略を利用 selection_strategy = MultiAgentKernelFunctionSelectionStrategy( function=selection_function, kernel=_create_kernel_with_chat_completion("selection"), result_parser=lambda result: result.value, agent_variable_name="agents", history_variable_name="history", ) # Create chat with participating agents agents = [agent_Supervisor, agent_SummarizeAgent, agent_documentsResearcher, agent_generalAnswer, agent_salesSpecialist] chat = AgentGroupChat( agents=agents, selection_strategy=selection_strategy, termination_strategy=KernelFunctionTerminationStrategy( agents=[agent_Supervisor], function=termination_function, kernel = _create_kernel_with_chat_completion("termination"), result_parser=lambda result: TERMINATION_KEYWORD in str(result.value[0]).lower(), history_variable_name="history", maximum_iterations=6 ), ) is_completed: bool = False while not is_completed: # Collect user input userInput = input("User > ") if not userInput: print('empty input') continue # Terminate the loop if the user says "exit" if userInput == "exit": break if userInput.lower() == "reset": await chat.reset() print('[conversation has been reset]') continue # メッセージを追加 await chat.add_chat_message(ChatMessageContent(role=AuthorRole.USER, content=userInput)) # SelectionStrategyで選択された複数エージェントを処理 selected_agents = await chat.selection_strategy.next(chat.agents, chat.history.messages) # エージェントごとに処理を並列で実行し、履歴を収集 tasks = [process_agent_with_individual_chat(agent, userInput) for agent in selected_agents] all_agent_histories = await asyncio.gather(*tasks) # 各エージェントの履歴をメインのチャット履歴に統合 for agent_history in all_agent_histories: for message in agent_history: await chat.add_chat_message(message) # 各メッセージをメインチャット履歴に追加 # SummarizeAgentの処理 # Invoke an agent for its response(s) async for response in chat.invoke(agent_SummarizeAgent): # process agent response(s) print(f"# {response.role} - {response.name or '*'}: '{response.content}'") # Supervisorの処理 # Invoke an agent for its response(s) async for response in chat.invoke(agent_Supervisor): # process agent response(s) print(f"# {response.role} - {response.name or '*'}: '{response.content}'") should_terminate = await chat.termination_strategy.should_terminate(agent_Supervisor, chat.history.messages) if should_terminate: print("Conversation has reached termination conditions.") else: print("Conversation will continue.") print(f"termination result: {should_terminate}") if should_terminate: print("chat is complete") is_completed = True break # asyncioイベントループを起動 asyncio.run(main_async())
実行結果
以下が実行結果です。3つのエージェントが並列で呼び出され、集約された回答がSummarizeAgentによって要約されたことが確認できます。
User > リモートの決まりを教えてください Starting processing for agent: CompanyRuleDocumentsResearcher at 2025-02-26 09:43:00.203815 Starting processing for agent: GeneralAnswerAgent at 2025-02-26 09:43:00.221718 Starting processing for agent: SalesSpecialistAgent at 2025-02-26 09:43:00.227719 --------------------------------------------------------------- # Response from GeneralAnswerAgent: リモートワーク(テレワーク)に関する決まりやルールは、企業や組織によって異なる場合がありますが、一般的に考慮すべきポイントやアプローチは以下の通りです。 ### 1. 勤務時間 - **勤務時間の設定**: 企業が定める勤務時間を遵守する。フレックスタイム制を導入する場合もある。 - **休憩時間**: 定期的に休憩をとることを推奨。 ### 2. コミュニケーション - **定期的なミーティング**: チームでの情報共有や進捗確認を目的とした定例会議の設定。 - **連絡手段の明確化**: メール、チャット、電話などの利用方法を決める。 ### 3. 作業環境 - **快適な作業スペースの確保**: 集中できる環境を整えることを推奨。 - **セキュリティ対策**: 自宅での作業時におけるデータ管理やセキュリティ対策についての指針を明記。 ### 4. タスク管理 - **タスクの明確化**: 各メンバーの役割やタスクの明確化を行う。進捗管理のツール活用も有効。 - **成果のフィードバック**: 定期的に成果に対しての評価やフィードバックを行う。 ### 5. バランス・メンタルヘルス - **ワークライフバランス**: 仕事とプライベートのバランスを保つためのガイドラインを設ける。 - **メンタルヘルスの支援**: 必要に応じて、メンタルヘルスに関するサポートや相談窓口を提供。 ### 6. 管理職のサポート - **透明性のあるリーダーシップ**: 管理職がメンバーの状況を把握しやすい環境を作る。 - **エンゲージメントの促進**: メンバーのモチベーションを高める取り組みを行う。 これらの決まりやルールはあくまで一般的なガイドラインですので、各組織の文化や状況に応じてカスタマイズすることが重要です。また、定期的に見直しを行い、改善を図ることも大切です。 Finished processing for agent: GeneralAnswerAgent at 2025-02-26 09:43:05.823301 Duration for agent GeneralAnswerAgent: 0:00:05.601583 --------------------------------------------------------------- rule tool returns: " リモートワークの規定: リモートワークは週に3回まで可能とします。 業務時間中の勤務専念義務を厳守し、指定ツールで業務報告と進捗共有を行うこと。 セキュリティポリシーを徹底遵守し、機密情報の取り扱いに十分注意してください。 " sales tool returns: " 営業の規定: 営業活動では、顧客情報は社内ツールに正確に記録し、提案資料や契約書は事前承認を得て使用すること。 また、競合情報の取扱いには注意し、守秘義務を厳守してください。 リモート営業時はオンライン会議ツールを活用し、進捗報告を定期的に上司へ行い、情報セキュリティを遵守してください。 " --------------------------------------------------------------- # Response from CompanyRuleDocumentsResearcher: 社内のリモートワークに関する規定は以下の通りです: 1. **リモートワークの回数**: リモートワークは週に3回まで可能です。 2. **勤務専念義務**: 業務時間中は勤務に専念する義務があります。また、指定されたツールを用いて業務報告や進捗共有を行う必要があります。 3. **セキュリティポリシー**: セキュリティポリシーを厳守し、機密情報の取り扱いには十分な注意が必要です。 これらの規定を守り、効果的なリモートワークを行ってください。 Finished processing for agent: CompanyRuleDocumentsResearcher at 2025-02-26 09:43:09.024622 Duration for agent CompanyRuleDocumentsResearcher: 0:00:08.820807 --------------------------------------------------------------- # Response from SalesSpecialistAgent: リモート営業に関する決まりは以下の通りです: 1. **顧客情報の記録**:営業活動では、顧客情報を社内ツールに正確に記録することが求められます。 2. **資料の利用**:提案資料や契約書は事前に承認を得てから使用すること。 3. **競合情報の取扱い**:競合情報の取り扱いには注意が必要で、守秘義務を厳守してください。 4. **オンライン会議の活用**:リモート営業時にはオンライン会議ツールを利用することが推奨されています。 5. **進捗報告**:定期的に上司へ進捗報告を行い、情報セキュリティを遵守することが重要です。 これらのルールを守ることで、リモート営業を円滑に行うことができます。 Finished processing for agent: SalesSpecialistAgent at 2025-02-26 09:43:09.332482 Duration for agent SalesSpecialistAgent: 0:00:09.104763 --------------------------------------------------------------- # AuthorRole.ASSISTANT - SummarizeAgent: 'エージェントからのリモートワークに関する回答を要約すると、以下のようになります。 ### 同じ点 - **業務時間の遵守**: どのエージェントも、業務時間中は勤務に専念することが重要だと述べています(CompanyRuleDocumentsResearcher、GeneralAnswerAgent)。 - **コミュニケーションの重要性**: 定期的なミーティングや報告を通じて情報共有を行うことが推奨されています(GeneralAnswerAgent、SalesSpecialistAgent)。 - **セキュリティ対策**: 機密情報の取り扱いやデータ管理に関する規定が強調されています(CompanyRuleDocumentsResearcher、SalesSpecialistAgent)。 ### 食い違う点 - **リモート勤務の回数**: 一部エージェントはリモートワークの回数を明記していますが(CompanyRuleDocumentsResearcher: 週に3回まで)、他のエージェントは具体的な回数には言及していません(GeneralAnswerAgent、SalesSpecialistAgent)。 - **タスク管理の方法**: タスク管理について具体的な方法やツールの提案はGeneralAnswerAgentにのみ見られ、他のエージェントでは触れられていません。 ### 導き出される結論 リモートワークにおいては、業務時間を守ること、コミュニケーションを重視すること、そしてセキュリティに注意を払うことが共通して重要であるという点が強調されます。しかし、リモート勤務の回数や具体的なタスク管理方法についての方針は組織によって異なるため、各組織の方針に従った運用が必要です。このため、自身の組織内での方針を明確に理解し、それに基づいて行動することが円滑なリモートワークにつながるでしょう。' --------------------------------------------------------------- # AuthorRole.ASSISTANT - SupervisorAgent: 'リモートの決まりに関する具体的な情報が含まれており、内容が明確に示されています。したがって、これは承認されました。' Conversation has reached termination conditions. termination result: True chat is complete
別の方法でのSemantic Kernelでの並列実行の検討
AgentGroupChatを利用せず、Agentを並列で実行するように実装する
AgentGroupChatでは複数のエージェントを同時に実行することが想定されていません。
そのため、AgentGroupChatを利用せずに独自でエージェントの並列処理とチャット履歴の管理機能を実装することで、シンプルな並列実行が可能になるかもしれません。
Functionを並列で実行する
Semantic kernelでは、pluginに含めた複数のfunctionを並列で実行することが可能です。
厳密にはエージェントの並列実行ではありませんが、RAGなどをfunctionとして利用する場合は、複数のRAGを並列で実行し1つのエージェントに回答を生成させることはできそうです。
※toolはSemantic KernelではFunctionのことです。
終わりに
今回はSemantic Kernelでのエージェントの並列処理に関して記載しました。
元々AgentGroupChatで複数のエージェントを同時に実行できないことを知らなかったため、AgentGroupChatを利用した方法になっていますが、利用せずに実装すればもう少しシンプルになるかもしれません。
AutoGenやLangGraphでのマルチエージェント、複数のエージェントの並列処理も試したので、AutoGenは次の記事で、それ以外も時間があれば別記事で改めて記載したいと思います。