跳转到内容

工作流程

工作流程 #

事件驱动的编排器,用于通过类型化步骤定义和运行应用流程。

一个 Workflow@step 装饰的可调用对象组成,这些对象接收并发出 类型化的 事件。步骤可以声明为实例方法, 或通过装饰器注册的自由函数。

主要特性: - 运行前验证步骤签名和事件图 - 类型化的开始/停止事件 - 中间事件流式传输 - 可选的人工介入事件 - 每个步骤的重试策略 - 资源注入

示例:

基本用法:

from workflows import Workflow, step
from workflows.events import StartEvent, StopEvent

class MyFlow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> StopEvent:
        return StopEvent(result="done")

result = await MyFlow(timeout=60).run(topic="Pirates")

自定义开始/停止事件和流式处理:

handler = MyFlow().run()
async for ev in handler.stream_events():
    ...
result = await handler
相关链接
workflows/workflow.py中的源代码
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
class Workflow(metaclass=WorkflowMeta):
    """
    Event-driven orchestrator to define and run application flows using typed steps.

    A `Workflow` is composed of `@step`-decorated callables that accept and emit
    typed [Event][workflows.events.Event]s. Steps can be declared as instance
    methods or as free functions registered via the decorator.

    Key features:
    - Validation of step signatures and event graph before running
    - Typed start/stop events
    - Streaming of intermediate events
    - Optional human-in-the-loop events
    - Retry policies per step
    - Resource injection

    Examples:
        Basic usage:

        ```python
        from workflows import Workflow, step
        from workflows.events import StartEvent, StopEvent

        class MyFlow(Workflow):
            @step
            async def start(self, ev: StartEvent) -> StopEvent:
                return StopEvent(result="done")

        result = await MyFlow(timeout=60).run(topic="Pirates")
        ```

        Custom start/stop events and streaming:

        ```python
        handler = MyFlow().run()
        async for ev in handler.stream_events():
            ...
        result = await handler
        ```

    See Also:
        - [step][workflows.decorators.step]
        - [Event][workflows.events.Event]
        - [Context][workflows.context.context.Context]
        - [WorkflowHandler][workflows.handler.WorkflowHandler]
        - [RetryPolicy][workflows.retry_policy.RetryPolicy]
    """

    # Populated by the metaclass; declared here for type checkers.
    _step_functions: dict[str, StepFunction]

    def __init__(
        self,
        timeout: float | None = 45.0,
        disable_validation: bool = False,
        verbose: bool = False,
        resource_manager: ResourceManager | None = None,
        num_concurrent_runs: int | None = None,
    ) -> None:
        """
        Initialize a workflow instance.

        Args:
            timeout (float | None): Max seconds to wait for completion. `None`
                disables the timeout.
            disable_validation (bool): Skip pre-run validation of the event graph
                (not recommended).
            verbose (bool): If True, print step activity.
            resource_manager (ResourceManager | None): Custom resource manager
                for dependency injection.
            num_concurrent_runs (int | None): Limit on concurrent `run()` calls.
        """
        # Configuration
        self._timeout = timeout
        self._verbose = verbose
        self._disable_validation = disable_validation
        self._num_concurrent_runs = num_concurrent_runs
        # Detect StartEvent issues before StopEvent for clearer guidance
        self._start_event_class = self._ensure_start_event_class()
        self._stop_event_class = self._ensure_stop_event_class()
        self._events = self._ensure_events_collected()
        self._sem = (
            asyncio.Semaphore(num_concurrent_runs) if num_concurrent_runs else None
        )
        # Resource management
        self._resource_manager = resource_manager or ResourceManager()
        # Instrumentation
        self._dispatcher = dispatcher

    def _ensure_start_event_class(self) -> type[StartEvent]:
        """
        Returns the StartEvent type used in this workflow.

        It works by inspecting the events received by the step methods.
        """
        start_events_found: set[type[StartEvent]] = set()
        for step_func in self._get_steps().values():
            step_config: StepConfig = step_func._step_config
            for event_type in step_config.accepted_events:
                if issubclass(event_type, StartEvent):
                    start_events_found.add(event_type)

        num_found = len(start_events_found)
        if num_found == 0:
            cls_name = self.__class__.__name__
            msg = (
                "At least one Event of type StartEvent must be received by any step. "
                f"(Workflow '{cls_name}' has no @step that accepts StartEvent.)"
            )
            raise WorkflowConfigurationError(msg)
        elif num_found > 1:
            cls_name = self.__class__.__name__
            msg = (
                f"Only one type of StartEvent is allowed per workflow, found {num_found}: {start_events_found} "
                f"in workflow '{cls_name}'."
            )
            raise WorkflowConfigurationError(msg)
        else:
            return start_events_found.pop()

    @property
    def start_event_class(self) -> type[StartEvent]:
        """The `StartEvent` subclass accepted by this workflow.

        Determined by inspecting step input types.
        """
        return self._start_event_class

    @property
    def events(self) -> list[type[Event]]:
        """Returns all known events emitted by this workflow.

        Determined by inspecting step input/output types.
        """
        return self._events

    def _ensure_events_collected(self) -> list[type[Event]]:
        """Returns all known events emitted by this workflow.

        Determined by inspecting step input/output types.
        """
        events_found: set[type[Event]] = set()
        for step_func in self._get_steps().values():
            step_config: StepConfig = step_func._step_config

            # Do not collect events from the done step
            if step_func.__name__ == "_done":
                continue

            for event_type in step_config.return_types:
                if issubclass(event_type, Event):
                    events_found.add(event_type)
            for event_type in step_config.accepted_events:
                if issubclass(event_type, Event):
                    events_found.add(event_type)

        return list(events_found)

    def _ensure_stop_event_class(self) -> type[RunResultT]:
        """
        Returns the StopEvent type used in this workflow.

        It works by inspecting the events returned.
        """
        stop_events_found: set[type[StopEvent]] = set()
        for step_func in self._get_steps().values():
            step_config: StepConfig = step_func._step_config
            for event_type in step_config.return_types:
                if issubclass(event_type, StopEvent):
                    stop_events_found.add(event_type)

        num_found = len(stop_events_found)
        if num_found == 0:
            cls_name = self.__class__.__name__
            msg = (
                "At least one Event of type StopEvent must be returned by any step. "
                f"(Workflow '{cls_name}' has no @step that returns StopEvent.)"
            )
            raise WorkflowConfigurationError(msg)
        elif num_found > 1:
            cls_name = self.__class__.__name__
            msg = (
                f"Only one type of StopEvent is allowed per workflow, found {num_found}: {stop_events_found} "
                f"in workflow '{cls_name}'."
            )
            raise WorkflowConfigurationError(msg)
        else:
            return stop_events_found.pop()

    @property
    def stop_event_class(self) -> type[RunResultT]:
        """The `StopEvent` subclass produced by this workflow.

        Determined by inspecting step return annotations.
        """
        return self._stop_event_class

    @classmethod
    def add_step(cls, func: StepFunction) -> None:
        """
        Adds a free function as step for this workflow instance.

        It raises an exception if a step with the same name was already added to the workflow.
        """
        step_config: StepConfig | None = getattr(func, "_step_config", None)
        if not step_config:
            msg = f"Step function {func.__name__} is missing the `@step` decorator."
            raise WorkflowValidationError(msg)

        if func.__name__ in {**get_steps_from_class(cls), **cls._step_functions}:
            msg = f"A step {func.__name__} is already part of this workflow, please choose another name."
            raise WorkflowValidationError(msg)

        cls._step_functions[func.__name__] = func

    def _get_steps(self) -> dict[str, StepFunction]:
        """Returns all the steps, whether defined as methods or free functions."""
        return {**get_steps_from_instance(self), **self.__class__._step_functions}

    def _get_start_event_instance(
        self, start_event: StartEvent | None, **kwargs: Any
    ) -> StartEvent:
        if start_event is not None:
            # start_event was used wrong
            if not isinstance(start_event, StartEvent):
                msg = "The 'start_event' argument must be an instance of 'StartEvent'."
                raise ValueError(msg)

            # start_event is ok but point out that additional kwargs will be ignored in this case
            if kwargs:
                msg = (
                    "Keyword arguments are not supported when 'run()' is invoked with the 'start_event' parameter."
                    f" These keyword arguments will be ignored: {kwargs}"
                )
                logger.warning(msg)
            return start_event

        # Old style start event creation, with kwargs used to create an instance of self._start_event_class
        try:
            return self._start_event_class(**kwargs)
        except ValidationError as e:
            ev_name = self._start_event_class.__name__
            msg = f"Failed creating a start event of type '{ev_name}' with the keyword arguments: {kwargs}"
            logger.debug(e)
            raise WorkflowRuntimeError(msg)

    @dispatcher.span
    def run(
        self,
        ctx: Context | None = None,
        start_event: StartEvent | None = None,
        **kwargs: Any,
    ) -> WorkflowHandler:
        """Run the workflow and return a handler for results and streaming.

        This schedules the workflow execution in the background and returns a
        [WorkflowHandler][workflows.handler.WorkflowHandler] that can be awaited
        for the final result or used to stream intermediate events.

        You may pass either a concrete `start_event` instance or keyword
        arguments that will be used to construct the inferred
        [StartEvent][workflows.events.StartEvent] subclass.

        Args:
            ctx (Context | None): Optional context to resume or share state
                across runs. If omitted, a fresh context is created.
            start_event (StartEvent | None): Optional explicit start event.
            **kwargs (Any): Keyword args to initialize the start event when
                `start_event` is not provided.

        Returns:
            WorkflowHandler: A future-like object to await the final result and
            stream events.

        Raises:
            WorkflowValidationError: If validation fails and validation is
                enabled.
            WorkflowRuntimeError: If the start event cannot be created from kwargs.
            WorkflowTimeoutError: If execution exceeds the configured timeout.

        Examples:
            ```python
            # Create and run with kwargs
            handler = MyFlow().run(topic="Pirates")

            # Stream events
            async for ev in handler.stream_events():
                ...

            # Await final result
            result = await handler
            ```

            If you subclassed the start event, you can also directly pass it in:

            ```python
            result = await my_workflow.run(start_event=MyStartEvent(topic="Pirates"))
            ```
        """
        from workflows.context import Context

        # Validate the workflow
        self._validate()

        # If a previous context is provided, pass its serialized form
        ctx = ctx if ctx is not None else Context(self)
        start_event_instance: StartEvent | None = (
            None
            if ctx.is_running
            else self._get_start_event_instance(start_event, **kwargs)
        )
        return ctx._workflow_run(
            workflow=self, start_event=start_event_instance, semaphore=self._sem
        )

    def validate(self) -> bool:
        """
        Validate the workflow to ensure it's well-formed.

        Returns True if the workflow uses human-in-the-loop, False otherwise.
        """
        return self._validate()

    def _validate(self) -> bool:
        if self._disable_validation:
            return False

        # Ensure at least one step is configured before inspecting events
        if not self._get_steps():
            cls_name = self.__class__.__name__
            msg = (
                f"Workflow '{cls_name}' has no configured steps. "
                "Did you forget to annotate methods with @step or to register "
                "free-function steps via @step(workflow=...)?"
            )
            raise WorkflowConfigurationError(msg)

        # Recompute StartEvent and StopEvent classes here to support dynamic changes
        # and to surface StartEvent errors before StopEvent during validation.
        self._start_event_class = self._ensure_start_event_class()
        self._stop_event_class = self._ensure_stop_event_class()

        produced_events: set[type] = {self._start_event_class}
        consumed_events: set[type] = set()

        # Collect steps that incorrectly accept StopEvent
        steps_accepting_stop_event: list[str] = []

        for name, step_func in self._get_steps().items():
            step_config: StepConfig = step_func._step_config

            # Check that no user-defined step accepts StopEvent (only _done step should)
            if name != "_done":
                for event_type in step_config.accepted_events:
                    if issubclass(event_type, StopEvent):
                        steps_accepting_stop_event.append(name)
                        break

            for event_type in step_config.accepted_events:
                consumed_events.add(event_type)

            for event_type in step_config.return_types:
                if event_type is type(None):
                    # some events may not trigger other events
                    continue

                produced_events.add(event_type)

        # Raise error if any steps incorrectly accept StopEvent
        if steps_accepting_stop_event:
            step_names = "', '".join(steps_accepting_stop_event)
            plural = "" if len(steps_accepting_stop_event) == 1 else "s"
            msg = f"Step{plural} '{step_names}' cannot accept StopEvent. StopEvent signals the end of the workflow. Use a different Event type instead."
            raise WorkflowValidationError(msg)

        # Check if no StopEvent is produced
        stop_ok = False
        for ev in produced_events:
            if issubclass(ev, StopEvent):
                stop_ok = True
                break
        if not stop_ok:
            msg = "No event of type StopEvent is produced."
            raise WorkflowValidationError(msg)

        # Check if all consumed events are produced (except specific built-in events)
        unconsumed_events = consumed_events - produced_events
        unconsumed_events = {
            x
            for x in unconsumed_events
            if not issubclass(x, (InputRequiredEvent, HumanResponseEvent, StopEvent))
        }
        if unconsumed_events:
            names = ", ".join(ev.__name__ for ev in unconsumed_events)
            raise WorkflowValidationError(
                f"The following events are consumed but never produced: {names}"
            )

        # Check if there are any unused produced events (except specific built-in events)
        unused_events = produced_events - consumed_events
        unused_events = {
            x
            for x in unused_events
            if not issubclass(
                x, (InputRequiredEvent, HumanResponseEvent, self._stop_event_class)
            )
        }
        if unused_events:
            names = ", ".join(ev.__name__ for ev in unused_events)
            raise WorkflowValidationError(
                f"The following events are produced but never consumed: {names}"
            )

        # Check if the workflow uses human-in-the-loop
        return (
            InputRequiredEvent in produced_events
            or HumanResponseEvent in consumed_events
        )

