跳转到内容

工作流

在LlamaIndex中,Workflow是一个轻量级、事件驱动的抽象概念,用于将多个事件链接在一起。工作流由handlers组成,每个组件负责处理特定的事件类型并发出新事件。

工作流设计灵活,可用于构建智能体、RAG流程、提取流程或任何您想要实现的其他功能。

npm i @llamaindex/workflow @llamaindex/openai

让我们探索一个简单的工作流示例:先生成一个笑话,然后进行批评并迭代改进:

../../examples/agents/workflow/joke.ts

这里涉及几个关键部分,让我们逐步来梳理。

const startEvent = workflowEvent<string>(); // Input topic for joke
const jokeEvent = workflowEvent<{ joke: string }>(); // Intermediate joke
const critiqueEvent = workflowEvent<{ joke: string; critique: string }>(); // Intermediate critique
const resultEvent = workflowEvent<{ joke: string; critique: string }>(); // Final joke + critique

事件使用 workflowEvent 函数定义,并包含作为泛型类型提供的任意数据。在此示例中,我们有四个事件:

  • startEvent: 接收字符串输入(笑话主题)
  • jokeEvent: 包含一个具有笑话属性的对象
  • critiqueEvent: 包含笑话及其评论,用于反馈循环
  • resultEvent: 包含经过任何迭代后的最终笑话和评价
const { withState, getContext } = createStatefulMiddleware(() => ({
numIterations: 0,
maxIterations: 3,
}));
const jokeFlow = withState(createWorkflow());

我们的工作流程通过createWorkflow()函数实现,并通过withState中间件进行增强。该中间件为所有处理器提供共享状态,在本例中用于追踪:

  • numIterations: 统计我们进行了多少次笑话改进的迭代
  • maxIterations: 设置限制以防止无限循环

此状态将可通过使用 getContext().state 函数在工作流中访问。

我们的工作流中有三个关键处理器:

  1. 第一个处理器处理 startEvent,生成初始笑话,并发出 jokeEvent
jokeFlow.handle([startEvent], async (event) => {
// Prompt the LLM to write a joke
const prompt = `Write your best joke about ${event.data}. Write the joke between <joke> and </joke> tags.`;
const response = await llm.complete({ prompt });
// Parse the joke from the response
const joke =
response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
response.text;
return jokeEvent.with({ joke: joke });
});
  1. 第二个处理器处理 jokeEvent,评估笑话质量,并执行以下操作之一:
    • 如果笑话需要改进,则发出 critiqueEvent
    • 如果笑话足够好,则发出 resultEvent
jokeFlow.handle([jokeEvent], async (event) => {
// Prompt the LLM to critique the joke
const prompt = `Give a thorough critique of the following joke. If the joke needs improvement, put "IMPROVE" somewhere in the critique: ${event.data.joke}`;
const response = await llm.complete({ prompt });
// If the critique includes "IMPROVE", keep iterating, else, return the result
if (response.text.includes("IMPROVE")) {
return critiqueEvent.with({
joke: event.data.joke,
critique: response.text,
});
}
return resultEvent.with({ joke: event.data.joke, critique: response.text });
});
  1. 第三个处理器处理 critiqueEvent,根据评价生成改进后的笑话,并执行以下操作之一:
    • 若未达到迭代限制,则循环回笑话评估环节
    • 若达到迭代限制,则输出最终的 resultEvent
jokeFlow.handle([critiqueEvent], async (event) => {
// Keep track of the number of iterations
const state = getContext().state;
state.numIterations++;
// Write a new joke based on the previous joke and critique
const prompt = `Write a new joke based on the following critique and the original joke. Write the joke between <joke> and </joke> tags.\n\nJoke: ${event.data.joke}\n\nCritique: ${event.data.critique}`;
const response = await llm.complete({ prompt });
// Parse the joke from the response
const joke =
response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
response.text;
// If we've done less than the max number of iterations, keep iterating
// else, return the result
if (state.numIterations < state.maxIterations) {
return jokeEvent.with({ joke: joke });
}
return resultEvent.with({ joke: joke, critique: event.data.critique });
});
async function main() {
const { stream, sendEvent } = jokeFlow.createContext();
sendEvent(startEvent.with("pirates"));
let result: { joke: string, critique: string } | undefined;
for await (const event of stream) {
// console.log(event.data); optionally log the event data
if (resultEvent.include(event)) {
result = event.data;
break; // Stop when we get the final result
}
}
console.log(result);
}

要运行工作流,我们需要:

  1. 使用 createContext() 创建工作流上下文
  2. 使用 sendEvent() 触发初始事件
  3. 监听事件流并在事件到达时进行处理
  4. 使用 include() 检查事件是否为特定类型
  5. 当我们收到最终结果时跳出循环

createContext 返回的 stream 包含实用函数,使处理事件流更加便捷:

// Create a workflow context and send the initial event
const { stream, sendEvent } = jokeFlow.createContext();
sendEvent(startEvent.with("pirates"));
// Collect all events until we get a resultEvent
const allEvents = await stream.until(resultEvent).toArray();
// The last event will be the resultEvent
const finalEvent = allEvents.at(-1);
console.log(finalEvent.data); // Output the joke and critique

流式工具使得处理异步事件流变得更加容易。在这个示例中,我们使用:

  • toArray: 将所有事件聚合到一个数组中
  • until: 创建一个持续发送事件的流,直到满足某个条件(在本例中,直到接收到resultEvent)

您可以将这些实用工具与其他流操作符(如 filtermap)结合使用,以创建强大的处理流水线。

要了解更多关于工作流的信息,请查阅工作流文档