跳过内容

图形

除非你需要钉枪,否则不要使用钉枪

如果 PydanticAI agents 是一把锤子,而 multi-agent workflows 是一把大锤,那么图形就是一个钉枪:

  • 当然,钉枪看起来比锤子更酷
  • 但是钉枪的设置比锤子复杂得多
  • 而钉枪并不会让你成为更好的建筑工,而是让你成为一个拥有钉枪的建筑工
  • 最后,(冒着扭曲这个隐喻的风险),如果你是喜欢像锤子和未类型化Python这样的中世纪工具的粉丝,你可能不会喜欢钉枪或我们处理图表的方法。(但话又说回来,如果你不喜欢Python中的类型提示,你可能已经放弃了PydanticAI而去使用其中一个玩具代理框架——祝你好运,如果你意识到需要它,随时可以借用我的大锤)

总之,图是一个强大的工具,但并不是每个工作都合适的工具。请在继续之前考虑其他 多智能体方法

如果你不确定基于图的方式是否是个好主意,那可能就没有必要。

图形和有限状态机(FSMs)是一种强大的抽象,用于建模、执行、控制和可视化复杂的工作流程。

除了PydanticAI,我们还开发了 pydantic-graph — 一个用于Python的异步图和状态机库,其中节点和边是使用类型提示定义的。

虽然这个库是作为PydanticAI的一部分开发的;但它并不依赖于 pydantic-ai,可以被认为是一个纯粹的基于图的状态机库。无论你是否在使用PydanticAI,甚至是在使用GenAI进行构建,你都可能会发现它很有用。

pydantic-graph 针对高级用户设计,广泛使用 Python 泛型和类型提示。它并不像 PydanticAI 那样友好于初学者。

非常早期的beta版本

图形支持在 v0.0.19 中引入,并且处于非常早期的测试阶段。API 可能会有所更改。文档尚不完整。实现也尚不完整。

安装

pydantic-graphpydantic-ai 的必要依赖,也是 pydantic-ai-slim 的可选依赖,更多信息请参见 安装说明。你也可以直接安装它:

pip install pydantic-graph
uv add pydantic-graph

图形类型

pydantic-graph由几个关键组件组成:

图形运行上下文

GraphRunContext — 图形运行的上下文,类似于 PydanticAI 的 RunContext。它保存了图形的状态和依赖关系,并在节点运行时传递给它们。

GraphRunContext 是图中使用的状态类型的通用类型, StateT

结束

End — 返回值表示图形运行应结束。

End 在它所使用的图的图形返回类型中是通用的, RunEndT

节点

BaseNode 的子类定义了图中执行的节点。

节点,通常是 dataclasses,一般由以下组成:

  • 包含调用节点时所需/可选的任何参数的字段
  • run 方法中执行节点的业务逻辑
  • 返回run方法的注释,这些注释由pydantic-graph读取以确定节点的输出边

节点是通用的:

  • state,必须与它们包含的图的状态具有相同类型, StateT 的默认值为 None,因此如果您不使用状态,可以省略此泛型参数,更多信息请参见 stateful graphs
  • deps,必须与它们所包含的图的deps具有相同类型,DepsT的默认值为None,因此如果您不使用deps,可以省略此泛型参数,更多信息请参见依赖注入
  • 图返回类型 — 这仅适用于节点返回 End 的情况。 RunEndT 的默认值为 Never,因此如果节点不返回 End,可以省略此泛型参数,但如果返回,则必须包含。

这是图中起始或中间节点的示例——它无法结束运行,因为它不返回 End

intermediate_node.py
from dataclasses import dataclass

from pydantic_graph import BaseNode, GraphRunContext


@dataclass
class MyNode(BaseNode[MyState]):  # (1)!
    foo: int  # (2)!

    async def run(
        self,
        ctx: GraphRunContext[MyState],  # (3)!
    ) -> AnotherNode:  # (4)!
        ...
        return AnotherNode()
  1. 在这个例子中,状态是 MyState(未显示),因此 BaseNode 被参数化为 MyState。这个节点无法结束运行,因此 RunEndT 的泛型参数被省略,默认为 Never
  2. MyNode 是一个数据类,具有一个字段 foo,类型为 int
  3. run 方法接受一个 GraphRunContext 参数,同样以状态 MyState 作为参数化。
  4. run 方法的返回类型是 AnotherNode(未显示),这用于确定节点的输出边。

