跳至内容

工作流程#

LlamaIndex中的Workflow是一种事件驱动的抽象概念,用于将多个事件串联起来。工作流由steps组成,每个步骤负责处理特定类型的事件并发出新事件。

Workflow在LlamaIndex中通过使用@step装饰器来装饰函数工作。这用于推断每个工作流的输入和输出类型以进行验证,并确保每个步骤仅在准备好接受事件时运行。

你可以创建一个Workflow来实现任何功能!构建一个智能体、RAG流程、提取流程,或任何你想要的流程。

工作流也会自动进行监测,因此您可以使用Arize Pheonix等工具观察每个步骤的运行情况。(注意:监测功能适用于利用了新监测系统的集成方案。具体使用情况可能有所不同。)

提示

工作流将异步作为一等公民,本页面假设您正在异步环境中运行。这意味着您需要正确设置代码以支持异步。如果您已经在像FastAPI这样的服务器中运行,或者在笔记本环境中,您已经可以自由使用await了!

如果您正在运行自己的Python脚本,最佳实践是设置一个单一的异步入口点。

async def main():
    w = MyWorkflow(...)
    result = await w.run(...)
    print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

入门指南#

作为一个示例,让我们考虑一个简单的工作流程:先生成一个笑话,然后对其进行点评。

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)

# `pip install llama-index-llms-openai` if you don't already have it
from llama_index.llms.openai import OpenAI


class JokeEvent(Event):
    joke: str


class JokeFlow(Workflow):
    llm = OpenAI()

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))


w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

这里涉及几个关键部分,让我们逐一解析。

定义工作流事件#

class JokeEvent(Event):
    joke: str

事件是用户自定义的pydantic对象。您可以控制属性和任何其他辅助方法。在本例中,我们的工作流依赖于一个用户定义的事件JokeEvent

设置工作流类#

class JokeFlow(Workflow):
    llm = OpenAI(model="gpt-4o-mini")
    ...

我们的工作流程通过继承Workflow类来实现。为简化起见,我们附加了一个静态的OpenAI llm实例。

工作流入口点#

class JokeFlow(Workflow):
    ...

    @step
    async def generate_joke(self, ev: StartEvent) -> JokeEvent:
        topic = ev.topic

        prompt = f"Write your best joke about {topic}."
        response = await self.llm.acomplete(prompt)
        return JokeEvent(joke=str(response))

    ...

这里,我们来到了工作流的入口点。虽然大多数事件是由用户定义的,但框架提供了两个特殊事件:StartEventStopEvent。其中,StartEvent表示初始工作流输入的发送位置。

StartEvent是一个比较特殊的对象,因为它可以包含任意属性。这里我们通过ev.topic访问主题,如果该属性不存在则会引发错误。你也可以使用ev.get("topic")来处理属性可能不存在的情况,这样就不会引发错误。

此时,你可能已经注意到我们并未明确告知工作流哪些步骤处理哪些事件。 相反,@step装饰器被用来推断每个步骤的输入和输出类型。此外,这些推断出的输入和输出类型还会在运行前帮你验证工作流是否有效!

工作流退出点#

class JokeFlow(Workflow):
    ...

    @step
    async def critique_joke(self, ev: JokeEvent) -> StopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return StopEvent(result=str(response))

    ...

这里是我们工作流程中的第二步,也是最后一步。我们知道这是最后一步,因为返回了特殊的StopEvent。当工作流程遇到返回的StopEvent时,它会立即停止工作流程,并返回我们在result参数中传递的任何内容。

在这种情况下,结果是一个字符串,但也可能是字典、列表或其他任何对象。

运行工作流#

w = JokeFlow(timeout=60, verbose=False)
result = await w.run(topic="pirates")
print(str(result))

最后,我们创建并运行工作流。有一些设置如超时时间(以秒为单位)和详细程度,有助于调试。

