跳过内容

多智能体应用

在使用PydanticAI构建应用程序时,大致有四个复杂程度:

  1. 单代理工作流程 — 大多数 pydantic_ai 文档所覆盖的内容
  2. 代理委派 — 通过工具使用另一个代理的代理
  3. 程序化代理交接 — 一个代理运行,然后应用代码调用另一个代理
  4. 基于图的控制流 — 对于最复杂的情况,可以使用基于图的状态机来控制多个代理的执行

当然,您可以在一个应用程序中结合多种策略。

代理委托

“代理委托”是指代理将工作委托给另一个代理的场景,然后在被委托的代理(从工具内部调用的代理)完成后重新掌控。

由于代理是无状态的并且被设计为全球性的,因此您无需在代理依赖项中包含代理本身。

您通常需要将 ctx.usage 传递给委托代理运行的 usage 关键字参数,以便该运行中的使用量计入父代理运行的总使用量。

多个模型

代理委派不需要为每个代理使用相同的模型。如果您选择在一次运行中使用不同的模型,则无法根据运行的最终 result.usage() 来计算货币成本,但您仍然可以使用 UsageLimits 来避免意外费用。

agent_delegation_simple.py
from pydantic_ai import Agent, RunContext
from pydantic_ai.usage import UsageLimits

joke_selection_agent = Agent(  # (1)!
    'openai:gpt-4o',
    system_prompt=(
        'Use the `joke_factory` to generate some jokes, then choose the best. '
        'You must return just a single joke.'
    ),
)
joke_generation_agent = Agent(  # (2)!
    'google-gla:gemini-1.5-flash', result_type=list[str]
)


@joke_selection_agent.tool
async def joke_factory(ctx: RunContext[None], count: int) -> list[str]:
    r = await joke_generation_agent.run(  # (3)!
        f'Please generate {count} jokes.',
        usage=ctx.usage,  # (4)!
    )
    return r.data  # (5)!


result = joke_selection_agent.run_sync(
    'Tell me a joke.',
    usage_limits=UsageLimits(request_limit=5, total_tokens_limit=300),
)
print(result.data)
#> Did you hear about the toothpaste scandal? They called it Colgate.
print(result.usage())
"""
Usage(
    requests=3, request_tokens=204, response_tokens=24, total_tokens=228, details=None
)
"""
  1. “父”或控制代理。
  2. “代理”代理,从父代理的工具内部调用。
  3. 在父代理的工具内调用代理代理。
  4. 将父代理的使用情况传递给委托代理,以便最终的 result.usage() 包括两个代理的使用情况。
  5. 由于函数返回 list[str],而 joke_generation_agentresult_type 也是 list[str],我们可以简单地从工具中返回 r.data

(这个例子是完整的,可以“原样”运行)

这个示例的控制流程非常简单,可以总结如下:

graph TD
  START --> joke_selection_agent
  joke_selection_agent --> joke_factory["joke_factory (tool)"]
  joke_factory --> joke_generation_agent
  joke_generation_agent --> joke_factory
  joke_factory --> joke_selection_agent
  joke_selection_agent --> END

代理委托和依赖关系

通常情况下,委托代理需要具有与调用代理相同的 依赖项,或具有调用代理依赖项的子集。

正在初始化依赖项

我们在上面说“通常”是因为没有什么可以阻止你在工具调用中初始化依赖,因此在委托代理中使用在父代理上不可用的相互依赖,这种做法通常应该避免,因为它可能比从父代理重用连接等要慢得多。

agent_delegation_deps.py
from dataclasses import dataclass

import httpx

from pydantic_ai import Agent, RunContext


@dataclass
class ClientAndKey:  # (1)!
    http_client: httpx.AsyncClient
    api_key: str


