主题和订阅示例场景#

介绍#

在本指南中,我们通过四种不同的广播场景来探索AutoGen中代理通信的广播机制。这些场景展示了处理和分发消息的不同方法。我们将使用一个一致的示例,即税务管理公司处理客户请求,来演示每种场景。

场景概述#

想象一家提供各种税务管理服务的公司,例如税务规划、争议解决、合规和报税准备。该公司雇用了一支税务专家团队,每位专家在这些领域中都有自己的专长,以及一位负责监督运营的税务系统经理。

客户提交的请求需要由相应的专家进行处理。在该系统中,客户、税务系统管理者和税务专家之间的通信通过广播方式处理。

我们将探讨不同的广播场景如何影响消息在代理之间的分发方式,以及如何利用这些场景来根据特定需求定制通信流程。


广播场景概览#

我们将涵盖以下广播场景:

  1. 单租户,单一发布范围

  2. 多租户,单一发布范围

  3. 单租户,多发布范围

  4. 多租户,多种发布范围

每个场景代表了在系统中消息分发和代理交互的不同方法。通过理解这些场景,您可以设计出最适合您应用需求的代理通信策略。

import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import List

from autogen_core import (
    MessageContext,
    RoutedAgent,
    SingleThreadedAgentRuntime,
    TopicId,
    TypeSubscription,
    message_handler,
)
from autogen_core._default_subscription import DefaultSubscription
from autogen_core._default_topic import DefaultTopicId
from autogen_core.models import (
    SystemMessage,
)
class TaxSpecialty(str, Enum):
    PLANNING = "planning"
    DISPUTE_RESOLUTION = "dispute_resolution"
    COMPLIANCE = "compliance"
    PREPARATION = "preparation"


@dataclass
class ClientRequest:
    content: str


@dataclass
class RequestAssessment:
    content: str


class TaxSpecialist(RoutedAgent):
    def __init__(
        self,
        description: str,
        specialty: TaxSpecialty,
        system_messages: List[SystemMessage],
    ) -> None:
        super().__init__(description)
        self.specialty = specialty
        self._system_messages = system_messages
        self._memory: List[ClientRequest] = []

    @message_handler
    async def handle_message(self, message: ClientRequest, ctx: MessageContext) -> None:
        # Process the client request.
        print(f"\n{'='*50}\nTax specialist {self.id} with specialty {self.specialty}:\n{message.content}")
        # Send a response back to the manager
        if ctx.topic_id is None:
            raise ValueError("Topic ID is required for broadcasting")
        await self.publish_message(
            message=RequestAssessment(content=f"I can handle this request in {self.specialty}."),
            topic_id=ctx.topic_id,
        )

1. 单租户,单一发布范围#

场景解释#

在单租户、单一发布范围的场景中:

  • 所有代理都在单个租户内操作(例如,一个客户端或用户会话)。

  • 消息发布到一个单一的主题,所有代理都订阅这个主题。

  • 每个代理都会接收发布到主题的每一条消息。

此场景适用于所有代理需要了解所有消息,且无需在不同组代理或会话之间隔离通信的情况。

税务专家公司中的应用程序#

在我们的税务专家公司中,这个场景意味着:

  • 所有税务专家都会接收每个客户请求和内部消息。

  • 所有代理紧密合作,所有通信都完全可见。

  • 适用于所有代理需要了解所有消息的任务或团队。

场景如何工作#

  • 订阅:所有代理均使用默认订阅(例如,“default”)。

  • 发布:消息被发布到默认主题。

  • 消息处理:每个代理根据消息的内容和可用的处理程序来决定是否对该消息采取行动。

好处#

  • 简单性:易于设置和理解。

  • 协作:促进代理之间的透明度和协作。

  • 灵活性:代理可以动态决定处理哪些消息。

注意事项#

  • 可扩展性:可能在处理大量代理或消息时表现不佳。

  • 效率问题:代理可能会收到许多无关的消息,导致不必要的处理。

