autogen_ext.runtimes.grpc#

class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | = None, extra_grpc_config: Sequence[元组[str, 任何]] | = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[源代码]#

基础类: AgentRuntime

一个用于运行远程或跨语言代理的代理运行时。

Agent 消息传递使用了来自 agent_worker.proto 的 protobufs 和来自 cloudevent.protoCloudEvent

跨语言代理将额外要求所有代理之间发送的任何消息类型都使用共享的protobuf模式。

add_message_serializer(serializer: MessageSerializer[任何] | Sequence[MessageSerializer[任何]]) [源代码]#

在运行时添加一个新的消息序列化序列化器

注意:这将根据type_name和data_content_type属性对序列化器进行去重。

Parameters:

serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器

async add_subscription(subscription: 订阅) [源代码]#

添加一个新的订阅,运行时应处理发布的消息时履行

Parameters:

subscription (Subscription) – 添加的订阅

async agent_load_state(agent: AgentId, state: 映射[str, 任何]) [源代码]#

加载单个代理的状态。

Parameters:
  • 代理 (AgentId) – 代理ID。

  • state (Mapping[str, Any]) – 保存的状态。

async agent_metadata(agent: AgentId) AgentMetadata[源代码]#

获取代理的元数据。

Parameters:

代理 (AgentId) – 代理的 ID。

Returns:

AgentMetadata – 代理元数据。

async agent_save_state(agent: AgentId) 映射[str, 任何][源代码]#

保存单个代理的状态。

状态的结构由实现定义,可以是任何可JSON序列化的对象。

Parameters:

代理 (AgentId) – 代理的 ID。

Returns:

Mapping[str, Any] – 保存的状态。

async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId[源代码]#
async load_state(state: 映射[str, 任何]) [源代码]#

加载整个运行时的状态,包括所有托管的代理。该状态应与save_state()返回的状态相同。

Parameters:

state (Mapping[str, Any]) – 保存的状态。

async publish_message(message: 任何, topic_id: TopicId, *, sender: AgentId | = None, cancellation_token: CancellationToken | = None, message_id: str | = None) [源代码]#

向给定命名空间中的所有代理发布消息,如果没有提供命名空间,则使用发送者的命名空间。

发布时不期望有任何响应。

Parameters:
  • message (Any) – 要发布的消息。

  • topic (TopicId) – 发布消息的主题。

  • sender (AgentId | None, optional) – 发送消息的代理。默认为None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的操作的令牌。默认为None。

  • message_id (str | None, optional) – 消息ID。如果为None,将生成一个新的消息ID。默认为None。此消息ID必须唯一,并且建议使用UUID。

Raises:

UndeliverableException – 如果消息无法被送达。

async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: 类型[T] | = None) AgentType[源代码]#

向与特定类型关联的运行时注册一个代理工厂。该类型必须是唯一的。此API不会添加任何订阅。

注意

这是一个低级API,通常应该使用代理类的register方法,因为这会自动处理订阅。

示例:

from dataclasses import dataclass

from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event
from autogen_core.models import UserMessage


@dataclass
class MyMessage:
    content: str


class MyAgent(RoutedAgent):
    def __init__(self) -> None:
        super().__init__("My core agent")

    @event
    async def handler(self, message: UserMessage, context: MessageContext) -> None:
        print("Event received: ", message.content)


async def my_agent_factory():
    return MyAgent()


async def main() -> None:
    runtime: AgentRuntime = ...  # type: ignore
    await runtime.register_factory("my_agent", lambda: MyAgent())


import asyncio

asyncio.run(main())
Parameters:
  • type (str) – 该工厂创建的代理的类型。它与代理类名不同。type参数用于区分不同的工厂函数,而不是代理类。

  • agent_factory (Callable[[], T]) – 用于创建代理的工厂,其中T是具体的Agent类型。在工厂内部,使用autogen_core.AgentInstantiationContext来访问诸如当前运行时和代理ID等变量。

  • expected_class (type[T] | None, optional) – 代理的预期类,用于工厂的运行时验证。默认值为 None。如果为 None,则不执行验证。

async remove_subscription(id: str) [源代码]#

从运行时移除一个订阅

Parameters:

id (str) – 要移除的订阅的id

Raises:

LookupError – 如果订阅不存在

async save_state() 映射[str, 任何][源代码]#

保存整个运行时的状态,包括所有托管的代理。恢复状态的唯一方法是将其传递给load_state()

状态的结构由实现定义,可以是任何可JSON序列化的对象。

Returns:

Mapping[str, Any] – 保存的状态。

async send_message(message: 任何, recipient: AgentId, *, sender: AgentId | = None, cancellation_token: CancellationToken | = None, message_id: str | = None) 任何[源代码]#

向代理发送消息并获取回复。

Parameters:
  • message (Any) – 要发送的消息。

  • recipient (AgentId) – 接收消息的代理。

  • sender (AgentId | None, optional) – 发送消息的代理。如果此消息不是由任何代理发送的,例如直接从外部运行时发送,则只能为None。默认值为None。

  • cancellation_token (CancellationToken | None, optional) – 用于取消进行中的操作的令牌。默认为 None。

Raises:
Returns:

任何 – 来自代理的响应。

async start() [源代码]#

在后台任务中启动运行时。

async stop() [源代码]#

立即停止运行时。

async stop_when_signal(signals: Sequence[信号] = (signal.SIGTERM, signal.SIGINT)) [源代码]#

在接收到信号时停止运行时。

async try_get_underlying_agent_instance(id: AgentId, type: 类型[T] = Agent) T[源代码]#

尝试通过名称和命名空间获取底层代理实例。通常不推荐这样做(因此名称较长),但在某些情况下可能会有用。

如果底层代理无法访问,这将引发异常。

Parameters:
  • id (AgentId) – 代理ID。

  • type (Type[T], optional) – 代理的预期类型。默认为 Agent。

Returns:

T – 具体的代理实例。

Raises:
class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[元组[str, 任何]] | = None)[源代码]#

基础: object

start() [源代码]#

在后台任务中启动服务器。

async stop(grace: int = 5) [源代码]#

停止服务器。

async stop_when_signal(grace: int = 5, signals: Sequence[信号] = (signal.SIGTERM, signal.SIGINT)) [源代码]#

当接收到信号时停止服务器。

class GrpcWorkerAgentRuntimeHostServicer[源代码]#

基类:AgentRpcServicer

为代理托管消息传递服务的gRPC服务器。

async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) AddSubscriptionResponse[源代码]#

在.proto文件中缺少关联的文档注释。

async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) GetSubscriptionsResponse[源代码]#

.proto文件中缺少关联的文档注释。

async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) AsyncIterator[Message][源代码]#

在.proto文件中缺少相关的文档注释。

async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) AsyncIterator[ControlMessage][源代码]#

在.proto文件中缺少关联的文档注释。

async RegisterAgent(request: RegisterAgentTypeRequest, context: ServicerContext[RegisterAgentTypeRequest, RegisterAgentTypeResponse]) RegisterAgentTypeResponse[源代码]#

在.proto文件中缺少关联的文档注释。

async RemoveSubscription(request: RemoveSubscriptionRequest, context: ServicerContext[RemoveSubscriptionRequest, RemoveSubscriptionResponse]) RemoveSubscriptionResponse[源代码]#

在.proto文件中缺少关联的文档注释。