我们可以扩展 MyNode 以在 foo 能被 5 整除时可选地结束运行:

intermediate_or_end_node.py
from dataclasses import dataclass

from pydantic_graph import BaseNode, End, GraphRunContext


@dataclass
class MyNode(BaseNode[MyState, None, int]):  # (1)!
    foo: int

    async def run(
        self,
        ctx: GraphRunContext[MyState],
    ) -> AnotherNode | End[int]:  # (2)!
        if self.foo % 5 == 0:
            return End(self.foo)
        else:
            return AnotherNode()
  1. 我们用返回类型(int)以及状态对节点进行参数化。由于通用参数只能是位置参数,我们必须将None作为第二个参数来表示deps。
  2. run 方法的返回类型现在是 AnotherNodeEnd[int] 的联合,这允许节点在 foo 能被 5 整除时结束运行。

图形

Graph — 这是执行图本身,由一组 节点类(即 BaseNode 子类)组成。

Graph 是通用的:

  • state 图的状态类型, StateT
  • deps 图的deps类型, DepsT
  • 图形返回类型 图形运行的返回类型, RunEndT

这是一个简单图形的例子:

graph_example.py
from __future__ import annotations

from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext


@dataclass
class DivisibleBy5(BaseNode[None, None, int]):  # (1)!
    foo: int

    async def run(
        self,
        ctx: GraphRunContext,
    ) -> Increment | End[int]:
        if self.foo % 5 == 0:
            return End(self.foo)
        else:
            return Increment(self.foo)


@dataclass
class Increment(BaseNode):  # (2)!
    foo: int

    async def run(self, ctx: GraphRunContext) -> DivisibleBy5:
        return DivisibleBy5(self.foo + 1)


fives_graph = Graph(nodes=[DivisibleBy5, Increment])  # (3)!
result = fives_graph.run_sync(DivisibleBy5(4))  # (4)!
print(result.output)
#> 5
# the full history is quite verbose (see below), so we'll just print the summary
print([item.data_snapshot() for item in result.history])
#> [DivisibleBy5(foo=4), Increment(foo=4), DivisibleBy5(foo=5), End(data=5)]
  1. 这个 DivisibleBy5 节点的状态参数被设置为 None,依赖参数也被设置为 None,因为这个图不使用状态或依赖,使用 int 作为它可以结束运行。
  2. Increment 节点不返回 End,因此 RunEndT 泛型参数被省略,状态也可以被省略,因为该图形不使用状态。
  3. 图形是通过一系列节点创建的。
  4. 图是在 run_sync 中同步运行的。初始节点是 DivisibleBy5(4)。由于图不使用外部状态或依赖关系,因此我们不传递 statedeps

(这个示例是完整的,可以在Python 3.10+中“原样”运行)

可以使用以下代码生成该图的mermaid 图表

graph_example_diagram.py
from graph_example import DivisibleBy5, fives_graph

fives_graph.mermaid_code(start_node=DivisibleBy5)
---
title: fives_graph
---
stateDiagram-v2
  [*] --> DivisibleBy5
  DivisibleBy5 --> Increment
  DivisibleBy5 --> [*]
  Increment --> DivisibleBy5

为了在jupyter-notebook中可视化图形,需要使用IPython.display

jupyter_display_mermaid.py
from graph_example import DivisibleBy5, fives_graph
from IPython.display import Image, display

display(Image(fives_graph.mermaid_image(start_node=DivisibleBy5)))

有状态图

pydantic-graph 中,“状态”概念提供了一种可选的方式来访问和修改对象(通常是 dataclass 或 Pydantic 模型),因为节点在图中运行。如果你把图看作是一条生产线,那么你的状态就是沿着生产线传递并由每个节点构建的引擎。

在未来,我们打算扩展 pydantic-graph 以提供状态持久性,在每个节点运行后记录状态,见 #695

这是一个图的示例,它表示一个自动售货机,用户可以在其中投币并选择要购买的产品。

vending_machine.py
from __future__ import annotations

from dataclasses import dataclass

from rich.prompt import Prompt

from pydantic_graph import BaseNode, End, Graph, GraphRunContext


@dataclass
class MachineState:  # (1)!
    user_balance: float = 0.0
    product: str | None = None


