跳转到内容

自定义入口点和出口点

大多数情况下,依赖我们在入门指南章节中看到的默认入口点和出口点就足够了。 然而,工作流支持自定义事件,您通常会在这些位置看到 StartEventStopEvent,让我们来看看具体实现方式。

当我们在工作流实例上调用 run() 方法时,传入的关键字参数会成为 StartEvent 实例的字段,该实例会在底层自动创建。如果我们想要传递复杂数据来启动工作流,这种方法可能会变得繁琐,这时我们可以引入自定义启动事件。

要能够使用自定义起始事件,第一步是创建一个继承自 StartEvent 的自定义类:

from pathlib import Path
from workflows.events import StartEvent
from llama_index.indices.managed.llama_cloud import LlamaCloudIndex
from llama_index.llms.openai import OpenAI
class MyCustomStartEvent(StartEvent):
a_string_field: str
a_path_to_somewhere: Path
an_index: LlamaCloudIndex
an_llm: OpenAI

现在我们需要做的就是在作为入口点的步骤中使用 MyCustomStartEvent 作为事件类型。 以这个人为复杂化的步骤为例:

class JokeFlow(Workflow):
...
@step
async def generate_joke_from_index(
self, ev: MyCustomStartEvent
) -> JokeEvent:
# Build a query engine using the index and the llm from the start event
query_engine = ev.an_index.as_query_engine(llm=ev.an_llm)
topic = await query_engine.aquery(
f"What is the closest topic to {a_string_field}"
)
# Use the llm attached to the start event to instruct the model
prompt = f"Write your best joke about {topic}."
response = await ev.an_llm.acomplete(prompt)
# Dump the response on disk using the Path object from the event
ev.a_path_to_somewhere.write_text(str(response))
# Finally, pass the JokeEvent along
return JokeEvent(joke=str(response))

我们仍然可以将 MyCustomStartEvent 的字段作为关键字参数传递给工作流的 run 方法,但 这样做仍然很繁琐。更好的方法是通过 start_event 关键字参数传递事件实例,如下所示:

custom_start_event = MyCustomStartEvent(...)
w = JokeFlow(timeout=60, verbose=False)
result = await w.run(start_event=custom_start_event)
print(str(result))

这种方法使代码更清晰、更明确,并允许IDE中的自动补全功能正常工作。

StartEvent类似,依赖内置的StopEvent在大多数情况下有效但并非总是如此。实际上,当我们 使用StopEvent时,工作流的结果必须设置到事件实例的result字段。由于结果可以是 任何Python对象,StopEventresult字段被类型化为Any,从而失去了类型系统的所有优势。 此外,返回多个对象非常麻烦:我们通常会将一堆不相关的对象塞入 字典,然后将其赋值给StopEvent.result

支持自定义停止事件的第一步,我们需要创建一个 StopEvent 的子类:

from workflows.events import StopEvent
class MyStopEvent(StopEvent):
critique: CompletionResponse

我们现在可以在工作流中将 StopEvent 替换为 MyStopEvent

class JokeFlow(Workflow):
...
@step
async def critique_joke(self, ev: JokeEvent) -> MyStopEvent:
joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}"
response = await self.llm.acomplete(prompt)
return MyStopEvent(response)
...

使用自定义停止事件时,我们需要记住的重要一点是:工作流运行的结果将是该事件的实例:

w = JokeFlow(timeout=60, verbose=False)
# Warning! `result` now contains an instance of MyStopEvent!
result = await w.run(topic="pirates")
# We can now access the event fields as any normal Event
print(result.critique.text)

这种方法利用了Python的类型系统,对IDE中的自动补全非常友好,并允许外部应用程序进行内省,从而准确了解工作流运行将返回什么结果。