跳转到内容

WorkflowStream

定义于:core/src/core/stream.ts:106

用于处理工作流事件的响应式流。

WorkflowStream 扩展了标准 ReadableStream,为过滤、转换和消费工作流事件提供专门方法。 它支持响应式模式,可用于构建复杂的事件处理流水线。

// Get stream from workflow context
const stream = context.stream;
// Filter for specific events
const userEvents = stream.filter(UserEvent);
// Transform events
const processed = stream.map(event => (\{
type: event.constructor.name,
timestamp: Date.now(),
data: event.data
\}));
// Consume events
for await (const event of stream.take(10)) \{
console.log('Received:', event);
\}
  • ReadableStream<R>

R = any

流经数据流的类型

  • AsyncIterable<R>

新工作流流<R>(subscribable, rootStream): WorkflowStream<R>

定义于:core/src/core/stream.ts:141

Subscribable<[R], void>

ReadableStream<R>

WorkflowStream<R>

ReadableStream<R>.constructor

新工作流流<R>(subscribable, rootStream): WorkflowStream<R>

定义于:core/src/core/stream.ts:145

Subscribable<[R], void>

null

WorkflowStream<R>

ReadableStream<R>.constructor

新工作流流<R>(subscribable, rootStream): WorkflowStream<R>

定义于:core/src/core/stream.ts:146

null

null | ReadableStream<R>

WorkflowStream<R>

ReadableStream<R>.constructor

开启<T>(event, handler): () => void

定义于:core/src/core/stream.ts:130

订阅特定的工作流事件。

T

WorkflowEvent<T>

要监听的事件类型

(event) => void

处理事件的函数

取消订阅功能

(): void

void

const unsubscribe = stream.on(UserEvent, (event) => \{
console.log('User event:', event.data);
\});
// Later...
unsubscribe();

static fromReadableStream<T>(stream): WorkflowStream<T>

定义于:core/src/core/stream.ts:192

从标准可读流创建一个工作流流。

T = any

ReadableStream<WorkflowEventData<any>>

要包装的可读流

WorkflowStream<T>

一个新的 WorkflowStream 实例


static fromResponse(response, eventMap): WorkflowStream<WorkflowEventData<any>>

定义于:core/src/core/stream.ts:214

从HTTP响应创建一个工作流流。

Response

包含工作流事件的HTTP响应

Record<string, WorkflowEvent<any>>

事件唯一标识符到事件构造函数的映射

WorkflowStream<WorkflowEventData<any>>

一个新的 WorkflowStream 实例


toResponse(init?, transformer?): R 继承 WorkflowEventData<any> ? Response : never

定义于:core/src/core/stream.ts:237

将数据流转换为HTTP响应。

ResponseInit

可选的ResponseInit参数

JsonEncodeTransform = ...

可选的自定义转换器(默认为JSON编码)

R 继承 WorkflowEventData<any> ? Response : never

包含流数据的HTTP响应


forEach(callback): Promise<void>

定义于:core/src/core/stream.ts:314

使用回调函数处理流中的每个项目。

(item) => void

为每个项目调用的函数

Promise<void>

当所有项目处理完成时解决的Promise

await stream.forEach(event => \{
console.log('Processing:', event);
\});

map<T>(callback): WorkflowStream<T>

定义于:core/src/core/stream.ts:338

转换流中的每个项目。

T

(item) => T

用于转换每个项目的函数

WorkflowStream<T>

一个包含转换后项目的新工作流流

const timestamps = stream.map(event => (\{
...event,
timestamp: Date.now()
\}));

take(limit): WorkflowStream<R>

定义于:core/src/core/stream.ts:369

仅从流中获取前N个项目。

number

最大可获取项目数量

WorkflowStream<R>

一个限制为指定项目数量的新 WorkflowStream

const firstTen = stream.take(10);
for await (const event of firstTen) \{
console.log(event);
\}

filter(predicate): WorkflowStream<R>

定义于:core/src/core/stream.ts:404

过滤数据流,仅包含符合谓词条件的条目。

R 扩展 WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never

按事件类型、函数或值进行筛选

WorkflowStream<R>

一个仅包含匹配项的新 WorkflowStream

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

filter(predicate): WorkflowStream<R>

定义于:core/src/core/stream.ts:409

过滤数据流,仅包含符合谓词条件的项目。

R

按事件类型、函数或值进行筛选

WorkflowStream<R>

一个仅包含匹配项的新 WorkflowStream

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

filter(predicate): WorkflowStream<R>

定义于:core/src/core/stream.ts:410

过滤数据流,仅包含符合谓词条件的项目。

(event) => boolean

按事件类型、函数或值进行筛选

WorkflowStream<R>

一个仅包含匹配项的新 WorkflowStream

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

until(predicate): WorkflowStream<R>

定义于:core/src/core/stream.ts:452

持续流式处理直到满足谓词条件,然后终止。

R 扩展 WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never

事件类型、函数或停止值

WorkflowStream<R>

当满足条件时终止的新工作流流

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

until(predicate): WorkflowStream<R>

定义于:core/src/core/stream.ts:457

继续流式处理直到满足谓词条件,然后终止。

(item) => boolean

事件类型、函数或停止值

WorkflowStream<R>

当满足条件时终止的新工作流流

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

until(item): WorkflowStream<R>

定义于:core/src/core/stream.ts:458

持续流式处理直到满足谓词条件,然后终止。

R

WorkflowStream<R>

当满足条件时终止的新工作流流

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

toArray(): Promise<R[]>

定义于:core/src/core/stream.ts:494

将流中的所有项目收集到一个数组中。

Promise<R[]>

Promise 解析为所有流项目的数组

const events = await stream.take(5).toArray();
console.log('Collected events:', events);