使用代理提取结果#

当运行一个多代理系统来解决某些任务时,一旦系统达到终止,你可能希望提取系统结果。本指南展示了一种实现这一目标的方法。考虑到代理实例无法直接从外部访问,我们将使用一个代理将最终结果发布到一个可访问的位置。

如果你将你的系统建模为发布某种FinalResult类型,那么你可以创建一个代理,其唯一的工作是订阅这个类型并将其对外提供。对于这样的简单代理,ClosureAgent是一个减少样板代码的选项。这允许你定义一个函数,该函数将作为代理的消息处理程序。在这个例子中,我们将使用一个在代理和外部代码之间共享的队列来传递结果。

注意

在考虑如何从多代理系统中提取结果时,您必须始终考虑代理的订阅以及它们发布到的话题。这是因为代理只会接收来自它订阅的话题的消息。

import asyncio
from dataclasses import dataclass

from autogen_core import (
    ClosureAgent,
    ClosureContext,
    DefaultSubscription,
    DefaultTopicId,
    MessageContext,
    SingleThreadedAgentRuntime,
)

为最终结果定义一个数据类。

@dataclass
class FinalResult:
    value: str

创建一个队列以将代理的结果传递给外部代码。

queue = asyncio.Queue[FinalResult]()

创建一个函数闭包,用于将最终结果输出到队列中。 该函数必须遵循签名 Callable[[AgentRuntime, AgentId, T, MessageContext], Awaitable[Any]] 其中 T 是代理将接收的消息类型。 您可以使用联合类型来处理多种消息类型。

async def output_result(_agent: ClosureContext, message: FinalResult, ctx: MessageContext) -> None:
    await queue.put(message)

让我们创建一个运行时并注册一个ClosureAgent,它将把最终结果发布到队列中。

runtime = SingleThreadedAgentRuntime()
await ClosureAgent.register_closure(
    runtime, "output_result", output_result, subscriptions=lambda: [DefaultSubscription()]
)
AgentType(type='output_result')

我们可以通过直接将最终结果发布到运行时来模拟其收集。

runtime.start()
await runtime.publish_message(FinalResult("Result 1"), DefaultTopicId())
await runtime.publish_message(FinalResult("Result 2"), DefaultTopicId())
await runtime.stop_when_idle()

我们可以查看队列以查看最终结果。

while not queue.empty():
    print((result := await queue.get()).value)
Result 1
Result 2