跳转到内容

编写持久化工作流

工作流默认是临时的,这意味着一旦 run() 方法返回结果,工作流状态就会丢失。对同一工作流实例的后续调用 run() 将从全新状态开始。

如果用例需要在多次运行甚至不同进程之间保持工作流状态持久化,有几种策略可以使工作流更具持久性。

工作流是常规的Python类,数据可以存储在类或实例变量中,以便后续的run()调用可以访问它。

class DbWorkflow(Workflow):
def __init__(self, db: Client, *args, **kwargs):
self.db = db
super().__init__(*args, **kwargs)
@step
def count(self, ev: StartEvent) -> StopEvent:
num_rows = self.db.exec("select COUNT(*) from t;")
return StopEvent(result=num_rows)

在这种情况下,多次调用 run() 将复用同一个数据库客户端。

run 次调用中持续存在
在进程重启后保持持久化
在运行时错误中存活

每个工作流都附带一个负责其运行时操作的特殊对象,称为Context。上下文实例可供工作流的任何步骤使用,并带有一个store属性,可用于存储和加载状态数据。与类和实例变量相比,使用状态存储有两个主要优势:

  • 它是异步安全的,并支持并发访问
  • 它可以被序列化
w = MyWorkflow()
handler = w.run()
context = handler.ctx
# Save the context to a database
db.save("id", context.to_dict())
#
# Restart the Python process...
#
w = MyWorkflow()
# Load the context from the database
context = Context.from_dict(w, db.load("id"))
# Pass the context containing the state to the workflow
result = await w.run(ctx=context)
run 次调用中持续存在
在进程重启后保持持久化
在运行时错误中存活

使用外部资源对执行进行检查点记录

Section titled “Using external resources to checkpoint execution”

为避免任何开销,工作流不会自动拍摄当前状态的快照,因此它们无法自行在致命错误中存活。然而,任何步骤都可以依赖某些外部数据库(如Redis),并在代码的敏感部分对当前上下文进行快照。

例如,对于一个处理数百份文档的长时间运行工作流,我们可以在状态存储中保存最后成功处理的文档ID:

class DurableWorkflow(Workflow):
def __init__(self, r: Redis):
self.redis = r
@step
def convert_documents(self, ev: StartEvent, ctx: Context) -> StopEvent:
# Get the workflow input
document_ids = ev.ids
# Get the list of processed documents from the state store
converted_ids = await ctx.store.get("converted_ids", default=[])
for doc_id in document_ids:
# Ignore documents that were alredy processed
if doc_id in converted_ids:
continue
convert()
# Update the state store
converted_id.append(doc_id)
await ctx.store.set("converted_ids", converted_ids)
# Create a snapshot of the current context
self.redis.hset("ctx", mapping=ctx.to_dict())

工作流将使用一个Redis集合来存储每次转换后当前上下文的快照。如果运行工作流的进程崩溃,可以使用相同的输入安全地重新启动该进程。实际上,ctx.store 将包含已处理的文档列表,而 for 循环将能够跳过它们并继续处理剩余的工作。

额外功能:向工作流中注入依赖项以减少样板代码

Section titled “Bonus: inject dependencies into the workflow to reduce boilerplate”

使用工作流的资源特性,Redis客户端可以直接注入到步骤中:

def get_redis_client(*args, **kwargs):
"""This can be reused across several workflows to reduce boilerplate"""
return Redis(host='localhost', port=6379, decode_responses=True)
class DurableWorkflow(Workflow):
@step
def convert_documents(
self,
ev: StartEvent,
ctx: Context,
redis: Annotated[Redis, Resource(get_redis_client)]
) -> StopEvent:
# Get the workflow input
document_ids = ev.ids
# Get the list of processed documents from the state store
converted_ids = await ctx.store.get("converted_ids", default=[])
for doc_id in document_ids:
# Ignore documents that were alredy processed
if doc_id in converted_ids:
continue
convert()
# Update the state store
converted_id.append(doc_id)
await ctx.store.set("converted_ids", converted_ids)
# Create a snapshot of the current context
redis.hset("ctx", mapping=ctx.to_dict())
run 次调用期间持续存在
在进程重启后保持持久化
在运行时错误中存活