.run() 方法是异步的,因此我们在这里使用 await 来等待结果。传递给 run() 的关键字参数将成为特殊 StartEvent 的字段,该事件将自动触发并启动工作流。正如我们所看到的,在这种情况下,topic 将通过 ev.topic 从步骤中访问。

自定义入口和出口点#

大多数情况下,依靠我们在[入门指南]部分看到的默认入口和出口点就足够了。 不过,工作流也支持自定义事件,通常你可以在预期StartEventStopEvent的地方使用它们,让我们看看具体方法。

使用自定义的StartEvent#

当我们调用工作流实例上的run()方法时,传递的关键字参数会成为在底层自动创建的StartEvent实例的字段。如果我们想传递复杂数据来启动工作流,这种方法可能会变得繁琐,这时我们可以引入自定义启动事件。

要使用自定义启动事件,第一步是创建一个继承自StartEvent的自定义类:

from pathlib import Path

from llama_index.core.workflow import StartEvent
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
from llama_index.llms.openai import OpenAI


class MyCustomStartEvent(StartEvent):
    a_string_field: str
    a_path_to_somewhere: Path
    an_index: LlamaCloudIndex
    an_llm: OpenAI

现在我们需要做的就是在作为入口点的步骤中使用MyCustomStartEvent作为事件类型。 以这个人为设计的复杂步骤为例:

class JokeFlow(Workflow):
    ...

    @step
    async def generate_joke_from_index(
        self, ev: MyCustomStartEvent
    ) -> JokeEvent:
        # Build a query engine using the index and the llm from the start event
        query_engine = ev.an_index.as_query_engine(llm=ev.an_llm)
        topic = query_engine.query(
            f"What is the closest topic to {a_string_field}"
        )
        # Use the llm attached to the start event to instruct the model
        prompt = f"Write your best joke about {topic}."
        response = await ev.an_llm.acomplete(prompt)
        # Dump the response on disk using the Path object from the event
        ev.a_path_to_somewhere.write_text(str(response))
        # Finally, pass the JokeEvent along
        return JokeEvent(joke=str(response))

我们仍然可以将MyCustomStartEvent的字段作为关键字参数传递给工作流的run方法,但这样做仍然很繁琐。更好的方法是像这样通过start_event关键字参数传递事件实例:

custom_start_event = MyCustomStartEvent(...)
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(start_event=custom_start_event)
print(str(result))

这种方法使代码更加清晰明确,并能让IDE中的自动补全功能正常工作。

使用自定义的StopEvent#

StartEvent类似,大多数情况下依赖内置的StopEvent可以正常工作,但并非总是如此。实际上,当我们使用StopEvent时,工作流的结果必须设置为事件实例的result字段。由于结果可以是任何Python对象,StopEventresult字段被类型化为Any,从而失去了类型系统的所有优势。此外,返回多个对象会很麻烦:我们通常会将一堆不相关的对象塞入字典,然后将其赋值给StopEvent.result

支持自定义停止事件的第一步,我们需要创建一个StopEvent的子类:

from llama_index.core.workflow import StopEvent


class MyStopEvent(StopEvent):
    critique: CompletionResponse

我们现在可以在工作流中用MyStopEvent替换StopEvent

class JokeFlow(Workflow):
    ...

    @step
    async def critique_joke(self, ev: JokeEvent) -> MyStopEvent:
        joke = ev.joke

        prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
        response = await self.llm.acomplete(prompt)
        return MyStopEvent(response)

    ...

使用自定义停止事件时,我们需要记住一个重要事项:工作流运行的结果将是该事件的实例:

w = JokeFlow(timeout=60, verbose=False)
# Warning! `result` now contains an instance of MyStopEvent!
result = await w.run(topic="pirates")
# We can now access the event fields as any normal Event
print(result.critique.text)

这种方法利用了Python的类型系统,对IDE中的自动补全非常友好,并且允许外部应用程序进行内省,从而确切知道工作流运行将返回什么。

绘制工作流程#

