使用代理提取结果#
当运行一个多代理系统来解决某些任务时,一旦系统达到终止,你可能希望提取系统结果。本指南展示了一种实现这一目标的方法。考虑到代理实例无法直接从外部访问,我们将使用一个代理将最终结果发布到一个可访问的位置。
如果你将你的系统建模为发布某种FinalResult
类型,那么你可以创建一个代理,其唯一的工作是订阅这个类型并将其对外提供。对于这样的简单代理,ClosureAgent
是一个减少样板代码的选项。这允许你定义一个函数,该函数将作为代理的消息处理程序。在这个例子中,我们将使用一个在代理和外部代码之间共享的队列来传递结果。
注意
在考虑如何从多代理系统中提取结果时,您必须始终考虑代理的订阅以及它们发布到的话题。这是因为代理只会接收来自它订阅的话题的消息。
import asyncio
from dataclasses import dataclass
from autogen_core import (
ClosureAgent,
ClosureContext,
DefaultSubscription,
DefaultTopicId,
MessageContext,
SingleThreadedAgentRuntime,
)
为最终结果定义一个数据类。
@dataclass
class FinalResult:
value: str
创建一个队列以将代理的结果传递给外部代码。
queue = asyncio.Queue[FinalResult]()
创建一个函数闭包,用于将最终结果输出到队列中。
该函数必须遵循签名
Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]]
其中 T
是代理将接收的消息类型。
您可以使用联合类型来处理多种消息类型。
async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
await queue.put(message)
让我们创建一个运行时并注册一个ClosureAgent
,它将把最终结果发布到队列中。
runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')
我们可以通过直接将最终结果发布到运行时来模拟其收集。
runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()
我们可以查看队列以查看最终结果。
while not queue.empty():
print((result := await queue.get()).value)
Result 1
Result 2