Skip to content

流式处理

在本指南中,我们将引导您了解如何在您的DSPy程序中启用流式处理。DSPy流式处理包含两个部分:

  • 输出令牌流式传输: 在生成单个令牌时进行流式传输,而不是等待完整响应。
  • 中间状态流式传输: 提供程序执行状态的实时更新(例如:"正在调用网络搜索...", "正在处理结果...")。

输出令牌流式传输

DSPy的令牌流功能适用于您流水线中的任何模块,而不仅仅是最终输出。唯一的要求是流式传输的字段必须是str类型。要启用令牌流:

  1. 使用dspy.streamify包装你的程序
  2. 创建一个或多个 dspy.streaming.StreamListener 对象来指定要流式传输的字段

这是一个基本示例:

import os

import dspy

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

predict = dspy.Predict("question->answer")

# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
    predict,
    stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)

要消费流式输出:

import asyncio

async def read_output_stream():
    output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

    async for chunk in output_stream:
        print(chunk)

asyncio.run(read_output_stream())

这将产生类似以下的输出:

StreamResponse(predict_name='self', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' other')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' side of the frying pan!')
Prediction(
    answer='To get to the other side of the frying pan!'
)

注意:由于 dspy.streamify 返回一个异步生成器,你必须在异步上下文中使用它。如果你使用的是像 Jupyter 或 Google Colab 这样已经有事件循环(异步上下文)的环境,你可以直接使用生成器。

您可能已经注意到上述流式传输包含两个不同的实体:StreamResponsePredictionStreamResponse是对监听字段上流式令牌的包装器,在 此示例中为answer字段。Prediction是程序的最终输出。在DSPy中,流式传输是 以边车方式实现的:我们在LM上启用流式传输,以便LM输出令牌流。我们将这些 令牌发送到一个边信道,该信道由用户定义的监听器持续读取。监听器不断解释 流,并判断其正在监听的signature_field_name是否已开始出现并已完成。 一旦确定字段出现,监听器开始向异步生成器输出令牌,用户可以 读取。监听器的内部机制根据背后的适配器而变化,并且由于通常 在看到下一个字段之前无法确定字段是否已完成,因此监听器在发送到最终生成器之前会缓冲输出令牌, 这就是为什么您通常会看到类型为StreamResponse的最后一块 包含多个令牌。程序的输出也会写入流中,即上述示例输出中的Prediction 块。

为了处理这些不同类型并实现自定义逻辑:

import asyncio

async def read_output_stream():
  output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

  async for chunk in output_stream:
    return_value = None
    if isinstance(chunk, dspy.streaming.StreamResponse):
      print(f"Output token of field {chunk.signature_field_name}: {chunk.chunk}")
    elif isinstance(chunk, dspy.Prediction):
      return_value = chunk


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

理解 StreamResponse

StreamResponse (dspy.streaming.StreamResponse) 是流式令牌的包装类。它包含3个字段:

  • predict_name: 持有signature_field_name的预测名称。该名称与运行your_program.named_predictors()时键的名称相同。在上面的代码中,由于answer来自predict本身,因此predict_name显示为self,这是运行predict.named_predictors()时的唯一键。
  • signature_field_name: 这些令牌映射到的输出字段。predict_namesignature_field_name 共同构成该字段的唯一标识符。我们将在本指南后续部分演示如何处理多字段流式传输 和重复字段名称的情况。
  • chunk: 流块的值。

流式处理与缓存

当找到缓存结果时,数据流将跳过单个令牌,仅输出最终的Prediction。例如:

Prediction(
    answer='To get to the other side of the dinner plate!'
)

流式处理多个字段

你可以通过为每个字段创建一个StreamListener来监控多个字段。以下是一个多模块程序的示例:

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("answer->simplified_answer")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        simplified_answer = self.predict2(answer=answer)
        return simplified_answer


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="simplified_answer"),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
)

async def read_output_stream():
    output = stream_predict(question="why did a chicken cross the kitchen?")

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
    return return_value

program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

输出将如下所示:

StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk='To')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' reach')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' other side of the recipe!')
Final output:  Prediction(
    simplified_answer='To reach the other side of the recipe!'
)

多次流式传输同一字段(如在dspy.ReAct中)

默认情况下,StreamListener在完成单个流式会话后会自动关闭。 这种设计有助于防止性能问题,因为每个令牌都会广播给所有已配置的流监听器, 而拥有过多活跃监听器可能会带来显著的开销。

然而,在循环中重复使用DSPy模块的场景下——比如使用dspy.ReAct时——你可能希望每次使用时都能从每个预测中流式传输相同的字段。要启用此行为,请在创建StreamListener时设置allow_reuse=True。参见以下示例:

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


def fetch_user_info(user_name: str):
    """Get user information like name, birthday, etc."""
    return {
        "name": user_name,
        "birthday": "2009-05-16",
    }


def get_sports_news(year: int):
    """Get sports news for a given year."""
    if year == 2009:
        return "Usane Bolt broke the world record in the 100m race."
    return None


react = dspy.ReAct("question->answer", tools=[fetch_user_info, get_sports_news])

