跳转到内容

上下文

上下文 #

基类:Generic[MODEL_T]

Workflow 提供的全局、每次运行的上下文。为外部(工作流运行观察者)和内部工作流步骤消费提供底层代理运行的接口。

Context 负责协调步骤间的事件传递,跟踪进行中的工作, 暴露全局状态存储,并提供流处理和同步的实用工具。它由运行时环境中的 Workflow 创建, 并可被持久化存储和恢复。

参数:

名称 类型 描述 默认
workflow 工作流

所属工作流实例。用于推断步骤配置和检测。

必填
previous_context dict[str, Any] | None

用于恢复的先前上下文快照。

None
serializer BaseSerializer | None

用于序列化和反序列化当前及先前上下文快照的序列化器。

None

属性:

名称 类型 描述
is_running bool

工作流当前是否正在运行。

存储 InMemoryStateStore[MODEL_T]

跨步骤共享的类型安全异步状态存储。另请参阅 InMemoryStateStore

示例:

在步骤内的基本用法:

from workflows import step
from workflows.events import StartEvent, StopEvent

@step
async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
    await ctx.store.set("query", ev.topic)
    ctx.write_event_to_stream(ev)  # surface progress to UI
    return StopEvent(result="ok")

跨运行持久化工作流状态:

from workflows import Context

# Create a context and run the workflow with the same context
ctx = Context(my_workflow)
result_1 = await my_workflow.run(..., ctx=ctx)
result_2 = await my_workflow.run(..., ctx=ctx)

