autogen_agentchat.teams#

该模块提供了各种预定义的多智能体团队的实现。 每个团队都继承自BaseGroupChat类。

class BaseGroupChat(participants: 列表[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: 类型[SequentialRoutedAgent], termination_condition: 终止条件 | = None, max_turns: int | = None, runtime: AgentRuntime | = None)[源代码]#

基础类: Team, ABC, ComponentBase[BaseModel]

群聊团队的基础类。

要实现一个群聊团队,首先创建一个BaseGroupChatManager的子类,然后创建一个使用该群聊管理器的BaseGroupChat的子类。

component_type: ClassVar[ComponentType] = 'team'#

组件的逻辑类型。

async load_state(state: 映射[str, 任何]) [源代码]#

加载外部状态并覆盖群聊团队的当前状态。

状态通过在每个参与者和群聊管理器上调用agent_load_state()方法并传入其内部代理ID来加载。 有关状态的预期格式,请参阅save_state()

async pause() [源代码]#

当团队运行时,通过直接RPC调用暂停其参与者的on_pause()方法。

注意

这是一个在v0.4.9版本中引入的实验性功能,未来可能会发生变化或被移除。

团队必须在暂停之前被初始化。

与终止不同,暂停团队不会导致 run()run_stream() 方法返回。它会在每个参与者上调用 on_pause() 方法,如果参与者没有实现该方法, 这将是一个空操作。

注意

代理类的责任是处理暂停 并确保代理可以在稍后恢复。 请确保在您的代理类中实现on_pause() 方法以自定义暂停行为。 默认情况下,代理在被调用时不会执行任何操作。

Raises:

RuntimeError – 如果团队尚未初始化。当调用参与者的on_pause实现时,从参与者抛出的异常会传播到此方法并引发。

async reset() [源代码]#

重置团队及其参与者到初始状态。

团队必须在重置之前被阻止。

Raises:

RuntimeError – 如果团队尚未初始化或当前正在运行。

使用RoundRobinGroupChat团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # Reset the team.
    await team.reset()
    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)


asyncio.run(main())
async resume() [源代码]#

当团队正在运行并被暂停时,通过直接RPC调用其成员的on_resume()方法来恢复它们的执行。

注意

这是一个在v0.4.9版本中引入的实验性功能,未来可能会发生变化或被移除。

团队必须在恢复之前被初始化。

与终止任务并重新开始不同,恢复团队不会导致run()run_stream()方法返回。它会调用每个参与者的on_resume()方法,如果参与者没有实现该方法,则不会执行任何操作。

注意

这是代理类的责任,负责处理恢复并确保代理从其暂停的地方继续。确保在你的代理类中实现on_resume()方法以自定义恢复行为。

Raises:

RuntimeError – 如果团队尚未初始化。调用参与者实现on_resume方法时的异常会传播到该方法并引发。

async run(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Sequence[Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] | = None, cancellation_token: CancellationToken | = None) TaskResult[源代码]#

运行团队并返回结果。基础实现使用 run_stream()来运行团队,然后返回最终结果。 一旦团队停止,终止条件将被重置。

Parameters:
  • 任务 (str | ChatMessage | Sequence[ChatMessage] | None) – 运行团队的任务。可以是一个字符串,单个 ChatMessage,或一个 ChatMessage的列表。

  • cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。 设置取消令牌可能使团队处于不一致状态, 并且可能不会重置终止条件. 要优雅地停止团队,请改用 ExternalTermination

Returns:

结果 – 任务的结果为 TaskResult。结果包含团队生成的消息和停止原因。

使用RoundRobinGroupChat团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    result = await team.run(task="Count from 1 to 10, respond one at a time.")
    print(result)

    # Run the team again without a task to continue the previous task.
    result = await team.run()
    print(result)


asyncio.run(main())

使用CancellationToken来取消任务的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # Create a task to run the team in the background.
    run_task = asyncio.create_task(
        team.run(
            task="Count from 1 to 10, respond one at a time.",
            cancellation_token=cancellation_token,
        )
    )

    # Wait for 1 second and then cancel the task.
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # This will raise a cancellation error.
    await run_task