start_event_class property #

start_event_class: type[StartEvent]

此工作流接受的 StartEvent 子类。

通过检查步骤输入类型确定。

事件 property #

events: list[type[事件]]

返回此工作流发出的所有已知事件。

通过检查步骤输入/输出类型确定。

stop_event_class property #

stop_event_class: type[RunResultT]

此工作流程生成的 StopEvent 子类。

通过检查步骤返回注释确定。

__init__ #

__init__(timeout: float | None = 45.0, disable_validation: bool = False, verbose: bool = False, resource_manager: ResourceManager | None = None, num_concurrent_runs: int | None = None) -> None

参数:

名称 类型 描述 默认
timeout float | None

等待完成的最大秒数。None表示禁用超时。

45.0
disable_validation bool

跳过事件图的预运行验证 (不推荐)。

False
verbose bool

如果为真,则打印步骤活动。

False
resource_manager ResourceManager | None

用于依赖注入的自定义资源管理器。

None
num_concurrent_runs int | None

并发 run() 调用的限制。

None
workflows/workflow.py中的源代码
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(
    self,
    timeout: float | None = 45.0,
    disable_validation: bool = False,
    verbose: bool = False,
    resource_manager: ResourceManager | None = None,
    num_concurrent_runs: int | None = None,
) -> None:
    """
    Initialize a workflow instance.

    Args:
        timeout (float | None): Max seconds to wait for completion. `None`
            disables the timeout.
        disable_validation (bool): Skip pre-run validation of the event graph
            (not recommended).
        verbose (bool): If True, print step activity.
        resource_manager (ResourceManager | None): Custom resource manager
            for dependency injection.
        num_concurrent_runs (int | None): Limit on concurrent `run()` calls.
    """
    # Configuration
    self._timeout = timeout
    self._verbose = verbose
    self._disable_validation = disable_validation
    self._num_concurrent_runs = num_concurrent_runs
    # Detect StartEvent issues before StopEvent for clearer guidance
    self._start_event_class = self._ensure_start_event_class()
    self._stop_event_class = self._ensure_stop_event_class()
    self._events = self._ensure_events_collected()
    self._sem = (
        asyncio.Semaphore(num_concurrent_runs) if num_concurrent_runs else None
    )
    # Resource management
    self._resource_manager = resource_manager or ResourceManager()
    # Instrumentation
    self._dispatcher = dispatcher