joke_selection_agent = Agent(
    'openai:gpt-4o',
    deps_type=ClientAndKey,  # (2)!
    system_prompt=(
        'Use the `joke_factory` tool to generate some jokes on the given subject, '
        'then choose the best. You must return just a single joke.'
    ),
)
joke_generation_agent = Agent(
    'gemini-1.5-flash',
    deps_type=ClientAndKey,  # (4)!
    result_type=list[str],
    system_prompt=(
        'Use the "get_jokes" tool to get some jokes on the given subject, '
        'then extract each joke into a list.'
    ),
)


@joke_selection_agent.tool
async def joke_factory(ctx: RunContext[ClientAndKey], count: int) -> list[str]:
    r = await joke_generation_agent.run(
        f'Please generate {count} jokes.',
        deps=ctx.deps,  # (3)!
        usage=ctx.usage,
    )
    return r.data


@joke_generation_agent.tool  # (5)!
async def get_jokes(ctx: RunContext[ClientAndKey], count: int) -> str:
    response = await ctx.deps.http_client.get(
        'https://example.com',
        params={'count': count},
        headers={'Authorization': f'Bearer {ctx.deps.api_key}'},
    )
    response.raise_for_status()
    return response.text


async def main():
    async with httpx.AsyncClient() as client:
        deps = ClientAndKey(client, 'foobar')
        result = await joke_selection_agent.run('Tell me a joke.', deps=deps)
        print(result.data)
        #> Did you hear about the toothpaste scandal? They called it Colgate.
        print(result.usage())  # (6)!
        """
        Usage(
            requests=4,
            request_tokens=309,
            response_tokens=32,
            total_tokens=341,
            details=None,
        )
        """
  1. 定义一个数据类以保存客户端和API密钥依赖关系。
  2. 设置调用代理的 deps_type — 这里是 joke_selection_agent
  3. 在工具调用中将依赖项传递给委托代理的运行方法。
  4. 还要设置代理代理的 deps_typejoke_generation_agent 在这里。
  5. 在代理代理上定义一个工具,该工具使用依赖关系进行HTTP请求。
  6. 当前用法包括4个请求——来自调用代理的2个请求和来自委托代理的2个请求。

(此示例是完整的,可以“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)

这个例子展示了即使是一个相当简单的代理委托也可以导致复杂的控制流程:

graph TD
  START --> joke_selection_agent
  joke_selection_agent --> joke_factory["joke_factory (tool)"]
  joke_factory --> joke_generation_agent
  joke_generation_agent --> get_jokes["get_jokes (tool)"]
  get_jokes --> http_request["HTTP request"]
  http_request --> get_jokes
  get_jokes --> joke_generation_agent
  joke_generation_agent --> joke_factory
  joke_factory --> joke_selection_agent
  joke_selection_agent --> END

程序化代理交接

“程序化代理交接”指的是在多个代理连续调用的场景中,由应用代码和/或人为介入负责决定下一个要调用的代理。

在这里,代理不需要使用相同的依赖。

这里我们展示了两个依次使用的代理,第一个用于查找航班,第二个用于提取用户的座位偏好。

programmatic_handoff.py
from typing import Literal, Union

from pydantic import BaseModel, Field
from rich.prompt import Prompt

from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import ModelMessage
from pydantic_ai.usage import Usage, UsageLimits


class FlightDetails(BaseModel):
    flight_number: str


class Failed(BaseModel):
    """Unable to find a satisfactory choice."""


flight_search_agent = Agent[None, Union[FlightDetails, Failed]](  # (1)!
    'openai:gpt-4o',
    result_type=Union[FlightDetails, Failed],  # type: ignore
    system_prompt=(
        'Use the "flight_search" tool to find a flight '
        'from the given origin to the given destination.'
    ),
)


@flight_search_agent.tool  # (2)!
async def flight_search(
    ctx: RunContext[None], origin: str, destination: str
) -> Union[FlightDetails, None]:
    # in reality, this would call a flight search API or
    # use a browser to scrape a flight search website
    return FlightDetails(flight_number='AK456')


