工作流程手册:全面解析Workflows的所有功能¶
首先,我们安装所需的依赖项。Core模块包含了我们所需的大部分功能;OpenAI用于处理LLM访问,而utils-workflow模块提供了我们稍后将使用的可视化功能。
In [ ]:
Copied!
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
然后我们引入刚刚安装的依赖项
In [ ]:
Copied!
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI
设置我们的OpenAI密钥,以便我们可以进行实际的LLM操作。
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
导入 os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
工作流基础¶
让我们从最基本的工作流程开始:它只是启动、执行一项任务然后停止。如果你的任务如此简单,实际上并不需要真正的工作流程,但我们只是演示它们的工作原理。
In [ ]:
Copied!
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step
async def generate(self, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o")
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step
async def generate(self, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o")
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="什么是LlamaIndex?")
print(result)
LlamaIndex, formerly known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides tools to index various data types, such as documents, databases, and APIs, enabling LLMs to interact with and retrieve information from these sources more effectively. The framework supports the creation of indices that can be queried by LLMs, enhancing their ability to access and utilize external data in a structured manner. This capability is particularly useful for applications requiring the integration of LLMs with specific datasets or knowledge bases.
关于工作流的一个很酷的功能是我们可以使用pyvis来可视化它们。让我们看看这个非常简单的流程会是什么样子。
In [ ]:
Copied!
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
In [ ]:
Copied!
class FailedEvent(Event):
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# try to answer the query
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# improve the query or decide it can't be fixed
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
class FailedEvent(Event):
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# 尝试回答问题
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# 改进查询或决定无法修复
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
我们在这里使用随机数来模拟LLM(大型语言模型)的动作,以便获得可靠且有趣的行为。
answer_query() 接受一个起始事件。然后它可以执行以下两件事:
- 它可以回答查询并发出StopEvent,返回结果
- 它可以判定查询存在问题并发出FailedEvent
improve_query() 接受一个 FailedEvent。它还可以做两件事:
- 它可以判定查询无法优化并发出StopEvent事件,返回失败结果
- 它可以呈现一个更好的查询并发出QueryEvent,从而创建一个循环回到answer_query()
我们还可以将这个更复杂的工作流程可视化:
In [ ]:
Copied!
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
loop_workflow.html
我们在这里设置了verbose=True
,这样就能清楚地看到触发了哪些事件。你可以看到它方便地展示了循环过程然后给出回答。
In [ ]:
Copied!
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="什么是LlamaIndex?")
print(result)
Running step answer_query Step answer_query produced event FailedEvent Running step improve_query Step improve_query produced event StopEvent Your query can't be fixed.
在事件之间维护状态¶
有一个全局状态,允许您保留任意数据或函数供所有事件处理程序使用。
In [ ]:
Copied!
class GlobalExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# load our data here
await ctx.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# use our data with our query
data = await ctx.get("some_database")
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
class GlobalExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# 在这里加载我们的数据
await ctx.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# 使用我们的数据查询
data = await ctx.get("some_database")
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
In [ ]:
Copied!
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="什么是LlamaIndex?")
print(result)
Running step setup Step setup produced event QueryEvent Running step query Step query produced event StopEvent The answer to your query is value2
当然,这个流程本质上仍然是线性的。更现实的例子是,如果您的起始事件可以是查询或数据填充事件,并且您需要等待。让我们设置一下看看它是什么样子:
In [ ]:
Copied!
class WaitExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
await ctx.set("data", ev.data)
return StopEvent(result=None)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# do we have any data?
if hasattr(self, "data"):
data = await ctx.get("data")
return StopEvent(result=f"Got the data {data}")
else:
# there's non data yet
return None
else:
# this isn't a query
return None
class WaitExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
await ctx.set("data", ev.data)
return StopEvent(result=None)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# 我们有任何数据吗?
if hasattr(self, "data"):
data = await ctx.get("data")
return StopEvent(result=f"Got the data {data}")
else:
# 还没有数据
return None
else:
# 这不是一个查询
return None
In [ ]:
Copied!
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)
w = WaitExampleFlow(verbose=True)
result = await w.run(query="我能做到吗?")
if result is None:
print("不,你不能")
print("---")
result = await w.run(data="是的,你可以")
print("---")
result = await w.run(query="我能做到吗?")
print(result)
Running step query Step query produced no event Running step setup Step setup produced event StopEvent No you can't --- Running step query Step query produced no event Running step setup Step setup produced event StopEvent --- Running step query Step query produced event StopEvent Running step setup Step setup produced event StopEvent Got the data Yes you can
让我们可视化这个流程是如何工作的:
In [ ]:
Copied!
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
wait_workflow.html
等待一个或多个事件¶
由于等待事件是一种常见模式,上下文对象提供了一个便捷函数collect_events()
。该函数会捕获事件并存储它们,在所需事件全部收集完成前返回None
。这些事件将按照指定顺序附加到collect_events
的输出中。让我们看看实际效果:
In [ ]:
Copied!
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if not hasattr(self, "setup") or not self.setup:
self.setup = True
print("I got set up")
return SetupEvent(error=False)
@step
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent | None:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return None
# run the query
print("Now I have all the events")
print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
return StopEvent(result=result)
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if not hasattr(self, "setup") or not self.setup:
self.setup = True
print("I got set up")
return SetupEvent(error=False)
@step
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent | None:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return None
# run the query
print("Now I have all the events")
print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
return StopEvent(result=result)
In [ ]:
Copied!
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)
I got some input I got a query Not enough events yet Not enough events yet Now I have all the events [QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)] Ran query 'Here's my question' on input 'Here's some input'
你可以看到每个事件的触发情况,以及收集事件在收到足够事件前反复返回None
。让我们看看这在流程图中的表现:
In [ ]:
Copied!
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
collect_workflow.html