@dataclass
class InsertCoin(BaseNode[MachineState]):  # (3)!
    async def run(self, ctx: GraphRunContext[MachineState]) -> CoinsInserted:  # (16)!
        return CoinsInserted(float(Prompt.ask('Insert coins')))  # (4)!


@dataclass
class CoinsInserted(BaseNode[MachineState]):
    amount: float  # (5)!

    async def run(
        self, ctx: GraphRunContext[MachineState]
    ) -> SelectProduct | Purchase:  # (17)!
        ctx.state.user_balance += self.amount  # (6)!
        if ctx.state.product is not None:  # (7)!
            return Purchase(ctx.state.product)
        else:
            return SelectProduct()


@dataclass
class SelectProduct(BaseNode[MachineState]):
    async def run(self, ctx: GraphRunContext[MachineState]) -> Purchase:
        return Purchase(Prompt.ask('Select product'))


PRODUCT_PRICES = {  # (2)!
    'water': 1.25,
    'soda': 1.50,
    'crisps': 1.75,
    'chocolate': 2.00,
}


@dataclass
class Purchase(BaseNode[MachineState, None, None]):  # (18)!
    product: str

    async def run(
        self, ctx: GraphRunContext[MachineState]
    ) -> End | InsertCoin | SelectProduct:
        if price := PRODUCT_PRICES.get(self.product):  # (8)!
            ctx.state.product = self.product  # (9)!
            if ctx.state.user_balance >= price:  # (10)!
                ctx.state.user_balance -= price
                return End(None)
            else:
                diff = price - ctx.state.user_balance
                print(f'Not enough money for {self.product}, need {diff:0.2f} more')
                #> Not enough money for crisps, need 0.75 more
                return InsertCoin()  # (11)!
        else:
            print(f'No such product: {self.product}, try again')
            return SelectProduct()  # (12)!


vending_machine_graph = Graph(  # (13)!
    nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)


async def main():
    state = MachineState()  # (14)!
    await vending_machine_graph.run(InsertCoin(), state=state)  # (15)!
    print(f'purchase successful item={state.product} change={state.user_balance:0.2f}')
    #> purchase successful item=crisps change=0.25
  1. 自动售货机的状态被定义为一个数据类,包含用户的余额和他们选择的产品(如果有的话)。
  2. 一个将产品映射到价格的字典。
  3. InsertCoin 节点,BaseNode 被参数化为 MachineState,因为这是在此图中使用的状态。
  4. 这个 InsertCoin 节点提示用户投币。我们通过简单地输入一个浮动的货币金额来保持简单。在你开始觉得这也是一个玩具,因为它在节点中使用了 rich的 Prompt.ask ,请查看 下面,了解当节点需要外部输入时如何管理控制流。
  5. 这个 CoinsInserted 节点;再次强调这是一个 dataclass,包含一个字段 amount
  6. 使用插入的金额更新用户的余额。
  7. 如果用户已经选择了一个产品,则转到 Purchase,否则转到 SelectProduct
  8. Purchase节点中,如果用户输入了有效的产品,则查找该产品的价格。
  9. 如果用户输入了有效的产品,则在状态中设置该产品,以便我们不再访问 SelectProduct
  10. 如果余额足够购买产品,请调整余额以反映购买,并返回 End 以结束图形。我们不使用运行返回类型,因此我们用 None 调用 End
  11. 如果余额不足,去 InsertCoin 提示用户插入更多硬币。
  12. 如果产品无效,请去 SelectProduct 提示用户重新选择产品。
  13. 通过将节点列表传递给 Graph 来创建图形。节点的顺序并不重要,但它可能会影响 图表 的显示方式。
  14. 初始化状态。这将被传递给图运行,并在图运行时被修改。
  15. 运行图形,使用初始状态。由于图形可以从任何节点运行,我们必须传递起始节点——在这种情况下,InsertCoinGraph.run 返回一个 GraphRunResult,提供最终数据和运行历史。
  16. 节点的 run 方法的返回类型很重要,因为它用于确定节点的出边。这个信息反过来用于渲染 mermaid diagrams,并在运行时强制执行,以尽早检测不当行为。
  17. CoinsInsertedrun 方法的返回类型是一个联合类型,这意味着可能有多个输出边。
  18. 与其他节点不同,Purchase 可以结束运行,因此必须设置 RunEndT 泛型参数。在这种情况下,它是 None,因为图运行返回类型是 None