asyncio.run(main())
async run_stream(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Sequence[Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] | = None, cancellation_token: CancellationToken | = None) AsyncGenerator[Annotated[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | TaskResult, ][源代码]#

运行团队并生成一系列消息和TaskResult类型的最终结果作为流中的最后一项。一旦团队停止,终止条件将被重置。

注意

如果代理生成 ModelClientStreamingChunkEvent, 消息将在流中产生,但不会包含在 messages 中。

Parameters:
  • 任务 (str | ChatMessage | Sequence[ChatMessage] | None) – 与团队一起运行的任务。可以是一个字符串,一个单一的ChatMessage,或一个ChatMessage的列表。

  • cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。 设置取消令牌可能使团队处于不一致状态, 并且可能不会重置终止条件. 要优雅地停止团队,请改用 ExternalTermination

Returns:

stream – 一个AsyncGenerator,生成AgentEventChatMessage,并将最终结果TaskResult作为流中的最后一项。

使用RoundRobinGroupChat团队的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="Count from 1 to 10, respond one at a time.")
    async for message in stream:
        print(message)

    # Run the team again without a task to continue the previous task.
    stream = team.run_stream()
    async for message in stream:
        print(message)


asyncio.run(main())

使用CancellationToken来取消任务的示例:

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = MaxMessageTermination(3)
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)

    cancellation_token = CancellationToken()

    # Create a task to run the team in the background.
    run_task = asyncio.create_task(
        Console(
            team.run_stream(
                task="Count from 1 to 10, respond one at a time.",
                cancellation_token=cancellation_token,
            )
        )
    )

    # Wait for 1 second and then cancel the task.
    await asyncio.sleep(1)
    cancellation_token.cancel()

    # This will raise a cancellation error.
    await run_task


asyncio.run(main())
async save_state() 映射[str, 任何][源代码]#

保存群聊团队的状态。

状态通过调用每个参与者和群聊管理器的agent_save_state()方法保存,使用它们各自的内部代理ID。状态以嵌套字典的形式返回:一个带有键agent_states的字典,该字典以代理名称为键,状态为值。

{
    "agent_states": {
        "agent1": ...,
        "agent2": ...,
        "RoundRobinGroupChatManager": ...
    }
}

注意

从v0.4.9开始,状态使用代理名称作为键,而不是代理ID, 并且team_id字段已从状态中移除。这是为了使状态能够在不同的团队和运行时之间可移植。使用旧格式保存的状态在未来可能与新格式不兼容。

注意

当在团队运行时调用save_state()时,状态可能不一致,并可能导致意外状态。建议在团队未运行或停止后调用此方法。

class MagenticOneGroupChat(participants: 列表[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | = None, max_turns: int | = 20, runtime: AgentRuntime | = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT)[源代码]#

基础:BaseGroupChat, Component[MagenticOneGroupChatConfig]

一个团队,通过MagenticOneOrchestrator管理参与者,并运行一个群聊。

协调器处理对话流程,通过管理参与者的互动来确保任务高效完成。

编排器基于Magnetic-One架构,这是一个用于解决复杂任务的通用多代理系统(请参阅下面的参考资料)。

Parameters:
  • participants (列表[ChatAgent]) – 参与群聊的成员。

  • model_client (ChatCompletionClient) – 用于生成响应的模型客户端。

  • termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将基于协调器逻辑运行,或者直到达到最大轮次。

  • max_turns (int, optional) – 组内聊天在停止前的最大轮次。默认为 20。

  • max_stalls (int, 可选) – 允许在重新规划之前的最大停滞次数。默认值为3。

  • final_answer_prompt (str, optional) – 用于从团队对话记录中生成最终答案或响应的LLM提示。提供了一个默认值(适用于GPT-4o类模型)。

Raises:

ValueError – 在编排逻辑中,如果进度分类账没有所需的键或下一个发言者无效。

示例:

MagenticOneGroupChat 与一个助理代理:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import MagenticOneGroupChat
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
    )
    team = MagenticOneGroupChat([assistant], model_client=model_client)
    await Console(team.run_stream(task="Provide a different proof to Fermat last theorem"))


asyncio.run(main())

参考文献

如果您在工作中使用了MagenticOneGroupChat,请引用以下论文:

@article{fourney2024magentic,
    title={Magentic-one: A generalist multi-agent system for solving complex tasks},
    author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others},
    journal={arXiv preprint arXiv:2411.04468},
    year={2024}
}
classmethod _from_config(config: MagenticOneGroupChatConfig) 自我[源代码]#

从配置对象创建组件的新实例。

Parameters:

config (T) – 配置对象。

Returns:

Self – 组件的新实例。

_to_config() MagenticOneGroupChatConfig[源代码]#

导出配置,该配置将用于创建一个与此实例配置相匹配的组件新实例。

Returns:

T – 组件的配置。

component_config_schema#

MagenticOneGroupChatConfig 的别名

component_provider_override: ClassVar[str | ] = 'autogen_agentchat.teams.MagenticOneGroupChat'#

