agentscope.pipeline¶
AgentScope 中的 pipeline 模块,为复杂工作流和多智能体对话提供语法糖。
- class MsgHub[source]¶
基础:
objectMsgHub 类,用于控制参与智能体的订阅。
示例
在以下示例中,来自 agent1、agent2 和 agent3 的回复消息将广播给 MsgHub 中的所有其他智能体。
with MsgHub(participant=[agent1, agent2, agent3]): agent1() agent2()
实际上,它与以下代码具有相同效果,但更加简单优雅!
x1 = agent1() agent2.observe(x1) agent3.observe(x1) x2 = agent2() agent1.observe(x2) agent3.observe(x2)
- __init__(participants, announcement=None, enable_auto_broadcast=True, name=None)[source]¶
初始化一个MsgHub上下文管理器。
- Parameters:
- Return type:
无
- class SequentialPipeline[source]¶
基础:
object一个异步顺序流水线类,它按顺序执行一系列智能体。与函数式流水线相比,这个类可以被重复使用。
- async sequential_pipeline(agents, msg=None)[source]¶
一种异步语法糖管道,按顺序执行一系列智能体。前一个智能体的输出将被作为输入传递给下一个智能体。最终输出将是最后一个智能体的输出。
示例
agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") msg_output = await sequential_pipeline( [agent1, agent2, agent3], msg_input )
- Parameters:
agents (list[AgentBase]) – 一个智能体列表。
msg (Msg | list[Msg] | None, 默认值为 None) – 将被传递给第一个智能体的初始输入。
- Returns:
序列中最后一位智能体的输出。
- Return type:
消息 | 消息列表 | 无
- class FanoutPipeline[source]¶
基础:
object一个异步广播式流水线类,可将相同输入分发给多个智能体。与功能式流水线相比,该类可重复使用,并且可以使用默认参数进行配置。
- async fanout_pipeline(agents, msg=None, enable_gather=True, **kwargs)[source]¶
一种扇出流水线,将相同的输入分发给多个智能体。 该流水线将相同的消息(或其深拷贝)发送给所有智能体 并收集它们的响应。根据enable_gather参数,智能体可以 通过asyncio.gather()并发执行或顺序执行。
示例
agent1 = ReActAgent(...) agent2 = ReActAgent(...) agent3 = ReActAgent(...) msg_input = Msg("user", "Hello", "user") # Concurrent execution (default) results = await fanout_pipeline( [agent1, agent2, agent3], msg_input ) # Sequential execution results = await fanout_pipeline( [agent1, agent2, agent3], msg_input, enable_gather=False )
- Parameters:
agents (list[AgentBase]) – 一个智能体列表。
msg (Msg | list[Msg] | None, 默认值为 None) – 将传递给所有智能体的初始输入。
enable_gather (bool, 默认为 True) – 是否使用 asyncio.gather() 并行执行智能体。 如果为 False, 智能体会按顺序执行。
**kwargs (任意) – 在执行期间传递给每个智能体的附加关键字参数。
- Returns:
来自每个智能体的响应消息列表。
- Return type:
列表[消息]