(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)

可以使用以下代码生成该图的mermaid 图表

vending_machine_diagram.py
from vending_machine import InsertCoin, vending_machine_graph

vending_machine_graph.mermaid_code(start_node=InsertCoin)

上面的代码生成的图是:

---
title: vending_machine_graph
---
stateDiagram-v2
  [*] --> InsertCoin
  InsertCoin --> CoinsInserted
  CoinsInserted --> SelectProduct
  CoinsInserted --> Purchase
  SelectProduct --> Purchase
  Purchase --> InsertCoin
  Purchase --> SelectProduct
  Purchase --> [*]

有关生成图表的更多信息,请参见下方

GenAI 示例

到目前为止,我们还没有展示一个实际上使用PydanticAI或GenAI的图的例子。

在这个例子中,一个代理生成了欢迎电子邮件给用户,另一个代理对电子邮件提供反馈。

这个图有一个非常简单的结构:

---
title: feedback_graph
---
stateDiagram-v2
  [*] --> WriteEmail
  WriteEmail --> Feedback
  Feedback --> WriteEmail
  Feedback --> [*]
genai_email_feedback.py
from __future__ import annotations as _annotations

from dataclasses import dataclass, field

from pydantic import BaseModel, EmailStr

from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, Graph, GraphRunContext


@dataclass
class User:
    name: str
    email: EmailStr
    interests: list[str]


@dataclass
class Email:
    subject: str
    body: str


@dataclass
class State:
    user: User
    write_agent_messages: list[ModelMessage] = field(default_factory=list)


email_writer_agent = Agent(
    'google-vertex:gemini-1.5-pro',
    result_type=Email,
    system_prompt='Write a welcome email to our tech blog.',
)


@dataclass
class WriteEmail(BaseNode[State]):
    email_feedback: str | None = None

    async def run(self, ctx: GraphRunContext[State]) -> Feedback:
        if self.email_feedback:
            prompt = (
                f'Rewrite the email for the user:\n'
                f'{format_as_xml(ctx.state.user)}\n'
                f'Feedback: {self.email_feedback}'
            )
        else:
            prompt = (
                f'Write a welcome email for the user:\n'
                f'{format_as_xml(ctx.state.user)}'
            )

        result = await email_writer_agent.run(
            prompt,
            message_history=ctx.state.write_agent_messages,
        )
        ctx.state.write_agent_messages += result.all_messages()
        return Feedback(result.data)


class EmailRequiresWrite(BaseModel):
    feedback: str


class EmailOk(BaseModel):
    pass


feedback_agent = Agent[None, EmailRequiresWrite | EmailOk](
    'openai:gpt-4o',
    result_type=EmailRequiresWrite | EmailOk,  # type: ignore
    system_prompt=(
        'Review the email and provide feedback, email must reference the users specific interests.'
    ),
)


@dataclass
class Feedback(BaseNode[State, None, Email]):
    email: Email

    async def run(
        self,
        ctx: GraphRunContext[State],
    ) -> WriteEmail | End[Email]:
        prompt = format_as_xml({'user': ctx.state.user, 'email': self.email})
        result = await feedback_agent.run(prompt)
        if isinstance(result.data, EmailRequiresWrite):
            return WriteEmail(email_feedback=result.data.feedback)
        else:
            return End(self.email)


async def main():
    user = User(
        name='John Doe',
        email='john.joe@example.com',
        interests=['Haskel', 'Lisp', 'Fortran'],
    )
    state = State(user)
    feedback_graph = Graph(nodes=(WriteEmail, Feedback))
    result = await feedback_graph.run(WriteEmail(), state=state)
    print(result.output)
    """
    Email(
        subject='Welcome to our tech blog!',
        body='Hello John, Welcome to our tech blog! ...',
    )
    """

(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)

自定义控制流

在许多实际应用中,图形无法从头到尾不间断地运行——它们可能需要外部输入,或者在较长时间内运行,以至于单个进程无法从头到尾执行整个图形运行而不被中断。

在这些场景中,next 方法可以用于一次运行图中的一个节点。

在这个例子中,一个AI向用户提问,用户提供答案,AI评估答案,如果用户回答正确则结束,如果回答错误则提出另一个问题。