async def run_single_tenant_single_scope() -> None:
    # Create the runtime.
    runtime = SingleThreadedAgentRuntime()

    # Register TaxSpecialist agents for each specialty
    specialist_agent_type_1 = "TaxSpecialist_1"
    specialist_agent_type_2 = "TaxSpecialist_2"
    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_1,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 1",
            specialty=TaxSpecialty.PLANNING,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    await TaxSpecialist.register(
        runtime=runtime,
        type=specialist_agent_type_2,
        factory=lambda: TaxSpecialist(
            description="A tax specialist 2",
            specialty=TaxSpecialty.DISPUTE_RESOLUTION,
            system_messages=[SystemMessage(content="You are a tax specialist.")],
        ),
    )

    # Add default subscriptions for each agent type
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_1))
    await runtime.add_subscription(DefaultSubscription(agent_type=specialist_agent_type_2))

    # Start the runtime and send a message to agents on default topic
    runtime.start()
    await runtime.publish_message(ClientRequest("I need to have my tax for 2024 prepared."), topic_id=DefaultTopicId())
    await runtime.stop_when_idle()


await run_single_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_1:default with specialty TaxSpecialty.PLANNING:
I need to have my tax for 2024 prepared.

==================================================
Tax specialist TaxSpecialist_2:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need to have my tax for 2024 prepared.

2. 多租户,单一发布范围#

场景解释#

在多租户、单一发布范围的场景中:

  • 有多个租户(例如,多个客户端或用户会话)。

  • 每个租户通过主题源拥有其自己的隔离主题。

  • 租户内的所有代理都订阅了该租户的主题。如果需要,将为每个租户创建新的代理实例。

  • 消息仅对同一租户内的代理可见。

当您需要在不同租户之间隔离通信,但希望同一租户内的所有代理都能了解所有消息时,此场景非常有用。

税务专家公司中的应用程序#

在这种情况下:

  • 公司同时服务多个客户(租户)。

  • 为每个客户端创建一组专用的代理实例。

  • 每个客户端的通信与其他客户端隔离。

  • 客户端的所有代理都会接收到发布到该客户端主题的消息。

场景如何工作#

  • 订阅:代理根据租户的身份订阅主题。

  • 发布:消息被发布到特定租户的主题。

  • 消息处理:代理仅接收与其租户相关的消息。

好处#

  • 租户隔离:确保数据隐私和客户之间的分离。

  • 租户内协作:代理可以自由地在他们的租户内协作。

注意事项#

  • 复杂性:需要管理多组代理和主题。

  • 资源使用:更多的代理实例可能会消耗额外的资源。