# Serialize the context and restore it
ctx_dict = ctx.to_dict()
restored_ctx = Context.from_dict(my_workflow, ctx_dict)
result_3 = await my_workflow.run(..., ctx=restored_ctx)
相关链接
workflows/context/context.py中的源代码
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
class Context(Generic[MODEL_T]):
    """
    Global, per-run context for a `Workflow`. Provides an interface into the
    underlying broker run, for both external (workflow run oberservers) and
    internal consumption by workflow steps.

    The `Context` coordinates event delivery between steps, tracks in-flight work,
    exposes a global state store, and provides utilities for streaming and
    synchronization. It is created by a `Workflow` at run time and can be
    persisted and restored.

    Args:
        workflow (Workflow): The owning workflow instance. Used to infer
            step configuration and instrumentation.
        previous_context: A previous context snapshot to resume from.
        serializer: A serializer to use for serializing and deserializing the current and previous context snapshots.

    Attributes:
        is_running (bool): Whether the workflow is currently running.
        store (InMemoryStateStore[MODEL_T]): Type-safe, async state store shared
            across steps. See also
            [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore].

    Examples:
        Basic usage inside a step:

        ```python
        from workflows import step
        from workflows.events import StartEvent, StopEvent

        @step
        async def start(self, ctx: Context, ev: StartEvent) -> StopEvent:
            await ctx.store.set("query", ev.topic)
            ctx.write_event_to_stream(ev)  # surface progress to UI
            return StopEvent(result="ok")
        ```

        Persisting the state of a workflow across runs:

        ```python
        from workflows import Context

        # Create a context and run the workflow with the same context
        ctx = Context(my_workflow)
        result_1 = await my_workflow.run(..., ctx=ctx)
        result_2 = await my_workflow.run(..., ctx=ctx)

        # Serialize the context and restore it
        ctx_dict = ctx.to_dict()
        restored_ctx = Context.from_dict(my_workflow, ctx_dict)
        result_3 = await my_workflow.run(..., ctx=restored_ctx)
        ```


    See Also:
        - [Workflow][workflows.Workflow]
        - [Event][workflows.events.Event]
        - [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore]
    """

    # These keys are set by pre-built workflows and
    # are known to be unserializable in some cases.
    known_unserializable_keys = ("memory",)

    # Backing state store; serialized as `state`
    _state_store: InMemoryStateStore[MODEL_T]
    _broker_run: WorkflowBroker[MODEL_T] | None
    _plugin: Plugin
    _workflow: Workflow

    def __init__(
        self,
        workflow: Workflow,
        previous_context: dict[str, Any] | None = None,
        serializer: BaseSerializer | None = None,
        plugin: Plugin = basic_runtime,
    ) -> None:
        self._serializer = serializer or JsonSerializer()
        self._broker_run = None
        self._plugin = plugin
        self._workflow = workflow

        # parse the serialized context
        serializer = serializer or JsonSerializer()
        if previous_context is not None:
            try:
                # Auto-detect and convert V0 to V1 if needed
                previous_context_parsed = SerializedContext.from_dict_auto(
                    previous_context
                )
                # validate it fully parses synchronously to avoid delayed validation errors
                BrokerState.from_serialized(
                    previous_context_parsed, workflow, serializer
                )
            except ValidationError as e:
                raise ContextSerdeError(
                    f"Context dict specified in an invalid format: {e}"
                ) from e
        else:
            previous_context_parsed = SerializedContext()

        self._init_snapshot = previous_context_parsed

        # initialization of the state store is a bit complex, due to inferring and validating its type from the
        # provided workflow context args

        state_types: set[Type[BaseModel]] = set()
        for _, step_func in workflow._get_steps().items():
            step_config: StepConfig = step_func._step_config
            if (
                step_config.context_state_type is not None
                and step_config.context_state_type != DictState
                and issubclass(step_config.context_state_type, BaseModel)
            ):
                state_type = step_config.context_state_type
                state_types.add(state_type)

        if len(state_types) > 1:
            raise ValueError(
                "Multiple state types are not supported. Make sure that each Context[...] has the same generic state type. Found: "
                + ", ".join([state_type.__name__ for state_type in state_types])
            )
        state_type = state_types.pop() if state_types else DictState
        if previous_context_parsed.state:
            # perhaps offer a way to clear on invalid
            store_state = InMemoryStateStore.from_dict(
                previous_context_parsed.state, serializer
            )
            if store_state.state_type != state_type:
                raise ValueError(
                    f"State type mismatch. Workflow context expected {state_type.__name__}, got {store_state.state_type.__name__}"
                )
            self._state_store = cast(InMemoryStateStore[MODEL_T], store_state)
        else:
            try:
                state_instance = cast(MODEL_T, state_type())
                self._state_store = InMemoryStateStore(state_instance)
            except Exception as e:
                raise WorkflowRuntimeError(
                    f"Failed to initialize state of type {state_type}. Does your state define defaults for all fields? Original error:\n{e}"
                ) from e

    @property
    def is_running(self) -> bool:
        """Whether the workflow is currently running."""
        if self._broker_run is None:
            return self._init_snapshot.is_running
        else:
            return self._broker_run.is_running

    def _init_broker(
        self, workflow: Workflow, plugin: WorkflowRuntime | None = None
    ) -> WorkflowBroker[MODEL_T]:
        if self._broker_run is not None:
            raise WorkflowRuntimeError("Broker already initialized")
        # Initialize a runtime plugin (asyncio-based by default)
        runtime: WorkflowRuntime = plugin or self._plugin.new_runtime(str(uuid.uuid4()))
        # Initialize the new broker implementation (broker2)
        broker: WorkflowBroker[MODEL_T] = WorkflowBroker(
            workflow=workflow,
            context=cast("Context[MODEL_T]", self),
            runtime=runtime,
            plugin=self._plugin,
        )
        self._broker_run = broker
        return broker

    def _workflow_run(
        self,
        workflow: Workflow,
        start_event: StartEvent | None = None,
        semaphore: asyncio.Semaphore | None = None,
    ) -> WorkflowHandler:
        """
        called by package internally from the workflow to run it
        """
        prev_broker: WorkflowBroker[MODEL_T] | None = None
        if self._broker_run is not None:
            prev_broker = self._broker_run
            self._broker_run = None

        self._broker_run = self._init_broker(workflow)

        async def before_start() -> None:
            if prev_broker is not None:
                try:
                    await prev_broker.shutdown()
                except Exception:
                    pass
            if semaphore is not None:
                await semaphore.acquire()

        async def after_complete() -> None:
            if semaphore is not None:
                semaphore.release()

        state = BrokerState.from_serialized(
            self._init_snapshot, workflow, self._serializer
        )
        return self._broker_run.start(
            workflow=workflow,
            previous=state,
            start_event=start_event,
            before_start=before_start,
            after_complete=after_complete,
        )

    def _workflow_cancel_run(self) -> None:
        """
        Called internally from the handler to cancel a context's run
        """
        self._running_broker.cancel_run()

    @property
    def _running_broker(self) -> WorkflowBroker[MODEL_T]:
        if self._broker_run is None:
            raise WorkflowRuntimeError(
                "Workflow run is not yet running. Make sure to only call this method after the context has been passed to a workflow.run call."
            )
        return self._broker_run

    @property
    def store(self) -> InMemoryStateStore[MODEL_T]:
        """Typed, process-local state store shared across steps.

        If no state was initialized yet, a default
        [DictState][workflows.context.state_store.DictState] store is created.

        Returns:
            InMemoryStateStore[MODEL_T]: The state store instance.
        """
        return self._state_store

    def to_dict(self, serializer: BaseSerializer | None = None) -> dict[str, Any]:
        """Serialize the context to a JSON-serializable dict.

        Persists the global state store, event queues, buffers, accepted events,
        broker log, and running flag. This payload can be fed to
        [from_dict][workflows.context.context.Context.from_dict] to resume a run
        or carry state across runs.

        Args:
            serializer (BaseSerializer | None): Value serializer used for state
                and event payloads. Defaults to
                [JsonSerializer][workflows.context.serializers.JsonSerializer].

        Returns:
            dict[str, Any]: A dict suitable for JSON encoding and later
            restoration via `from_dict`.

        See Also:
            - [InMemoryStateStore.to_dict][workflows.context.state_store.InMemoryStateStore.to_dict]

        Examples:
            ```python
            ctx_dict = ctx.to_dict()
            my_db.set("key", json.dumps(ctx_dict))

            ctx_dict = my_db.get("key")
            restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
            result = await my_workflow.run(..., ctx=restored_ctx)
            ```
        """
        serializer = serializer or self._serializer

        # Serialize state using the state manager's method
        state_data = {}
        if self._state_store is not None:
            state_data = self._state_store.to_dict(serializer)

        # Get the broker state - either from the running broker or from the init snapshot
        if self._broker_run is not None:
            broker_state = self._broker_run._state
        else:
            # Deserialize the init snapshot to get a BrokerState, then re-serialize it
            # This ensures we always output the current format
            broker_state = BrokerState.from_serialized(
                self._init_snapshot, self._workflow, serializer
            )

        context = broker_state.to_serialized(serializer)
        context.state = state_data
        # mode="python" to support pickling over json if one so chooses. This should perhaps be moved into the serializers
        return context.model_dump(mode="python")

    @classmethod
    def from_dict(
        cls,
        workflow: "Workflow",
        data: dict[str, Any],
        serializer: BaseSerializer | None = None,
    ) -> "Context[MODEL_T]":
        """Reconstruct a `Context` from a serialized payload.

        Args:
            workflow (Workflow): The workflow instance that will own this
                context.
            data (dict[str, Any]): Payload produced by
                [to_dict][workflows.context.context.Context.to_dict].
            serializer (BaseSerializer | None): Serializer used to decode state
                and events. Defaults to JSON.

        Returns:
            Context[MODEL_T]: A context instance initialized with the persisted
                state and queues.

        Raises:
            ContextSerdeError: If the payload is missing required fields or is
                in an incompatible format.

        Examples:
            ```python
            ctx_dict = ctx.to_dict()
            my_db.set("key", json.dumps(ctx_dict))

            ctx_dict = my_db.get("key")
            restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
            result = await my_workflow.run(..., ctx=restored_ctx)
            ```
        """
        try:
            return cls(workflow, previous_context=data, serializer=serializer)
        except KeyError as e:
            msg = "Error creating a Context instance: the provided payload has a wrong or old format."
            raise ContextSerdeError(msg) from e

    async def running_steps(self) -> list[str]:
        """Return the list of currently running step names.

        Returns:
            list[str]: Names of steps that have at least one active worker.
        """
        return await self._running_broker.running_steps()

    def collect_events(
        self, ev: Event, expected: list[Type[Event]], buffer_id: str | None = None
    ) -> list[Event] | None:
        """
        Buffer events until all expected types are available, then return them.

        This utility is helpful when a step can receive multiple event types
        and needs to proceed only when it has a full set. The returned list is
        ordered according to `expected`.

        Args:
            ev (Event): The incoming event to add to the buffer.
            expected (list[Type[Event]]): Event types to collect, in order.
            buffer_id (str | None): Optional stable key to isolate buffers across
                steps or workers. Defaults to an internal key derived from the
                task name or expected types.

        Returns:
            list[Event] | None: The events in the requested order when complete,
            otherwise `None`.

        Examples:
            ```python
            @step
            async def synthesize(
                self, ctx: Context, ev: QueryEvent | RetrieveEvent
            ) -> StopEvent | None:
                events = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
                if events is None:
                    return None
                query_ev, retrieve_ev = events
                # ... proceed with both inputs present ...
            ```

        See Also:
            - [Event][workflows.events.Event]
        """
        return self._running_broker.collect_events(ev, expected, buffer_id)

    def send_event(self, message: Event, step: str | None = None) -> None:
        """Dispatch an event to one or all workflow steps.

        If `step` is omitted, the event is broadcast to all step queues and
        non-matching steps will ignore it. When `step` is provided, the target
        step must accept the event type or a
        [WorkflowRuntimeError][workflows.errors.WorkflowRuntimeError] is raised.

        Args:
            message (Event): The event to enqueue.
            step (str | None): Optional step name to target.

        Raises:
            WorkflowRuntimeError: If the target step does not exist or does not
                accept the event type.

        Examples:
            It's common to use this method to fan-out events:

            ```python
            @step
            async def my_step(self, ctx: Context, ev: StartEvent) -> WorkerEvent | GatherEvent:
                for i in range(10):
                    ctx.send_event(WorkerEvent(msg=i))
                return GatherEvent()
            ```

            You also see this method used from the caller side to send events into the workflow:

            ```python
            handler = my_workflow.run(...)
            async for ev in handler.stream_events():
                if isinstance(ev, SomeEvent):
                    handler.ctx.send_event(SomeOtherEvent(msg="Hello!"))

            result = await handler
            ```
        """
        return self._running_broker.send_event(message, step)

    async def wait_for_event(
        self,
        event_type: Type[T],
        waiter_event: Event | None = None,
        waiter_id: str | None = None,
        requirements: dict[str, Any] | None = None,
        timeout: float | None = 2000,
    ) -> T:
        """Wait for the next matching event of type `event_type`.

        Optionally emits a `waiter_event` to the event stream once per `waiter_id` to
        inform callers that the workflow is waiting for external input.
        This helps to prevent duplicate waiter events from being sent to the event stream.

        Args:
            event_type (type[T]): Concrete event class to wait for.
            waiter_event (Event | None): Optional event to write to the stream
                once when the wait begins.
            waiter_id (str | None): Stable identifier to avoid emitting multiple
                waiter events for the same logical wait.
            requirements (dict[str, Any] | None): Key/value filters that must be
                satisfied by the event via `event.get(key) == value`.
            timeout (float | None): Max seconds to wait. `None` means no
                timeout. Defaults to 2000 seconds.

        Returns:
            T: The received event instance of the requested type.

        Raises:
            asyncio.TimeoutError: If the timeout elapses.

        Examples:
            ```python
            @step
            async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
                response = await ctx.wait_for_event(
                    HumanResponseEvent,
                    waiter_event=InputRequiredEvent(msg="What's your name?"),
                    waiter_id="user_name",
                    timeout=60,
                )
                return StopEvent(result=response.response)
            ```
        """
        return await self._running_broker.wait_for_event(
            event_type, waiter_event, waiter_id, requirements, timeout
        )

    def write_event_to_stream(self, ev: Event | None) -> None:
        """Enqueue an event for streaming to [WorkflowHandler]](workflows.handler.WorkflowHandler).

        Args:
            ev (Event | None): The event to stream. `None` can be used as a
                sentinel in some streaming modes.

        Examples:
            ```python
            @step
            async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
                ctx.write_event_to_stream(ev)
                return StopEvent(result="ok")
            ```
        """
        self._running_broker.write_event_to_stream(ev)

    def get_result(self) -> RunResultT:
        """Return the final result of the workflow run.

        Deprecated:
            This method is deprecated and will be removed in a future release.
            Prefer awaiting the handler returned by `Workflow.run`, e.g.:
            `result = await workflow.run(..., ctx=ctx)`.

        Examples:
            ```python
            # Preferred
            result = await my_workflow.run(..., ctx=ctx)

            # Deprecated
            result_agent = ctx.get_result()
            ```

        Returns:
            RunResultT: The value provided via a `StopEvent`.
        """
        _warn_get_result()
        if self._running_broker._handler is None:
            raise WorkflowRuntimeError("Workflow handler is not set")
        return self._running_broker._handler.result()

    def stream_events(self) -> AsyncGenerator[Event, None]:
        """The internal queue used for streaming events to callers."""
        return self._running_broker.stream_published_events()

    @property
    def streaming_queue(self) -> asyncio.Queue:
        """Deprecated queue-based event stream.

        Returns an asyncio.Queue that is populated by iterating this context's
        stream_events(). A deprecation warning is emitted once per process.
        """
        _warn_streaming_queue()
        q: asyncio.Queue[Event] = asyncio.Queue()

        async def _pump() -> None:
            async for ev in self.stream_events():
                await q.put(ev)
                if isinstance(ev, StopEvent):
                    break

        try:
            asyncio.create_task(_pump())
        except RuntimeError:
            loop = asyncio.get_event_loop()
            loop.create_task(_pump())
        return q