覆盖组件的提供商字符串。这应用于防止内部模块名称成为模块名称的一部分。

class RoundRobinGroupChat(participants: 列表[ChatAgent], termination_condition: 终止条件 | = None, max_turns: int | = None, runtime: AgentRuntime | = None)[源代码]#

基类: BaseGroupChat, Component[RoundRobinGroupChatConfig]

一个团队以轮转的方式运作群聊,参与者轮流向所有人发布消息。

如果团队中只有一个参与者,则该参与者将是唯一的发言人。

Parameters:
  • participants (List[BaseChatAgent]) – 群聊中的参与者。

  • termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。

  • max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。

Raises:

ValueError – 如果没有提供参与者或参与者名称不唯一。

示例:

一个带有一名参与者的团队,使用工具:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def get_weather(location: str) -> str:
        return f"The weather in {location} is sunny."

    assistant = AssistantAgent(
        "Assistant",
        model_client=model_client,
        tools=[get_weather],
    )
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([assistant], termination_condition=termination)
    await Console(team.run_stream(task="What's the weather in New York?"))


asyncio.run(main())

一个拥有多名参与者的团队:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent("Assistant1", model_client=model_client)
    agent2 = AssistantAgent("Assistant2", model_client=model_client)
    termination = TextMentionTermination("TERMINATE")
    team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination)
    await Console(team.run_stream(task="Tell me some jokes."))


asyncio.run(main())
classmethod _from_config(config: RoundRobinGroupChatConfig) 自我[源代码]#

从配置对象创建组件的新实例。

Parameters:

config (T) – 配置对象。

Returns:

Self – 组件的新实例。

_to_config() RoundRobinGroupChatConfig[源代码]#

导出配置,该配置将用于创建一个与此实例配置相匹配的组件新实例。

Returns:

T – 组件的配置。

component_config_schema#

RoundRobinGroupChatConfig的别名

component_provider_override: ClassVar[str | ] = 'autogen_agentchat.teams.RoundRobinGroupChat'#

覆盖组件的提供商字符串。这应用于防止内部模块名称成为模块名称的一部分。

class SelectorGroupChat(participants: 列表[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: 终止条件 | = None, max_turns: int | = None, runtime: AgentRuntime | = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[Annotated[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]], str | ] | = None)[源代码]#

基类:BaseGroupChat, Component[SelectorGroupChatConfig]

一个小组聊天团队,参与者依次向所有人发布消息,使用ChatCompletion模型在每条消息后选择下一个发言者。

Parameters:
  • participants (List[ChatAgent]) – 群聊中的参与者,必须具有唯一的名称且至少有两个参与者。

  • model_client (ChatCompletionClient) – 用于选择下一个发言者的ChatCompletion模型客户端。

  • termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。

  • max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。

  • selector_prompt (str, optional) – 用于选择下一个发言者的提示模板。 可用的字段:‘{roles}’, ‘{participants}’, 和 ‘{history}’。

  • allow_repeated_speaker (bool, 可选) – 是否将上一个发言者包含在下一轮选择的候选人列表中。 默认为 False。模型仍可能选择上一个发言者——如果发生这种情况,将记录一条警告。

  • max_selector_attempts (int, 可选) – 使用模型选择发言者的最大尝试次数。默认为 3。 如果模型在达到最大尝试次数后仍未成功选择发言者,将使用上一个发言者(如果可用), 否则将使用第一个参与者。

  • selector_func (Callable[[Sequence[AgentEvent | ChatMessage]], str | None], optional) – 一个自定义的选择器函数,它接收对话历史并返回下一个发言者的名称。如果提供了这个函数,它将用于覆盖模型来选择下一个发言者。如果函数返回None,将使用模型来选择下一个发言者。

Raises:

ValueError – 如果参与者的数量少于两个,或者选择器提示无效。

示例:

一个拥有多名参与者的团队:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    async def lookup_hotel(location: str) -> str:
        return f"Here are some hotels in {location}: hotel1, hotel2, hotel3."

    async def lookup_flight(origin: str, destination: str) -> str:
        return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3."

    async def book_trip() -> str:
        return "Your trip is booked!"

    travel_advisor = AssistantAgent(
        "Travel_Advisor",
        model_client,
        tools=[book_trip],
        description="Helps with travel planning.",
    )
    hotel_agent = AssistantAgent(
        "Hotel_Agent",
        model_client,
        tools=[lookup_hotel],
        description="Helps with hotel booking.",
    )
    flight_agent = AssistantAgent(
        "Flight_Agent",
        model_client,
        tools=[lookup_flight],
        description="Helps with flight booking.",
    )
    termination = TextMentionTermination("TERMINATE")
    team = SelectorGroupChat(
        [travel_advisor, hotel_agent, flight_agent],
        model_client=model_client,
        termination_condition=termination,
    )
    await Console(team.run_stream(task="Book a 3-day trip to new york."))


