工作流
在LlamaIndex中,Workflow是一个轻量级、事件驱动的抽象概念,用于将多个事件链接在一起。工作流由handlers组成,每个组件负责处理特定的事件类型并发出新事件。
工作流设计灵活,可用于构建智能体、RAG流程、提取流程或任何您想要实现的其他功能。
npm i @llamaindex/workflow @llamaindex/openai让我们探索一个简单的工作流示例:先生成一个笑话,然后进行批评并迭代改进:
这里涉及几个关键部分,让我们逐步来梳理。
const startEvent = workflowEvent<string>(); // Input topic for jokeconst jokeEvent = workflowEvent<{ joke: string }>(); // Intermediate jokeconst critiqueEvent = workflowEvent<{ joke: string; critique: string }>(); // Intermediate critiqueconst resultEvent = workflowEvent<{ joke: string; critique: string }>(); // Final joke + critique事件使用 workflowEvent 函数定义,并包含作为泛型类型提供的任意数据。在此示例中,我们有四个事件:
startEvent: 接收字符串输入(笑话主题)jokeEvent: 包含一个具有笑话属性的对象critiqueEvent: 包含笑话及其评论,用于反馈循环resultEvent: 包含经过任何迭代后的最终笑话和评价
const { withState, getContext } = createStatefulMiddleware(() => ({ numIterations: 0, maxIterations: 3,}));const jokeFlow = withState(createWorkflow());我们的工作流程通过createWorkflow()函数实现,并通过withState中间件进行增强。该中间件为所有处理器提供共享状态,在本例中用于追踪:
numIterations: 统计我们进行了多少次笑话改进的迭代maxIterations: 设置限制以防止无限循环
此状态将可通过使用 getContext().state 函数在工作流中访问。
我们的工作流中有三个关键处理器:
- 第一个处理器处理
startEvent,生成初始笑话,并发出jokeEvent:
jokeFlow.handle([startEvent], async (event) => { // Prompt the LLM to write a joke const prompt = `Write your best joke about ${event.data}. Write the joke between <joke> and </joke> tags.`; const response = await llm.complete({ prompt });
// Parse the joke from the response const joke = response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ?? response.text; return jokeEvent.with({ joke: joke });});- 第二个处理器处理
jokeEvent,评估笑话质量,并执行以下操作之一:- 如果笑话需要改进,则发出
critiqueEvent - 如果笑话足够好,则发出
resultEvent
- 如果笑话需要改进,则发出
jokeFlow.handle([jokeEvent], async (event) => { // Prompt the LLM to critique the joke const prompt = `Give a thorough critique of the following joke. If the joke needs improvement, put "IMPROVE" somewhere in the critique: ${event.data.joke}`; const response = await llm.complete({ prompt });
// If the critique includes "IMPROVE", keep iterating, else, return the result if (response.text.includes("IMPROVE")) { return critiqueEvent.with({ joke: event.data.joke, critique: response.text, }); }
return resultEvent.with({ joke: event.data.joke, critique: response.text });});- 第三个处理器处理
critiqueEvent,根据评价生成改进后的笑话,并执行以下操作之一:- 若未达到迭代限制,则循环回笑话评估环节
- 若达到迭代限制,则输出最终的
resultEvent
jokeFlow.handle([critiqueEvent], async (event) => { // Keep track of the number of iterations const state = getContext().state; state.numIterations++;
// Write a new joke based on the previous joke and critique const prompt = `Write a new joke based on the following critique and the original joke. Write the joke between <joke> and </joke> tags.\n\nJoke: ${event.data.joke}\n\nCritique: ${event.data.critique}`; const response = await llm.complete({ prompt });
// Parse the joke from the response const joke = response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ?? response.text;
// If we've done less than the max number of iterations, keep iterating // else, return the result if (state.numIterations < state.maxIterations) { return jokeEvent.with({ joke: joke }); }
return resultEvent.with({ joke: joke, critique: event.data.critique });});async function main() { const { stream, sendEvent } = jokeFlow.createContext(); sendEvent(startEvent.with("pirates"));
let result: { joke: string, critique: string } | undefined;
for await (const event of stream) { // console.log(event.data); optionally log the event data if (resultEvent.include(event)) { result = event.data; break; // Stop when we get the final result } }
console.log(result);}要运行工作流,我们需要:
- 使用
createContext()创建工作流上下文 - 使用
sendEvent()触发初始事件 - 监听事件流并在事件到达时进行处理
- 使用
include()检查事件是否为特定类型 - 当我们收到最终结果时跳出循环
由 createContext 返回的 stream 包含实用函数,使处理事件流更加便捷:
// Create a workflow context and send the initial eventconst { stream, sendEvent } = jokeFlow.createContext();sendEvent(startEvent.with("pirates"));
// Collect all events until we get a resultEventconst allEvents = await stream.until(resultEvent).toArray();
// The last event will be the resultEventconst finalEvent = allEvents.at(-1);console.log(finalEvent.data); // Output the joke and critique流式工具使得处理异步事件流变得更加容易。在这个示例中,我们使用:
toArray: 将所有事件聚合到一个数组中until: 创建一个持续发送事件的流,直到满足某个条件(在本例中,直到接收到resultEvent)
您可以将这些实用工具与其他流操作符(如 filter 和 map)结合使用,以创建强大的处理流水线。
要了解更多关于工作流的信息,请查阅工作流文档。