add_step classmethod #

add_step(func: StepFunction) -> None

为此工作流实例添加一个自由函数作为步骤。

如果工作流中已存在同名步骤,则会引发异常。

workflows/workflow.py中的源代码
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@classmethod
def add_step(cls, func: StepFunction) -> None:
    """
    Adds a free function as step for this workflow instance.

    It raises an exception if a step with the same name was already added to the workflow.
    """
    step_config: StepConfig | None = getattr(func, "_step_config", None)
    if not step_config:
        msg = f"Step function {func.__name__} is missing the `@step` decorator."
        raise WorkflowValidationError(msg)

    if func.__name__ in {**get_steps_from_class(cls), **cls._step_functions}:
        msg = f"A step {func.__name__} is already part of this workflow, please choose another name."
        raise WorkflowValidationError(msg)

    cls._step_functions[func.__name__] = func

运行 #

run(ctx: 上下文 | None = None, start_event: StartEvent | None = None, **kwargs: Any) -> WorkflowHandler

运行工作流并返回结果和流处理的句柄。

这会在后台调度工作流执行,并返回一个可等待最终结果或用于流式传输中间事件的WorkflowHandler

您可以传递一个具体的 start_event 实例,或者用于构造推断出的 起始事件 子类的关键字参数。

参数:

名称 类型 描述 默认
ctx 上下文 | None

用于在多次运行之间恢复或共享状态的可选上下文。如果省略,将创建一个新的上下文。

None
start_event StartEvent | None

可选的显式起始事件。

None
**kwargs Any

当未提供 start_event 时,用于初始化起始事件的关键字参数。

{}

返回:

名称 类型 描述
WorkflowHandler WorkflowHandler

一个类似未来的对象,用于等待最终结果和

WorkflowHandler

流式事件。

引发:

类型 描述
WorkflowValidationError

如果验证失败且验证功能已启用。

WorkflowRuntimeError

如果无法从关键字参数创建起始事件。

WorkflowTimeoutError

如果执行超过配置的超时时间。

示例:

# Create and run with kwargs
handler = MyFlow().run(topic="Pirates")

# Stream events
async for ev in handler.stream_events():
    ...

# Await final result
result = await handler

如果你子类化了启动事件,也可以直接传入:

result = await my_workflow.run(start_event=MyStartEvent(topic="Pirates"))
workflows/workflow.py中的源代码
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
@dispatcher.span
def run(
    self,
    ctx: Context | None = None,
    start_event: StartEvent | None = None,
    **kwargs: Any,
) -> WorkflowHandler:
    """Run the workflow and return a handler for results and streaming.

    This schedules the workflow execution in the background and returns a
    [WorkflowHandler][workflows.handler.WorkflowHandler] that can be awaited
    for the final result or used to stream intermediate events.

    You may pass either a concrete `start_event` instance or keyword
    arguments that will be used to construct the inferred
    [StartEvent][workflows.events.StartEvent] subclass.

    Args:
        ctx (Context | None): Optional context to resume or share state
            across runs. If omitted, a fresh context is created.
        start_event (StartEvent | None): Optional explicit start event.
        **kwargs (Any): Keyword args to initialize the start event when
            `start_event` is not provided.

    Returns:
        WorkflowHandler: A future-like object to await the final result and
        stream events.

    Raises:
        WorkflowValidationError: If validation fails and validation is
            enabled.
        WorkflowRuntimeError: If the start event cannot be created from kwargs.
        WorkflowTimeoutError: If execution exceeds the configured timeout.

    Examples:
        ```python
        # Create and run with kwargs
        handler = MyFlow().run(topic="Pirates")

        # Stream events
        async for ev in handler.stream_events():
            ...

        # Await final result
        result = await handler
        ```

        If you subclassed the start event, you can also directly pass it in:

        ```python
        result = await my_workflow.run(start_event=MyStartEvent(topic="Pirates"))
        ```
    """
    from workflows.context import Context

    # Validate the workflow
    self._validate()

    # If a previous context is provided, pass its serialized form
    ctx = ctx if ctx is not None else Context(self)
    start_event_instance: StartEvent | None = (
        None
        if ctx.is_running
        else self._get_start_event_instance(start_event, **kwargs)
    )
    return ctx._workflow_run(
        workflow=self, start_event=start_event_instance, semaphore=self._sem
    )

验证 #

validate() -> bool

验证工作流以确保其格式正确。

如果工作流使用人在回路中则返回 True,否则返回 False。

workflows/workflow.py中的源代码
362
363
364
365
366
367
368
def validate(self) -> bool:
    """
    Validate the workflow to ensure it's well-formed.

    Returns True if the workflow uses human-in-the-loop, False otherwise.
    """
    return self._validate()