跳转到内容

并行执行相同事件示例

在本示例中,我们将演示如何使用工作流功能实现类似能力,同时允许并行执行多个同类型事件。
通过在 @step 装饰器中设置 num_workers 参数,我们可以控制同时执行的步骤数量,从而实现高效的并行处理。

首先,我们需要安装必要的依赖项:

  • LlamaIndex 核心提供大部分功能
  • llama-index-utils-workflow 用于工作流功能
# %pip install llama-index-core llama-index-utils-workflow -q

安装依赖项后,我们可以导入所需的库:

import asyncio
from llama_index.core.workflow import (
step,
Context,
Workflow,
Event,
StartEvent,
StopEvent,
)

我们将创建两个工作流:一个通过使用@step(num_workers=N)装饰器能够并行处理多个数据项,另一个不设置num_workers作为对比。
通过在@step装饰器中使用num_workers参数,我们可以限制同时执行的步骤数量,从而控制并行度。这种方法特别适用于需要处理相似任务同时管理资源使用量的场景。
例如,您可以同时执行多个子查询,但请注意num_workers不能无限制设置。它取决于您的工作负载或令牌限制。

我们将定义两种事件类型:一种用于待处理的输入事件,另一种用于处理结果:

class ProcessEvent(Event):
data: str
class ResultEvent(Event):
result: str

现在,我们将创建一个包含三个主要步骤的SequentialWorkflow和ParallelWorkflow类:

  • 开始:初始化并发送多个并行事件
  • process_data: 处理数据
  • combine_results: 收集并合并所有处理结果
import random
class SequentialWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.store.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=1)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.store.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)
class ParallelWorkflow(Workflow):
@step
async def start(self, ctx: Context, ev: StartEvent) -> ProcessEvent:
data_list = ["A", "B", "C"]
await ctx.store.set("num_to_collect", len(data_list))
for item in data_list:
ctx.send_event(ProcessEvent(data=item))
return None
@step(num_workers=3)
async def process_data(self, ev: ProcessEvent) -> ResultEvent:
# Simulate some time-consuming processing
processing_time = 2 + random.random()
await asyncio.sleep(processing_time)
result = f"Processed: {ev.data}"
print(f"Completed processing: {ev.data}")
return ResultEvent(result=result)
@step
async def combine_results(
self, ctx: Context, ev: ResultEvent
) -> StopEvent | None:
num_to_collect = await ctx.store.get("num_to_collect")
results = ctx.collect_events(ev, [ResultEvent] * num_to_collect)
if results is None:
return None
combined_result = ", ".join([event.result for event in results])
return StopEvent(result=combined_result)

在这两种工作流程中:

  • start 方法初始化并发送多个 ProcessEvent。
  • process_data 方法使用
    • 在 SequentialWorkflow 中仅使用 @step 装饰器
    • 在 ParallelWorkflow 中使用 @step(num_workers=3) 装饰器将同时执行的工作线程数量限制为 3 个。
  • combine_results 方法收集所有处理结果并将其合并。

最后,我们可以创建一个主函数来运行我们的工作流:

import time
sequential_workflow = SequentialWorkflow()
print(
"Start a sequential workflow without setting num_workers in the step of process_data"
)
start_time = time.time()
result = await sequential_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
print("-" * 30)
parallel_workflow = ParallelWorkflow()
print(
"Start a parallel workflow with setting num_workers in the step of process_data"
)
start_time = time.time()
result = await parallel_workflow.run()
end_time = time.time()
print(f"Workflow result: {result}")
print(f"Time taken: {end_time - start_time} seconds")
Start a sequential workflow without setting num_workers in the step of process_data
Completed processing: A
Completed processing: B
Completed processing: C
Workflow result: Processed: A, Processed: B, Processed: C
Time taken: 7.439495086669922 seconds
------------------------------
Start a parallel workflow with setting num_workers in the step of process_data
Completed processing: C
Completed processing: A
Completed processing: B
Workflow result: Processed: C, Processed: A, Processed: B
Time taken: 2.5881590843200684 seconds
  • 如果不设置 num_workers=1,可能需要总共 6-9 秒。通过设置 num_workers=3,处理过程会并行进行,每次处理 3 个项目,总共只需 2-3 秒。
  • 在 ParallelWorkflow 中,已完成结果的顺序可能与输入顺序不同,具体取决于任务的完成时间。

本示例演示了使用和不使用 num_workers 时的执行速度,以及如何在流程中实现并行处理。通过设置 num_workers,我们可以控制并行度,这对于需要平衡性能和资源使用情况的场景非常有用。

对并行执行工作流(如上述定义的工作流)进行检查点设置也是可行的。为此,我们必须将 Workflow 包装在 WorkflowCheckpointer 对象中,并使用这些实例执行运行。在工作流执行期间,检查点存储在此包装器对象中,可用于检查以及作为运行执行的起始点。

from llama_index.core.workflow.checkpointer import WorkflowCheckpointer
wflow_ckptr = WorkflowCheckpointer(workflow=parallel_workflow)
handler = wflow_ckptr.run()
await handler
Completed processing: C
Completed processing: A
Completed processing: B
'Processed: C, Processed: A, Processed: B'

上述运行的检查点存储在 WorkflowCheckpointer.checkpoints Dict 属性中。

for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']

我们可以使用WorkflowCheckpointer.run_from(checkpoint=...)方法从存储的任何检查点开始运行。让我们选择在首次完成"process_data"后存储的第一个检查点,并从此处开始运行。

ckpt = wflow_ckptr.checkpoints[run_id][0]
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B
Completed processing: A
'Processed: C, Processed: B, Processed: A'

调用 run_fromrun 将在 checkpoints 属性中创建一个新的运行条目。在指定检查点的最新运行中,我们可以看到只剩下两个“process_data”步骤和最终的“combine_results”步骤需要完成。

for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']
Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']

现在,如果我们使用与同一初始运行的“process_data”第二次完成相关联的检查点作为起点,那么我们应该会看到一个仅包含两个步骤的新条目:“process_data”和“combine_results”。

# get the run_id of the first initial run
first_run_id = next(iter(wflow_ckptr.checkpoints.keys()))
first_run_id
'90812bec-b571-4513-8ad5-aa957ad7d4fb'
ckpt = wflow_ckptr.checkpoints[first_run_id][
1
] # checkpoint after the second "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
Completed processing: B
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']
Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']
Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']

同样地,如果我们从初始运行的“process_data”第三次完成的检查点开始,那么我们应该只会看到最终的“combine_results”步骤。

ckpt = wflow_ckptr.checkpoints[first_run_id][
2
] # checkpoint after the third "process_data" step
handler = wflow_ckptr.run_from(ckpt)
await handler
'Processed: C, Processed: A, Processed: B'
for run_id, ckpts in wflow_ckptr.checkpoints.items():
print(f"Run: {run_id} has {[c.last_completed_step for c in ckpts]}")
Run: 90812bec-b571-4513-8ad5-aa957ad7d4fb has ['process_data', 'process_data', 'process_data', 'combine_results']
Run: 4e1d24cd-c672-4ed1-bb5b-b9f1a252abed has ['process_data', 'process_data', 'combine_results']
Run: e4f94fcd-9b78-4e28-8981-e0232d068f6e has ['process_data', 'combine_results']
Run: c498a1a0-cf4c-4d80-a1e2-a175bb90b66d has ['combine_results']