asyncio.run(main())

一个带有自定义选择器函数的团队:

import asyncio
from typing import Sequence
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import AgentEvent, ChatMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    def check_calculation(x: int, y: int, answer: int) -> str:
        if x + y == answer:
            return "Correct!"
        else:
            return "Incorrect!"

    agent1 = AssistantAgent(
        "Agent1",
        model_client,
        description="For calculation",
        system_message="Calculate the sum of two numbers",
    )
    agent2 = AssistantAgent(
        "Agent2",
        model_client,
        tools=[check_calculation],
        description="For checking calculation",
        system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'",
    )

    def selector_func(messages: Sequence[AgentEvent | ChatMessage]) -> str | None:
        if len(messages) == 1 or messages[-1].content == "Incorrect!":
            return "Agent1"
        if messages[-1].source == "Agent1":
            return "Agent2"
        return None

    termination = TextMentionTermination("Correct!")
    team = SelectorGroupChat(
        [agent1, agent2],
        model_client=model_client,
        selector_func=selector_func,
        termination_condition=termination,
    )

    await Console(team.run_stream(task="What is 1 + 1?"))


asyncio.run(main())
classmethod _from_config(config: SelectorGroupChatConfig) 自我[源代码]#

从配置对象创建组件的新实例。

Parameters:

config (T) – 配置对象。

Returns:

Self – 组件的新实例。

_to_config() SelectorGroupChatConfig[源代码]#

导出配置,该配置将用于创建一个与此实例配置相匹配的组件新实例。

Returns:

T – 组件的配置。

component_config_schema#

SelectorGroupChatConfig的别名

component_provider_override: ClassVar[str | ] = 'autogen_agentchat.teams.SelectorGroupChat'#

覆盖组件的提供商字符串。这应用于防止内部模块名称成为模块名称的一部分。

class Swarm(participants: 列表[ChatAgent], termination_condition: 终止条件 | = None, max_turns: int | = None, runtime: AgentRuntime | = None)[源代码]#

基础类:BaseGroupChat, Component[SwarmConfig]

一个群聊团队,仅根据交接消息选择下一个发言者。

参与者列表中的第一个参与者是初始发言人。下一个发言人根据当前发言人发送的HandoffMessage消息来选择。如果没有发送移交消息,当前发言人将继续作为发言人。

Parameters:
  • participants (List[ChatAgent]) – 参与群聊的代理。列表中的第一个代理是初始发言人。

  • termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。

  • max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。

基本示例:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import MaxMessageTermination


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent1 = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["Bob"],
        system_message="You are Alice and you only answer questions about yourself.",
    )
    agent2 = AssistantAgent(
        "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January."
    )

    termination = MaxMessageTermination(3)
    team = Swarm([agent1, agent2], termination_condition=termination)

    stream = team.run_stream(task="What is bob's birthday?")
    async for message in stream:
        print(message)


asyncio.run(main())

使用 HandoffTermination 进行人机交互交接:

import asyncio
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
from autogen_agentchat.ui import Console
from autogen_agentchat.messages import HandoffMessage


async def main() -> None:
    model_client = OpenAIChatCompletionClient(model="gpt-4o")

    agent = AssistantAgent(
        "Alice",
        model_client=model_client,
        handoffs=["user"],
        system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.",
    )
    termination = HandoffTermination(target="user") | MaxMessageTermination(3)
    team = Swarm([agent], termination_condition=termination)

    # Start the conversation.
    await Console(team.run_stream(task="What is bob's birthday?"))

    # Resume with user feedback.
    await Console(
        team.run_stream(
            task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.")
        )
    )


asyncio.run(main())
classmethod _from_config(config: SwarmConfig) Swarm[源代码]#

从配置对象创建组件的新实例。

Parameters:

config (T) – 配置对象。

Returns:

Self – 组件的新实例。

_to_config() SwarmConfig[源代码]#

导出配置,该配置将用于创建一个与此实例配置相匹配的组件新实例。

Returns:

T – 组件的配置。

component_config_schema#

SwarmConfig 的别名

component_provider_override: ClassVar[str | ] = 'autogen_agentchat.teams.Swarm'#

覆盖组件的提供商字符串。这应用于防止内部模块名称成为模块名称的一部分。