WorkflowStream
定义于:core/src/core/stream.ts:106
用于处理工作流事件的响应式流。
WorkflowStream 扩展了标准 ReadableStream,为过滤、转换和消费工作流事件提供专门方法。 它支持响应式模式,可用于构建复杂的事件处理流水线。
// Get stream from workflow contextconst stream = context.stream;
// Filter for specific eventsconst userEvents = stream.filter(UserEvent);
// Transform eventsconst processed = stream.map(event => (\{ type: event.constructor.name, timestamp: Date.now(), data: event.data\}));
// Consume eventsfor await (const event of stream.take(10)) \{ console.log('Received:', event);\}ReadableStream<R>
R = any
流经数据流的类型
AsyncIterable<R>
新工作流流<
R>(subscribable,rootStream):WorkflowStream<R>
定义于:core/src/core/stream.ts:141
Subscribable<[R], void>
rootStream
Section titled “rootStream”ReadableStream<R>
WorkflowStream<R>
ReadableStream<R>.constructor
新工作流流<
R>(subscribable,rootStream):WorkflowStream<R>
定义于:core/src/core/stream.ts:145
Subscribable<[R], void>
rootStream
Section titled “rootStream”null
WorkflowStream<R>
ReadableStream<R>.constructor
新工作流流<
R>(subscribable,rootStream):WorkflowStream<R>
定义于:core/src/core/stream.ts:146
null
rootStream
Section titled “rootStream”null | ReadableStream<R>
WorkflowStream<R>
ReadableStream<R>.constructor
开启<
T>(event,handler): () =>void
定义于:core/src/core/stream.ts:130
订阅特定的工作流事件。
T
要监听的事件类型
(event) => void
处理事件的函数
取消订阅功能
():
void
void
const unsubscribe = stream.on(UserEvent, (event) => \{ console.log('User event:', event.data);\});
// Later...unsubscribe();fromReadableStream()
Section titled “fromReadableStream()”
staticfromReadableStream<T>(stream):WorkflowStream<T>
定义于:core/src/core/stream.ts:192
从标准可读流创建一个工作流流。
T = any
ReadableStream<WorkflowEventData<any>>
要包装的可读流
WorkflowStream<T>
一个新的 WorkflowStream 实例
fromResponse()
Section titled “fromResponse()”
staticfromResponse(response,eventMap):WorkflowStream<WorkflowEventData<any>>
定义于:core/src/core/stream.ts:214
从HTTP响应创建一个工作流流。
Response
包含工作流事件的HTTP响应
eventMap
Section titled “eventMap”Record<string, WorkflowEvent<any>>
事件唯一标识符到事件构造函数的映射
WorkflowStream<WorkflowEventData<any>>
一个新的 WorkflowStream 实例
toResponse()
Section titled “toResponse()”toResponse(
init?,transformer?):R继承WorkflowEventData<any> ?Response:never
定义于:core/src/core/stream.ts:237
将数据流转换为HTTP响应。
ResponseInit
可选的ResponseInit参数
transformer?
Section titled “transformer?”JsonEncodeTransform = ...
可选的自定义转换器(默认为JSON编码)
R 继承 WorkflowEventData<any> ? Response : never
包含流数据的HTTP响应
forEach()
Section titled “forEach()”forEach(
callback):Promise<void>
定义于:core/src/core/stream.ts:314
使用回调函数处理流中的每个项目。
(item) => void
为每个项目调用的函数
Promise<void>
当所有项目处理完成时解决的Promise
await stream.forEach(event => \{ console.log('Processing:', event);\});map<
T>(callback):WorkflowStream<T>
定义于:core/src/core/stream.ts:338
转换流中的每个项目。
T
(item) => T
用于转换每个项目的函数
WorkflowStream<T>
一个包含转换后项目的新工作流流
const timestamps = stream.map(event => (\{ ...event, timestamp: Date.now()\}));take()
Section titled “take()”take(
limit):WorkflowStream<R>
定义于:core/src/core/stream.ts:369
仅从流中获取前N个项目。
number
最大可获取项目数量
WorkflowStream<R>
一个限制为指定项目数量的新 WorkflowStream
const firstTen = stream.take(10);for await (const event of firstTen) \{ console.log(event);\}filter()
Section titled “filter()”filter(
predicate):WorkflowStream<R>
定义于:core/src/core/stream.ts:404
过滤数据流,仅包含符合谓词条件的条目。
R 扩展 WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never
按事件类型、函数或值进行筛选
WorkflowStream<R>
一个仅包含匹配项的新 WorkflowStream
// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);filter(
predicate):WorkflowStream<R>
定义于:core/src/core/stream.ts:409
过滤数据流,仅包含符合谓词条件的项目。
R
按事件类型、函数或值进行筛选
WorkflowStream<R>
一个仅包含匹配项的新 WorkflowStream
// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);filter(
predicate):WorkflowStream<R>
定义于:core/src/core/stream.ts:410
过滤数据流,仅包含符合谓词条件的项目。
(event) => boolean
按事件类型、函数或值进行筛选
WorkflowStream<R>
一个仅包含匹配项的新 WorkflowStream
// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);until()
Section titled “until()”until(
predicate):WorkflowStream<R>
定义于:core/src/core/stream.ts:452
持续流式处理直到满足谓词条件,然后终止。
R 扩展 WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never
事件类型、函数或停止值
WorkflowStream<R>
当满足条件时终止的新工作流流
// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);until(
predicate):WorkflowStream<R>
定义于:core/src/core/stream.ts:457
继续流式处理直到满足谓词条件,然后终止。
(item) => boolean
事件类型、函数或停止值
WorkflowStream<R>
当满足条件时终止的新工作流流
// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);until(
item):WorkflowStream<R>
定义于:core/src/core/stream.ts:458
持续流式处理直到满足谓词条件,然后终止。
R
WorkflowStream<R>
当满足条件时终止的新工作流流
// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);toArray()
Section titled “toArray()”toArray():
Promise<R[]>
定义于:core/src/core/stream.ts:494
将流中的所有项目收集到一个数组中。
Promise<R[]>
Promise 解析为所有流项目的数组
const events = await stream.take(5).toArray();console.log('Collected events:', events);