介绍
工作流是一种基于事件驱动、分步骤控制应用程序执行流程的方式。
您的应用程序被划分为称为步骤的部分,这些步骤由事件触发,并且它们自身会发出事件来触发后续步骤。通过组合步骤和事件,您可以创建任意复杂的流程,这些流程封装了逻辑,使您的应用程序更易于维护和理解。一个步骤可以是从单行代码到复杂智能体的任何内容。它们可以具有任意输入和输出,这些输入和输出通过事件传递。
为什么需要工作流?
Section titled “Why workflows?”随着生成式AI应用变得越来越复杂,管理数据流和控制应用执行变得更加困难。工作流通过将应用拆分为更小、更易管理的部分,提供了一种管理这种复杂性的方法。
其他框架及LlamaIndex本身曾尝试通过有向无环图(DAG)解决此问题,但这些方案存在工作流所不具备的若干局限性:
- 像循环和分支这样的逻辑需要被编码到图的边中,这使得它们难以阅读和理解。
- 在DAG中的节点间传递数据会围绕可选值、默认值以及应传递哪些参数产生复杂性。
- 对于尝试开发复杂、循环、分支AI应用的开发者来说,有向无环图(DAG)的架构感觉不够自然。
工作流基于事件的模式和原生Python方法解决了这些问题。
作为一个示例,让我们考虑一个简单的工作流程:先生成一个笑话,然后对其进行评价。
from workflows import Workflow, stepfrom workflows.events import ( Event, StartEvent, StopEvent,)
# `pip install llama-index-llms-openai` if you don't already have itfrom llama_index.llms.openai import OpenAI
class JokeEvent(Event): joke: str
class JokeFlow(Workflow): llm = OpenAI(model="gpt-4.1")
@step async def generate_joke(self, ev: StartEvent) -> JokeEvent: topic = ev.topic
prompt = f"Write your best joke about {topic}." response = await self.llm.acomplete(prompt) return JokeEvent(joke=str(response))
@step async def critique_joke(self, ev: JokeEvent) -> StopEvent: joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}" response = await self.llm.acomplete(prompt) return StopEvent(result=str(response))
w = JokeFlow(timeout=60, verbose=False)result = await w.run(topic="pirates")print(str(result))
这里涉及几个关键部分,让我们逐一梳理。
class JokeEvent(Event): joke: str事件是用户自定义的pydantic对象。您可以控制其属性和任何其他辅助方法。在本例中,我们的工作流依赖于单个用户定义的事件,即JokeEvent。
class JokeFlow(Workflow): llm = OpenAI(model="gpt-4.1") ...我们的工作流程通过继承 Workflow 类来实现。为简化操作,我们附加了一个静态的 OpenAI llm 实例。
class JokeFlow(Workflow): ...
@step async def generate_joke(self, ev: StartEvent) -> JokeEvent: topic = ev.topic
prompt = f"Write your best joke about {topic}." response = await self.llm.acomplete(prompt) return JokeEvent(joke=str(response))
...这里,我们来到工作流的入口点。虽然大多数事件是用户定义的,但框架提供了两种特殊情况的事件,
即 StartEvent 和 StopEvent。其中,StartEvent 表示
初始工作流输入的发送位置。
StartEvent 是一个比较特殊的对象,因为它可以容纳任意属性。这里我们通过 ev.topic 访问主题,如果该属性不存在则会引发错误。你也可以使用 ev.get("topic") 来处理属性可能不存在的情况,这样就不会引发错误。
为了进一步增强类型安全性,您也可以继承 StartEvent 类。
此时,你可能已经注意到我们并未明确告知工作流哪些事件由哪些步骤处理。
相反,@step 装饰器被用来推断每个步骤的输入和输出类型。此外,这些推断出的
输入和输出类型还会在运行前用于验证工作流的有效性!
class JokeFlow(Workflow): ...
@step async def critique_joke(self, ev: JokeEvent) -> StopEvent: joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}" response = await self.llm.acomplete(prompt) return StopEvent(result=str(response))
...这里,我们进入了工作流程中的第二个也是最后一个步骤。我们知道这是最后一步,因为返回了特殊的 StopEvent。当工作流程遇到返回的 StopEvent 时,它会立即停止工作流程,并返回我们在 result 参数中传递的任何内容。
在这种情况下,结果是一个字符串,但它也可以是字典、列表或任何其他对象。
您也可以继承 StopEvent 类以获得更强的类型安全性。
w = JokeFlow(timeout=60, verbose=False)result = await w.run(topic="pirates")print(str(result))最后,我们创建并运行工作流。有一些设置,如超时时间(以秒为单位)和详细程度,以帮助调试。
.run() 方法是异步的,因此我们在此使用 await 来等待结果。传递给 run() 的关键字参数将
成为特殊 StartEvent 的字段,该字段将自动发出并启动工作流。正如我们所见,
在这种情况下,topic 将通过 ev.topic 从步骤中访问。
为了帮助您更熟悉工作流概念及其功能,LlamaIndex 文档提供了可运行的示例笔记本供您进行实践学习:
- 常见工作流模式 将引导您了解常见的使用模式,例如使用简单工作流实现循环和状态管理。这通常是一个很好的入门起点。
- RAG + 重排序 展示了如何通过一个相当简单的工作流实现现实世界用例,该工作流同时执行数据摄取和查询。
- 引文查询引擎 类似于 RAG + 重排序,该笔记本重点介绍如何在检索和生成之间实现中间步骤。这是一个展示如何在流程中使用
Context对象的优秀示例。 - 纠正性RAG在RAG工作流基础上增加了更多复杂性,展示了在评估步骤后如何查询网络搜索引擎。
- 利用并发性 解释了如何管理工作流中步骤的并行执行,随着工作流复杂度的增加,这一点非常重要。
RAG应用易于理解,为学习工作流基础知识提供了绝佳机会。然而,涉及工具调用、记忆和路由的更复杂智能体场景才是工作流真正发挥优势的领域。
以下示例展示了其中一些应用场景。
- ReAct 智能体显然是展示如何在工作流中实现工具的完美示例。
- 函数调用智能体是一个很好的示例,展示了如何在工作流中使用LlamaIndex框架原语,即使在函数调用等复杂场景中也能保持代码简洁整齐。
- CodeAct 智能体 是一个关于如何从零开始创建 CodeAct 智能体的优秀示例。
- 人在回路:故事创作是一个强大的示例,展示了工作流运行如何实现交互性和状态保持。在这个案例中,用于收集来自人类的输入。
- 可靠的结构化生成展示了如何在流程中实现循环,在本例中通过反思来改进结构化输出。
- 使用工作流进行查询规划是一个工作流示例,它通过将查询分解为更小的项目并执行这些项目来规划查询。它展示了如何从工作流中流式传输事件、并行执行步骤,以及循环直到满足条件。
- 检查点工作流 更详尽地演示了如何充分利用
WorkflowCheckpointer来对工作流运行进行检查点设置。
最后但同样重要的是,还有一些更高级的用例展示了工作流在快速实现原型(例如从文献中提取)时的极大便利性: