autogen_ext.runtimes.grpc#
- class GrpcWorkerAgentRuntime(host_address: str, tracer_provider: TracerProvider | 无 = None, extra_grpc_config: Sequence[元组[str, 任何]] | 无 = None, payload_serialization_format: str = JSON_DATA_CONTENT_TYPE)[源代码]#
基础类:
AgentRuntime
一个用于运行远程或跨语言代理的代理运行时。
Agent 消息传递使用了来自 agent_worker.proto 的 protobufs 和来自 cloudevent.proto 的
CloudEvent
。跨语言代理将额外要求所有代理之间发送的任何消息类型都使用共享的protobuf模式。
- add_message_serializer(serializer: MessageSerializer[任何] | Sequence[MessageSerializer[任何]]) 无 [源代码]#
在运行时添加一个新的消息序列化序列化器
注意:这将根据type_name和data_content_type属性对序列化器进行去重。
- Parameters:
serializer (MessageSerializer[Any] | Sequence[MessageSerializer[Any]]) – 要添加的序列化器
- async add_subscription(subscription: 订阅) 无 [源代码]#
添加一个新的订阅,运行时应处理发布的消息时履行
- Parameters:
subscription (Subscription) – 添加的订阅
- async agent_metadata(agent: AgentId) AgentMetadata [源代码]#
获取代理的元数据。
- Parameters:
代理 (AgentId) – 代理的 ID。
- Returns:
AgentMetadata – 代理元数据。
- async agent_save_state(agent: AgentId) 映射[str, 任何] [源代码]#
保存单个代理的状态。
状态的结构由实现定义,可以是任何可JSON序列化的对象。
- Parameters:
代理 (AgentId) – 代理的 ID。
- Returns:
Mapping[str, Any] – 保存的状态。
- async get(id_or_type: AgentId | AgentType | str, /, key: str = 'default', *, lazy: bool = True) AgentId [源代码]#
- async load_state(state: 映射[str, 任何]) 无 [源代码]#
加载整个运行时的状态,包括所有托管的代理。该状态应与
save_state()
返回的状态相同。- Parameters:
state (Mapping[str, Any]) – 保存的状态。
- async publish_message(message: 任何, topic_id: TopicId, *, sender: AgentId | 无 = None, cancellation_token: CancellationToken | 无 = None, message_id: str | 无 = None) 无 [源代码]#
向给定命名空间中的所有代理发布消息,如果没有提供命名空间,则使用发送者的命名空间。
发布时不期望有任何响应。
- Parameters:
message (Any) – 要发布的消息。
topic (TopicId) – 发布消息的主题。
sender (AgentId | None, optional) – 发送消息的代理。默认为None。
cancellation_token (CancellationToken | None, optional) – 用于取消正在进行的操作的令牌。默认为None。
message_id (str | None, optional) – 消息ID。如果为None,将生成一个新的消息ID。默认为None。此消息ID必须唯一,并且建议使用UUID。
- Raises:
UndeliverableException – 如果消息无法被送达。
- async register_factory(type: str | AgentType, agent_factory: Callable[[], T | Awaitable[T]], *, expected_class: 类型[T] | 无 = None) AgentType [源代码]#
向与特定类型关联的运行时注册一个代理工厂。该类型必须是唯一的。此API不会添加任何订阅。
注意
这是一个低级API,通常应该使用代理类的register方法,因为这会自动处理订阅。
示例:
from dataclasses import dataclass from autogen_core import AgentRuntime, MessageContext, RoutedAgent, event from autogen_core.models import UserMessage @dataclass class MyMessage: content: str class MyAgent(RoutedAgent): def __init__(self) -> None: super().__init__("My core agent") @event async def handler(self, message: UserMessage, context: MessageContext) -> None: print("Event received: ", message.content) async def my_agent_factory(): return MyAgent() async def main() -> None: runtime: AgentRuntime = ... # type: ignore await runtime.register_factory("my_agent", lambda: MyAgent()) import asyncio asyncio.run(main())
- async remove_subscription(id: str) 无 [源代码]#
从运行时移除一个订阅
- Parameters:
id (str) – 要移除的订阅的id
- Raises:
LookupError – 如果订阅不存在
- async save_state() 映射[str, 任何] [源代码]#
保存整个运行时的状态,包括所有托管的代理。恢复状态的唯一方法是将其传递给
load_state()
。状态的结构由实现定义,可以是任何可JSON序列化的对象。
- Returns:
Mapping[str, Any] – 保存的状态。
- async send_message(message: 任何, recipient: AgentId, *, sender: AgentId | 无 = None, cancellation_token: CancellationToken | 无 = None, message_id: str | 无 = None) 任何 [源代码]#
向代理发送消息并获取回复。
- Parameters:
message (Any) – 要发送的消息。
recipient (AgentId) – 接收消息的代理。
sender (AgentId | None, optional) – 发送消息的代理。如果此消息不是由任何代理发送的,例如直接从外部运行时发送,则只能为None。默认值为None。
cancellation_token (CancellationToken | None, optional) – 用于取消进行中的操作的令牌。默认为 None。
- Raises:
CantHandleException – 如果接收者无法处理该消息。
UndeliverableException – 如果消息无法传递。
其他 – 接收方引发的任何其他异常。
- Returns:
任何 – 来自代理的响应。
- async stop_when_signal(signals: Sequence[信号] = (signal.SIGTERM, signal.SIGINT)) 无 [源代码]#
在接收到信号时停止运行时。
- async try_get_underlying_agent_instance(id: AgentId, type: 类型[T] = Agent) T [源代码]#
尝试通过名称和命名空间获取底层代理实例。通常不推荐这样做(因此名称较长),但在某些情况下可能会有用。
如果底层代理无法访问,这将引发异常。
- Parameters:
id (AgentId) – 代理ID。
type (Type[T], optional) – 代理的预期类型。默认为 Agent。
- Returns:
T – 具体的代理实例。
- Raises:
LookupError – 如果未找到代理。
NotAccessibleError – 如果代理不可访问,例如,如果它位于远程位置。
TypeError – 如果代理不是预期的类型。
- class GrpcWorkerAgentRuntimeHost(address: str, extra_grpc_config: Sequence[元组[str, 任何]] | 无 = None)[源代码]#
基础:
object
- class GrpcWorkerAgentRuntimeHostServicer[源代码]#
基类:
AgentRpcServicer
为代理托管消息传递服务的gRPC服务器。
- async AddSubscription(request: AddSubscriptionRequest, context: ServicerContext[AddSubscriptionRequest, AddSubscriptionResponse]) AddSubscriptionResponse [源代码]#
在.proto文件中缺少关联的文档注释。
- async GetSubscriptions(request: GetSubscriptionsRequest, context: ServicerContext[GetSubscriptionsRequest, GetSubscriptionsResponse]) GetSubscriptionsResponse [源代码]#
.proto文件中缺少关联的文档注释。
- async OpenChannel(request_iterator: AsyncIterator[Message], context: ServicerContext[Message, Message]) AsyncIterator[Message] [源代码]#
在.proto文件中缺少相关的文档注释。
- async OpenControlChannel(request_iterator: AsyncIterator[ControlMessage], context: ServicerContext[ControlMessage, ControlMessage]) AsyncIterator[ControlMessage] [源代码]#
在.proto文件中缺少关联的文档注释。