分布式代理运行时#

注意

分布式代理运行时是一个实验性功能。API可能会发生重大变化。

一个分布式代理运行时促进了跨进程边界的通信和代理生命周期管理。它由一个主机服务和至少一个工作运行时组成。

主机服务维护与所有活动工作者运行时的连接,促进消息传递,并为所有直接消息(即RPC)保持会话。工作者运行时处理应用程序代码(代理)并连接到主机服务。它还向主机服务宣传它们支持的代理,以便主机服务可以将消息传递到正确的工作者。

注意

分布式代理运行时需要额外的依赖项,请使用以下命令安装它们:

pip install "autogen-ext[grpc]"

我们可以使用 GrpcWorkerAgentRuntimeHost 来启动一个主机服务。

from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost

host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start()  # Start a host service in the background.

上述代码在后台启动主机服务,并在端口50051上接受工作连接。

在运行工作运行时之前,让我们定义我们的代理。 这个代理将会在每接收到一条消息时发布一条新消息。 它还会记录已经发布的消息数量,并在发布了5条消息后停止发布新消息。

from dataclasses import dataclass

from autogen_core import DefaultTopicId, MessageContext, RoutedAgent, default_subscription, message_handler


@dataclass
class MyMessage:
    content: str


@default_subscription
class MyAgent(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__("My agent")
        self._name = name
        self._counter = 0

    @message_handler
    async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> None:
        self._counter += 1
        if self._counter > 5:
            return
        content = f"{self._name}: Hello x {self._counter}"
        print(content)
        await self.publish_message(MyMessage(content=content), DefaultTopicId())

现在我们可以设置工作代理运行时。 我们使用GrpcWorkerAgentRuntime。 我们设置了两个工作运行时。每个运行时都托管一个代理。 所有代理都发布和订阅默认主题,因此它们可以看到所有发布的消息。

为了运行这些代理,我们从一个工作线程发布了一条消息。

import asyncio

from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime

worker1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker1.start()
await MyAgent.register(worker1, "worker1", lambda: MyAgent("worker1"))

worker2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await worker2.start()
await MyAgent.register(worker2, "worker2", lambda: MyAgent("worker2"))

await worker2.publish_message(MyMessage(content="Hello!"), DefaultTopicId())

# Let the agents run for a while.
await asyncio.sleep(5)
worker1: Hello x 1
worker2: Hello x 1
worker2: Hello x 2
worker1: Hello x 2
worker1: Hello x 3
worker2: Hello x 3
worker2: Hello x 4
worker1: Hello x 4
worker1: Hello x 5
worker2: Hello x 5

我们可以看到每个代理都发布了恰好5条信息。

要停止工作线程运行时,我们可以调用stop()

await worker1.stop()
await worker2.stop()

# To keep the worker running until a termination signal is received (e.g., SIGTERM).
# await worker1.stop_when_signal()

我们可以调用stop()来停止主机服务。

await host.stop()

# To keep the host service running until a termination signal (e.g., SIGTERM)
# await host.stop_when_signal()

跨语言运行时#

上述流程大体相同,但所有消息类型都必须使用共享的protobuf模式来处理所有跨代理的消息类型。

下一步#

要查看使用分布式运行时的完整示例,请查看以下示例: