agentscope.pipeline

AgentScope 中的 pipeline 模块,为复杂工作流和多智能体对话提供语法糖。

class MsgHub[source]

基础:object

MsgHub 类,用于控制参与智能体的订阅。

示例

在以下示例中,来自 agent1agent2agent3 的回复消息将广播给 MsgHub 中的所有其他智能体。

with MsgHub(participant=[agent1, agent2, agent3]):
    agent1()
    agent2()

实际上,它与以下代码具有相同效果,但更加简单优雅!

x1 = agent1()
agent2.observe(x1)
agent3.observe(x1)

x2 = agent2()
agent1.observe(x2)
agent3.observe(x2)
__init__(participants, announcement=None, enable_auto_broadcast=True, name=None)[source]

初始化一个MsgHub上下文管理器。

Parameters:
  • participants (list[AgentBase]) – 参与MsgHub的智能体列表。

  • None) (announcement (列表[Msg] | Msg |) – 进入消息中心时将向所有参与者广播的消息。

  • enable_auto_broadcast (bool, 默认为 True) – 是否启用自动将来自任何参与者的回复消息广播至所有其他参与者。若禁用此功能,MsgHub 将仅作为手动消息广播器,通过 announcement 参数和 broadcast() 方法进行广播。

  • name (str | None) - 此MsgHub的名称。如果未提供,将生成一个随机ID。

  • announcement (Msg | list[Msg] | None)

Return type:

add(new_participant)[source]

向这个集线器添加新的参与者

Parameters:

new_participant (列表[AgentBase] | AgentBase

Return type:

delete(participant)[source]

从参与者中删除智能体。

Parameters:

参与者 (列表[AgentBase] | AgentBase)

Return type:

async broadcast(msg)[source]

将消息广播给所有参与者。

Parameters:

msg (list[Msg] | Msg) – 在所有参与者之间广播的消息。

Return type:

set_auto_broadcast(enable)[source]

启用自动广播功能,将来自任何参与者的回复消息自动发送给所有其他参与者。

Parameters:

enable (bool) – 是否启用自动广播功能。若禁用,MsgHub将仅作为手动消息广播器,通过announcement参数和broadcast()方法进行操作。

Return type:

class SequentialPipeline[source]

基础:object

一个异步顺序流水线类,它按顺序执行一系列智能体。与函数式流水线相比,这个类可以被重复使用。

__init__(agents)[source]

初始化一个顺序流水线类

Parameters:

agents (list[AgentBase]) – 一个智能体列表。

Return type:

async __call__(msg=None)[source]

执行顺序流程

Parameters:

msg (Msg | list[Msg] | None, defaults to None) – 传递给第一个智能体的初始输入。

Return type:

Msg | 列表[Msg] | 无

async sequential_pipeline(agents, msg=None)[source]

一种异步语法糖管道,按顺序执行一系列智能体。前一个智能体的输出将被作为输入传递给下一个智能体。最终输出将是最后一个智能体的输出。

示例

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

msg_output = await sequential_pipeline(
    [agent1, agent2, agent3],
    msg_input
)
Parameters:
  • agents (list[AgentBase]) – 一个智能体列表。

  • msg (Msg | list[Msg] | None, 默认值为 None) – 将被传递给第一个智能体的初始输入。

Returns:

序列中最后一位智能体的输出。

Return type:

消息 | 消息列表 | 无

class FanoutPipeline[source]

基础:object

一个异步广播式流水线类,可将相同输入分发给多个智能体。与功能式流水线相比,该类可重复使用,并且可以使用默认参数进行配置。

__init__(agents, enable_gather=True)[source]

初始化一个扇出流水线类

Parameters:
  • 智能体 (list[AgentBase]) – 要执行的智能体列表。

  • enable_gather (bool, 默认为 True) – 是否使用 asyncio.gather() 并发执行智能体。如果为 False,则会按顺序依次执行智能体。

Return type:

async __call__(msg=None, **kwargs)[source]

执行扇形管道

Parameters:
  • msg (Msg | list[Msg] | None, 默认值为 None) – 将要分发给所有智能体的输入消息。

  • **kwargs (Any) – 执行期间传递给每个智能体的额外关键字参数。

Returns:

所有智能体的输出消息列表。

Return type:

消息列表

async fanout_pipeline(agents, msg=None, enable_gather=True, **kwargs)[source]

一种扇出流水线,将相同的输入分发给多个智能体。 该流水线将相同的消息(或其深拷贝)发送给所有智能体 并收集它们的响应。根据enable_gather参数,智能体可以 通过asyncio.gather()并发执行或顺序执行。

示例

agent1 = ReActAgent(...)
agent2 = ReActAgent(...)
agent3 = ReActAgent(...)

msg_input = Msg("user", "Hello", "user")

# Concurrent execution (default)
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input
)

# Sequential execution
results = await fanout_pipeline(
    [agent1, agent2, agent3],
    msg_input,
    enable_gather=False
)
Parameters:
  • agents (list[AgentBase]) – 一个智能体列表。

  • msg (Msg | list[Msg] | None, 默认值为 None) – 将传递给所有智能体的初始输入。

  • enable_gather (bool, 默认为 True) – 是否使用 asyncio.gather() 并行执行智能体。 如果为 False, 智能体会按顺序执行。

  • **kwargs (任意) – 在执行期间传递给每个智能体的附加关键字参数。

Returns:

来自每个智能体的响应消息列表。

Return type:

列表[消息]