跳转到内容

处理器

工作流处理器 #

基类:Future[RunResultT]

处理运行中的工作流:等待结果、流式事件、访问上下文或取消。

实例由 Workflow.run 返回。 它们可以被等待以获取最终结果,并通过 stream_events 支持流式传输中间事件。

相关链接
workflows/handler.py中的源代码
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class WorkflowHandler(asyncio.Future[RunResultT]):
    """
    Handle a running workflow: await results, stream events, access context, or cancel.

    Instances are returned by [Workflow.run][workflows.workflow.Workflow.run].
    They can be awaited for the final result and support streaming intermediate
    events via [stream_events][workflows.handler.WorkflowHandler.stream_events].

    See Also:
        - [Context][workflows.context.context.Context]
        - [StopEvent][workflows.events.StopEvent]
    """

    _ctx: Context
    _run_task: asyncio.Task[None] | None
    _all_events_consumed: bool
    _stop_event: StopEvent | None

    def __init__(
        self,
        *args: Any,
        ctx: Context,
        run_id: str | None = None,
        run_task: asyncio.Task[None] | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.run_id = run_id
        self._ctx = ctx
        self._run_task = run_task
        self._all_events_consumed = False

    @property
    def ctx(self) -> Context:
        """The workflow [Context][workflows.context.context.Context] for this run."""
        return self._ctx

    def get_stop_event(self) -> StopEvent | None:
        """The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
        return self._stop_event

    async def stop_event_result(self) -> StopEvent:
        """Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
        await self.result()
        assert self._stop_event is not None, (
            "Stop event must be defined once the future is done."
        )
        return self._stop_event

    def _set_stop_event(self, stop_event: StopEvent) -> None:
        self._stop_event = stop_event
        # sad but necessary legacy behavior:
        # set the result to the stop event result. To be removed in a future major release,
        # and justuse the stop event directly.
        self.set_result(
            stop_event.result if type(stop_event) is StopEvent else stop_event
        )

    def __str__(self) -> str:
        return str(self.result())

    def is_done(self) -> bool:
        """Return True when the workflow has completed."""
        return self.done()

    async def stream_events(
        self, expose_internal: bool = False
    ) -> AsyncGenerator[Event, None]:
        """
        Stream events from the workflow execution as they occur.

        This method provides real-time access to events generated during workflow
        execution, allowing for monitoring and processing of intermediate results.
        Events are yielded in the order they are generated by the workflow.

        The stream includes all events written to the context's streaming queue,
        and terminates when a [StopEvent][workflows.events.StopEvent] is
        encountered, indicating the workflow has completed.

        Args:
            expose_internal (bool): Whether to expose internal events.

        Returns:
            AsyncGenerator[Event, None]: An async generator that yields Event objects
                as they are produced by the workflow.

        Raises:
            ValueError: If the context is not set on the handler.
            WorkflowRuntimeError: If all events have already been consumed by a
                previous call to `stream_events()` on the same handler instance.

        Examples:
            ```python
            handler = workflow.run()

            # Stream and process events in real-time
            async for event in handler.stream_events():
                if isinstance(event, StopEvent):
                    print(f"Workflow completed with result: {event.result}")
                else:
                    print(f"Received event: {event}")

            # Get final result
            result = await handler
            ```

        Note:
            Events can only be streamed once per handler instance. Subsequent
            calls to `stream_events()` will raise a WorkflowRuntimeError.
        """

        # Check if we already consumed all the streamed events
        if self._all_events_consumed:
            msg = "All the streamed events have already been consumed."
            raise WorkflowRuntimeError(msg)

        async for ev in self.ctx.stream_events():
            if isinstance(ev, InternalDispatchEvent) and not expose_internal:
                continue
            yield ev

            if isinstance(ev, StopEvent):
                self._all_events_consumed = True
                break

    async def cancel_run(self) -> None:
        """Cancel the running workflow.

        Signals the underlying context to raise
        [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
        which will be caught by the workflow and gracefully end the run.

        Examples:
            ```python
            handler = workflow.run()
            await handler.cancel_run()
            ```
        """
        if self.ctx:
            self.ctx._workflow_cancel_run()
            if self._run_task is not None:
                try:
                    await self._run_task
                except Exception:
                    pass

上下文 property #

ctx: 上下文

本次运行的上下文工作流。

get_stop_event #

get_stop_event() -> StopEvent | None

此运行的停止事件。一旦 future 完成即被定义。在未来的主要版本中,此项将被移除,结果将直接是停止事件本身。

workflows/handler.py中的源代码
54
55
56
def get_stop_event(self) -> StopEvent | None:
    """The stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
    return self._stop_event

stop_event_result async #

stop_event_result() -> StopEvent

获取此运行的停止事件。一旦 future 完成,此值始终会被定义。在未来的主要版本中,此功能将被移除,结果将直接是停止事件本身。

workflows/handler.py 中的源代码
58
59
60
61
62
63
64
async def stop_event_result(self) -> StopEvent:
    """Get the stop event for this run. Always defined once the future is done. In a future major release, this will be removed, and the result will be the stop event itself."""
    await self.result()
    assert self._stop_event is not None, (
        "Stop event must be defined once the future is done."
    )
    return self._stop_event

is_done #

is_done() -> bool

当工作流完成时返回 True。

workflows/handler.py中的源代码
78
79
80
def is_done(self) -> bool:
    """Return True when the workflow has completed."""
    return self.done()

stream_events async #

stream_events(expose_internal: bool = False) -> AsyncGenerator[事件, None]

在工作流执行过程中实时流式传输事件。

此方法提供对工作流执行期间生成事件的实时访问,允许监控和处理中间结果。 事件按照工作流生成的顺序依次产生。

该流包含写入上下文流队列的所有事件,并在遇到停止事件时终止,表示工作流已完成。

参数:

名称 类型 描述 默认
expose_internal bool

是否公开内部事件。

False

返回:

类型 描述
AsyncGenerator[事件, None]

AsyncGenerator[Event, None]: 一个异步生成器,在工作流生成事件对象时产生这些对象。

引发:

类型 描述
ValueError

如果未在处理器上设置上下文。

WorkflowRuntimeError

如果所有事件已被同一处理器实例上的先前调用 stream_events() 所消耗。

示例:

handler = workflow.run()

# Stream and process events in real-time
async for event in handler.stream_events():
    if isinstance(event, StopEvent):
        print(f"Workflow completed with result: {event.result}")
    else:
        print(f"Received event: {event}")

# Get final result
result = await handler
备注

每个处理器实例只能流式传输事件一次。后续对 stream_events() 的调用将引发 WorkflowRuntimeError。

workflows/handler.py中的源代码
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def stream_events(
    self, expose_internal: bool = False
) -> AsyncGenerator[Event, None]:
    """
    Stream events from the workflow execution as they occur.

    This method provides real-time access to events generated during workflow
    execution, allowing for monitoring and processing of intermediate results.
    Events are yielded in the order they are generated by the workflow.

    The stream includes all events written to the context's streaming queue,
    and terminates when a [StopEvent][workflows.events.StopEvent] is
    encountered, indicating the workflow has completed.

    Args:
        expose_internal (bool): Whether to expose internal events.

    Returns:
        AsyncGenerator[Event, None]: An async generator that yields Event objects
            as they are produced by the workflow.

    Raises:
        ValueError: If the context is not set on the handler.
        WorkflowRuntimeError: If all events have already been consumed by a
            previous call to `stream_events()` on the same handler instance.

    Examples:
        ```python
        handler = workflow.run()

        # Stream and process events in real-time
        async for event in handler.stream_events():
            if isinstance(event, StopEvent):
                print(f"Workflow completed with result: {event.result}")
            else:
                print(f"Received event: {event}")

        # Get final result
        result = await handler
        ```

    Note:
        Events can only be streamed once per handler instance. Subsequent
        calls to `stream_events()` will raise a WorkflowRuntimeError.
    """

    # Check if we already consumed all the streamed events
    if self._all_events_consumed:
        msg = "All the streamed events have already been consumed."
        raise WorkflowRuntimeError(msg)

    async for ev in self.ctx.stream_events():
        if isinstance(ev, InternalDispatchEvent) and not expose_internal:
            continue
        yield ev

        if isinstance(ev, StopEvent):
            self._all_events_consumed = True
            break

cancel_run async #

cancel_run() -> None

取消正在运行的工作流。

向底层上下文发出信号以引发 用户取消工作流, 该异常将被工作流捕获并优雅地结束运行。

示例:

handler = workflow.run()
await handler.cancel_run()
workflows/handler.py中的源代码
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def cancel_run(self) -> None:
    """Cancel the running workflow.

    Signals the underlying context to raise
    [WorkflowCancelledByUser][workflows.errors.WorkflowCancelledByUser],
    which will be caught by the workflow and gracefully end the run.

    Examples:
        ```python
        handler = workflow.run()
        await handler.cancel_run()
        ```
    """
    if self.ctx:
        self.ctx._workflow_cancel_run()
        if self._run_task is not None:
            try:
                await self._run_task
            except Exception:
                pass