usage_limits = UsageLimits(request_limit=15)  # (3)!


async def find_flight(usage: Usage) -> Union[FlightDetails, None]:  # (4)!
    message_history: Union[list[ModelMessage], None] = None
    for _ in range(3):
        prompt = Prompt.ask(
            'Where would you like to fly from and to?',
        )
        result = await flight_search_agent.run(
            prompt,
            message_history=message_history,
            usage=usage,
            usage_limits=usage_limits,
        )
        if isinstance(result.data, FlightDetails):
            return result.data
        else:
            message_history = result.all_messages(
                result_tool_return_content='Please try again.'
            )


class SeatPreference(BaseModel):
    row: int = Field(ge=1, le=30)
    seat: Literal['A', 'B', 'C', 'D', 'E', 'F']


# This agent is responsible for extracting the user's seat selection
seat_preference_agent = Agent[None, Union[SeatPreference, Failed]](  # (5)!
    'openai:gpt-4o',
    result_type=Union[SeatPreference, Failed],  # type: ignore
    system_prompt=(
        "Extract the user's seat preference. "
        'Seats A and F are window seats. '
        'Row 1 is the front row and has extra leg room. '
        'Rows 14, and 20 also have extra leg room. '
    ),
)


async def find_seat(usage: Usage) -> SeatPreference:  # (6)!
    message_history: Union[list[ModelMessage], None] = None
    while True:
        answer = Prompt.ask('What seat would you like?')

        result = await seat_preference_agent.run(
            answer,
            message_history=message_history,
            usage=usage,
            usage_limits=usage_limits,
        )
        if isinstance(result.data, SeatPreference):
            return result.data
        else:
            print('Could not understand seat preference. Please try again.')
            message_history = result.all_messages()


async def main():  # (7)!
    usage: Usage = Usage()

    opt_flight_details = await find_flight(usage)
    if opt_flight_details is not None:
        print(f'Flight found: {opt_flight_details.flight_number}')
        #> Flight found: AK456
        seat_preference = await find_seat(usage)
        print(f'Seat preference: {seat_preference}')
        #> Seat preference: row=1 seat='A'
  1. 定义第一个代理,用于查找航班。在 PEP-747 发布之前,我们使用显式类型注解,参见 结构化结果。我们使用联合体作为结果类型,以便模型可以在无法找到满意的选择时进行通信;在内部,联合体的每个成员将作为一个单独的工具注册。
  2. 在代理上定义一个工具来查找航班。在这个简单的情况下,我们可以省去工具,只需定义代理返回结构化数据,然后搜索航班,但在更复杂的场景中,工具将是必要的。
  3. 为整个应用定义使用限制。
  4. 定义一个函数以查找航班,该函数询问用户的偏好,然后调用代理来查找航班。
  5. 与上面的 flight_search_agent 一样,我们使用显式类型注释来定义代理。
  6. 定义一个函数来查找用户的座位偏好,该函数询问用户的座位偏好,然后调用代理提取座位偏好。
  7. 现在我们已经将运行每个代理的逻辑放入单独的函数中,我们的主应用程序变得非常简单。

(这个例子是完整的,可以“直接运行”——你需要添加 asyncio.run(main()) 来运行 main)

本例的控制流程可以总结如下:

graph TB
  START --> ask_user_flight["ask user for flight"]

  subgraph find_flight
    flight_search_agent --> ask_user_flight
    ask_user_flight --> flight_search_agent
  end

  flight_search_agent --> ask_user_seat["ask user for seat"]
  flight_search_agent --> END

  subgraph find_seat
    seat_preference_agent --> ask_user_seat
    ask_user_seat --> seat_preference_agent
  end

  seat_preference_agent --> END

Pydantic 图形

查看graph文档,以了解何时以及如何使用图表。

示例

以下示例演示了如何在PydanticAI中使用依赖项: