注意
Go to the end 下载完整示例代码。
流程管道¶
对于多智能体协同编排,agentscope 提供 agentscope.pipeline 模块
作为语法糖,用于将多个智能体连接起来,包括
MsgHub: 用于在多个智能体间广播消息的消息中心
sequential_pipeline 和 SequentialPipeline: 一种基于函数和类的方式,用于将智能体按照顺序连接起来
fanout_pipeline 和 FanoutPipeline:一种函数式和基于类的实现,将相同输入分发给多个智能体
import os, asyncio
from agentscope.formatter import DashScopeMultiAgentFormatter
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.agent import ReActAgent
from agentscope.pipeline import MsgHub
使用MsgHub进行广播¶
MsgHub 类是一个异步上下文管理器,接收一系列智能体作为其参与者。
当某位参与者生成回复消息时,所有其他参与者将通过调用它们的 observe 方法接收该消息。
MsgHub上下文中, 开发者无需手动将回复消息从一个智能体发送到另一个智能体。
广播功能会自动处理。
这里我们创建四个智能体:Alice、Bob、Charlie和David。 然后通过互相介绍,我们开始一个包含Alice、Bob和Charlie的会议。 请注意David未参与此次会议。
def create_agent(name: str, age: int, career: str) -> ReActAgent:
"""Create agent object by the given information."""
return ReActAgent(
name=name,
sys_prompt=f"You're {name}, a {age}-year-old {career}",
model=DashScopeChatModel(
model_name="qwen-max",
api_key=os.environ["DASHSCOPE_API_KEY"],
),
formatter=DashScopeMultiAgentFormatter(),
)
alice = create_agent("Alice", 50, "teacher")
bob = create_agent("Bob", 35, "engineer")
charlie = create_agent("Charlie", 28, "designer")
david = create_agent("David", 30, "developer")
然后我们开始会议,并让它们自我介绍,无需手动传递消息:
提示
announcement 中的消息将在进入 MsgHub 上下文时广播给所有参与者。
async def example_broadcast_message():
"""Example of broadcasting messages with MsgHub."""
# Create a message hub
async with MsgHub(
participants=[alice, bob, charlie],
announcement=Msg(
"user",
"Now introduce yourself in one sentence, including your name, age and career.",
"user",
),
) as hub:
# Group chat without manual message passing
await alice()
await bob()
await charlie()
asyncio.run(example_broadcast_message())
Alice: Hello, I'm Alice, a 50-year-old teacher with a passion for helping students learn and grow.
Bob: Hello, I'm Bob, a 35-year-old engineer who enjoys solving complex problems and designing innovative solutions.
Charlie: Hello, I'm Charlie, a 28-year-old designer with a keen interest in creating visually appealing and user-friendly designs.
现在让我们检查Bob、Charlie和David是否收到了Alice的消息。
async def check_broadcast_message():
"""Check if the messages are broadcast correctly."""
user_msg = Msg(
"user",
"Do you know who's Alice, and what she does? Answer me briefly.",
"user",
)
await bob(user_msg)
await charlie(user_msg)
await david(user_msg)
asyncio.run(check_broadcast_message())
Bob: Alice is a 50-year-old teacher who is passionate about helping students learn and grow.
Charlie: Alice is a 50-year-old teacher who is passionate about helping students learn and grow.
David: I don't have specific information about who Alice is or what she does, as it could refer to many different people. Could you provide more context or details?
现在我们观察发现 Bob 和 Charlie 知道 Alice 和她的职业,而 David 对 Alice 一无所知,因为他未包含在 MsgHub 上下文中。
动态参与者管理¶
此外,MsgHub 支持通过以下方法动态管理参与者:
add: 添加一个或多个智能体作为新的参与者delete: 从参与者中移除一个或多个智能体,它们将不再接收广播消息broadcast: 向所有当前参与者广播一条消息
注意
新增的参与者将不会收到先前的消息。
async with MsgHub(participants=[alice]) as hub:
# Add new participants
hub.add(david)
# Remove participants
hub.delete(alice)
# Broadcast to all current participants
await hub.broadcast(
Msg("system", "Now we begin to ...", "system"),
)
流水线¶
Pipeline作为多智能体协调的语法糖。
目前,agentscope 提供两种主要管道实现:
顺序流程: 按照预定义顺序逐个执行智能体
扇出管道: 将相同输入分发给多个智能体并收集它们的响应
顺序管道¶
sequential pipeline 会逐个执行智能体,其中上一个智能体的输出会成为下一个智能体的输入。
例如,以下两段代码是等价的:msg = None
msg = await alice(msg)
msg = await bob(msg)
msg = await charlie(msg)
msg = await david(msg)
from agentscope.pipeline import sequential_pipeline
msg = await sequential_pipeline(
# List of agents to be executed in order
agents=[alice, bob, charlie, david],
# The first input message, can be None
msg=None
)
扇出管道¶
扇形输出流水线将相同的输入消息同时分发给多个智能体,并收集它们的所有响应。当你想收集同一主题的不同视角或专业知识时,这非常有用。
例如,以下两个代码片段是等价的:
from copy import deepcopy
msgs = []
msg = None
for agent in [alice, bob, charlie, david]:
msgs.append(await agent(deepcopy(msg)))
from agentscope.pipeline import fanout_pipeline
msgs = await fanout_pipeline(
# List of agents to be executed in order
agents=[alice, bob, charlie, david],
# The first input message, can be None
msg=None,
enable_gather=False,
)
注意
enable_gather 参数控制扇出管道的执行模式:
enable_gather=True(默认): 使用asyncio.gather()并发执行所有智能体。对于I/O密集型操作(如API调用),这种方式能提供更好的性能,因为智能体会并行运行。enable_gather=False: 以顺序方式逐个执行智能体。这在您需要确定性的执行顺序或希望避免并发请求压垮外部服务时非常有用。
选择并发执行以获得更佳性能,或选择顺序执行以实现可预测的次序和资源控制。
提示
通过结合MsgHub与sequential_pipeline或fanout_pipeline,您可以非常轻松地创建更复杂的工作流。
高级管道特性¶
此外,为实现可重用性,我们也提供了基于类的实现:
from agentscope.pipeline import SequentialPipeline
# Create a pipeline object
pipeline = SequentialPipeline(agents=[alice, bob, charlie, david])
# Call the pipeline
msg = await pipeline(msg=None)
# Reuse the pipeline with different input
msg = await pipeline(msg=Msg("user", "Hello!", "user"))
from agentscope.pipeline import FanoutPipeline
# Create a pipeline object
pipeline = FanoutPipeline(agents=[alice, bob, charlie, david])
# Call the pipeline
msgs = await pipeline(msg=None)
# Reuse the pipeline with different input
msgs = await pipeline(msg=Msg("user", "Hello!", "user"))
脚本总运行时间: (0分钟16.833秒)