is_running property #

is_running: bool

工作流当前是否正在运行。

存储 property #

store: InMemoryStateStore[MODEL_T]

跨步骤共享的类型化进程本地状态存储。

如果尚未初始化任何状态,则会创建一个默认的DictState存储。

返回:

类型 描述
InMemoryStateStore[MODEL_T]

InMemoryStateStore[MODEL_T]: 状态存储实例。

collect_events #

collect_events(ev: 事件, expected: list[Type[事件]], buffer_id: str | None = None) -> list[事件] | None

缓冲事件,直到所有预期类型都可用,然后返回它们。

当一个步骤可以接收多种事件类型且仅当拥有完整集合时才需要继续时,此工具非常有用。返回的列表根据 expected 进行排序。

参数:

名称 类型 描述 默认
ev 事件

要添加到缓冲区的传入事件。

必填
expected list[Type[事件]]

要收集的事件类型,按顺序排列。

必填
buffer_id str | None

用于跨步骤或工作线程隔离缓冲区的可选稳定键。默认为从任务名称或预期类型派生的内部键。

None

返回:

类型 描述
list[事件] | None

list[Event] | None: 完成时按请求顺序排列的事件列表,

list[事件] | None

否则 None

示例:

@step
async def synthesize(
    self, ctx: Context, ev: QueryEvent | RetrieveEvent
) -> StopEvent | None:
    events = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
    if events is None:
        return None
    query_ev, retrieve_ev = events
    # ... proceed with both inputs present ...
相关链接
workflows/context/context.py中的源代码
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def collect_events(
    self, ev: Event, expected: list[Type[Event]], buffer_id: str | None = None
) -> list[Event] | None:
    """
    Buffer events until all expected types are available, then return them.

    This utility is helpful when a step can receive multiple event types
    and needs to proceed only when it has a full set. The returned list is
    ordered according to `expected`.

    Args:
        ev (Event): The incoming event to add to the buffer.
        expected (list[Type[Event]]): Event types to collect, in order.
        buffer_id (str | None): Optional stable key to isolate buffers across
            steps or workers. Defaults to an internal key derived from the
            task name or expected types.

    Returns:
        list[Event] | None: The events in the requested order when complete,
        otherwise `None`.

    Examples:
        ```python
        @step
        async def synthesize(
            self, ctx: Context, ev: QueryEvent | RetrieveEvent
        ) -> StopEvent | None:
            events = ctx.collect_events(ev, [QueryEvent, RetrieveEvent])
            if events is None:
                return None
            query_ev, retrieve_ev = events
            # ... proceed with both inputs present ...
        ```

    See Also:
        - [Event][workflows.events.Event]
    """
    return self._running_broker.collect_events(ev, expected, buffer_id)

from_dict classmethod #

from_dict(workflow: 'Workflow', data: dict[str, Any], serializer: BaseSerializer | None = None) -> 'Context[MODEL_T]'

从序列化载荷重建一个 Context

参数:

名称 类型 描述 默认
workflow 工作流

将拥有此上下文的流程实例。

必填
data dict[str, Any]

to_dict 生成的载荷。

必填
serializer BaseSerializer | None

用于解码状态和事件的序列化器。默认为JSON。

None

返回:

类型 描述
'Context[MODEL_T]'

Context[MODEL_T]: 一个使用持久化状态和队列初始化的上下文实例。

引发:

类型 描述
ContextSerdeError

如果载荷缺少必需字段或格式不兼容。

示例:

ctx_dict = ctx.to_dict()
my_db.set("key", json.dumps(ctx_dict))

ctx_dict = my_db.get("key")
restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
result = await my_workflow.run(..., ctx=restored_ctx)
workflows/context/context.py中的源代码
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@classmethod
def from_dict(
    cls,
    workflow: "Workflow",
    data: dict[str, Any],
    serializer: BaseSerializer | None = None,
) -> "Context[MODEL_T]":
    """Reconstruct a `Context` from a serialized payload.

    Args:
        workflow (Workflow): The workflow instance that will own this
            context.
        data (dict[str, Any]): Payload produced by
            [to_dict][workflows.context.context.Context.to_dict].
        serializer (BaseSerializer | None): Serializer used to decode state
            and events. Defaults to JSON.

    Returns:
        Context[MODEL_T]: A context instance initialized with the persisted
            state and queues.

    Raises:
        ContextSerdeError: If the payload is missing required fields or is
            in an incompatible format.

    Examples:
        ```python
        ctx_dict = ctx.to_dict()
        my_db.set("key", json.dumps(ctx_dict))

        ctx_dict = my_db.get("key")
        restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
        result = await my_workflow.run(..., ctx=restored_ctx)
        ```
    """
    try:
        return cls(workflow, previous_context=data, serializer=serializer)
    except KeyError as e:
        msg = "Error creating a Context instance: the provided payload has a wrong or old format."
        raise ContextSerdeError(msg) from e

get_result #

get_result() -> RunResultT

返回工作流运行的最终结果。

已弃用

此方法已弃用,将在未来版本中移除。 建议改用等待 Workflow.run 返回的处理程序,例如: result = await workflow.run(..., ctx=ctx)

示例:

# Preferred
result = await my_workflow.run(..., ctx=ctx)

# Deprecated
result_agent = ctx.get_result()

返回:

名称 类型 描述
RunResultT RunResultT

通过 StopEvent 提供的值。

workflows/context/context.py中的源代码workflows/context/context.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def get_result(self) -> RunResultT:
    """Return the final result of the workflow run.

    Deprecated:
        This method is deprecated and will be removed in a future release.
        Prefer awaiting the handler returned by `Workflow.run`, e.g.:
        `result = await workflow.run(..., ctx=ctx)`.

    Examples:
        ```python
        # Preferred
        result = await my_workflow.run(..., ctx=ctx)

        # Deprecated
        result_agent = ctx.get_result()
        ```

    Returns:
        RunResultT: The value provided via a `StopEvent`.
    """
    _warn_get_result()
    if self._running_broker._handler is None:
        raise WorkflowRuntimeError("Workflow handler is not set")
    return self._running_broker._handler.result()

send_event #

send_event(message: 事件, step: str | None = None) -> None

向一个或所有工作流步骤分发事件。

如果省略step,事件将广播到所有步骤队列,不匹配的步骤会忽略该事件。当提供step时,目标步骤必须接受该事件类型,否则会引发工作流运行时错误

参数:

名称 类型 描述 默认
message 事件

要加入队列的事件。

必填
step str | None

可选的目标步骤名称。

None

引发:

类型 描述
WorkflowRuntimeError

如果目标步骤不存在或不接受该事件类型。

示例:

通常使用此方法来扇出事件:

@step
async def my_step(self, ctx: Context, ev: StartEvent) -> WorkerEvent | GatherEvent:
    for i in range(10):
        ctx.send_event(WorkerEvent(msg=i))
    return GatherEvent()

