编写持久化工作流
工作流默认是临时的,这意味着一旦 run() 方法返回结果,工作流状态就会丢失。对同一工作流实例的后续调用 run() 将从全新状态开始。
如果用例需要在多次运行甚至不同进程之间保持工作流状态持久化,有几种策略可以使工作流更具持久性。
工作流是常规的Python类,数据可以存储在类或实例变量中,以便后续的run()调用可以访问它。
class DbWorkflow(Workflow): def __init__(self, db: Client, *args, **kwargs): self.db = db super().__init__(*args, **kwargs)
@step def count(self, ev: StartEvent) -> StopEvent: num_rows = self.db.exec("select COUNT(*) from t;") return StopEvent(result=num_rows)在这种情况下,多次调用 run() 将复用同一个数据库客户端。
在 run 次调用中持续存在 | ✅ |
|---|---|
| 在进程重启后保持持久化 | ❌ |
| 在运行时错误中存活 | ❌ |
每个工作流都附带一个负责其运行时操作的特殊对象,称为Context。上下文实例可供工作流的任何步骤使用,并带有一个store属性,可用于存储和加载状态数据。与类和实例变量相比,使用状态存储有两个主要优势:
- 它是异步安全的,并支持并发访问
- 它可以被序列化
w = MyWorkflow()handler = w.run()context = handler.ctx# Save the context to a databasedb.save("id", context.to_dict())
## Restart the Python process...#
w = MyWorkflow()# Load the context from the databasecontext = Context.from_dict(w, db.load("id"))# Pass the context containing the state to the workflowresult = await w.run(ctx=context)在 run 次调用中持续存在 | ✅ |
|---|---|
| 在进程重启后保持持久化 | ✅ |
| 在运行时错误中存活 | ❌ |
为避免任何开销,工作流不会自动拍摄当前状态的快照,因此它们无法自行在致命错误中存活。然而,任何步骤都可以依赖某些外部数据库(如Redis),并在代码的敏感部分对当前上下文进行快照。
例如,对于一个处理数百份文档的长时间运行工作流,我们可以在状态存储中保存最后成功处理的文档ID:
class DurableWorkflow(Workflow): def __init__(self, r: Redis): self.redis = r
@step def convert_documents(self, ev: StartEvent, ctx: Context) -> StopEvent: # Get the workflow input document_ids = ev.ids # Get the list of processed documents from the state store converted_ids = await ctx.store.get("converted_ids", default=[]) for doc_id in document_ids: # Ignore documents that were alredy processed if doc_id in converted_ids: continue convert() # Update the state store converted_id.append(doc_id) await ctx.store.set("converted_ids", converted_ids) # Create a snapshot of the current context self.redis.hset("ctx", mapping=ctx.to_dict())工作流将使用一个Redis集合来存储每次转换后当前上下文的快照。如果运行工作流的进程崩溃,可以使用相同的输入安全地重新启动该进程。实际上,ctx.store 将包含已处理的文档列表,而 for 循环将能够跳过它们并继续处理剩余的工作。
额外功能:向工作流中注入依赖项以减少样板代码
Section titled “Bonus: inject dependencies into the workflow to reduce boilerplate”使用工作流的资源特性,Redis客户端可以直接注入到步骤中:
def get_redis_client(*args, **kwargs): """This can be reused across several workflows to reduce boilerplate""" return Redis(host='localhost', port=6379, decode_responses=True)
class DurableWorkflow(Workflow): @step def convert_documents( self, ev: StartEvent, ctx: Context, redis: Annotated[Redis, Resource(get_redis_client)] ) -> StopEvent: # Get the workflow input document_ids = ev.ids # Get the list of processed documents from the state store converted_ids = await ctx.store.get("converted_ids", default=[]) for doc_id in document_ids: # Ignore documents that were alredy processed if doc_id in converted_ids: continue convert() # Update the state store converted_id.append(doc_id) await ctx.store.set("converted_ids", converted_ids) # Create a snapshot of the current context redis.hset("ctx", mapping=ctx.to_dict())在 run 次调用期间持续存在 | ✅ |
|---|---|
| 在进程重启后保持持久化 | ✅ |
| 在运行时错误中存活 | ✅ |