Skip to content

dspy.streamify

dspy.streamify(program: Module, status_message_provider: StatusMessageProvider | None = None, stream_listeners: list[StreamListener] | None = None, include_final_prediction_in_output_stream: bool = True, is_async_program: bool = False, async_streaming: bool = True) -> Callable[[Any, Any], Awaitable[Any]]

包装一个DSPy程序,使其能够增量式流式传输输出,而不是一次性返回所有输出。它还会向用户提供状态消息以指示程序进度,并且用户可以自行实现状态消息提供器来自定义状态消息以及指定生成状态消息的模块。

参数:

名称 类型 描述 默认值
program Module

要包装为流式功能的DSPy程序。

必填
status_message_provider StatusMessageProvider | None

一个自定义状态消息生成器,用于替代默认生成器。用户可以 实现自己的状态消息生成器,以自定义状态消息以及为哪些模块生成 状态消息。

None
stream_listeners list[StreamListener] | None

一个流监听器列表,用于捕获程序中子预测特定字段的流式输出。当提供时,只有目标预测中的目标字段会被流式传输给用户。

None
include_final_prediction_in_output_stream bool

是否在输出流中包含最终预测,仅在提供stream_listeners时有用。如果为False,最终预测将不会包含在输出流中。当程序命中缓存,或者没有监听器捕获到任何内容时,即使此值为False,最终预测仍将包含在输出流中。

True
is_async_program bool

程序是否为异步。如果False,程序将被asyncify包装, 否则程序将通过acall调用。

False
async_streaming bool

是否返回异步生成器或同步生成器。如果为False,流式传输将被转换为同步生成器。

True

返回:

类型 描述
Callable[[Any, Any], Awaitable[Any]]

一个函数,接收与原程序相同的参数,但返回一个异步生成器,该生成器会逐步产生程序的输出。

示例:

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"))

# Use the program with streaming output
async def use_streaming():
    output = program(q="Why did a chicken cross the kitchen?")
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

Example with custom status message provider:

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class MyStatusMessageProvider(StatusMessageProvider):
    def module_start_status_message(self, instance, inputs):
        return f"Predicting..."

    def tool_end_status_message(self, outputs):
        return f"Tool calling finished with output: {outputs}!"

# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())

# Use the program with streaming output
async def use_streaming():
    output = program(q="Why did a chicken cross the kitchen?")
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

使用流监听器的示例:

import asyncio
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))

# Create the program and wrap it with streaming functionality
predict = dspy.Predict("question->answer, reasoning")
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)

async def use_streaming():
    output = stream_predict(
        question="why did a chicken cross the kitchen?",
        include_final_prediction_in_output_stream=False,
    )
    return_value = None
    async for value in output:
        if isinstance(value, dspy.Prediction):
            return_value = value
        else:
            print(value)
    return return_value

output = asyncio.run(use_streaming())
print(output)

你应该会在控制台输出中看到流式传输的数据块(格式为dspy.streaming.StreamResponse)。