您还可以看到调用方使用此方法将事件发送到工作流中:

handler = my_workflow.run(...)
async for ev in handler.stream_events():
    if isinstance(ev, SomeEvent):
        handler.ctx.send_event(SomeOtherEvent(msg="Hello!"))

result = await handler
workflows/context/context.py中的源代码workflows/context/context.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def send_event(self, message: Event, step: str | None = None) -> None:
    """Dispatch an event to one or all workflow steps.

    If `step` is omitted, the event is broadcast to all step queues and
    non-matching steps will ignore it. When `step` is provided, the target
    step must accept the event type or a
    [WorkflowRuntimeError][workflows.errors.WorkflowRuntimeError] is raised.

    Args:
        message (Event): The event to enqueue.
        step (str | None): Optional step name to target.

    Raises:
        WorkflowRuntimeError: If the target step does not exist or does not
            accept the event type.

    Examples:
        It's common to use this method to fan-out events:

        ```python
        @step
        async def my_step(self, ctx: Context, ev: StartEvent) -> WorkerEvent | GatherEvent:
            for i in range(10):
                ctx.send_event(WorkerEvent(msg=i))
            return GatherEvent()
        ```

        You also see this method used from the caller side to send events into the workflow:

        ```python
        handler = my_workflow.run(...)
        async for ev in handler.stream_events():
            if isinstance(ev, SomeEvent):
                handler.ctx.send_event(SomeOtherEvent(msg="Hello!"))

        result = await handler
        ```
    """
    return self._running_broker.send_event(message, step)

to_dict #

to_dict(serializer: BaseSerializer | None = None) -> dict[str, Any]

将上下文序列化为可JSON序列化的字典。

持久化全局状态存储、事件队列、缓冲区、已接受事件、代理日志和运行标志。该有效载荷可输入到 from_dict 以恢复运行 或在多次运行间传递状态。

参数:

名称 类型 描述 默认
serializer BaseSerializer | None

用于状态和事件负载的值序列化器。默认为 JsonSerializer

None

返回:

类型 描述
dict[str, Any]

dict[str, Any]: 一个适合JSON编码及后续处理的字典

dict[str, Any]

通过 from_dict 进行恢复。

相关链接

示例:

ctx_dict = ctx.to_dict()
my_db.set("key", json.dumps(ctx_dict))

ctx_dict = my_db.get("key")
restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
result = await my_workflow.run(..., ctx=restored_ctx)
workflows/context/context.py中的源代码workflows/context/context.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def to_dict(self, serializer: BaseSerializer | None = None) -> dict[str, Any]:
    """Serialize the context to a JSON-serializable dict.

    Persists the global state store, event queues, buffers, accepted events,
    broker log, and running flag. This payload can be fed to
    [from_dict][workflows.context.context.Context.from_dict] to resume a run
    or carry state across runs.

    Args:
        serializer (BaseSerializer | None): Value serializer used for state
            and event payloads. Defaults to
            [JsonSerializer][workflows.context.serializers.JsonSerializer].

    Returns:
        dict[str, Any]: A dict suitable for JSON encoding and later
        restoration via `from_dict`.

    See Also:
        - [InMemoryStateStore.to_dict][workflows.context.state_store.InMemoryStateStore.to_dict]

    Examples:
        ```python
        ctx_dict = ctx.to_dict()
        my_db.set("key", json.dumps(ctx_dict))

        ctx_dict = my_db.get("key")
        restored_ctx = Context.from_dict(my_workflow, json.loads(ctx_dict))
        result = await my_workflow.run(..., ctx=restored_ctx)
        ```
    """
    serializer = serializer or self._serializer

    # Serialize state using the state manager's method
    state_data = {}
    if self._state_store is not None:
        state_data = self._state_store.to_dict(serializer)

    # Get the broker state - either from the running broker or from the init snapshot
    if self._broker_run is not None:
        broker_state = self._broker_run._state
    else:
        # Deserialize the init snapshot to get a BrokerState, then re-serialize it
        # This ensures we always output the current format
        broker_state = BrokerState.from_serialized(
            self._init_snapshot, self._workflow, serializer
        )

    context = broker_state.to_serialized(serializer)
    context.state = state_data
    # mode="python" to support pickling over json if one so chooses. This should perhaps be moved into the serializers
    return context.model_dump(mode="python")

wait_for_event async #

wait_for_event(event_type: Type[T], waiter_event: 事件 | None = None, waiter_id: str | None = None, requirements: dict[str, Any] | None = None, timeout: float | None = 2000) -> T

等待下一个类型为 event_type 的匹配事件。

可选地,每个 waiter_id 向事件流发送一次 waiter_event, 以通知调用方工作流正在等待外部输入。 这有助于防止重复的等待事件被发送到事件流。

参数:

名称 类型 描述 默认
event_type type[T]

要等待的具体事件类。

必填
waiter_event 事件 | None

当等待开始时写入流的可选事件。

None
waiter_id str | None

用于避免为同一逻辑等待发出多个等待事件的稳定标识符。

None
requirements dict[str, Any] | None

必须通过 event.get(key) == value 满足事件的键/值过滤器。

None
timeout float | None

最大等待秒数。None 表示无超时限制。默认为2000秒。

2000

返回:

名称 类型 描述
T T

所请求类型的事件实例已接收。

引发:

类型 描述
TimeoutError

如果超时时间已过。

示例:

@step
async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
    response = await ctx.wait_for_event(
        HumanResponseEvent,
        waiter_event=InputRequiredEvent(msg="What's your name?"),
        waiter_id="user_name",
        timeout=60,
    )
    return StopEvent(result=response.response)
workflows/context/context.py中的源代码workflows/context/context.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
async def wait_for_event(
    self,
    event_type: Type[T],
    waiter_event: Event | None = None,
    waiter_id: str | None = None,
    requirements: dict[str, Any] | None = None,
    timeout: float | None = 2000,
) -> T:
    """Wait for the next matching event of type `event_type`.

    Optionally emits a `waiter_event` to the event stream once per `waiter_id` to
    inform callers that the workflow is waiting for external input.
    This helps to prevent duplicate waiter events from being sent to the event stream.

    Args:
        event_type (type[T]): Concrete event class to wait for.
        waiter_event (Event | None): Optional event to write to the stream
            once when the wait begins.
        waiter_id (str | None): Stable identifier to avoid emitting multiple
            waiter events for the same logical wait.
        requirements (dict[str, Any] | None): Key/value filters that must be
            satisfied by the event via `event.get(key) == value`.
        timeout (float | None): Max seconds to wait. `None` means no
            timeout. Defaults to 2000 seconds.

    Returns:
        T: The received event instance of the requested type.

    Raises:
        asyncio.TimeoutError: If the timeout elapses.

    Examples:
        ```python
        @step
        async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
            response = await ctx.wait_for_event(
                HumanResponseEvent,
                waiter_event=InputRequiredEvent(msg="What's your name?"),
                waiter_id="user_name",
                timeout=60,
            )
            return StopEvent(result=response.response)
        ```
    """
    return await self._running_broker.wait_for_event(
        event_type, waiter_event, waiter_id, requirements, timeout
    )

write_event_to_stream #

write_event_to_stream(ev: 事件 | None) -> None

将事件加入队列以便流式传输至 [WorkflowHandler]](workflows.handler.WorkflowHandler)。

参数:

名称 类型 描述 默认
ev 事件 | None

要流式传输的事件。None 在某些流式传输模式中可用作哨兵值。

必填

示例:

@step
async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
    ctx.write_event_to_stream(ev)
    return StopEvent(result="ok")
workflows/context/context.py中的源代码workflows/context/context.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def write_event_to_stream(self, ev: Event | None) -> None:
    """Enqueue an event for streaming to [WorkflowHandler]](workflows.handler.WorkflowHandler).

    Args:
        ev (Event | None): The event to stream. `None` can be used as a
            sentinel in some streaming modes.

    Examples:
        ```python
        @step
        async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
            ctx.write_event_to_stream(ev)
            return StopEvent(result="ok")
        ```
    """
    self._running_broker.write_event_to_stream(ev)

字典状态 #

基类:Generic[MODEL_T]DictLikeModel