stream_listeners = [
    # dspy.ReAct has a built-in output field called "next_thought".
    dspy.streaming.StreamListener(signature_field_name="next_thought", allow_reuse=True),
]
stream_react = dspy.streamify(react, stream_listeners=stream_listeners)


async def read_output_stream():
    output = stream_react(question="What sports news happened in the year Adam was born?")
    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
    return return_value


print(asyncio.run(read_output_stream()))

在本示例中,通过在StreamListener中设置allow_reuse=True,你可以确保"next_thought"的流式处理在每次迭代中都可用,而不仅仅是第一次。当你运行这段代码时,你会看到每次生成该字段时,next_thought的流式令牌都会被输出。

处理重复字段名

当从不同模块流式传输同名字段时,请在StreamListener中同时指定predictpredict_name

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("question, answer->answer, score")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        simplified_answer = self.predict2(answer=answer)
        return simplified_answer


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(
        signature_field_name="answer",
        predict=predict.predict1,
        predict_name="predict1"
    ),
    dspy.streaming.StreamListener(
        signature_field_name="answer",
        predict=predict.predict2,
        predict_name="predict2"
    ),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
)


async def read_output_stream():
    output = stream_predict(question="why did a chicken cross the kitchen?")

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
    return return_value


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

输出将类似于:

StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk="I'm")
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' ready')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' assist')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' you')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk='! Please provide a question.')
Final output:  Prediction(
    answer="I'm ready to assist you! Please provide a question.",
    score='N/A'
)

中间状态流式传输

状态流让用户了解程序的进度,对于长时间运行的操作(如工具调用或复杂AI管道)尤其有用。要实现状态流:

  1. 通过子类化 dspy.streaming.StatusMessageProvider 创建一个自定义状态消息提供器
  2. 重写所需的钩子方法以提供自定义状态消息
  3. 将你的提供者传递给 dspy.streamify

示例:

class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
    def lm_start_status_message(self, instance, inputs):
        return f"Calling LM with inputs {inputs}..."

    def lm_end_status_message(self, outputs):
        return f"Tool finished with output: {outputs}!"

可用的钩子:

  • lm_start_status_message: 调用dspy.LM开始时的状态消息。
  • lm_end_status_message: 调用dspy.LM结束时的状态消息。
  • module_start_status_message: 调用dspy.Module开始时的状态消息。
  • module_end_status_message: 在调用dspy.Module开始时显示的状态消息。
  • tool_start_status_message: 调用dspy.Tool开始时的状态消息。
  • tool_end_status_message: 调用dspy.Tool结束时的状态消息。

每个钩子应返回包含状态消息的字符串。

创建消息提供者后,只需将其传递给dspy.streamify,即可同时启用状态消息流式传输和输出令牌流式传输。请查看以下示例。中间状态消息在类dspy.streaming.StatusMessage中表示,因此我们需要另一个条件检查来捕获它。

import asyncio

import dspy

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.tool = dspy.Tool(lambda x: 2 * x, name="double_the_number")
        self.predict = dspy.ChainOfThought("num1, num2->sum")

    def forward(self, num, **kwargs):
        num2 = self.tool(x=num)
        return self.predict(num1=num, num2=num2)


class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
    def tool_start_status_message(self, instance, inputs):
        return f"Calling Tool {instance.name} with inputs {inputs}..."

    def tool_end_status_message(self, outputs):
        return f"Tool finished with output: {outputs}!"


predict = MyModule()
stream_listeners = [
    # dspy.ChainOfThought has a built-in output field called "reasoning".
    dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(
    predict,
    stream_listeners=stream_listeners,
    status_message_provider=MyStatusMessageProvider(),
)


async def read_output_stream():
    output = stream_predict(num=3)

    return_value = None
    async for chunk in output:
        if isinstance(chunk, dspy.streaming.StreamResponse):
            print(chunk)
        elif isinstance(chunk, dspy.Prediction):
            return_value = chunk
        elif isinstance(chunk, dspy.streaming.StatusMessage):
            print(chunk)
    return return_value


program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)

示例输出:

StatusMessage(message='Calling tool double_the_number...')
StatusMessage(message='Tool calling finished! Querying the LLM with tool calling results...')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='To')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' find')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' sum')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' two')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' numbers')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' we')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' simply')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' add')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' them')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' together')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='.')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' Here')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' ')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='3')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' plus')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' 6 equals 9.')
Final output:  Prediction(
    reasoning='To find the sum of the two numbers, we simply add them together. Here, 3 plus 6 equals 9.',
    sum='9'
)

同步流处理

默认情况下,调用流式化的DSPy程序会产生异步生成器。为了获取同步生成器,您可以设置标志async_streaming=False

import os

import dspy

os.environ["OPENAI_API_KEY"] = "your_api_key"

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

predict = dspy.Predict("question->answer")

# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
    predict,
    stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
    async_streaming=False,
)

output = stream_predict(question="why did a chicken cross the kitchen?")

program_output = None
for chunk in output:
    if isinstance(chunk, dspy.streaming.StreamResponse):
        print(chunk)
    elif isinstance(chunk, dspy.Prediction):
        program_output = chunk
print(f"Program output: {program_output}")
优云智算