Source code in dspy/streaming/streamify.py
def streamify(
    program: "Module",
    status_message_provider: StatusMessageProvider | None = None,
    stream_listeners: list[StreamListener] | None = None,
    include_final_prediction_in_output_stream: bool = True,
    is_async_program: bool = False,
    async_streaming: bool = True,
) -> Callable[[Any, Any], Awaitable[Any]]:
    """
    Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them
    all at once. It also provides status messages to the user to indicate the progress of the program, and users
    can implement their own status message provider to customize the status messages and what module to generate
    status messages for.

    Args:
        program: The DSPy program to wrap with streaming functionality.
        status_message_provider: A custom status message generator to use instead of the default one. Users can
            implement their own status message generator to customize the status messages and what module to generate
            status messages for.
        stream_listeners: A list of stream listeners to capture the streaming output of specific fields of sub predicts
            in the program. When provided, only the target fields in the target predict will be streamed to the user.
        include_final_prediction_in_output_stream: Whether to include the final prediction in the output stream, only
            useful when `stream_listeners` is provided. If `False`, the final prediction will not be included in the
            output stream. When the program hit cache, or no listeners captured anything, the final prediction will
            still be included in the output stream even if this is `False`.
        is_async_program: Whether the program is async. If `False`, the program will be wrapped with `asyncify`,
            otherwise the program will be called with `acall`.
        async_streaming: Whether to return an async generator or a sync generator. If `False`, the streaming will be
            converted to a sync generator.

    Returns:
        A function that takes the same arguments as the original program, but returns an async
            generator that yields the program's outputs incrementally.

    Example:

    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
    # Create the program and wrap it with streaming functionality
    program = dspy.streamify(dspy.Predict("q->a"))

    # Use the program with streaming output
    async def use_streaming():
        output = program(q="Why did a chicken cross the kitchen?")
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    Example with custom status message provider:
    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

    class MyStatusMessageProvider(StatusMessageProvider):
        def module_start_status_message(self, instance, inputs):
            return f"Predicting..."

        def tool_end_status_message(self, outputs):
            return f"Tool calling finished with output: {outputs}!"

    # Create the program and wrap it with streaming functionality
    program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())

    # Use the program with streaming output
    async def use_streaming():
        output = program(q="Why did a chicken cross the kitchen?")
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    Example with stream listeners:

    ```python
    import asyncio
    import dspy

    dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))

    # Create the program and wrap it with streaming functionality
    predict = dspy.Predict("question->answer, reasoning")
    stream_listeners = [
        dspy.streaming.StreamListener(signature_field_name="answer"),
        dspy.streaming.StreamListener(signature_field_name="reasoning"),
    ]
    stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)

    async def use_streaming():
        output = stream_predict(
            question="why did a chicken cross the kitchen?",
            include_final_prediction_in_output_stream=False,
        )
        return_value = None
        async for value in output:
            if isinstance(value, dspy.Prediction):
                return_value = value
            else:
                print(value)
        return return_value

    output = asyncio.run(use_streaming())
    print(output)
    ```

    You should see the streaming chunks (in the format of `dspy.streaming.StreamResponse`) in the console output.
    """
    stream_listeners = stream_listeners or []
    if len(stream_listeners) > 0:
        predict_id_to_listener = find_predictor_for_stream_listeners(program, stream_listeners)
    else:
        predict_id_to_listener = {}

    if is_async_program:
        program = program.acall
    elif not iscoroutinefunction(program):
        program = asyncify(program)

    callbacks = settings.callbacks
    status_streaming_callback = StatusStreamingCallback(status_message_provider)
    if not any(isinstance(c, StatusStreamingCallback) for c in callbacks):
        callbacks.append(status_streaming_callback)

    async def generator(args, kwargs, stream: MemoryObjectSendStream):
        with settings.context(send_stream=stream, callbacks=callbacks, stream_listeners=stream_listeners):
            prediction = await program(*args, **kwargs)

        await stream.send(prediction)

    async def async_streamer(*args, **kwargs):
        send_stream, receive_stream = create_memory_object_stream(16)
        async with create_task_group() as tg, send_stream, receive_stream:
            tg.start_soon(generator, args, kwargs, send_stream)

            async for value in receive_stream:
                if isinstance(value, ModelResponseStream):
                    if len(predict_id_to_listener) == 0:
                        # No listeners are configured, yield the chunk directly for backwards compatibility.
                        yield value
                    else:
                        # We are receiving a chunk from the LM's response stream, delegate it to the listeners to
                        # determine if we should yield a value to the user.
                        output = None
                        for listener in predict_id_to_listener[value.predict_id]:
                            # There should be at most one listener provides a return value.
                            output = listener.receive(value) or output
                        if output:
                            yield output
                elif isinstance(value, StatusMessage):
                    yield value
                elif isinstance(value, Prediction):
                    if include_final_prediction_in_output_stream:
                        yield value
                    elif (
                        len(stream_listeners) == 0
                        or any(listener.cache_hit for listener in stream_listeners)
                        or not any(listener.stream_start for listener in stream_listeners)
                    ):
                        yield value
                    return
                else:
                    # This wildcard case allows for customized streaming behavior.
                    # It is useful when a users have a custom LM which returns stream chunks in a custom format.
                    # We let those chunks pass through to the user to handle them as needed.
                    yield value

    if async_streaming:
        return async_streamer
    else:

        def sync_streamer(*args, **kwargs):
            output = async_streamer(*args, **kwargs)
            return apply_sync_streaming(output)

        return sync_streamer

:::

优云智算