用于工作流状态的动态、类字典的Pydantic模型。

当未提供类型化状态时用作默认状态模型。在保留Pydantic验证和序列化功能的同时,其行为类似于映射。

示例:

from workflows.context.state_store import DictState

state = DictState()
state["foo"] = 1
state.bar = 2  # attribute-style access works for nested structures
相关链接
workflows/context/context.py中的源代码workflows/context/state_store.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class DictState(DictLikeModel):
    """
    Dynamic, dict-like Pydantic model for workflow state.

    Used as the default state model when no typed state is provided. Behaves
    like a mapping while retaining Pydantic validation and serialization.

    Examples:
        ```python
        from workflows.context.state_store import DictState

        state = DictState()
        state["foo"] = 1
        state.bar = 2  # attribute-style access works for nested structures
        ```

    See Also:
        - [InMemoryStateStore][workflows.context.state_store.InMemoryStateStore]
    """

    def __init__(self, **params: Any):
        super().__init__(**params)

内存状态存储 #

基类:Generic[MODEL_T]Generic[MODEL_T]

用于工作流的异步、内存式、类型安全状态管理器。

此存储区持有一个表示全局工作流状态的单一Pydantic模型实例。当省略泛型参数时,默认使用DictState以实现灵活的类字典用法。

通过内部的 asyncio.Lock 确保线程安全。使用者可以通过 get_stateset_state 执行原子读写操作,或通过 edit_state 上下文管理器进行原地事务性编辑。

示例:

类型化状态模型:

from pydantic import BaseModel
from workflows.context.state_store import InMemoryStateStore

class MyState(BaseModel):
    count: int = 0

store = InMemoryStateStore(MyState())
async with store.edit_state() as state:
    state.count += 1

使用 DictState 的动态状态:

from workflows.context.state_store import InMemoryStateStore, DictState

store = InMemoryStateStore(DictState())
await store.set("user.profile.name", "Ada")
name = await store.get("user.profile.name")
相关链接
workflows/context/context.py中的源代码workflows/context/state_store.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class InMemoryStateStore(Generic[MODEL_T]):
    """
    Async, in-memory, type-safe state manager for workflows.

    This store holds a single Pydantic model instance representing global
    workflow state. When the generic parameter is omitted, it defaults to
    [DictState][workflows.context.state_store.DictState] for flexible,
    dictionary-like usage.

    Thread-safety is ensured with an internal `asyncio.Lock`. Consumers can
    either perform atomic reads/writes via `get_state` and `set_state`, or make
    in-place, transactional edits via the `edit_state` context manager.

    Examples:
        Typed state model:

        ```python
        from pydantic import BaseModel
        from workflows.context.state_store import InMemoryStateStore

        class MyState(BaseModel):
            count: int = 0

        store = InMemoryStateStore(MyState())
        async with store.edit_state() as state:
            state.count += 1
        ```

        Dynamic state with `DictState`:

        ```python
        from workflows.context.state_store import InMemoryStateStore, DictState

        store = InMemoryStateStore(DictState())
        await store.set("user.profile.name", "Ada")
        name = await store.get("user.profile.name")
        ```

    See Also:
        - [Context.store][workflows.context.context.Context.store]
    """

    # These keys are set by pre-built workflows and
    # are known to be unserializable in some cases.
    known_unserializable_keys = ("memory",)

    state_type: Type[MODEL_T]

    def __init__(self, initial_state: MODEL_T):
        self._state = initial_state
        self._lock = asyncio.Lock()
        self.state_type = type(initial_state)

    async def get_state(self) -> MODEL_T:
        """Return a shallow copy of the current state model.

        Returns:
            MODEL_T: A `.model_copy()` of the internal Pydantic model.
        """
        return self._state.model_copy()

    async def set_state(self, state: MODEL_T) -> None:
        """Replace the current state model.

        Args:
            state (MODEL_T): New state of the same type as the existing model.

        Raises:
            ValueError: If the type differs from the existing state type.
        """
        if not isinstance(state, type(self._state)):
            raise ValueError(f"State must be of type {type(self._state)}")

        async with self._lock:
            self._state = state

    def to_dict(self, serializer: "BaseSerializer") -> dict[str, Any]:
        """Serialize the state and model metadata for persistence.

        For `DictState`, each individual item is serialized using the provided
        serializer since values can be arbitrary Python objects. For other
        Pydantic models, defers to the serializer (e.g. JSON) which can leverage
        model-aware encoding.

        Args:
            serializer (BaseSerializer): Strategy used to encode values.

        Returns:
            dict[str, Any]: A payload suitable for
            [from_dict][workflows.context.state_store.InMemoryStateStore.from_dict].
        """
        # Special handling for DictState - serialize each item in _data
        if isinstance(self._state, DictState):
            serialized_data = {}
            for key, value in self._state.items():
                try:
                    serialized_data[key] = serializer.serialize(value)
                except Exception as e:
                    if key in self.known_unserializable_keys:
                        warnings.warn(
                            f"Skipping serialization of known unserializable key: {key} -- "
                            "This is expected but will require this item to be set manually after deserialization.",
                            category=UnserializableKeyWarning,
                        )
                        continue
                    raise ValueError(
                        f"Failed to serialize state value for key {key}: {e}"
                    )

            return {
                "state_data": {"_data": serialized_data},
                "state_type": type(self._state).__name__,
                "state_module": type(self._state).__module__,
            }
        else:
            # For regular Pydantic models, rely on pydantic's serialization
            serialized_state = serializer.serialize(self._state)

            return {
                "state_data": serialized_state,
                "state_type": type(self._state).__name__,
                "state_module": type(self._state).__module__,
            }

    @classmethod
    def from_dict(
        cls, serialized_state: dict[str, Any], serializer: "BaseSerializer"
    ) -> "InMemoryStateStore[MODEL_T]":
        """Restore a state store from a serialized payload.

        Args:
            serialized_state (dict[str, Any]): The payload produced by
                [to_dict][workflows.context.state_store.InMemoryStateStore.to_dict].
            serializer (BaseSerializer): Strategy to decode stored values.

        Returns:
            InMemoryStateStore[MODEL_T]: A store with the reconstructed model.
        """
        if not serialized_state:
            # Return a default DictState manager
            return cls(DictState())  # type: ignore

        state_data = serialized_state.get("state_data", {})
        state_type = serialized_state.get("state_type", "DictState")

        # Deserialize the state data
        if state_type == "DictState":
            # Special handling for DictState - deserialize each item in _data
            _data_serialized = state_data.get("_data", {})
            deserialized_data = {}
            for key, value in _data_serialized.items():
                try:
                    deserialized_data[key] = serializer.deserialize(value)
                except Exception as e:
                    raise ValueError(
                        f"Failed to deserialize state value for key {key}: {e}"
                    )

            state_instance = DictState(_data=deserialized_data)
        else:
            state_instance = serializer.deserialize(state_data)

        return cls(state_instance)  # type: ignore

    @asynccontextmanager
    async def edit_state(self) -> AsyncGenerator[MODEL_T, None]:
        """Edit state transactionally under a lock.

        Yields the mutable model and writes it back on exit. This pattern avoids
        read-modify-write races and keeps updates atomic.

        Yields:
            MODEL_T: The current state model for in-place mutation.
        """
        async with self._lock:
            state = self._state

            yield state

            self._state = state

    async def get(self, path: str, default: Optional[Any] = Ellipsis) -> Any:
        """Get a nested value using dot-separated paths.

        Supports dict keys, list indices, and attribute access transparently at
        each segment.

        Args:
            path (str): Dot-separated path, e.g. "user.profile.name".
            default (Any): If provided, return this when the path does not
                exist; otherwise, raise `ValueError`.

        Returns:
            Any: The resolved value.

        Raises:
            ValueError: If the path is invalid and no default is provided or if
                the path depth exceeds limits.
        """
        segments = path.split(".") if path else []
        if len(segments) > MAX_DEPTH:
            raise ValueError(f"Path length exceeds {MAX_DEPTH} segments")

        async with self._lock:
            try:
                value: Any = self._state
                for segment in segments:
                    value = self._traverse_step(value, segment)
            except Exception:
                if default is not Ellipsis:
                    return default

                msg = f"Path '{path}' not found in state"
                raise ValueError(msg)

        return value

    async def set(self, path: str, value: Any) -> None:
        """Set a nested value using dot-separated paths.

        Intermediate containers are created as needed. Dicts, lists, tuples, and
        Pydantic models are supported where appropriate.

        Args:
            path (str): Dot-separated path to write.
            value (Any): Value to assign.

        Raises:
            ValueError: If the path is empty or exceeds the maximum depth.
        """
        if not path:
            raise ValueError("Path cannot be empty")

        segments = path.split(".")
        if len(segments) > MAX_DEPTH:
            raise ValueError(f"Path length exceeds {MAX_DEPTH} segments")

        async with self._lock:
            current = self._state

            # Navigate/create intermediate segments
            for segment in segments[:-1]:
                try:
                    current = self._traverse_step(current, segment)
                except (KeyError, AttributeError, IndexError, TypeError):
                    # Create intermediate object and assign it
                    intermediate: Any = {}
                    self._assign_step(current, segment, intermediate)
                    current = intermediate

            # Assign the final value
            self._assign_step(current, segments[-1], value)

    async def clear(self) -> None:
        """Reset the state to its type defaults.

        Raises:
            ValueError: If the model type cannot be instantiated from defaults
                (i.e., fields missing default values).
        """
        try:
            await self.set_state(self._state.__class__())
        except ValidationError:
            raise ValueError("State must have defaults for all fields")

    def _traverse_step(self, obj: Any, segment: str) -> Any:
        """Follow one segment into *obj* (dict key, list index, or attribute)."""
        if isinstance(obj, dict):
            return obj[segment]

        # attempt list/tuple index
        try:
            idx = int(segment)
            return obj[idx]
        except (ValueError, TypeError, IndexError):
            pass

        # fallback to attribute access (Pydantic models, normal objects)
        return getattr(obj, segment)

    def _assign_step(self, obj: Any, segment: str, value: Any) -> None:
        """Assign *value* to *segment* of *obj* (dict key, list index, or attribute)."""
        if isinstance(obj, dict):
            obj[segment] = value
            return

        # attempt list/tuple index assignment
        try:
            idx = int(segment)
            obj[idx] = value
            return
        except (ValueError, TypeError, IndexError):
            pass

        # fallback to attribute assignment
        setattr(obj, segment, value)