工作流可以通过在步骤定义中使用类型注解的功能进行可视化。您可以绘制工作流中所有可能的路径,或者最近一次执行的路径,以帮助调试。

首先安装:

pip install llama-index-utils-workflow

然后导入并使用:

from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)

# Draw all
draw_all_possible_flows(JokeFlow, filename="joke_flow_all.html")

# Draw an execution
w = JokeFlow()
await w.run(topic="Pirates")
draw_most_recent_execution(w, filename="joke_flow_recent.html")

处理全局上下文/状态#

可选地,您可以选择在步骤之间使用全局上下文。例如,可能有多个步骤需要访问用户原始的query输入。您可以将其存储在全局上下文中,以便每个步骤都能访问。

from llama_index.core.workflow import Context


@step
async def query(self, ctx: Context, ev: MyEvent) -> StopEvent:
    # retrieve from context
    query = await ctx.get("query")

    # do something with context and event
    val = ...
    result = ...

    # store in context
    await ctx.set("key", val)

    return StopEvent(result=result)

等待多个事件#

上下文不仅仅保存数据,它还提供了缓冲和等待多个事件的实用功能。

例如,您可能有一个步骤需要在合成响应之前等待查询并检索节点:

from llama_index.core import get_response_synthesizer


@step
async def synthesize(
    self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:
    data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
    # check if we can run
    if data is None:
        return None

    # unpack -- data is returned in order
    query_event, retrieve_event = data

    # run response synthesis
    synthesizer = get_response_synthesizer()
    response = synthesizer.synthesize(
        query_event.query, nodes=retrieve_event.nodes
    )

    return StopEvent(result=response)

使用ctx.collect_events()我们可以缓冲并等待所有预期事件到达。该函数只有在所有事件都到达后才会返回数据(按请求的顺序)。

手动触发事件#

通常情况下,事件是通过在步骤中返回另一个事件来触发的。但是,也可以在工作流中使用ctx.send_event(event)方法手动派发事件。

这是一个简短的示例,展示如何使用该功能:

from llama_index.core.workflow import step, Context, Event, Workflow


class MyEvent(Event):
    pass


class MyEventResult(Event):
    result: str


class GatherEvent(Event):
    pass


class MyWorkflow(Workflow):
    @step
    async def dispatch_step(
        self, ctx: Context, ev: StartEvent
    ) -> MyEvent | GatherEvent:
        ctx.send_event(MyEvent())
        ctx.send_event(MyEvent())

        return GatherEvent()

    @step
    async def handle_my_event(self, ev: MyEvent) -> MyEventResult:
        return MyEventResult(result="result")

    @step
    async def gather(
        self, ctx: Context, ev: GatherEvent | MyEventResult
    ) -> StopEvent | None:
        # wait for events to finish
        events = ctx.collect_events(ev, [MyEventResult, MyEventResult])
        if not events:
            return None

        return StopEvent(result=events)

流式事件#

您还可以在事件传入时进行迭代处理。这对于流式传输目的、显示进度或调试非常有用。智能体对象将发出使用ctx.write_event_to_stream()显式写入流的事件:

class ProgressEvent(Event):
    msg: str


class MyWorkflow(Workflow):
    @step
    async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
        ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
        return FirstEvent(first_output="First step complete.")

然后你可以像这样获取事件:

w = MyWorkflow(...)

handler = w.run(topic="Pirates")

async for event in handler.stream_events():
    print(event)

result = await handler

失败时重试执行步骤#

执行失败的步骤可能导致整个工作流失败,但通常情况下错误是可预期的,可以安全地重试执行。例如由于网络暂时拥堵导致HTTP请求超时,或调用外部API时触发了速率限制器。

在所有需要步骤重试的情况下,您可以使用"重试策略"。重试策略是一个对象,它指示工作流多次执行某个步骤,并规定在下次尝试前需要等待多长时间。策略会考虑自首次失败以来经过的时间、连续发生的失败次数以及最后出现的错误类型。

要为特定步骤设置策略,您只需将一个策略对象传递给@step装饰器:

from llama_index.core.workflow.retry_policy import ConstantDelayRetryPolicy


class MyWorkflow(Workflow):
    # ...more workflow definition...

    # This policy will retry this step on failure every 5 seconds for at most 10 times
    @step(retry_policy=ConstantDelayRetryPolicy(delay=5, maximum_attempts=10))
    async def flaky_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
        result = flaky_call()  # this might raise
        return StopEvent(result=result)

您可以查看API文档了解框架中可用策略的详细说明。如果找不到适合您用例的策略,您可以轻松编写自定义策略。自定义策略的唯一要求是编写一个遵循RetryPolicy协议的Python类。换句话说,您的自定义策略类必须包含具有以下签名的方法:

def next(
    self, elapsed_time: float, attempts: int, error: Exception
) -> Optional[float]:
    ...

例如,这是一个关于周末的激动重试策略,仅在周五时重试步骤:

from datetime import datetime


class RetryOnFridayPolicy:
    def next(
        self, elapsed_time: float, attempts: int, error: Exception
    ) -> Optional[float]:
        if datetime.today().strftime("%A") == "Friday":
            # retry in 5 seconds
            return 5
        # tell the workflow we don't want to retry
        return None

人在回路#

由于工作流程非常灵活,因此有许多可能的方式来实现人在回路模式。

实现人机交互的最简单方法是在事件流期间使用InputRequiredEventHumanResponseEvent事件。

from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent


class HumanInTheLoopWorkflow(Workflow):
    @step
    async def step1(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="Enter a number: ")

    @step
    async def step2(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)


# workflow should work with streaming
workflow = HumanInTheLoopWorkflow()

handler = workflow.run()
async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        # here, we can handle human input however you want
        # this means using input(), websockets, accessing async state, etc.
        # here, we just use input()
        response = input(event.prefix)
        handler.ctx.send_event(HumanResponseEvent(response=response))

final_result = await handler

在这里,工作流将等待直到HumanResponseEvent被触发。

另外请注意,您可以跳出循环并在稍后恢复它。如果您想暂停工作流以等待人工响应,但稍后继续工作流,这将非常有用。

handler = workflow.run()
async for event in handler.stream_events():
    if isinstance(event, InputRequiredEvent):
        break

# now we handle the human response
response = input(event.prefix)
handler.ctx.send_event(HumanResponseEvent(response=response))

# now we resume the workflow streaming
async for event in handler.stream_events():
    continue

final_result = await handler

逐步执行#

工作流程内置了逐步执行的实用工具,让您能够控制执行过程并在进展中调试状态。

# Create a workflow, same as usual
workflow = JokeFlow()
# Get the handler. Passing `stepwise=True` will block execution, waiting for manual intervention
handler = workflow.run(stepwise=True)
# Each time we call `run_step`, the workflow will advance and return all the events
# that were produced in the last step. This events need to be manually propagated
# for the workflow to keep going (we assign them to `produced_events` with the := operator).
while produced_events := await handler.run_step():
    # If we're here, it means there's at least an event we need to propagate,
    # let's do it with `send_event`
    for ev in produced_events:
        handler.ctx.send_event(ev)

# If we're here, it means the workflow execution completed, and
# we can now access the final result.
result = await handler

装饰非类函数#

你也可以在不继承工作流的情况下装饰并附加步骤到工作流中。

以下是之前的JokeFlow,但没有通过子类化定义。

from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
)
from llama_index.llms.openai import OpenAI


