并发代理#
在本节中,我们将探讨多个代理同时工作的使用。我们涵盖三种主要模式:
单消息 & 多处理器
展示了如何通过订阅同一主题的多个代理同时处理单个消息。多消息和多处理器
展示了如何根据主题将特定的消息类型路由到专用的代理。直接消息
专注于在代理之间以及从运行时向代理发送消息。
import asyncio
from dataclasses import dataclass
from autogen_core import (
AgentId,
ClosureAgent,
ClosureContext,
DefaultTopicId,
MessageContext,
RoutedAgent,
SingleThreadedAgentRuntime,
TopicId,
TypeSubscription,
default_subscription,
message_handler,
type_subscription,
)
@dataclass
class Task:
task_id: str
@dataclass
class TaskResponse:
task_id: str
result: str
单条消息及多处理器#
第一个模式展示了如何通过多个代理同时处理单个消息:
每个
Processor
代理都使用default_subscription()
装饰器订阅默认主题。当向默认主题发布消息时,所有已注册的代理将独立处理该消息。
注意
下面,我们使用default_subscription()
装饰器来订阅Processor
,还有一种不使用装饰器的订阅方式,如Subscribe and Publish to Topics所示,这种方式可以让同一个代理类订阅不同的主题。
@default_subscription
class Processor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"{self._description} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self._description} finished task {message.task_id}")
runtime = SingleThreadedAgentRuntime()
await Processor.register(runtime, "agent_1", lambda: Processor("Agent 1"))
await Processor.register(runtime, "agent_2", lambda: Processor("Agent 2"))
runtime.start()
await runtime.publish_message(Task(task_id="task-1"), topic_id=DefaultTopicId())
await runtime.stop_when_idle()
Agent 1 starting task task-1
Agent 2 starting task task-1
Agent 1 finished task task-1
Agent 2 finished task task-1
多条消息 & 多个处理器#
其次,这种模式展示了将不同类型的消息路由到特定的处理器:
UrgentProcessor
订阅了“urgent”主题NormalProcessor
订阅了“normal”主题
我们使用type_subscription()
装饰器让代理订阅特定的主题类型。
TASK_RESULTS_TOPIC_TYPE = "task-results"
task_results_topic_id = TopicId(type=TASK_RESULTS_TOPIC_TYPE, source="default")
@type_subscription(topic_type="urgent")
class UrgentProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Urgent processor starting task {message.task_id}")
await asyncio.sleep(1) # Simulate work
print(f"Urgent processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Urgent Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
@type_subscription(topic_type="normal")
class NormalProcessor(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> None:
print(f"Normal processor starting task {message.task_id}")
await asyncio.sleep(3) # Simulate work
print(f"Normal processor finished task {message.task_id}")
task_response = TaskResponse(task_id=message.task_id, result="Results by Normal Processor")
await self.publish_message(task_response, topic_id=task_results_topic_id)
在注册代理后,我们可以向“紧急”和“普通”主题发布消息:
runtime = SingleThreadedAgentRuntime()
await UrgentProcessor.register(runtime, "urgent_processor", lambda: UrgentProcessor("Urgent Processor"))
await NormalProcessor.register(runtime, "normal_processor", lambda: NormalProcessor("Normal Processor"))
runtime.start()
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
收集结果#
在前面的例子中,我们依赖控制台打印来验证任务完成。然而,在实际应用中,我们通常希望以编程方式收集和处理结果。
为了收集这些消息,我们将使用一个ClosureAgent
。我们已经定义了一个专门的主题TASK_RESULTS_TOPIC_TYPE
,UrgentProcessor
和NormalProcessor
都会在这个主题上发布他们的结果。然后,ClosureAgent将处理来自该主题的消息。
queue = asyncio.Queue[TaskResponse]()
async def collect_result(_agent: ClosureContext, message: TaskResponse, ctx: MessageContext) -> None:
await queue.put(message)
runtime.start()
CLOSURE_AGENT_TYPE = "collect_result_agent"
await ClosureAgent.register_closure(
runtime,
CLOSURE_AGENT_TYPE,
collect_result,
subscriptions=lambda: [TypeSubscription(topic_type=TASK_RESULTS_TOPIC_TYPE, agent_type=CLOSURE_AGENT_TYPE)],
)
await runtime.publish_message(Task(task_id="normal-1"), topic_id=TopicId(type="normal", source="default"))
await runtime.publish_message(Task(task_id="urgent-1"), topic_id=TopicId(type="urgent", source="default"))
await runtime.stop_when_idle()
Normal processor starting task normal-1
Urgent processor starting task urgent-1
Urgent processor finished task urgent-1
Normal processor finished task normal-1
while not queue.empty():
print(await queue.get())
TaskResponse(task_id='urgent-1', result='Results by Urgent Processor')
TaskResponse(task_id='normal-1', result='Results by Normal Processor')
直接消息#
与之前的模式不同,此模式专注于直接消息。这里我们展示了两种发送方式:
代理之间的直接消息传递
从运行时向特定代理发送消息
在下面的示例中需要考虑的事项:
消息使用
AgentId
进行地址处理。发送者可以期待从目标代理收到响应。
我们只注册一次
WorkerAgent
类;然而,我们将任务发送给两个不同的工作者。如何实现?正如Agent生命周期中所述,当使用
AgentId
发送消息时,运行时将获取实例,如果不存在则创建一个。在这种情况下,运行时在发送这两条消息时创建了两个工作实例。
class WorkerAgent(RoutedAgent):
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"{self.id} starting task {message.task_id}")
await asyncio.sleep(2) # Simulate work
print(f"{self.id} finished task {message.task_id}")
return TaskResponse(task_id=message.task_id, result=f"Results by {self.id}")
class DelegatorAgent(RoutedAgent):
def __init__(self, description: str, worker_type: str):
super().__init__(description)
self.worker_instances = [AgentId(worker_type, f"{worker_type}-1"), AgentId(worker_type, f"{worker_type}-2")]
@message_handler
async def on_task(self, message: Task, ctx: MessageContext) -> TaskResponse:
print(f"Delegator received task {message.task_id}.")
subtask1 = Task(task_id="task-part-1")
subtask2 = Task(task_id="task-part-2")
worker1_result, worker2_result = await asyncio.gather(
self.send_message(subtask1, self.worker_instances[0]), self.send_message(subtask2, self.worker_instances[1])
)
combined_result = f"Part 1: {worker1_result.result}, " f"Part 2: {worker2_result.result}"
task_response = TaskResponse(task_id=message.task_id, result=combined_result)
return task_response
runtime = SingleThreadedAgentRuntime()
await WorkerAgent.register(runtime, "worker", lambda: WorkerAgent("Worker Agent"))
await DelegatorAgent.register(runtime, "delegator", lambda: DelegatorAgent("Delegator Agent", "worker"))
runtime.start()
delegator = AgentId("delegator", "default")
response = await runtime.send_message(Task(task_id="main-task"), recipient=delegator)
print(f"Final result: {response.result}")
await runtime.stop_when_idle()
Delegator received task main-task.
worker/worker-1 starting task task-part-1
worker/worker-2 starting task task-part-2
worker/worker-1 finished task task-part-1
worker/worker-2 finished task task-part-2
Final result: Part 1: Results by worker/worker-1, Part 2: Results by worker/worker-2
其他资源#
如果你对并发处理感兴趣,可以查看混合代理模式,它严重依赖于并发代理。