get_state async #

get_state() -> MODEL_T

返回当前状态模型的浅层副本。

返回:

名称 类型 描述
MODEL_T MODEL_T

内部 Pydantic 模型的 .model_copy()

workflows/context/context.py中的源代码workflows/context/state_store.py
105
106
107
108
109
110
111
async def get_state(self) -> MODEL_T:
    """Return a shallow copy of the current state model.

    Returns:
        MODEL_T: A `.model_copy()` of the internal Pydantic model.
    """
    return self._state.model_copy()

set_state async #

set_state(state: MODEL_T) -> None

替换当前状态模型。

参数:

名称 类型 描述 默认
state MODEL_T

与现有模型类型相同的新状态。

必填

引发:

类型 描述
ValueError

如果类型与现有状态类型不同。

workflows/context/context.py中的源代码workflows/context/state_store.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def set_state(self, state: MODEL_T) -> None:
    """Replace the current state model.

    Args:
        state (MODEL_T): New state of the same type as the existing model.

    Raises:
        ValueError: If the type differs from the existing state type.
    """
    if not isinstance(state, type(self._state)):
        raise ValueError(f"State must be of type {type(self._state)}")

    async with self._lock:
        self._state = state

to_dict #

to_dict(serializer: BaseSerializer) -> dict[str, Any]

序列化状态和模型元数据以便持久化。

对于 DictState,由于值可以是任意Python对象,每个单独的项目都使用提供的序列化器进行序列化。对于其他Pydantic模型,则委托给序列化器(例如JSON),该序列化器可以利用模型感知编码。

参数:

名称 类型 描述 默认
serializer BaseSerializer

用于编码值的策略。

必填

返回:

类型 描述
dict[str, Any]

dict[str, Any]: 适用于

dict[str, Any]
workflows/context/context.py中的源代码workflows/context/state_store.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def to_dict(self, serializer: "BaseSerializer") -> dict[str, Any]:
    """Serialize the state and model metadata for persistence.

    For `DictState`, each individual item is serialized using the provided
    serializer since values can be arbitrary Python objects. For other
    Pydantic models, defers to the serializer (e.g. JSON) which can leverage
    model-aware encoding.

    Args:
        serializer (BaseSerializer): Strategy used to encode values.

    Returns:
        dict[str, Any]: A payload suitable for
        [from_dict][workflows.context.state_store.InMemoryStateStore.from_dict].
    """
    # Special handling for DictState - serialize each item in _data
    if isinstance(self._state, DictState):
        serialized_data = {}
        for key, value in self._state.items():
            try:
                serialized_data[key] = serializer.serialize(value)
            except Exception as e:
                if key in self.known_unserializable_keys:
                    warnings.warn(
                        f"Skipping serialization of known unserializable key: {key} -- "
                        "This is expected but will require this item to be set manually after deserialization.",
                        category=UnserializableKeyWarning,
                    )
                    continue
                raise ValueError(
                    f"Failed to serialize state value for key {key}: {e}"
                )

        return {
            "state_data": {"_data": serialized_data},
            "state_type": type(self._state).__name__,
            "state_module": type(self._state).__module__,
        }
    else:
        # For regular Pydantic models, rely on pydantic's serialization
        serialized_state = serializer.serialize(self._state)

        return {
            "state_data": serialized_state,
            "state_type": type(self._state).__name__,
            "state_module": type(self._state).__module__,
        }

from_dict classmethod #

from_dict(serialized_state: dict[str, Any], serializer: BaseSerializer) -> InMemoryStateStore[MODEL_T]

从序列化载荷中恢复状态存储。

参数:

名称 类型 描述 默认
serialized_state dict[str, Any]

to_dict 生成的有效载荷。

必填
serializer BaseSerializer

解码存储值的策略。

必填

返回:

类型 描述
InMemoryStateStore[MODEL_T]

InMemoryStateStore[MODEL_T]: 一个包含重构模型的存储库。

workflows/context/context.py中的源代码workflows/context/state_store.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def from_dict(
    cls, serialized_state: dict[str, Any], serializer: "BaseSerializer"
) -> "InMemoryStateStore[MODEL_T]":
    """Restore a state store from a serialized payload.

    Args:
        serialized_state (dict[str, Any]): The payload produced by
            [to_dict][workflows.context.state_store.InMemoryStateStore.to_dict].
        serializer (BaseSerializer): Strategy to decode stored values.

    Returns:
        InMemoryStateStore[MODEL_T]: A store with the reconstructed model.
    """
    if not serialized_state:
        # Return a default DictState manager
        return cls(DictState())  # type: ignore

    state_data = serialized_state.get("state_data", {})
    state_type = serialized_state.get("state_type", "DictState")

    # Deserialize the state data
    if state_type == "DictState":
        # Special handling for DictState - deserialize each item in _data
        _data_serialized = state_data.get("_data", {})
        deserialized_data = {}
        for key, value in _data_serialized.items():
            try:
                deserialized_data[key] = serializer.deserialize(value)
            except Exception as e:
                raise ValueError(
                    f"Failed to deserialize state value for key {key}: {e}"
                )

        state_instance = DictState(_data=deserialized_data)
    else:
        state_instance = serializer.deserialize(state_data)

    return cls(state_instance)  # type: ignore

edit_state async #

edit_state() -> AsyncGenerator[MODEL_T, None]

在锁下以事务方式编辑状态。

生成可变模型并在退出时将其写回。这种模式避免了读取-修改-写入竞争,并保持更新的原子性。

返回:

名称 类型 描述
MODEL_T AsyncGenerator[MODEL_T, None]

当前就地变更的状态模型。

workflows/context/context.py中的源代码workflows/context/state_store.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@asynccontextmanager
async def edit_state(self) -> AsyncGenerator[MODEL_T, None]:
    """Edit state transactionally under a lock.

    Yields the mutable model and writes it back on exit. This pattern avoids
    read-modify-write races and keeps updates atomic.

    Yields:
        MODEL_T: The current state model for in-place mutation.
    """
    async with self._lock:
        state = self._state

        yield state

        self._state = state

