跳转到内容

事件

事件 #

基类:DictLikeModel

所有工作流事件的基类。

事件是在步骤之间传递的轻量级、可序列化的载荷。 它们支持对动态字段进行属性和映射访问。

示例:

使用类型化字段进行子类化:

from pydantic import Field

class CustomEv(Event):
    score: int = Field(ge=0)

e = CustomEv(score=10)
print(e.score)
相关链接
workflows/handler.py 中的源代码workflows/events.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class Event(DictLikeModel):
    """
    Base class for all workflow events.

    Events are light-weight, serializable payloads passed between steps.
    They support both attribute and mapping access to dynamic fields.

    Examples:
        Subclassing with typed fields:

        ```python
        from pydantic import Field

        class CustomEv(Event):
            score: int = Field(ge=0)

        e = CustomEv(score=10)
        print(e.score)
        ```

    See Also:
        - [StartEvent][workflows.events.StartEvent]
        - [StopEvent][workflows.events.StopEvent]
        - [InputRequiredEvent][workflows.events.InputRequiredEvent]
        - [HumanResponseEvent][workflows.events.HumanResponseEvent]
    """

    def __init__(self, **params: Any):
        super().__init__(**params)

输入必需事件 #

基类:Event

当需要人工输入才能继续时触发。

如果从步骤返回,则自动写入事件流。

如果从步骤返回,它不需要被其他步骤使用,并且将通过验证。 预期调用方将响应此事件并返回一个人工响应事件

直接使用它或创建其子类。

典型流程:一个步骤返回 InputRequiredEvent,调用方从流中消费它并返回一个 人工响应事件

示例:

from workflows.events import InputRequiredEvent, HumanResponseEvent

class HITLWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="What's your name? ")

    @step
    async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)
workflows/handler.py 中的源代码workflows/events.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class InputRequiredEvent(Event):
    """Emitted when human input is required to proceed.

    Automatically written to the event stream if returned from a step.

    If returned from a step, it does not need to be consumed by other steps and will pass validation.
    It's expected that the caller will respond to this event and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent].

    Use this directly or subclass it.

    Typical flow: a step returns `InputRequiredEvent`, callers consume it from
    the stream and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent].

    Examples:
        ```python
        from workflows.events import InputRequiredEvent, HumanResponseEvent

        class HITLWorkflow(Workflow):
            @step
            async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
                return InputRequiredEvent(prefix="What's your name? ")

            @step
            async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
                return StopEvent(result=ev.response)
        ```
    """

人类响应事件 #

基类:Future[RunResultT]Event

携带人类对先前输入请求的响应。

如果被某个步骤使用但未被另一个步骤返回,它仍将通过验证。

示例:

from workflows.events import InputRequiredEvent, HumanResponseEvent

class HITLWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="What's your name? ")

    @step
    async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)
workflows/handler.py 中的源代码workflows/events.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
class HumanResponseEvent(Event):
    """Carries a human's response for a prior input request.

    If consumed by a step and not returned by another, it will still pass validation.

    Examples:
        ```python
        from workflows.events import InputRequiredEvent, HumanResponseEvent

        class HITLWorkflow(Workflow):
            @step
            async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
                return InputRequiredEvent(prefix="What's your name? ")

            @step
            async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
                return StopEvent(result=ev.response)
        ```
    """

开始事件 #

基类:DictLikeModelEvent

隐式入口事件已发送,用于启动一个 Workflow.run()

workflows/handler.py 中的源代码workflows/events.py
151
152
class StartEvent(Event):
    """Implicit entry event sent to kick off a `Workflow.run()`."""

停止事件 #

基类:DictLikeModelEvent

表示工作流已完成的终止事件。

result 属性包含工作流运行的返回值。当使用自定义停止事件子类时,工作流结果即为该事件实例本身。

示例:

# default stop event: result holds the value
return StopEvent(result={"answer": 42})

通过子类化提供自定义结果:

class MyStopEv(StopEvent): pass

@步骤 async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv: return MyStopEv(result={"answer": 42})

workflows/handler.py 中的源代码workflows/events.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class StopEvent(Event):
    """Terminal event that signals the workflow has completed.

    The `result` property contains the return value of the workflow run. When a
    custom stop event subclass is used, the workflow result is that event
    instance itself.

    Examples:
        ```python
        # default stop event: result holds the value
        return StopEvent(result={"answer": 42})
        ```

        Subclassing to provide a custom result:

        ```python
        class MyStopEv(StopEvent):
            pass

        @step
        async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv:
            return MyStopEv(result={"answer": 42})
    """

    _result: Any = PrivateAttr(default=None)

    def __init__(self, result: Any = None, **kwargs: Any) -> None:
        # forces the user to provide a result
        super().__init__(_result=result, **kwargs)

    def _get_result(self) -> Any:
        """This can be overridden by subclasses to return the desired result."""
        return self._result

    @property
    def result(self) -> Any:
        return self._get_result()

    @model_serializer(mode="wrap")
    def custom_model_dump(self, handler: Any) -> dict[str, Any]:
        data = handler(self)
        # include _result in serialization for base StopEvent
        if self._result is not None:
            data["result"] = self._result
        return data

    def __repr__(self) -> str:
        dict_items = {**self._data, **self.model_dump()}
        # Format as key=value pairs
        parts = [f"{k}={v!r}" for k, v in dict_items.items()]
        dict_str = ", ".join(parts)
        return f"{self.__class__.__name__}({dict_str})"

    def __str__(self) -> str:
        return str(self._result)