autogen_agentchat.teams#
该模块提供了各种预定义的多智能体团队的实现。 每个团队都继承自BaseGroupChat类。
- class BaseGroupChat(participants: 列表[ChatAgent], group_chat_manager_name: str, group_chat_manager_class: 类型[SequentialRoutedAgent], termination_condition: 终止条件 | 无 = None, max_turns: int | 无 = None, runtime: AgentRuntime | 无 = None)[源代码]#
基础类:
Team
,ABC
,ComponentBase
[BaseModel
]群聊团队的基础类。
要实现一个群聊团队,首先创建一个
BaseGroupChatManager
的子类,然后创建一个使用该群聊管理器的BaseGroupChat
的子类。- component_type: ClassVar[ComponentType] = 'team'#
组件的逻辑类型。
- async load_state(state: 映射[str, 任何]) 无 [源代码]#
加载外部状态并覆盖群聊团队的当前状态。
状态通过在每个参与者和群聊管理器上调用
agent_load_state()
方法并传入其内部代理ID来加载。 有关状态的预期格式,请参阅save_state()
。
- async pause() 无 [源代码]#
当团队运行时,通过直接RPC调用暂停其参与者的
on_pause()
方法。注意
这是一个在v0.4.9版本中引入的实验性功能,未来可能会发生变化或被移除。
团队必须在暂停之前被初始化。
与终止不同,暂停团队不会导致
run()
或run_stream()
方法返回。它会在每个参与者上调用on_pause()
方法,如果参与者没有实现该方法, 这将是一个空操作。注意
代理类的责任是处理暂停 并确保代理可以在稍后恢复。 请确保在您的代理类中实现
on_pause()
方法以自定义暂停行为。 默认情况下,代理在被调用时不会执行任何操作。- Raises:
RuntimeError – 如果团队尚未初始化。当调用参与者的
on_pause
实现时,从参与者抛出的异常会传播到此方法并引发。
- async reset() 无 [源代码]#
重置团队及其参与者到初始状态。
团队必须在重置之前被阻止。
- Raises:
RuntimeError – 如果团队尚未初始化或当前正在运行。
使用
RoundRobinGroupChat
团队的示例:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Reset the team. await team.reset() stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) asyncio.run(main())
- async resume() 无 [源代码]#
当团队正在运行并被暂停时,通过直接RPC调用其成员的
on_resume()
方法来恢复它们的执行。注意
这是一个在v0.4.9版本中引入的实验性功能,未来可能会发生变化或被移除。
团队必须在恢复之前被初始化。
与终止任务并重新开始不同,恢复团队不会导致
run()
或run_stream()
方法返回。它会调用每个参与者的on_resume()
方法,如果参与者没有实现该方法,则不会执行任何操作。注意
这是代理类的责任,负责处理恢复并确保代理从其暂停的地方继续。确保在你的代理类中实现
on_resume()
方法以自定义恢复行为。- Raises:
RuntimeError – 如果团队尚未初始化。调用参与者实现
on_resume
方法时的异常会传播到该方法并引发。
- async run(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Sequence[Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] | 无 = None, cancellation_token: CancellationToken | 无 = None) TaskResult [源代码]#
运行团队并返回结果。基础实现使用
run_stream()
来运行团队,然后返回最终结果。 一旦团队停止,终止条件将被重置。- Parameters:
任务 (str | ChatMessage | Sequence[ChatMessage] | None) – 运行团队的任务。可以是一个字符串,单个
ChatMessage
,或一个ChatMessage
的列表。cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。 设置取消令牌可能使团队处于不一致状态, 并且可能不会重置终止条件. 要优雅地停止团队,请改用
ExternalTermination
。
- Returns:
结果 – 任务的结果为
TaskResult
。结果包含团队生成的消息和停止原因。
使用
RoundRobinGroupChat
团队的示例:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) result = await team.run(task="Count from 1 to 10, respond one at a time.") print(result) # Run the team again without a task to continue the previous task. result = await team.run() print(result) asyncio.run(main())
使用
CancellationToken
来取消任务的示例:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( team.run( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async run_stream(*, task: str | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Sequence[Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]] | 无 = None, cancellation_token: CancellationToken | 无 = None) AsyncGenerator[Annotated[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | TaskResult, 无] [源代码]#
运行团队并生成一系列消息和
TaskResult
类型的最终结果作为流中的最后一项。一旦团队停止,终止条件将被重置。注意
如果代理生成
ModelClientStreamingChunkEvent
, 消息将在流中产生,但不会包含在messages
中。- Parameters:
任务 (str | ChatMessage | Sequence[ChatMessage] | None) – 与团队一起运行的任务。可以是一个字符串,一个单一的
ChatMessage
,或一个ChatMessage
的列表。cancellation_token (CancellationToken | None) – 用于立即终止任务的取消令牌。 设置取消令牌可能使团队处于不一致状态, 并且可能不会重置终止条件. 要优雅地停止团队,请改用
ExternalTermination
。
- Returns:
stream – 一个
AsyncGenerator
,生成AgentEvent
,ChatMessage
,并将最终结果TaskResult
作为流中的最后一项。
使用
RoundRobinGroupChat
团队的示例:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="Count from 1 to 10, respond one at a time.") async for message in stream: print(message) # Run the team again without a task to continue the previous task. stream = team.run_stream() async for message in stream: print(message) asyncio.run(main())
使用
CancellationToken
来取消任务的示例:import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.teams import RoundRobinGroupChat from autogen_core import CancellationToken from autogen_ext.models.openai import OpenAIChatCompletionClient async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = MaxMessageTermination(3) team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) cancellation_token = CancellationToken() # Create a task to run the team in the background. run_task = asyncio.create_task( Console( team.run_stream( task="Count from 1 to 10, respond one at a time.", cancellation_token=cancellation_token, ) ) ) # Wait for 1 second and then cancel the task. await asyncio.sleep(1) cancellation_token.cancel() # This will raise a cancellation error. await run_task asyncio.run(main())
- async save_state() 映射[str, 任何] [源代码]#
保存群聊团队的状态。
状态通过调用每个参与者和群聊管理器的
agent_save_state()
方法保存,使用它们各自的内部代理ID。状态以嵌套字典的形式返回:一个带有键agent_states的字典,该字典以代理名称为键,状态为值。{ "agent_states": { "agent1": ..., "agent2": ..., "RoundRobinGroupChatManager": ... } }
注意
从v0.4.9开始,状态使用代理名称作为键,而不是代理ID, 并且team_id字段已从状态中移除。这是为了使状态能够在不同的团队和运行时之间可移植。使用旧格式保存的状态在未来可能与新格式不兼容。
注意
当在团队运行时调用
save_state()
时,状态可能不一致,并可能导致意外状态。建议在团队未运行或停止后调用此方法。
- class MagenticOneGroupChat(participants: 列表[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: TerminationCondition | 无 = None, max_turns: int | 无 = 20, runtime: AgentRuntime | 无 = None, max_stalls: int = 3, final_answer_prompt: str = ORCHESTRATOR_FINAL_ANSWER_PROMPT)[源代码]#
基础:
BaseGroupChat
,Component
[MagenticOneGroupChatConfig
]一个团队,通过MagenticOneOrchestrator管理参与者,并运行一个群聊。
协调器处理对话流程,通过管理参与者的互动来确保任务高效完成。
编排器基于Magnetic-One架构,这是一个用于解决复杂任务的通用多代理系统(请参阅下面的参考资料)。
- Parameters:
participants (列表[ChatAgent]) – 参与群聊的成员。
model_client (ChatCompletionClient) – 用于生成响应的模型客户端。
termination_condition (TerminationCondition, optional) – 群聊的终止条件。默认为 None。如果没有终止条件,群聊将基于协调器逻辑运行,或者直到达到最大轮次。
max_turns (int, optional) – 组内聊天在停止前的最大轮次。默认为 20。
max_stalls (int, 可选) – 允许在重新规划之前的最大停滞次数。默认值为3。
final_answer_prompt (str, optional) – 用于从团队对话记录中生成最终答案或响应的LLM提示。提供了一个默认值(适用于GPT-4o类模型)。
- Raises:
ValueError – 在编排逻辑中,如果进度分类账没有所需的键或下一个发言者无效。
示例:
MagenticOneGroupChat 与一个助理代理:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import MagenticOneGroupChat from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") assistant = AssistantAgent( "Assistant", model_client=model_client, ) team = MagenticOneGroupChat([assistant], model_client=model_client) await Console(team.run_stream(task="Provide a different proof to Fermat last theorem")) asyncio.run(main())
参考文献
如果您在工作中使用了MagenticOneGroupChat,请引用以下论文:
@article{fourney2024magentic, title={Magentic-one: A generalist multi-agent system for solving complex tasks}, author={Fourney, Adam and Bansal, Gagan and Mozannar, Hussein and Tan, Cheng and Salinas, Eduardo and Niedtner, Friederike and Proebsting, Grace and Bassman, Griffin and Gerrits, Jack and Alber, Jacob and others}, journal={arXiv preprint arXiv:2411.04468}, year={2024} }
- classmethod _from_config(config: MagenticOneGroupChatConfig) 自我 [源代码]#
从配置对象创建组件的新实例。
- Parameters:
config (T) – 配置对象。
- Returns:
Self – 组件的新实例。
- component_config_schema#
MagenticOneGroupChatConfig
的别名
- class RoundRobinGroupChat(participants: 列表[ChatAgent], termination_condition: 终止条件 | 无 = None, max_turns: int | 无 = None, runtime: AgentRuntime | 无 = None)[源代码]#
基类:
BaseGroupChat
,Component
[RoundRobinGroupChatConfig
]一个团队以轮转的方式运作群聊,参与者轮流向所有人发布消息。
如果团队中只有一个参与者,则该参与者将是唯一的发言人。
- Parameters:
participants (List[BaseChatAgent]) – 群聊中的参与者。
termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。
max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。
- Raises:
ValueError – 如果没有提供参与者或参与者名称不唯一。
示例:
一个带有一名参与者的团队,使用工具:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def get_weather(location: str) -> str: return f"The weather in {location} is sunny." assistant = AssistantAgent( "Assistant", model_client=model_client, tools=[get_weather], ) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([assistant], termination_condition=termination) await Console(team.run_stream(task="What's the weather in New York?")) asyncio.run(main())
一个拥有多名参与者的团队:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent("Assistant1", model_client=model_client) agent2 = AssistantAgent("Assistant2", model_client=model_client) termination = TextMentionTermination("TERMINATE") team = RoundRobinGroupChat([agent1, agent2], termination_condition=termination) await Console(team.run_stream(task="Tell me some jokes.")) asyncio.run(main())
- classmethod _from_config(config: RoundRobinGroupChatConfig) 自我 [源代码]#
从配置对象创建组件的新实例。
- Parameters:
config (T) – 配置对象。
- Returns:
Self – 组件的新实例。
- component_config_schema#
RoundRobinGroupChatConfig
的别名
- class SelectorGroupChat(participants: 列表[ChatAgent], model_client: ChatCompletionClient, *, termination_condition: 终止条件 | 无 = None, max_turns: int | 无 = None, runtime: AgentRuntime | 无 = None, selector_prompt: str = 'You are in a role play game. The following roles are available:\n{roles}.\nRead the following conversation. Then select the next role from {participants} to play. Only return the role.\n\n{history}\n\nRead the above conversation. Then select the next role from {participants} to play. Only return the role.\n', allow_repeated_speaker: bool = False, max_selector_attempts: int = 3, selector_func: Callable[[Sequence[Annotated[ToolCallRequestEvent | ToolCallExecutionEvent | MemoryQueryEvent | UserInputRequestedEvent | ModelClientStreamingChunkEvent | ThoughtEvent, FieldInfo(annotation=NoneType, required=True, discriminator='type')] | Annotated[TextMessage | MultiModalMessage | StopMessage | 工具调用摘要信息 | HandoffMessage, FieldInfo(annotation=NoneType, required=True, discriminator='type')]]], str | 无] | 无 = None)[源代码]#
基类:
BaseGroupChat
,Component
[SelectorGroupChatConfig
]一个小组聊天团队,参与者依次向所有人发布消息,使用ChatCompletion模型在每条消息后选择下一个发言者。
- Parameters:
participants (List[ChatAgent]) – 群聊中的参与者,必须具有唯一的名称且至少有两个参与者。
model_client (ChatCompletionClient) – 用于选择下一个发言者的ChatCompletion模型客户端。
termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。
max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。
selector_prompt (str, optional) – 用于选择下一个发言者的提示模板。 可用的字段:‘{roles}’, ‘{participants}’, 和 ‘{history}’。
allow_repeated_speaker (bool, 可选) – 是否将上一个发言者包含在下一轮选择的候选人列表中。 默认为 False。模型仍可能选择上一个发言者——如果发生这种情况,将记录一条警告。
max_selector_attempts (int, 可选) – 使用模型选择发言者的最大尝试次数。默认为 3。 如果模型在达到最大尝试次数后仍未成功选择发言者,将使用上一个发言者(如果可用), 否则将使用第一个参与者。
selector_func (Callable[[Sequence[AgentEvent | ChatMessage]], str | None], optional) – 一个自定义的选择器函数,它接收对话历史并返回下一个发言者的名称。如果提供了这个函数,它将用于覆盖模型来选择下一个发言者。如果函数返回None,将使用模型来选择下一个发言者。
- Raises:
ValueError – 如果参与者的数量少于两个,或者选择器提示无效。
示例:
一个拥有多名参与者的团队:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") async def lookup_hotel(location: str) -> str: return f"Here are some hotels in {location}: hotel1, hotel2, hotel3." async def lookup_flight(origin: str, destination: str) -> str: return f"Here are some flights from {origin} to {destination}: flight1, flight2, flight3." async def book_trip() -> str: return "Your trip is booked!" travel_advisor = AssistantAgent( "Travel_Advisor", model_client, tools=[book_trip], description="Helps with travel planning.", ) hotel_agent = AssistantAgent( "Hotel_Agent", model_client, tools=[lookup_hotel], description="Helps with hotel booking.", ) flight_agent = AssistantAgent( "Flight_Agent", model_client, tools=[lookup_flight], description="Helps with flight booking.", ) termination = TextMentionTermination("TERMINATE") team = SelectorGroupChat( [travel_advisor, hotel_agent, flight_agent], model_client=model_client, termination_condition=termination, ) await Console(team.run_stream(task="Book a 3-day trip to new york.")) asyncio.run(main())
一个带有自定义选择器函数的团队:
import asyncio from typing import Sequence from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import AgentEvent, ChatMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") def check_calculation(x: int, y: int, answer: int) -> str: if x + y == answer: return "Correct!" else: return "Incorrect!" agent1 = AssistantAgent( "Agent1", model_client, description="For calculation", system_message="Calculate the sum of two numbers", ) agent2 = AssistantAgent( "Agent2", model_client, tools=[check_calculation], description="For checking calculation", system_message="Check the answer and respond with 'Correct!' or 'Incorrect!'", ) def selector_func(messages: Sequence[AgentEvent | ChatMessage]) -> str | None: if len(messages) == 1 or messages[-1].content == "Incorrect!": return "Agent1" if messages[-1].source == "Agent1": return "Agent2" return None termination = TextMentionTermination("Correct!") team = SelectorGroupChat( [agent1, agent2], model_client=model_client, selector_func=selector_func, termination_condition=termination, ) await Console(team.run_stream(task="What is 1 + 1?")) asyncio.run(main())
- classmethod _from_config(config: SelectorGroupChatConfig) 自我 [源代码]#
从配置对象创建组件的新实例。
- Parameters:
config (T) – 配置对象。
- Returns:
Self – 组件的新实例。
- component_config_schema#
SelectorGroupChatConfig
的别名
- class Swarm(participants: 列表[ChatAgent], termination_condition: 终止条件 | 无 = None, max_turns: int | 无 = None, runtime: AgentRuntime | 无 = None)[源代码]#
基础类:
BaseGroupChat
,Component
[SwarmConfig
]一个群聊团队,仅根据交接消息选择下一个发言者。
参与者列表中的第一个参与者是初始发言人。下一个发言人根据当前发言人发送的
HandoffMessage
消息来选择。如果没有发送移交消息,当前发言人将继续作为发言人。- Parameters:
participants (List[ChatAgent]) – 参与群聊的代理。列表中的第一个代理是初始发言人。
termination_condition (TerminationCondition, optional) – 群组聊天的终止条件。默认为 None。如果没有终止条件,群组聊天将无限期运行。
max_turns (int, 可选) – 在停止之前在群聊中的最大轮次。默认为 None,表示没有限制。
基本示例:
import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import MaxMessageTermination async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent1 = AssistantAgent( "Alice", model_client=model_client, handoffs=["Bob"], system_message="You are Alice and you only answer questions about yourself.", ) agent2 = AssistantAgent( "Bob", model_client=model_client, system_message="You are Bob and your birthday is on 1st January." ) termination = MaxMessageTermination(3) team = Swarm([agent1, agent2], termination_condition=termination) stream = team.run_stream(task="What is bob's birthday?") async for message in stream: print(message) asyncio.run(main())
使用
HandoffTermination
进行人机交互交接:import asyncio from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import Swarm from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination from autogen_agentchat.ui import Console from autogen_agentchat.messages import HandoffMessage async def main() -> None: model_client = OpenAIChatCompletionClient(model="gpt-4o") agent = AssistantAgent( "Alice", model_client=model_client, handoffs=["user"], system_message="You are Alice and you only answer questions about yourself, ask the user for help if needed.", ) termination = HandoffTermination(target="user") | MaxMessageTermination(3) team = Swarm([agent], termination_condition=termination) # Start the conversation. await Console(team.run_stream(task="What is bob's birthday?")) # Resume with user feedback. await Console( team.run_stream( task=HandoffMessage(source="user", target="Alice", content="Bob's birthday is on 1st January.") ) ) asyncio.run(main())
- classmethod _from_config(config: SwarmConfig) Swarm [源代码]#
从配置对象创建组件的新实例。
- Parameters:
config (T) – 配置对象。
- Returns:
Self – 组件的新实例。
- component_config_schema#
SwarmConfig
的别名