async def run_multi_tenant_single_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # List of clients (tenants)
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize sessions and map the topic type to each TaxSpecialist agent type
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = DefaultSubscription(agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish client requests to their respective topics
    for tenant in tenants:
        topic_source = tenant  # The topic source is the client name
        topic_id = DefaultTopicId(source=topic_source)
        await runtime.publish_message(
            ClientRequest(f"{tenant} requires tax services."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_single_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC requires tax services.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ requires tax services.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ requires tax services.

3. 单租户,多发布范围#

场景解释#

在单租户、多个发布范围的场景中:

  • 所有代理都在单个租户内操作。

  • 消息被发布到不同的主题。

  • 代理订阅与其角色或专业相关的特定主题。

  • 消息根据主题定向发送给代理的子集。

该场景允许在租户内进行有针对性的通信,从而实现对消息分发的更精细控制。

在税务管理公司中的应用#

在这种情况下:

  • 税务系统管理器根据其专业领域与特定专家进行沟通。

  • 不同主题代表不同的专业领域(例如,“规划”,“合规”)。

  • 专家只订阅与他们的专业相匹配的话题。

  • 管理者通过发布消息到特定主题来触达目标专家。

场景如何工作#

  • 订阅:代理订阅与其专业相关的主题。

  • 发布:消息根据预期的收件人发布到主题。

  • 消息处理:只有订阅了某个主题的代理才能接收到该主题的消息。

好处#

  • 针对性通信:消息仅传递给相关代理。

  • 效率:通过代理减少不必要的消息处理。

注意事项#

  • 设置复杂性:需要仔细管理主题和订阅。

  • 灵活性:通信场景的变化可能需要更新订阅。

async def run_single_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()
    # Register TaxSpecialist agents for each specialty and add subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        specialist_subscription = TypeSubscription(topic_type=specialty.value, agent_type=specialist_agent_type)
        await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Publish a ClientRequest to each specialist's topic
    for specialty in TaxSpecialty:
        topic_id = TopicId(type=specialty.value, source="default")
        await runtime.publish_message(
            ClientRequest(f"I need assistance with {specialty.value} taxes."),
            topic_id=topic_id,
        )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_single_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:default with specialty TaxSpecialty.PLANNING:
I need assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:default with specialty TaxSpecialty.DISPUTE_RESOLUTION:
I need assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:default with specialty TaxSpecialty.COMPLIANCE:
I need assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:default with specialty TaxSpecialty.PREPARATION:
I need assistance with preparation taxes.

4. 多租户,多发布范围#

场景解释#

在多租户、多范围的发布场景中:

  • 有多个租户,每个租户都有自己的代理集。

  • 消息会发布到每个租户内的多个主题。

  • 代理订阅与其角色相关的特定租户主题。

  • 将租户隔离与定向通信结合。

此场景提供了对消息分发的最高级别控制,适用于具有多个客户端和特殊通信需求的复杂系统。

在税务管理公司中的应用#

在这种情况下:

  • 公司服务于多个客户,每个客户都有专用的代理实例。

  • 在每个客户端内,代理根据专业领域通过多个主题进行通信。

  • 例如,客户A的规划专家订阅了来源为“ClientA”的“规划”主题。

  • 每个客户的税务系统经理通过特定租户的主题与他们的专家进行沟通。

场景如何工作#

  • 订阅:代理根据租户身份和专业订阅主题。

  • 发布:消息被发布到特定租户和特定专业的主题中。

  • 消息处理:只有匹配租户和主题的代理才能接收消息。

好处#

  • 完全隔离:确保租户和通信的隔离。

  • 细粒度控制:能够将消息精确地路由到目标代理。

注意事项#

  • 复杂性:需要仔细管理话题、租户和订阅。

  • 资源使用情况:增加的代理实例和主题数量可能会影响资源。

async def run_multi_tenant_multiple_scope() -> None:
    # Create the runtime
    runtime = SingleThreadedAgentRuntime()

    # Define TypeSubscriptions for each specialty and tenant
    tenants = ["ClientABC", "ClientXYZ"]

    # Initialize agents for all specialties and add type subscriptions
    for specialty in TaxSpecialty:
        specialist_agent_type = f"TaxSpecialist_{specialty.value}"
        await TaxSpecialist.register(
            runtime=runtime,
            type=specialist_agent_type,
            factory=lambda specialty=specialty: TaxSpecialist(  # type: ignore
                description=f"A tax specialist in {specialty.value}.",
                specialty=specialty,
                system_messages=[SystemMessage(content=f"You are a tax specialist in {specialty.value}.")],
            ),
        )
        for tenant in tenants:
            specialist_subscription = TypeSubscription(
                topic_type=f"{tenant}_{specialty.value}", agent_type=specialist_agent_type
            )
            await runtime.add_subscription(specialist_subscription)

    # Start the runtime
    runtime.start()

    # Send messages for each tenant to each specialty
    for tenant in tenants:
        for specialty in TaxSpecialty:
            topic_id = TopicId(type=f"{tenant}_{specialty.value}", source=tenant)
            await runtime.publish_message(
                ClientRequest(f"{tenant} needs assistance with {specialty.value} taxes."),
                topic_id=topic_id,
            )

    # Allow time for message processing
    await asyncio.sleep(1)

    # Stop the runtime when idle
    await runtime.stop_when_idle()


await run_multi_tenant_multiple_scope()
==================================================
Tax specialist TaxSpecialist_planning:ClientABC with specialty TaxSpecialty.PLANNING:
ClientABC needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientABC with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientABC needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientABC with specialty TaxSpecialty.COMPLIANCE:
ClientABC needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientABC with specialty TaxSpecialty.PREPARATION:
ClientABC needs assistance with preparation taxes.

==================================================
Tax specialist TaxSpecialist_planning:ClientXYZ with specialty TaxSpecialty.PLANNING:
ClientXYZ needs assistance with planning taxes.

==================================================
Tax specialist TaxSpecialist_dispute_resolution:ClientXYZ with specialty TaxSpecialty.DISPUTE_RESOLUTION:
ClientXYZ needs assistance with dispute_resolution taxes.

==================================================
Tax specialist TaxSpecialist_compliance:ClientXYZ with specialty TaxSpecialty.COMPLIANCE:
ClientXYZ needs assistance with compliance taxes.

==================================================
Tax specialist TaxSpecialist_preparation:ClientXYZ with specialty TaxSpecialty.PREPARATION:
ClientXYZ needs assistance with preparation taxes.