图形
除非你需要钉枪,否则不要使用钉枪
如果 PydanticAI agents 是一把锤子,而 multi-agent workflows 是一把大锤,那么图形就是一个钉枪:
- 当然,钉枪看起来比锤子更酷
- 但是钉枪的设置比锤子复杂得多
- 而钉枪并不会让你成为更好的建筑工,而是让你成为一个拥有钉枪的建筑工
- 最后,(冒着扭曲这个隐喻的风险),如果你是喜欢像锤子和未类型化Python这样的中世纪工具的粉丝,你可能不会喜欢钉枪或我们处理图表的方法。(但话又说回来,如果你不喜欢Python中的类型提示,你可能已经放弃了PydanticAI而去使用其中一个玩具代理框架——祝你好运,如果你意识到需要它,随时可以借用我的大锤)
总之,图是一个强大的工具,但并不是每个工作都合适的工具。请在继续之前考虑其他 多智能体方法。
如果你不确定基于图的方式是否是个好主意,那可能就没有必要。
图形和有限状态机(FSMs)是一种强大的抽象,用于建模、执行、控制和可视化复杂的工作流程。
除了PydanticAI,我们还开发了 pydantic-graph — 一个用于Python的异步图和状态机库,其中节点和边是使用类型提示定义的。
虽然这个库是作为PydanticAI的一部分开发的;但它并不依赖于 pydantic-ai,可以被认为是一个纯粹的基于图的状态机库。无论你是否在使用PydanticAI,甚至是在使用GenAI进行构建,你都可能会发现它很有用。
pydantic-graph 针对高级用户设计,广泛使用 Python 泛型和类型提示。它并不像 PydanticAI 那样友好于初学者。
非常早期的beta版本
图形支持在 v0.0.19 中引入,并且处于非常早期的测试阶段。API 可能会有所更改。文档尚不完整。实现也尚不完整。
安装
pydantic-graph 是 pydantic-ai 的必要依赖,也是 pydantic-ai-slim 的可选依赖,更多信息请参见 安装说明。你也可以直接安装它:
pip install pydantic-graph
uv add pydantic-graph
图形类型
pydantic-graph由几个关键组件组成:
图形运行上下文
GraphRunContext — 图形运行的上下文,类似于 PydanticAI 的 RunContext。它保存了图形的状态和依赖关系,并在节点运行时传递给它们。
GraphRunContext 是图中使用的状态类型的通用类型, StateT。
结束
End — 返回值表示图形运行应结束。
End 在它所使用的图的图形返回类型中是通用的, RunEndT。
节点
BaseNode 的子类定义了图中执行的节点。
节点,通常是 dataclasses,一般由以下组成:
节点是通用的:
- state,必须与它们包含的图的状态具有相同类型,
StateT的默认值为None,因此如果您不使用状态,可以省略此泛型参数,更多信息请参见 stateful graphs - deps,必须与它们所包含的图的deps具有相同类型,
DepsT的默认值为None,因此如果您不使用deps,可以省略此泛型参数,更多信息请参见依赖注入 - 图返回类型 — 这仅适用于节点返回
End的情况。RunEndT的默认值为 Never,因此如果节点不返回End,可以省略此泛型参数,但如果返回,则必须包含。
这是图中起始或中间节点的示例——它无法结束运行,因为它不返回 End:
from dataclasses import dataclass
from pydantic_graph import BaseNode, GraphRunContext
@dataclass
class MyNode(BaseNode[MyState]): # (1)!
foo: int # (2)!
async def run(
self,
ctx: GraphRunContext[MyState], # (3)!
) -> AnotherNode: # (4)!
...
return AnotherNode()
- 在这个例子中,状态是
MyState(未显示),因此BaseNode被参数化为MyState。这个节点无法结束运行,因此RunEndT的泛型参数被省略,默认为Never。 MyNode是一个数据类,具有一个字段foo,类型为int。- 该
run方法接受一个GraphRunContext参数,同样以状态MyState作为参数化。 run方法的返回类型是AnotherNode(未显示),这用于确定节点的输出边。
我们可以扩展 MyNode 以在 foo 能被 5 整除时可选地结束运行:
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, GraphRunContext
@dataclass
class MyNode(BaseNode[MyState, None, int]): # (1)!
foo: int
async def run(
self,
ctx: GraphRunContext[MyState],
) -> AnotherNode | End[int]: # (2)!
if self.foo % 5 == 0:
return End(self.foo)
else:
return AnotherNode()
- 我们用返回类型(
int)以及状态对节点进行参数化。由于通用参数只能是位置参数,我们必须将None作为第二个参数来表示deps。 run方法的返回类型现在是AnotherNode和End[int]的联合,这允许节点在foo能被 5 整除时结束运行。
图形
Graph — 这是执行图本身,由一组 节点类(即 BaseNode 子类)组成。
Graph 是通用的:
这是一个简单图形的例子:
from __future__ import annotations
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class DivisibleBy5(BaseNode[None, None, int]): # (1)!
foo: int
async def run(
self,
ctx: GraphRunContext,
) -> Increment | End[int]:
if self.foo % 5 == 0:
return End(self.foo)
else:
return Increment(self.foo)
@dataclass
class Increment(BaseNode): # (2)!
foo: int
async def run(self, ctx: GraphRunContext) -> DivisibleBy5:
return DivisibleBy5(self.foo + 1)
fives_graph = Graph(nodes=[DivisibleBy5, Increment]) # (3)!
result = fives_graph.run_sync(DivisibleBy5(4)) # (4)!
print(result.output)
#> 5
# the full history is quite verbose (see below), so we'll just print the summary
print([item.data_snapshot() for item in result.history])
#> [DivisibleBy5(foo=4), Increment(foo=4), DivisibleBy5(foo=5), End(data=5)]
- 这个
DivisibleBy5节点的状态参数被设置为None,依赖参数也被设置为None,因为这个图不使用状态或依赖,使用int作为它可以结束运行。 - 该
Increment节点不返回End,因此RunEndT泛型参数被省略,状态也可以被省略,因为该图形不使用状态。 - 图形是通过一系列节点创建的。
- 图是在
run_sync中同步运行的。初始节点是DivisibleBy5(4)。由于图不使用外部状态或依赖关系,因此我们不传递state或deps。
(这个示例是完整的,可以在Python 3.10+中“原样”运行)
可以使用以下代码生成该图的mermaid 图表:
from graph_example import DivisibleBy5, fives_graph
fives_graph.mermaid_code(start_node=DivisibleBy5)
---
title: fives_graph
---
stateDiagram-v2
[*] --> DivisibleBy5
DivisibleBy5 --> Increment
DivisibleBy5 --> [*]
Increment --> DivisibleBy5
为了在jupyter-notebook中可视化图形,需要使用IPython.display:
from graph_example import DivisibleBy5, fives_graph
from IPython.display import Image, display
display(Image(fives_graph.mermaid_image(start_node=DivisibleBy5)))
有状态图
在 pydantic-graph 中,“状态”概念提供了一种可选的方式来访问和修改对象(通常是 dataclass 或 Pydantic 模型),因为节点在图中运行。如果你把图看作是一条生产线,那么你的状态就是沿着生产线传递并由每个节点构建的引擎。
在未来,我们打算扩展 pydantic-graph 以提供状态持久性,在每个节点运行后记录状态,见 #695。
这是一个图的示例,它表示一个自动售货机,用户可以在其中投币并选择要购买的产品。
from __future__ import annotations
from dataclasses import dataclass
from rich.prompt import Prompt
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class MachineState: # (1)!
user_balance: float = 0.0
product: str | None = None
@dataclass
class InsertCoin(BaseNode[MachineState]): # (3)!
async def run(self, ctx: GraphRunContext[MachineState]) -> CoinsInserted: # (16)!
return CoinsInserted(float(Prompt.ask('Insert coins'))) # (4)!
@dataclass
class CoinsInserted(BaseNode[MachineState]):
amount: float # (5)!
async def run(
self, ctx: GraphRunContext[MachineState]
) -> SelectProduct | Purchase: # (17)!
ctx.state.user_balance += self.amount # (6)!
if ctx.state.product is not None: # (7)!
return Purchase(ctx.state.product)
else:
return SelectProduct()
@dataclass
class SelectProduct(BaseNode[MachineState]):
async def run(self, ctx: GraphRunContext[MachineState]) -> Purchase:
return Purchase(Prompt.ask('Select product'))
PRODUCT_PRICES = { # (2)!
'water': 1.25,
'soda': 1.50,
'crisps': 1.75,
'chocolate': 2.00,
}
@dataclass
class Purchase(BaseNode[MachineState, None, None]): # (18)!
product: str
async def run(
self, ctx: GraphRunContext[MachineState]
) -> End | InsertCoin | SelectProduct:
if price := PRODUCT_PRICES.get(self.product): # (8)!
ctx.state.product = self.product # (9)!
if ctx.state.user_balance >= price: # (10)!
ctx.state.user_balance -= price
return End(None)
else:
diff = price - ctx.state.user_balance
print(f'Not enough money for {self.product}, need {diff:0.2f} more')
#> Not enough money for crisps, need 0.75 more
return InsertCoin() # (11)!
else:
print(f'No such product: {self.product}, try again')
return SelectProduct() # (12)!
vending_machine_graph = Graph( # (13)!
nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)
async def main():
state = MachineState() # (14)!
await vending_machine_graph.run(InsertCoin(), state=state) # (15)!
print(f'purchase successful item={state.product} change={state.user_balance:0.2f}')
#> purchase successful item=crisps change=0.25
- 自动售货机的状态被定义为一个数据类,包含用户的余额和他们选择的产品(如果有的话)。
- 一个将产品映射到价格的字典。
InsertCoin节点,BaseNode被参数化为MachineState,因为这是在此图中使用的状态。- 这个
InsertCoin节点提示用户投币。我们通过简单地输入一个浮动的货币金额来保持简单。在你开始觉得这也是一个玩具,因为它在节点中使用了 rich的Prompt.ask,请查看 下面,了解当节点需要外部输入时如何管理控制流。 - 这个
CoinsInserted节点;再次强调这是一个dataclass,包含一个字段amount。 - 使用插入的金额更新用户的余额。
- 如果用户已经选择了一个产品,则转到
Purchase,否则转到SelectProduct。 - 在
Purchase节点中,如果用户输入了有效的产品,则查找该产品的价格。 - 如果用户输入了有效的产品,则在状态中设置该产品,以便我们不再访问
SelectProduct。 - 如果余额足够购买产品,请调整余额以反映购买,并返回
End以结束图形。我们不使用运行返回类型,因此我们用None调用End。 - 如果余额不足,去
InsertCoin提示用户插入更多硬币。 - 如果产品无效,请去
SelectProduct提示用户重新选择产品。 - 通过将节点列表传递给
Graph来创建图形。节点的顺序并不重要,但它可能会影响 图表 的显示方式。 - 初始化状态。这将被传递给图运行,并在图运行时被修改。
- 运行图形,使用初始状态。由于图形可以从任何节点运行,我们必须传递起始节点——在这种情况下,
InsertCoin。Graph.run返回一个GraphRunResult,提供最终数据和运行历史。 - 节点的
run方法的返回类型很重要,因为它用于确定节点的出边。这个信息反过来用于渲染 mermaid diagrams,并在运行时强制执行,以尽早检测不当行为。 CoinsInserted的run方法的返回类型是一个联合类型,这意味着可能有多个输出边。- 与其他节点不同,
Purchase可以结束运行,因此必须设置RunEndT泛型参数。在这种情况下,它是None,因为图运行返回类型是None。
(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)
可以使用以下代码生成该图的mermaid 图表:
from vending_machine import InsertCoin, vending_machine_graph
vending_machine_graph.mermaid_code(start_node=InsertCoin)
上面的代码生成的图是:
---
title: vending_machine_graph
---
stateDiagram-v2
[*] --> InsertCoin
InsertCoin --> CoinsInserted
CoinsInserted --> SelectProduct
CoinsInserted --> Purchase
SelectProduct --> Purchase
Purchase --> InsertCoin
Purchase --> SelectProduct
Purchase --> [*]
有关生成图表的更多信息,请参见下方。
GenAI 示例
到目前为止,我们还没有展示一个实际上使用PydanticAI或GenAI的图的例子。
在这个例子中,一个代理生成了欢迎电子邮件给用户,另一个代理对电子邮件提供反馈。
这个图有一个非常简单的结构:
---
title: feedback_graph
---
stateDiagram-v2
[*] --> WriteEmail
WriteEmail --> Feedback
Feedback --> WriteEmail
Feedback --> [*]
from __future__ import annotations as _annotations
from dataclasses import dataclass, field
from pydantic import BaseModel, EmailStr
from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class User:
name: str
email: EmailStr
interests: list[str]
@dataclass
class Email:
subject: str
body: str
@dataclass
class State:
user: User
write_agent_messages: list[ModelMessage] = field(default_factory=list)
email_writer_agent = Agent(
'google-vertex:gemini-1.5-pro',
result_type=Email,
system_prompt='Write a welcome email to our tech blog.',
)
@dataclass
class WriteEmail(BaseNode[State]):
email_feedback: str | None = None
async def run(self, ctx: GraphRunContext[State]) -> Feedback:
if self.email_feedback:
prompt = (
f'Rewrite the email for the user:\n'
f'{format_as_xml(ctx.state.user)}\n'
f'Feedback: {self.email_feedback}'
)
else:
prompt = (
f'Write a welcome email for the user:\n'
f'{format_as_xml(ctx.state.user)}'
)
result = await email_writer_agent.run(
prompt,
message_history=ctx.state.write_agent_messages,
)
ctx.state.write_agent_messages += result.all_messages()
return Feedback(result.data)
class EmailRequiresWrite(BaseModel):
feedback: str
class EmailOk(BaseModel):
pass
feedback_agent = Agent[None, EmailRequiresWrite | EmailOk](
'openai:gpt-4o',
result_type=EmailRequiresWrite | EmailOk, # type: ignore
system_prompt=(
'Review the email and provide feedback, email must reference the users specific interests.'
),
)
@dataclass
class Feedback(BaseNode[State, None, Email]):
email: Email
async def run(
self,
ctx: GraphRunContext[State],
) -> WriteEmail | End[Email]:
prompt = format_as_xml({'user': ctx.state.user, 'email': self.email})
result = await feedback_agent.run(prompt)
if isinstance(result.data, EmailRequiresWrite):
return WriteEmail(email_feedback=result.data.feedback)
else:
return End(self.email)
async def main():
user = User(
name='John Doe',
email='john.joe@example.com',
interests=['Haskel', 'Lisp', 'Fortran'],
)
state = State(user)
feedback_graph = Graph(nodes=(WriteEmail, Feedback))
result = await feedback_graph.run(WriteEmail(), state=state)
print(result.output)
"""
Email(
subject='Welcome to our tech blog!',
body='Hello John, Welcome to our tech blog! ...',
)
"""
(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)
自定义控制流
在许多实际应用中,图形无法从头到尾不间断地运行——它们可能需要外部输入,或者在较长时间内运行,以至于单个进程无法从头到尾执行整个图形运行而不被中断。
在这些场景中,next 方法可以用于一次运行图中的一个节点。
在这个例子中,一个AI向用户提问,用户提供答案,AI评估答案,如果用户回答正确则结束,如果回答错误则提出另一个问题。
ai_q_and_a_graph.py — question_graph definition
from __future__ import annotations as _annotations
from dataclasses import dataclass, field
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
ask_agent = Agent('openai:gpt-4o', result_type=str)
@dataclass
class QuestionState:
question: str | None = None
ask_agent_messages: list[ModelMessage] = field(default_factory=list)
evaluate_agent_messages: list[ModelMessage] = field(default_factory=list)
@dataclass
class Ask(BaseNode[QuestionState]):
async def run(self, ctx: GraphRunContext[QuestionState]) -> Answer:
result = await ask_agent.run(
'Ask a simple question with a single correct answer.',
message_history=ctx.state.ask_agent_messages,
)
ctx.state.ask_agent_messages += result.all_messages()
ctx.state.question = result.data
return Answer(result.data)
@dataclass
class Answer(BaseNode[QuestionState]):
question: str
answer: str | None = None
async def run(self, ctx: GraphRunContext[QuestionState]) -> Evaluate:
assert self.answer is not None
return Evaluate(self.answer)
@dataclass
class EvaluationResult:
correct: bool
comment: str
evaluate_agent = Agent(
'openai:gpt-4o',
result_type=EvaluationResult,
system_prompt='Given a question and answer, evaluate if the answer is correct.',
)
@dataclass
class Evaluate(BaseNode[QuestionState]):
answer: str
async def run(
self,
ctx: GraphRunContext[QuestionState],
) -> End[str] | Reprimand:
assert ctx.state.question is not None
result = await evaluate_agent.run(
format_as_xml({'question': ctx.state.question, 'answer': self.answer}),
message_history=ctx.state.evaluate_agent_messages,
)
ctx.state.evaluate_agent_messages += result.all_messages()
if result.data.correct:
return End(result.data.comment)
else:
return Reprimand(result.data.comment)
@dataclass
class Reprimand(BaseNode[QuestionState]):
comment: str
async def run(self, ctx: GraphRunContext[QuestionState]) -> Ask:
print(f'Comment: {self.comment}')
ctx.state.question = None
return Ask()
question_graph = Graph(nodes=(Ask, Answer, Evaluate, Reprimand))
(这个示例是完整的,可以在Python 3.10+中“原样”运行)
from rich.prompt import Prompt
from pydantic_graph import End, HistoryStep
from ai_q_and_a_graph import Ask, question_graph, QuestionState, Answer
async def main():
state = QuestionState() # (1)!
node = Ask() # (2)!
history: list[HistoryStep[QuestionState]] = [] # (3)!
while True:
node = await question_graph.next(node, history, state=state) # (4)!
if isinstance(node, Answer):
node.answer = Prompt.ask(node.question) # (5)!
elif isinstance(node, End): # (6)!
print(f'Correct answer! {node.data}')
#> Correct answer! Well done, 1 + 1 = 2
print([e.data_snapshot() for e in history])
"""
[
Ask(),
Answer(question='What is the capital of France?', answer='Vichy'),
Evaluate(answer='Vichy'),
Reprimand(comment='Vichy is no longer the capital of France.'),
Ask(),
Answer(question='what is 1 + 1?', answer='2'),
Evaluate(answer='2'),
End(data='Well done, 1 + 1 = 2'),
]
"""
return
# otherwise just continue
- 创建将被
next改变的状态对象。 - 起始节点是
Ask,但将在图形运行时由next更新。 - 图形运行的历史记录存储在一个
HistoryStep对象的列表中。同样,next将在原地更新此列表。 - 运行 该图形,每次一个节点,在图形运行时更新状态、当前节点和历史记录。
- 如果当前节点是一个
Answer节点,提示用户输入答案。 - 由于我们使用了
next,我们必须手动检查End,如果我们得到一个,就退出循环。
(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)
可以使用以下代码生成该图的mermaid 图表:
from ai_q_and_a_graph import Ask, question_graph
question_graph.mermaid_code(start_node=Ask)
---
title: question_graph
---
stateDiagram-v2
[*] --> Ask
Ask --> Answer
Answer --> Evaluate
Evaluate --> Reprimand
Evaluate --> [*]
Reprimand --> Ask
你可能已经注意到,尽管这个例子将控制流转移出了图形运行,我们仍然在使用 rich的 Prompt.ask 来获取用户输入,在等待用户输入响应时过程处于挂起状态。关于真正的外部控制流的例子,请参见 问题图示例。
遍历图
使用 Graph.iter 进行 async for 迭代
有时您希望在图形执行时直接控制或了解每个节点。最简单的方法是使用 Graph.iter 方法,该方法返回一个 上下文管理器 ,它生成一个 GraphRun 对象。 GraphRun 是一个异步可迭代的图形节点,允许您在它们执行时记录或修改它们。
这是一个示例:
from __future__ import annotations as _annotations
from dataclasses import dataclass
from pydantic_graph import Graph, BaseNode, End, GraphRunContext
@dataclass
class CountDownState:
counter: int
@dataclass
class CountDown(BaseNode[CountDownState]):
async def run(self, ctx: GraphRunContext[CountDownState]) -> CountDown | End[int]:
if ctx.state.counter <= 0:
return End(ctx.state.counter)
ctx.state.counter -= 1
return CountDown()
count_down_graph = Graph(nodes=[CountDown])
async def main():
state = CountDownState(counter=3)
with count_down_graph.iter(CountDown(), state=state) as run: # (1)!
async for node in run: # (2)!
print('Node:', node)
#> Node: CountDown()
#> Node: CountDown()
#> Node: CountDown()
#> Node: End(data=0)
print('Final result:', run.result.output) # (3)!
#> Final result: 0
print('History snapshots:', [step.data_snapshot() for step in run.history])
"""
History snapshots:
[CountDown(), CountDown(), CountDown(), CountDown(), End(data=0)]
"""
Graph.iter(...)返回一个GraphRun。- 在这里,我们逐步执行每个节点。
- 一旦图形返回一个
End,循环结束,run.final_result变为一个GraphRunResult,包含最终结果(这里是0)。
手动使用 GraphRun.next(node)
或者,您可以使用 GraphRun.next 方法手动驱动迭代,该方法允许您传入想要下一个运行的节点。您可以通过这种方式修改或选择性跳过节点。
下面是一个人为的示例,当计数器为2时停止,忽略任何超过该值的节点运行:
from pydantic_graph import End
from count_down import CountDown, CountDownState, count_down_graph
async def main():
state = CountDownState(counter=5)
with count_down_graph.iter(CountDown(), state=state) as run:
node = run.next_node # (1)!
while not isinstance(node, End): # (2)!
print('Node:', node)
#> Node: CountDown()
#> Node: CountDown()
#> Node: CountDown()
#> Node: CountDown()
if state.counter == 2:
break # (3)!
node = await run.next(node) # (4)!
print(run.result) # (5)!
#> None
for step in run.history: # (6)!
print('History Step:', step.data_snapshot(), step.state)
#> History Step: CountDown() CountDownState(counter=4)
#> History Step: CountDown() CountDownState(counter=3)
#> History Step: CountDown() CountDownState(counter=2)
- 我们首先获取将在代理图中运行的第一个节点。
- 一旦生成了一个
End节点,代理运行就结束了;End的实例不能传递给next。 - 如果用户决定提前停止,我们将跳出循环。在这种情况下,图形运行将没有实际的最终结果(
run.final_result仍然是None)。 - 在每一步中,我们调用
await run.next(node)来运行它并获取下一个节点(或一个End)。 - 因为我们没有继续运行直到完成,所以
result没有被设置。 - 运行历史仍然记录着我们迄今为止执行的步骤。
依赖注入
与 PydanticAI 一样, pydantic-graph 通过 Graph 和 BaseNode 上的泛型参数,以及 GraphRunContext.deps 字段支持依赖注入。
作为依赖注入的一个例子,让我们修改DivisibleBy5的例子上面,使用ProcessPoolExecutor在单独的进程中运行计算负载(这是一个虚构的例子,ProcessPoolExecutor在这个例子中并不会实际提高性能):
from __future__ import annotations
import asyncio
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
@dataclass
class GraphDeps:
executor: ProcessPoolExecutor
@dataclass
class DivisibleBy5(BaseNode[None, GraphDeps, int]):
foo: int
async def run(
self,
ctx: GraphRunContext[None, GraphDeps],
) -> Increment | End[int]:
if self.foo % 5 == 0:
return End(self.foo)
else:
return Increment(self.foo)
@dataclass
class Increment(BaseNode[None, GraphDeps]):
foo: int
async def run(self, ctx: GraphRunContext[None, GraphDeps]) -> DivisibleBy5:
loop = asyncio.get_running_loop()
compute_result = await loop.run_in_executor(
ctx.deps.executor,
self.compute,
)
return DivisibleBy5(compute_result)
def compute(self) -> int:
return self.foo + 1
fives_graph = Graph(nodes=[DivisibleBy5, Increment])
async def main():
with ProcessPoolExecutor() as executor:
deps = GraphDeps(executor)
result = await fives_graph.run(DivisibleBy5(3), deps=deps)
print(result.output)
#> 5
# the full history is quite verbose (see below), so we'll just print the summary
print([item.data_snapshot() for item in result.history])
"""
[
DivisibleBy5(foo=3),
Increment(foo=3),
DivisibleBy5(foo=4),
Increment(foo=4),
DivisibleBy5(foo=5),
End(data=5),
]
"""
(此示例是完整的,可以在 Python 3.10+ 上“直接运行” — 你需要添加 asyncio.run(main()) 来运行 main)
美人鱼图
Pydantic Graph 可以为图生成 mermaid stateDiagram-v2 图,如上所示。
这些图表可以通过以下方式生成:
Graph.mermaid_code生成图形的mermaid代码Graph.mermaid_image使用 mermaid.ink 生成图形的图像Graph.mermaid_save使用 mermaid.ink 生成图形的图像并保存到文件中
除了上面显示的图表,您还可以通过以下选项自定义mermaid图表:
Edge允许您给边缘应用一个标签BaseNode.docstring_notes和BaseNode.get_note允许你向节点添加备注highlighted_nodes参数允许您在图表中突出显示特定节点
结合这些,我们可以编辑最后一个 ai_q_and_a_graph.py 示例为:
- 为某些边添加标签
- 向
Ask节点添加备注 - 突出显示
Answer节点 - 将图表保存为
PNG格式的图像文件
...
from typing import Annotated
from pydantic_graph import BaseNode, End, Graph, GraphRunContext, Edge
...
@dataclass
class Ask(BaseNode[QuestionState]):
"""Generate question using GPT-4o."""
docstring_notes = True
async def run(
self, ctx: GraphRunContext[QuestionState]
) -> Annotated[Answer, Edge(label='Ask the question')]:
...
...
@dataclass
class Evaluate(BaseNode[QuestionState]):
answer: str
async def run(
self,
ctx: GraphRunContext[QuestionState],
) -> Annotated[End[str], Edge(label='success')] | Reprimand:
...
...
question_graph.mermaid_save('image.png', highlighted_nodes=[Answer])
(此示例不完整,无法直接运行)
这将生成一个看起来像这样的图像:
---
title: question_graph
---
stateDiagram-v2
Ask --> Answer: Ask the question
note right of Ask
Judge the answer.
Decide on next step.
end note
Answer --> Evaluate
Evaluate --> Reprimand
Evaluate --> [*]: success
Reprimand --> Ask
classDef highlighted fill:#fdff32
class Answer highlighted
设置状态图的方向
您可以使用以下值指定状态图的方向:
'TB': 从上到下,图表从上到下垂直流动。'LR': 从左到右,图表从左向右水平流动。'RL': 从右到左,图表水平从右向左流动。'BT': 从下到上,图表从下到上垂直流动。
这是一个如何使用“从左到右”(LR)而不是默认的“从上到下”(TB)的示例:
from vending_machine import InsertCoin, vending_machine_graph
vending_machine_graph.mermaid_code(start_node=InsertCoin, direction='LR')
---
title: vending_machine_graph
---
stateDiagram-v2
direction LR
[*] --> InsertCoin
InsertCoin --> CoinsInserted
CoinsInserted --> SelectProduct
CoinsInserted --> Purchase
SelectProduct --> Purchase
Purchase --> InsertCoin
Purchase --> SelectProduct
Purchase --> [*]