ai_q_and_a_graph.pyquestion_graph definition
ai_q_and_a_graph.py
from __future__ import annotations as _annotations

from dataclasses import dataclass, field

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage

ask_agent = Agent('openai:gpt-4o', result_type=str)


@dataclass
class QuestionState:
    question: str | None = None
    ask_agent_messages: list[ModelMessage] = field(default_factory=list)
    evaluate_agent_messages: list[ModelMessage] = field(default_factory=list)


@dataclass
class Ask(BaseNode[QuestionState]):
    async def run(self, ctx: GraphRunContext[QuestionState]) -> Answer:
        result = await ask_agent.run(
            'Ask a simple question with a single correct answer.',
            message_history=ctx.state.ask_agent_messages,
        )
        ctx.state.ask_agent_messages += result.all_messages()
        ctx.state.question = result.data
        return Answer(result.data)


@dataclass
class Answer(BaseNode[QuestionState]):
    question: str
    answer: str | None = None

    async def run(self, ctx: GraphRunContext[QuestionState]) -> Evaluate:
        assert self.answer is not None
        return Evaluate(self.answer)


@dataclass
class EvaluationResult:
    correct: bool
    comment: str


evaluate_agent = Agent(
    'openai:gpt-4o',
    result_type=EvaluationResult,
    system_prompt='Given a question and answer, evaluate if the answer is correct.',
)


@dataclass
class Evaluate(BaseNode[QuestionState]):
    answer: str

    async def run(
        self,
        ctx: GraphRunContext[QuestionState],
    ) -> End[str] | Reprimand:
        assert ctx.state.question is not None
        result = await evaluate_agent.run(
            format_as_xml({'question': ctx.state.question, 'answer': self.answer}),
            message_history=ctx.state.evaluate_agent_messages,
        )
        ctx.state.evaluate_agent_messages += result.all_messages()
        if result.data.correct:
            return End(result.data.comment)
        else:
            return Reprimand(result.data.comment)


@dataclass
class Reprimand(BaseNode[QuestionState]):
    comment: str

    async def run(self, ctx: GraphRunContext[QuestionState]) -> Ask:
        print(f'Comment: {self.comment}')
        ctx.state.question = None
        return Ask()


question_graph = Graph(nodes=(Ask, Answer, Evaluate, Reprimand))

(这个示例是完整的,可以在Python 3.10+中“原样”运行)

ai_q_and_a_run.py
from rich.prompt import Prompt

from pydantic_graph import End, HistoryStep

from ai_q_and_a_graph import Ask, question_graph, QuestionState, Answer


async def main():
    state = QuestionState()  # (1)!
    node = Ask()  # (2)!
    history: list[HistoryStep[QuestionState]] = []  # (3)!
    while True:
        node = await question_graph.next(node, history, state=state)  # (4)!
        if isinstance(node, Answer):
            node.answer = Prompt.ask(node.question)  # (5)!
        elif isinstance(node, End):  # (6)!
            print(f'Correct answer! {node.data}')
            #> Correct answer! Well done, 1 + 1 = 2
            print([e.data_snapshot() for e in history])
            """
            [
                Ask(),
                Answer(question='What is the capital of France?', answer='Vichy'),
                Evaluate(answer='Vichy'),
                Reprimand(comment='Vichy is no longer the capital of France.'),
                Ask(),
                Answer(question='what is 1 + 1?', answer='2'),
                Evaluate(answer='2'),
                End(data='Well done, 1 + 1 = 2'),
            ]
            """
            return
        # otherwise just continue
  1. 创建将被next改变的状态对象。
  2. 起始节点是 Ask,但将在图形运行时由 next 更新。
  3. 图形运行的历史记录存储在一个HistoryStep对象的列表中。同样,next将在原地更新此列表。
  4. 运行 该图形,每次一个节点,在图形运行时更新状态、当前节点和历史记录。
  5. 如果当前节点是一个 Answer 节点,提示用户输入答案。
  6. 由于我们使用了 next,我们必须手动检查 End,如果我们得到一个,就退出循环。

(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)

可以使用以下代码生成该图的mermaid 图表

ai_q_and_a_diagram.py
from ai_q_and_a_graph import Ask, question_graph

