使用干预处理程序终止#
注意
在使用SingleThreadedAgentRuntime
时,此方法是有效的。
在autogen_core
中有许多不同的方式来处理终止问题。最终目标是检测到运行时不再需要执行,并且你可以继续进行最终化任务。一种方法是使用autogen_core.base.intervention.InterventionHandler
来检测终止消息,然后根据它采取行动。
from dataclasses import dataclass
from typing import Any
from autogen_core import (
DefaultInterventionHandler,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
default_subscription,
message_handler,
)
首先,我们为常规消息和用于发出终止信号的消息定义一个数据类。
@dataclass
class Message:
content: Any
@dataclass
class Termination:
reason: str
我们将代码编写为当代理决定终止时发布终止消息。
@default_subscription
class AnAgent(RoutedAgent):
def __init__(self) -> None:
super().__init__("MyAgent")
self.received = 0
@message_handler
async def on_new_message(self, message: Message, ctx: MessageContext) -> None:
self.received += 1
if self.received > 3:
await self.publish_message(Termination(reason="Reached maximum number of messages"), DefaultTopicId())
接下来,我们创建一个InterventionHandler,它将检测终止消息并采取相应的行动。这个处理程序会挂钩到发布中,当它遇到Termination
时,它会改变其内部状态以指示已请求终止。
class TerminationHandler(DefaultInterventionHandler):
def __init__(self) -> None:
self._termination_value: Termination | None = None
async def on_publish(self, message: Any, *, message_context: MessageContext) -> Any:
if isinstance(message, Termination):
self._termination_value = message
return message
@property
def termination_value(self) -> Termination | None:
return self._termination_value
@property
def has_terminated(self) -> bool:
return self._termination_value is not None
最后,我们将这个处理器添加到运行时中,并使用它来检测终止,并在接收到终止消息时停止运行时。
termination_handler = TerminationHandler()
runtime = SingleThreadedAgentRuntime(intervention_handlers=[termination_handler])
await AnAgent.register(runtime, "my_agent", AnAgent)
runtime.start()
# Publish more than 3 messages to trigger termination.
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
await runtime.publish_message(Message("hello"), DefaultTopicId())
# Wait for termination.
await runtime.stop_when(lambda: termination_handler.has_terminated)
print(termination_handler.termination_value)
Termination(reason='Reached maximum number of messages')