自定义入口点和出口点
大多数情况下,依赖我们在入门指南章节中看到的默认入口点和出口点就足够了。
然而,工作流支持自定义事件,您通常会在这些位置看到 StartEvent 和 StopEvent,让我们来看看具体实现方式。
使用自定义 StartEvent
Section titled “Using a custom StartEvent”当我们在工作流实例上调用 run() 方法时,传入的关键字参数会成为 StartEvent 实例的字段,该实例会在底层自动创建。如果我们想要传递复杂数据来启动工作流,这种方法可能会变得繁琐,这时我们可以引入自定义启动事件。
要能够使用自定义起始事件,第一步是创建一个继承自 StartEvent 的自定义类:
from pathlib import Path
from workflows.events import StartEventfrom llama_index.indices.managed.llama_cloud import LlamaCloudIndexfrom llama_index.llms.openai import OpenAI
class MyCustomStartEvent(StartEvent): a_string_field: str a_path_to_somewhere: Path an_index: LlamaCloudIndex an_llm: OpenAI现在我们需要做的就是在作为入口点的步骤中使用 MyCustomStartEvent 作为事件类型。
以这个人为复杂化的步骤为例:
class JokeFlow(Workflow): ...
@step async def generate_joke_from_index( self, ev: MyCustomStartEvent ) -> JokeEvent: # Build a query engine using the index and the llm from the start event query_engine = ev.an_index.as_query_engine(llm=ev.an_llm) topic = await query_engine.aquery( f"What is the closest topic to {a_string_field}" ) # Use the llm attached to the start event to instruct the model prompt = f"Write your best joke about {topic}." response = await ev.an_llm.acomplete(prompt) # Dump the response on disk using the Path object from the event ev.a_path_to_somewhere.write_text(str(response)) # Finally, pass the JokeEvent along return JokeEvent(joke=str(response))我们仍然可以将 MyCustomStartEvent 的字段作为关键字参数传递给工作流的 run 方法,但
这样做仍然很繁琐。更好的方法是通过 start_event 关键字参数传递事件实例,如下所示:
custom_start_event = MyCustomStartEvent(...)w = JokeFlow(timeout=60, verbose=False)result = await w.run(start_event=custom_start_event)print(str(result))这种方法使代码更清晰、更明确,并允许IDE中的自动补全功能正常工作。
使用自定义 StopEvent
Section titled “Using a custom StopEvent”与StartEvent类似,依赖内置的StopEvent在大多数情况下有效但并非总是如此。实际上,当我们
使用StopEvent时,工作流的结果必须设置到事件实例的result字段。由于结果可以是
任何Python对象,StopEvent的result字段被类型化为Any,从而失去了类型系统的所有优势。
此外,返回多个对象非常麻烦:我们通常会将一堆不相关的对象塞入
字典,然后将其赋值给StopEvent.result。
支持自定义停止事件的第一步,我们需要创建一个 StopEvent 的子类:
from workflows.events import StopEvent
class MyStopEvent(StopEvent): critique: CompletionResponse我们现在可以在工作流中将 StopEvent 替换为 MyStopEvent:
class JokeFlow(Workflow): ...
@step async def critique_joke(self, ev: JokeEvent) -> MyStopEvent: joke = ev.joke
prompt = f"Give a thorough analysis and critique of the following joke: {joke}" response = await self.llm.acomplete(prompt) return MyStopEvent(response)
...使用自定义停止事件时,我们需要记住的重要一点是:工作流运行的结果将是该事件的实例:
w = JokeFlow(timeout=60, verbose=False)# Warning! `result` now contains an instance of MyStopEvent!result = await w.run(topic="pirates")# We can now access the event fields as any normal Eventprint(result.critique.text)这种方法利用了Python的类型系统,对IDE中的自动补全非常友好,并允许外部应用程序进行内省,从而准确了解工作流运行将返回什么结果。