question_graph.mermaid_code(start_node=Ask)
---
title: question_graph
---
stateDiagram-v2
  [*] --> Ask
  Ask --> Answer
  Answer --> Evaluate
  Evaluate --> Reprimand
  Evaluate --> [*]
  Reprimand --> Ask

你可能已经注意到,尽管这个例子将控制流转移出了图形运行,我们仍然在使用 rich的 Prompt.ask 来获取用户输入,在等待用户输入响应时过程处于挂起状态。关于真正的外部控制流的例子,请参见 问题图示例

遍历图

使用 Graph.iter 进行 async for 迭代

有时您希望在图形执行时直接控制或了解每个节点。最简单的方法是使用 Graph.iter 方法,该方法返回一个 上下文管理器 ,它生成一个 GraphRun 对象。 GraphRun 是一个异步可迭代的图形节点,允许您在它们执行时记录或修改它们。

这是一个示例:

count_down.py
from __future__ import annotations as _annotations

from dataclasses import dataclass
from pydantic_graph import Graph, BaseNode, End, GraphRunContext


@dataclass
class CountDownState:
    counter: int


@dataclass
class CountDown(BaseNode[CountDownState]):
    async def run(self, ctx: GraphRunContext[CountDownState]) -> CountDown | End[int]:
        if ctx.state.counter <= 0:
            return End(ctx.state.counter)
        ctx.state.counter -= 1
        return CountDown()


count_down_graph = Graph(nodes=[CountDown])


async def main():
    state = CountDownState(counter=3)
    with count_down_graph.iter(CountDown(), state=state) as run:  # (1)!
        async for node in run:  # (2)!
            print('Node:', node)
            #> Node: CountDown()
            #> Node: CountDown()
            #> Node: CountDown()
            #> Node: End(data=0)
    print('Final result:', run.result.output)  # (3)!
    #> Final result: 0
    print('History snapshots:', [step.data_snapshot() for step in run.history])
    """
    History snapshots:
    [CountDown(), CountDown(), CountDown(), CountDown(), End(data=0)]
    """
  1. Graph.iter(...) 返回一个 GraphRun
  2. 在这里,我们逐步执行每个节点。
  3. 一旦图形返回一个 End,循环结束,run.final_result 变为一个 GraphRunResult,包含最终结果(这里是 0)。

手动使用 GraphRun.next(node)

或者,您可以使用 GraphRun.next 方法手动驱动迭代,该方法允许您传入想要下一个运行的节点。您可以通过这种方式修改或选择性跳过节点。

下面是一个人为的示例,当计数器为2时停止,忽略任何超过该值的节点运行:

count_down_next.py
from pydantic_graph import End
from count_down import CountDown, CountDownState, count_down_graph


async def main():
    state = CountDownState(counter=5)
    with count_down_graph.iter(CountDown(), state=state) as run:
        node = run.next_node  # (1)!
        while not isinstance(node, End):  # (2)!
            print('Node:', node)
            #> Node: CountDown()
            #> Node: CountDown()
            #> Node: CountDown()
            #> Node: CountDown()
            if state.counter == 2:
                break  # (3)!
            node = await run.next(node)  # (4)!

        print(run.result)  # (5)!
        #> None

        for step in run.history:  # (6)!
            print('History Step:', step.data_snapshot(), step.state)
            #> History Step: CountDown() CountDownState(counter=4)
            #> History Step: CountDown() CountDownState(counter=3)
            #> History Step: CountDown() CountDownState(counter=2)
  1. 我们首先获取将在代理图中运行的第一个节点。
  2. 一旦生成了一个 End 节点,代理运行就结束了;End 的实例不能传递给 next
  3. 如果用户决定提前停止,我们将跳出循环。在这种情况下,图形运行将没有实际的最终结果(run.final_result 仍然是 None)。
  4. 在每一步中,我们调用 await run.next(node) 来运行它并获取下一个节点(或一个 End)。
  5. 因为我们没有继续运行直到完成,所以 result 没有被设置。
  6. 运行历史仍然记录着我们迄今为止执行的步骤。

依赖注入

与 PydanticAI 一样, pydantic-graph 通过 GraphBaseNode 上的泛型参数,以及 GraphRunContext.deps 字段支持依赖注入。

作为依赖注入的一个例子,让我们修改DivisibleBy5的例子上面,使用ProcessPoolExecutor在单独的进程中运行计算负载(这是一个虚构的例子,ProcessPoolExecutor在这个例子中并不会实际提高性能):

