流式事件
工作流可能很复杂——它们被设计用于处理复杂、分支、并发的逻辑——这意味着它们可能需要较长时间才能完全执行。为了给用户提供良好的体验,你可能希望通过实时流式传输事件来显示进度指示。工作流在 Context 对象中内置了对这一功能的支持。
要完成这个任务,让我们引入所有需要的依赖项:
import asynciofrom llama_index.llms.openai import OpenAI
from workflows import ( Workflow, Context, step,)from workflows.events import ( StartEvent, StopEvent, Event,)让我们为一个简单的三步工作流程设置一些事件,再加上一个用于在处理过程中流式传输进度的事件:
class FirstEvent(Event): first_output: str
class SecondEvent(Event): second_output: str response: str
class ProgressEvent(Event): msg: str并定义一个发送事件的工作流类:
class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening")) return FirstEvent(first_output="First step complete.")
@step async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent: llm = OpenAI(model="gpt-4o-mini") generator = await llm.astream_complete( "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain." ) full_resp = "" async for response in generator: # Allow the workflow to stream this piece of response ctx.write_event_to_stream(ProgressEvent(msg=response.delta)) full_resp += response.delta
return SecondEvent( second_output="Second step complete, full response attached", response=response, )
@step async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening")) return StopEvent(result="Workflow complete.")在 step_one 和 step_three 中,我们将单个事件写入事件流。在 step_two 中,我们使用 astream_complete 来生成LLM响应的可迭代生成器,然后为LLM返回给我们的每个数据块(大约每个单词一个)生成一个事件,最后将最终响应返回给 step_three。
要实际获取此输出,我们需要异步运行工作流并监听事件,如下所示:
async def main(): w = MyWorkflow(timeout=30, verbose=True) handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events(): if isinstance(ev, ProgressEvent): print(ev.msg)
final_result = await handler print("Final result", final_result)
if __name__ == "__main__": asyncio.run(main())run 在后台运行工作流,而 stream_events 将提供写入流的任何事件。当流传递 StopEvent 时它会停止,之后您可以像通常那样获取工作流的最终结果。