class JokeEvent(Event):
    joke: str


joke_flow = Workflow(timeout=60, verbose=True)


@step(workflow=joke_flow)
async def generate_joke(ev: StartEvent) -> JokeEvent:
    topic = ev.topic

    prompt = f"Write your best joke about {topic}."

    llm = OpenAI()
    response = await llm.acomplete(prompt)
    return JokeEvent(joke=str(response))


@step(workflow=joke_flow)
async def critique_joke(ev: JokeEvent) -> StopEvent:
    joke = ev.joke

    prompt = (
        f"Give a thorough analysis and critique of the following joke: {joke}"
    )
    response = await llm.acomplete(prompt)
    return StopEvent(result=str(response))

跨运行会话保持上下文#

如你所见,工作流有一个Context对象,可用于在步骤间维护状态。

如果你想在工作流程的多次运行中保持状态,可以将之前的上下文传入.run()方法。

handler = w.run()
result = await handler

# continue with next run
handler = w.run(ctx=handler.ctx)
result = await handler

工作流检查点#

工作流运行还可以通过WorfklowCheckpointer对象在每一步完成后创建并存储检查点。这些检查点随后可用作未来运行的起点,这在开发(和调试)工作流时是一个有用的功能。

from llama_index.core.workflow import WorkflowCheckpointer