获取 async #

get(path: str, default: Optional[Any] = Ellipsis) -> Any

使用点分隔路径获取嵌套值。

在每个段透明地支持字典键、列表索引和属性访问。

参数:

名称 类型 描述 默认
path str

点分隔路径,例如 "user.profile.name"。

必填
default Any

如果提供,当路径不存在时返回此值;否则,抛出 ValueError

Ellipsis

返回:

名称 类型 描述
Any Any

解析后的值。

引发:

类型 描述
ValueError

如果路径无效且未提供默认值,或者路径深度超出限制。

workflows/context/context.py中的源代码workflows/context/state_store.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
async def get(self, path: str, default: Optional[Any] = Ellipsis) -> Any:
    """Get a nested value using dot-separated paths.

    Supports dict keys, list indices, and attribute access transparently at
    each segment.

    Args:
        path (str): Dot-separated path, e.g. "user.profile.name".
        default (Any): If provided, return this when the path does not
            exist; otherwise, raise `ValueError`.

    Returns:
        Any: The resolved value.

    Raises:
        ValueError: If the path is invalid and no default is provided or if
            the path depth exceeds limits.
    """
    segments = path.split(".") if path else []
    if len(segments) > MAX_DEPTH:
        raise ValueError(f"Path length exceeds {MAX_DEPTH} segments")

    async with self._lock:
        try:
            value: Any = self._state
            for segment in segments:
                value = self._traverse_step(value, segment)
        except Exception:
            if default is not Ellipsis:
                return default

            msg = f"Path '{path}' not found in state"
            raise ValueError(msg)

    return value

设置 async #

set(path: str, value: Any) -> None

使用点分隔路径设置嵌套值。

中间容器根据需要创建。在适当情况下支持字典、列表、元组和Pydantic模型。

参数:

名称 类型 描述 默认
path str

要写入的点分隔路径。

必填
value Any

要分配的值。

必填

引发:

类型 描述
ValueError

如果路径为空或超过最大深度。

workflows/context/context.py中的源代码workflows/context/state_store.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
async def set(self, path: str, value: Any) -> None:
    """Set a nested value using dot-separated paths.

    Intermediate containers are created as needed. Dicts, lists, tuples, and
    Pydantic models are supported where appropriate.

    Args:
        path (str): Dot-separated path to write.
        value (Any): Value to assign.

    Raises:
        ValueError: If the path is empty or exceeds the maximum depth.
    """
    if not path:
        raise ValueError("Path cannot be empty")

    segments = path.split(".")
    if len(segments) > MAX_DEPTH:
        raise ValueError(f"Path length exceeds {MAX_DEPTH} segments")

    async with self._lock:
        current = self._state

        # Navigate/create intermediate segments
        for segment in segments[:-1]:
            try:
                current = self._traverse_step(current, segment)
            except (KeyError, AttributeError, IndexError, TypeError):
                # Create intermediate object and assign it
                intermediate: Any = {}
                self._assign_step(current, segment, intermediate)
                current = intermediate

        # Assign the final value
        self._assign_step(current, segments[-1], value)

清除 async #

clear() -> None

将状态重置为其类型默认值。

引发:

类型 描述
ValueError

如果模型类型无法从默认值实例化(即缺少默认值的字段)。

workflows/context/context.py中的源代码workflows/context/state_store.py
305
306
307
308
309
310
311
312
313
314
315
async def clear(self) -> None:
    """Reset the state to its type defaults.

    Raises:
        ValueError: If the model type cannot be instantiated from defaults
            (i.e., fields missing default values).
    """
    try:
        await self.set_state(self._state.__class__())
    except ValidationError:
        raise ValueError("State must have defaults for all fields")

基础序列化器 #

基类:Generic[MODEL_T]ABC

用于工作流上下文和状态存储的值序列化接口。

实现必须能够将任意Python值编码为字符串,并能够从该字符串中重构原始值。

相关链接
workflows/context/context.py中的源代码workflows/context/serializers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class BaseSerializer(ABC):
    """
    Interface for value serialization used by the workflow context and state store.

    Implementations must encode arbitrary Python values into a string and be able
    to reconstruct the original values from that string.

    See Also:
        - [JsonSerializer][workflows.context.serializers.JsonSerializer]
        - [PickleSerializer][workflows.context.serializers.PickleSerializer]
    """

    @abstractmethod
    def serialize(self, value: Any) -> str: ...

    @abstractmethod
    def deserialize(self, value: str) -> Any: ...

Json序列化器 #

基类:Generic[MODEL_T]BaseSerializer

理解Pydantic模型和LlamaIndex组件的JSON优先序列化器。

行为: - Pydantic 模型会被编码为带有其完整类名的 JSON 格式,以便能够准确重建。 - LlamaIndex 组件(暴露 class_nameto_dict 的对象)会被序列化为字典形式,并附带完整类名。 - 字典和列表会进行递归处理。

对于不支持的对象,回退方案是尝试直接进行JSON编码;如果失败,则会引发ValueError

示例:

s = JsonSerializer()
payload = s.serialize({"x": 1, "y": [2, 3]})
data = s.deserialize(payload)
assert data == {"x": 1, "y": [2, 3]}
相关链接
workflows/context/context.py中的源代码workflows/context/serializers.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class JsonSerializer(BaseSerializer):
    """
    JSON-first serializer that understands Pydantic models and LlamaIndex components.

    Behavior:
    - Pydantic models are encoded as JSON with their qualified class name so they
      can be faithfully reconstructed.
    - LlamaIndex components (objects exposing `class_name` and `to_dict`) are
      serialized to their dict form alongside the qualified class name.
    - Dicts and lists are handled recursively.

    Fallback for unsupported objects is to attempt JSON encoding directly; if it
    fails, a `ValueError` is raised.

    Examples:
        ```python
        s = JsonSerializer()
        payload = s.serialize({"x": 1, "y": [2, 3]})
        data = s.deserialize(payload)
        assert data == {"x": 1, "y": [2, 3]}
        ```

    See Also:
        - [BaseSerializer][workflows.context.serializers.BaseSerializer]
        - [PickleSerializer][workflows.context.serializers.PickleSerializer]
    """

    def serialize_value(self, value: Any) -> Any:
        """
        Events with a wrapper type that includes type metadata, so that they can be reserialized into the original Event type.
        Traverses dicts and lists recursively.

        Args:
            value (Any): The value to serialize.

        Returns:
            Any: The serialized value. A dict, list, string, number, or boolean.
        """
        # This has something to do with BaseComponent from llama_index.core. Is it still needed?
        if hasattr(value, "class_name"):
            retval = {
                "__is_component": True,
                "value": value.to_dict(),
                "qualified_name": get_qualified_name(value),
            }
            return retval

        if isinstance(value, BaseModel):
            return {
                "__is_pydantic": True,
                "value": value.model_dump(mode="json"),
                "qualified_name": get_qualified_name(value),
            }

        if isinstance(value, dict):
            return {k: self.serialize_value(v) for k, v in value.items()}

        if isinstance(value, list):
            return [self.serialize_value(item) for item in value]

        return value

    def serialize(self, value: Any) -> str:
        """Serialize an arbitrary value to a JSON string.

        Args:
            value (Any): The value to encode.

        Returns:
            str: JSON string.

        Raises:
            ValueError: If the value cannot be encoded to JSON.
        """
        try:
            serialized_value = self.serialize_value(value)
            return json.dumps(serialized_value)
        except Exception:
            raise ValueError(f"Failed to serialize value: {type(value)}: {value!s}")

    def deserialize_value(self, data: Any) -> Any:
        """Helper to deserialize a single dict or other json value from its discriminator fields back into a python class.

        Args:
            data (Any): a dict, list, string, number, or boolean

        Returns:
            Any: The deserialized value.
        """
        if isinstance(data, dict):
            if data.get("__is_pydantic") and data.get("qualified_name"):
                module_class = import_module_from_qualified_name(data["qualified_name"])
                return module_class.model_validate(data["value"])
            elif data.get("__is_component") and data.get("qualified_name"):
                module_class = import_module_from_qualified_name(data["qualified_name"])
                return module_class.from_dict(data["value"])
            return {k: self.deserialize_value(v) for k, v in data.items()}
        elif isinstance(data, list):
            return [self.deserialize_value(item) for item in data]
        return data

    def deserialize(self, value: str) -> Any:
        """Deserialize a JSON string into Python objects.

        Args:
            value (str): JSON string.

        Returns:
            Any: The reconstructed value.
        """
        data = json.loads(value)
        return self.deserialize_value(data)

