并行执行相同事件示例
在本示例中,我们将演示如何使用工作流功能实现类似能力,同时允许并行执行多个同类型事件。
通过在 @step 装饰器中设置 num_workers 参数,我们可以控制同时执行的步骤数量,从而实现高效的并行处理。
首先,我们需要安装必要的依赖项:
- LlamaIndex 核心提供大部分功能
- llama-index-utils-workflow 用于工作流功能
# %pip install llama-index-core llama-index-utils-workflow -q安装依赖项后,我们可以导入所需的库:
import asynciofrom llama_index.core.workflow import ( step, Context, Workflow, Event, StartEvent, StopEvent,)我们将创建两个工作流:一个通过使用@step(num_workers=N)装饰器能够并行处理多个数据项,另一个不设置num_workers作为对比。
通过在@step装饰器中使用num_workers参数,我们可以限制同时执行的步骤数量,从而控制并行度。这种方法特别适用于需要处理相似任务同时管理资源使用量的场景。
例如,您可以同时执行多个子查询,但请注意num_workers不能无限制设置。它取决于您的工作负载或令牌限制。
我们将定义两种事件类型:一种用于待处理的输入事件,另一种用于处理结果:
class ProcessEvent(Event): data: str
class ResultEvent(Event): result: str现在,我们将创建一个包含三个主要步骤的SequentialWorkflow和ParallelWorkflow类:
- 开始:初始化并发送多个并行事件
- process_data: 处理数据
- combine_results: 收集并合并所有处理结果
import random
class SequentialWorkflow(Workflow): @step async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent: data_list = ["A", "B", "C"] await ctx.store.set("num_to_collect", len(data_list)) for item in data_list: ctx.send_event(ProcessEvent(data=item)) return None
@step(num_workers=1) async def process_data(self, ev: ProcessEvent) -> ResultEvent: # Simulate some time-consuming processing processing_time = 2 + random.random() await asyncio.sleep(processing_time) result = f"Processed: {ev.data}" print(f"Completed processing: {ev.data}") return ResultEvent(result=result)
@step async def combine_results( self, ctx: Context, ev: ResultEvent ) -> StopEvent | None: num_to_collect = await ctx.store.get("num_to_collect") results = ctx.collect_events(ev, [ResultEvent] * num_to_collect) if results is None: return None
combined_result = ", ".join([event.result for event in results]) return StopEvent(result=combined_result)
class ParallelWorkflow(Workflow): @step async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent: data_list = ["A", "B", "C"] await ctx.store.set("num_to_collect", len(data_list)) for item in data_list: ctx.send_event(ProcessEvent(data=item)) return None
@step(num_workers=3) async def process_data(self, ev: ProcessEvent) -> ResultEvent: # Simulate some time-consuming processing processing_time = 2 + random.random() await asyncio.sleep(processing_time) result = f"Processed: {ev.data}" print(f"Completed processing: {ev.data}") return ResultEvent(result=result)
@step async def combine_results( self, ctx: Context, ev: ResultEvent ) -> StopEvent | None: num_to_collect = await ctx.store.get("num_to_collect") results = ctx.collect_events(ev, [ResultEvent] * num_to_collect) if results is None: return None
combined_result = ", ".join([event.result for event in results]) return StopEvent(result=combined_result)在这两种工作流程中:
- start 方法初始化并发送多个 ProcessEvent。
- process_data 方法使用
- 在 SequentialWorkflow 中仅使用
@step装饰器 - 在 ParallelWorkflow 中使用
@step(num_workers=3)装饰器将同时执行的工作线程数量限制为 3 个。
- 在 SequentialWorkflow 中仅使用
- combine_results 方法收集所有处理结果并将其合并。
最后,我们可以创建一个主函数来运行我们的工作流:
import time
sequential_workflow = SequentialWorkflow()
print( "Start a sequential workflow without setting num_workers in the step of process_data")start_time = time.time()result = await sequential_workflow.run()end_time = time.time()print(f"Workflow result: {result}")print(f"Time taken: {end_time - start_time} seconds")print("-" * 30)
parallel_workflow = ParallelWorkflow()
print( "Start a parallel workflow with setting num_workers in the step of process_data")start_time = time.time()result = await parallel_workflow.run()end_time = time.time()print(f"Workflow result: {result}")print(f"Time taken: {end_time - start_time} seconds")Start a sequential workflow without setting num_workers in the step of process_dataCompleted processing: ACompleted processing: BCompleted processing: CWorkflow result: Processed: A, Processed: B, Processed: CTime taken: 7.439495086669922 seconds------------------------------Start a parallel workflow with setting num_workers in the step of process_dataCompleted processing: CCompleted processing: ACompleted processing: BWorkflow result: Processed: C, Processed: A, Processed: BTime taken: 2.5881590843200684 seconds- 如果不设置
num_workers=1,可能需要总共 6-9 秒。通过设置num_workers=3,处理过程会并行进行,每次处理 3 个项目,总共只需 2-3 秒。 - 在 ParallelWorkflow 中,已完成结果的顺序可能与输入顺序不同,具体取决于任务的完成时间。
本示例演示了使用和不使用 num_workers 时的执行速度,以及如何在流程中实现并行处理。通过设置 num_workers,我们可以控制并行度,这对于需要平衡性能和资源使用情况的场景非常有用。
对并行执行工作流(如上述定义的工作流)进行检查点设置也是可行的。为此,我们必须将 Workflow 包装在 WorkflowCheckpointer 对象中,并使用这些实例执行运行。在工作流执行期间,检查点存储在此包装器对象中,可用于检查以及作为运行执行的起始点。
from llama_index.core.workflow.checkpointer import WorkflowCheckpointerwflow_ckptr = WorkflowCheckpointer(workflow=parallel_workflow)handler = wflow_ckptr.run()await handlerCompleted processing: CCompleted processing: ACompleted processing: B
'Processed: C, Processed: A, Processed: B'上述运行的检查点存储在 WorkflowCheckpointer.checkpoints Dict 属性中。
for run_id, ckpts in wflow_ckptr.checkpoints.items(): print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']我们可以使用WorkflowCheckpointer.run_from(checkpoint=...)方法从存储的任何检查点开始运行。让我们选择在首次完成"process_data"后存储的第一个检查点,并从此处开始运行。
ckpt = wflow_ckptr.checkpoints[run_id][0]handler = wflow_ckptr.run_from(ckpt)await handlerCompleted processing: BCompleted processing: A
'Processed: C, Processed: B, Processed: A'调用 run_from 或 run 将在 checkpoints 属性中创建一个新的运行条目。在指定检查点的最新运行中,我们可以看到只剩下两个“process_data”步骤和最终的“combine_results”步骤需要完成。
for run_id, ckpts in wflow_ckptr.checkpoints.items(): print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']现在,如果我们使用与同一初始运行的“process_data”第二次完成相关联的检查点作为起点,那么我们应该会看到一个仅包含两个步骤的新条目:“process_data”和“combine_results”。
# get the run_id of the first initial runfirst_run_id = next(iter(wflow_ckptr.checkpoints.keys()))first_run_id'90812bec-b571-4513-8ad5-aa957ad7d4fb'ckpt = wflow_ckptr.checkpoints[first_run_id][ 1] # checkpoint after the second "process_data" stephandler = wflow_ckptr.run_from(ckpt)await handlerCompleted processing: B
'Processed: C, Processed: A, Processed: B'for run_id, ckpts in wflow_ckptr.checkpoints.items(): print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']同样地,如果我们从初始运行的“process_data”第三次完成的检查点开始,那么我们应该只会看到最终的“combine_results”步骤。
ckpt = wflow_ckptr.checkpoints[first_run_id][ 2] # checkpoint after the third "process_data" stephandler = wflow_ckptr.run_from(ckpt)await handler'Processed: C, Processed: A, Processed: B'for run_id, ckpts in wflow_ckptr.checkpoints.items(): print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']Run: c498a1a0-cf4c-4d80-a1e2-a175bb90b66d has ['combine_results']