w = JokeFlow(...)
w_cptr = WorkflowCheckpointer(workflow=w)

# to checkpoint a run, use the `run` method from w_cptr
handler = w_cptr.run(topic="Pirates")
await handler

# to view the stored checkpoints of this run
w_cptr.checkpoints[handler.run_id]

# to run from one of the checkpoints, use `run_from` method
ckpt = w_cptr.checkpoints[handler.run_id][0]
handler = w_cptr.run_from(topic="Ships", checkpoint=ckpt)
await handler

部署工作流#

您可以通过llama_deployrepo)将工作流部署为多智能体服务。每个智能体服务通过控制平面进行编排,并通过消息队列进行通信。支持本地部署或Kubernetes部署。

示例#

为了帮助您更熟悉工作流概念及其功能,LlamaIndex文档提供了可运行的示例笔记本供您进行实践学习:

  • 常见工作流模式 将引导您了解常见的使用模式,例如使用简单工作流实现循环和状态管理。这通常是一个很好的起点。
  • RAG + Reranking 展示了如何通过一个相当简单的工作流程实现现实世界的用例,该流程同时执行数据摄取和查询。
  • Citation Query Engine 类似于RAG + 重排序,该笔记本重点介绍如何在检索和生成之间实现中间步骤。这是一个很好的示例,展示了如何在工作流中使用Context对象。
  • Corrective RAG 在RAG工作流的基础上增加了一些复杂性,展示了如何在评估步骤后查询网络搜索引擎。
  • Utilizing Concurrency 阐述了如何管理工作流中步骤的并行执行,随着工作流复杂度的提升,掌握这一点非常重要。

RAG应用易于理解,是学习工作流基础知识的绝佳机会。然而,涉及工具调用、记忆和路由等更复杂的智能体场景才是工作流真正大放异彩的地方。

以下示例展示了其中一些用例。

  • ReAct Agent 显然是展示如何在工作流中实现工具的完美示例。
  • Function Calling Agent 是一个很好的示例,展示了如何在workflow中使用LlamaIndex框架的基础功能,即使在像函数调用这样的复杂场景中也能保持代码简洁有序。
  • CodeAct Agent 是一个很好的示例,展示了如何从头开始创建一个CodeAct智能体。
  • Human In The Loop: Story Crafting 是一个强大的示例,展示了工作流运行如何具有交互性和状态性。在这个案例中,用于收集来自人类的输入。
  • 可靠的生成结构化数据展示了如何在工作流中实现循环,在这个案例中通过反思(reflection)来改进结构化输出。
  • Query Planning with Workflows 是一个工作流示例,通过将查询分解为更小的项目并执行这些项目来进行查询规划。它重点展示了如何从工作流中流式传输事件、并行执行步骤,以及循环直到满足条件。
  • Checkpointing Workflows 是一个更详尽的演示,展示如何充分利用 WorkflowCheckpointer 对工作流运行进行检查点保存。

最后但同样重要的是,还有一些更高级的用例展示了工作流在快速实现原型(例如从文献中)时如何变得极为便利:

优云智算