serialize_value #

serialize_value(value: Any) -> Any

包含类型元数据的包装类型事件,以便它们能够重新序列化为原始事件类型。 递归遍历字典和列表。

参数:

名称 类型 描述 默认
value Any

要序列化的值。

必填

返回:

名称 类型 描述
Any Any

序列化后的值。可以是字典、列表、字符串、数字或布尔值。

workflows/context/context.py中的源代码workflows/context/serializers.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def serialize_value(self, value: Any) -> Any:
    """
    Events with a wrapper type that includes type metadata, so that they can be reserialized into the original Event type.
    Traverses dicts and lists recursively.

    Args:
        value (Any): The value to serialize.

    Returns:
        Any: The serialized value. A dict, list, string, number, or boolean.
    """
    # This has something to do with BaseComponent from llama_index.core. Is it still needed?
    if hasattr(value, "class_name"):
        retval = {
            "__is_component": True,
            "value": value.to_dict(),
            "qualified_name": get_qualified_name(value),
        }
        return retval

    if isinstance(value, BaseModel):
        return {
            "__is_pydantic": True,
            "value": value.model_dump(mode="json"),
            "qualified_name": get_qualified_name(value),
        }

    if isinstance(value, dict):
        return {k: self.serialize_value(v) for k, v in value.items()}

    if isinstance(value, list):
        return [self.serialize_value(item) for item in value]

    return value

序列化 #

serialize(value: Any) -> str

将任意值序列化为JSON字符串。

参数:

名称 类型 描述 默认
value Any

要编码的值。

必填

返回:

名称 类型 描述
str str

JSON 字符串。

引发:

类型 描述
ValueError

如果该值无法编码为JSON。

workflows/context/context.py中的源代码workflows/context/serializers.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def serialize(self, value: Any) -> str:
    """Serialize an arbitrary value to a JSON string.

    Args:
        value (Any): The value to encode.

    Returns:
        str: JSON string.

    Raises:
        ValueError: If the value cannot be encoded to JSON.
    """
    try:
        serialized_value = self.serialize_value(value)
        return json.dumps(serialized_value)
    except Exception:
        raise ValueError(f"Failed to serialize value: {type(value)}: {value!s}")

deserialize_value #

deserialize_value(data: Any) -> Any

帮助程序,用于根据其判别字段将单个字典或其他 JSON 值反序列化回 Python 类。

参数:

名称 类型 描述 默认
data Any

一个字典、列表、字符串、数字或布尔值

必填

返回:

名称 类型 描述
Any Any

反序列化后的值。

workflows/context/context.py中的源代码workflows/context/serializers.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def deserialize_value(self, data: Any) -> Any:
    """Helper to deserialize a single dict or other json value from its discriminator fields back into a python class.

    Args:
        data (Any): a dict, list, string, number, or boolean

    Returns:
        Any: The deserialized value.
    """
    if isinstance(data, dict):
        if data.get("__is_pydantic") and data.get("qualified_name"):
            module_class = import_module_from_qualified_name(data["qualified_name"])
            return module_class.model_validate(data["value"])
        elif data.get("__is_component") and data.get("qualified_name"):
            module_class = import_module_from_qualified_name(data["qualified_name"])
            return module_class.from_dict(data["value"])
        return {k: self.deserialize_value(v) for k, v in data.items()}
    elif isinstance(data, list):
        return [self.deserialize_value(item) for item in data]
    return data

反序列化 #

deserialize(value: str) -> Any

将JSON字符串反序列化为Python对象。

参数:

名称 类型 描述 默认
value str

JSON 字符串。

必填

返回:

名称 类型 描述
Any Any

重构后的值。

workflows/context/context.py中的源代码workflows/context/serializers.py
137
138
139
140
141
142
143
144
145
146
147
def deserialize(self, value: str) -> Any:
    """Deserialize a JSON string into Python objects.

    Args:
        value (str): JSON string.

    Returns:
        Any: The reconstructed value.
    """
    data = json.loads(value)
    return self.deserialize_value(data)

Pickle序列化器 #

基类:Generic[MODEL_T]JsonSerializer

混合序列化器:尽可能使用JSON,Pickle作为安全备选方案。

该序列化器优先尝试使用 JSON 以实现可读性和可移植性,对于无法用 JSON 表示的对象会透明地回退到 Pickle。反序列化优先使用 Pickle,并回退到 JSON。

警告

Pickle 在反序列化过程中可能执行任意代码。仅反序列化可信的有效载荷。

注意:过去被称为 JsonPickleSerializer,但已重命名为 PickleSerializer

示例:

s = PickleSerializer()
class Foo:
    def __init__(self, x):
        self.x = x
payload = s.serialize(Foo(1))  # will likely use Pickle
obj = s.deserialize(payload)
assert isinstance(obj, Foo)
workflows/context/context.py中的源代码workflows/context/serializers.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class PickleSerializer(JsonSerializer):
    """
    Hybrid serializer: JSON when possible, Pickle as a safe fallback.

    This serializer attempts JSON first for readability and portability, and
    transparently falls back to Pickle for objects that cannot be represented in
    JSON. Deserialization prioritizes Pickle and falls back to JSON.

    Warning:
        Pickle can execute arbitrary code during deserialization. Only
        deserialize trusted payloads.

    Note: Used to be called `JsonPickleSerializer` but it was renamed to `PickleSerializer`.

    Examples:
        ```python
        s = PickleSerializer()
        class Foo:
            def __init__(self, x):
                self.x = x
        payload = s.serialize(Foo(1))  # will likely use Pickle
        obj = s.deserialize(payload)
        assert isinstance(obj, Foo)
        ```
    """

    def serialize(self, value: Any) -> str:
        """Serialize with JSON preference and Pickle fallback.

        Args:
            value (Any): The value to encode.

        Returns:
            str: Encoded string (JSON or base64-encoded Pickle bytes).
        """
        try:
            return super().serialize(value)
        except Exception:
            return base64.b64encode(pickle.dumps(value)).decode("utf-8")

    def deserialize(self, value: str) -> Any:
        """Deserialize with Pickle preference and JSON fallback.

        Args:
            value (str): Encoded string.

        Returns:
            Any: The reconstructed value.

        Notes:
            Use only with trusted payloads due to Pickle security implications.
        """
        try:
            return pickle.loads(base64.b64decode(value))
        except Exception:
            return super().deserialize(value)

序列化 #

serialize(value: Any) -> str

使用JSON偏好序列化,并备选Pickle回退方案。

参数:

名称 类型 描述 默认
value Any

要编码的值。

必填

返回:

名称 类型 描述
str str

编码字符串(JSON 或 base64 编码的 Pickle 字节)。

workflows/context/context.py中的源代码workflows/context/serializers.py
176
177
178
179
180
181
182
183
184
185
186
187
188
def serialize(self, value: Any) -> str:
    """Serialize with JSON preference and Pickle fallback.

    Args:
        value (Any): The value to encode.

    Returns:
        str: Encoded string (JSON or base64-encoded Pickle bytes).
    """
    try:
        return super().serialize(value)
    except Exception:
        return base64.b64encode(pickle.dumps(value)).decode("utf-8")

反序列化 #

deserialize(value: str) -> Any

使用 Pickle 偏好和 JSON 回退进行反序列化。

参数:

名称 类型 描述 默认
value str

已编码的字符串。

必填

返回:

名称 类型 描述
Any Any

重构后的值。

笔记

由于Pickle的安全隐患,请仅与可信负载一起使用。

workflows/context/context.py中的源代码workflows/context/serializers.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def deserialize(self, value: str) -> Any:
    """Deserialize with Pickle preference and JSON fallback.

    Args:
        value (str): Encoded string.

    Returns:
        Any: The reconstructed value.

    Notes:
        Use only with trusted payloads due to Pickle security implications.
    """
    try:
        return pickle.loads(base64.b64decode(value))
    except Exception:
        return super().deserialize(value)