deps_example.py
from __future__ import annotations

import asyncio
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext


@dataclass
class GraphDeps:
    executor: ProcessPoolExecutor


@dataclass
class DivisibleBy5(BaseNode[None, GraphDeps, int]):
    foo: int

    async def run(
        self,
        ctx: GraphRunContext[None, GraphDeps],
    ) -> Increment | End[int]:
        if self.foo % 5 == 0:
            return End(self.foo)
        else:
            return Increment(self.foo)


@dataclass
class Increment(BaseNode[None, GraphDeps]):
    foo: int

    async def run(self, ctx: GraphRunContext[None, GraphDeps]) -> DivisibleBy5:
        loop = asyncio.get_running_loop()
        compute_result = await loop.run_in_executor(
            ctx.deps.executor,
            self.compute,
        )
        return DivisibleBy5(compute_result)

    def compute(self) -> int:
        return self.foo + 1


fives_graph = Graph(nodes=[DivisibleBy5, Increment])


async def main():
    with ProcessPoolExecutor() as executor:
        deps = GraphDeps(executor)
        result = await fives_graph.run(DivisibleBy5(3), deps=deps)
    print(result.output)
    #> 5
    # the full history is quite verbose (see below), so we'll just print the summary
    print([item.data_snapshot() for item in result.history])
    """
    [
        DivisibleBy5(foo=3),
        Increment(foo=3),
        DivisibleBy5(foo=4),
        Increment(foo=4),
        DivisibleBy5(foo=5),
        End(data=5),
    ]
    """

(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)

美人鱼图

Pydantic Graph 可以为图生成 mermaid stateDiagram-v2 图,如上所示。

这些图表可以通过以下方式生成:

除了上面显示的图表,您还可以通过以下选项自定义mermaid图表:

结合这些,我们可以编辑最后一个 ai_q_and_a_graph.py 示例为:

  • 为某些边添加标签
  • Ask节点添加备注
  • 突出显示 Answer 节点
  • 将图表保存为PNG格式的图像文件
ai_q_and_a_graph_extra.py
...
from typing import Annotated

from pydantic_graph import BaseNode, End, Graph, GraphRunContext, Edge

...

@dataclass
class Ask(BaseNode[QuestionState]):
    """Generate question using GPT-4o."""
    docstring_notes = True
    async def run(
        self, ctx: GraphRunContext[QuestionState]
    ) -> Annotated[Answer, Edge(label='Ask the question')]:
        ...

...

@dataclass
class Evaluate(BaseNode[QuestionState]):
    answer: str

    async def run(
            self,
            ctx: GraphRunContext[QuestionState],
    ) -> Annotated[End[str], Edge(label='success')] | Reprimand:
        ...

...

question_graph.mermaid_save('image.png', highlighted_nodes=[Answer])

(此示例不完整,无法直接运行)

这将生成一个看起来像这样的图像:

---
title: question_graph
---
stateDiagram-v2
  Ask --> Answer: Ask the question
  note right of Ask
    Judge the answer.
    Decide on next step.
  end note
  Answer --> Evaluate
  Evaluate --> Reprimand
  Evaluate --> [*]: success
  Reprimand --> Ask

classDef highlighted fill:#fdff32
class Answer highlighted

设置状态图的方向

您可以使用以下值指定状态图的方向:

  • 'TB': 从上到下,图表从上到下垂直流动。
  • 'LR': 从左到右,图表从左向右水平流动。
  • 'RL': 从右到左,图表水平从右向左流动。
  • 'BT': 从下到上,图表从下到上垂直流动。

这是一个如何使用“从左到右”(LR)而不是默认的“从上到下”(TB)的示例:

vending_machine_diagram.py
from vending_machine import InsertCoin, vending_machine_graph

vending_machine_graph.mermaid_code(start_node=InsertCoin, direction='LR')
---
title: vending_machine_graph
---
stateDiagram-v2
  direction LR
  [*] --> InsertCoin
  InsertCoin --> CoinsInserted
  CoinsInserted --> SelectProduct
  CoinsInserted --> Purchase
  SelectProduct --> Purchase
  Purchase --> InsertCoin
  Purchase --> SelectProduct
  Purchase --> [*]