流式处理
在本指南中,我们将引导您了解如何在您的DSPy程序中启用流式处理。DSPy流式处理包含两个部分:
- 输出令牌流式传输: 在生成单个令牌时进行流式传输,而不是等待完整响应。
- 中间状态流式传输: 提供程序执行状态的实时更新(例如:"正在调用网络搜索...", "正在处理结果...")。
输出令牌流式传输
DSPy的令牌流功能适用于您流水线中的任何模块,而不仅仅是最终输出。唯一的要求是流式传输的字段必须是str类型。要启用令牌流:
- 使用
dspy.streamify包装你的程序 - 创建一个或多个
dspy.streaming.StreamListener对象来指定要流式传输的字段
这是一个基本示例:
import os
import dspy
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")
# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
predict,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)
要消费流式输出:
import asyncio
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")
async for chunk in output_stream:
print(chunk)
asyncio.run(read_output_stream())
这将产生类似以下的输出:
StreamResponse(predict_name='self', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' other')
StreamResponse(predict_name='self', signature_field_name='answer', chunk=' side of the frying pan!')
Prediction(
answer='To get to the other side of the frying pan!'
)
注意:由于 dspy.streamify 返回一个异步生成器,你必须在异步上下文中使用它。如果你使用的是像 Jupyter 或 Google Colab 这样已经有事件循环(异步上下文)的环境,你可以直接使用生成器。
您可能已经注意到上述流式传输包含两个不同的实体:StreamResponse
和Prediction。StreamResponse是对监听字段上流式令牌的包装器,在
此示例中为answer字段。Prediction是程序的最终输出。在DSPy中,流式传输是
以边车方式实现的:我们在LM上启用流式传输,以便LM输出令牌流。我们将这些
令牌发送到一个边信道,该信道由用户定义的监听器持续读取。监听器不断解释
流,并判断其正在监听的signature_field_name是否已开始出现并已完成。
一旦确定字段出现,监听器开始向异步生成器输出令牌,用户可以
读取。监听器的内部机制根据背后的适配器而变化,并且由于通常
在看到下一个字段之前无法确定字段是否已完成,因此监听器在发送到最终生成器之前会缓冲输出令牌,
这就是为什么您通常会看到类型为StreamResponse的最后一块
包含多个令牌。程序的输出也会写入流中,即上述示例输出中的Prediction
块。
为了处理这些不同类型并实现自定义逻辑:
import asyncio
async def read_output_stream():
output_stream = stream_predict(question="Why did a chicken cross the kitchen?")
async for chunk in output_stream:
return_value = None
if isinstance(chunk, dspy.streaming.StreamResponse):
print(f"Output token of field {chunk.signature_field_name}: {chunk.chunk}")
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
理解 StreamResponse
StreamResponse (dspy.streaming.StreamResponse) 是流式令牌的包装类。它包含3个字段:
predict_name: 持有signature_field_name的预测名称。该名称与运行your_program.named_predictors()时键的名称相同。在上面的代码中,由于answer来自predict本身,因此predict_name显示为self,这是运行predict.named_predictors()时的唯一键。signature_field_name: 这些令牌映射到的输出字段。predict_name和signature_field_name共同构成该字段的唯一标识符。我们将在本指南后续部分演示如何处理多字段流式传输 和重复字段名称的情况。chunk: 流块的值。
流式处理与缓存
当找到缓存结果时,数据流将跳过单个令牌,仅输出最终的Prediction。例如:
流式处理多个字段
你可以通过为每个字段创建一个StreamListener来监控多个字段。以下是一个多模块程序的示例:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict1 = dspy.Predict("question->answer")
self.predict2 = dspy.Predict("answer->simplified_answer")
def forward(self, question: str, **kwargs):
answer = self.predict1(question=question)
simplified_answer = self.predict2(answer=answer)
return simplified_answer
predict = MyModule()
stream_listeners = [
dspy.streaming.StreamListener(signature_field_name="answer"),
dspy.streaming.StreamListener(signature_field_name="simplified_answer"),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
)
async def read_output_stream():
output = stream_predict(question="why did a chicken cross the kitchen?")
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
输出将如下所示:
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk='To')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' reach')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' other side of the recipe!')
Final output: Prediction(
simplified_answer='To reach the other side of the recipe!'
)
多次流式传输同一字段(如在dspy.ReAct中)
默认情况下,StreamListener在完成单个流式会话后会自动关闭。
这种设计有助于防止性能问题,因为每个令牌都会广播给所有已配置的流监听器,
而拥有过多活跃监听器可能会带来显著的开销。
然而,在循环中重复使用DSPy模块的场景下——比如使用dspy.ReAct时——你可能希望每次使用时都能从每个预测中流式传输相同的字段。要启用此行为,请在创建StreamListener时设置allow_reuse=True。参见以下示例:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
def fetch_user_info(user_name: str):
"""Get user information like name, birthday, etc."""
return {
"name": user_name,
"birthday": "2009-05-16",
}
def get_sports_news(year: int):
"""Get sports news for a given year."""
if year == 2009:
return "Usane Bolt broke the world record in the 100m race."
return None
react = dspy.ReAct("question->answer", tools=[fetch_user_info, get_sports_news])
stream_listeners = [
# dspy.ReAct has a built-in output field called "next_thought".
dspy.streaming.StreamListener(signature_field_name="next_thought", allow_reuse=True),
]
stream_react = dspy.streamify(react, stream_listeners=stream_listeners)
async def read_output_stream():
output = stream_react(question="What sports news happened in the year Adam was born?")
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
return return_value
print(asyncio.run(read_output_stream()))
在本示例中,通过在StreamListener中设置allow_reuse=True,你可以确保"next_thought"的流式处理在每次迭代中都可用,而不仅仅是第一次。当你运行这段代码时,你会看到每次生成该字段时,next_thought的流式令牌都会被输出。
处理重复字段名
当从不同模块流式传输同名字段时,请在StreamListener中同时指定predict和predict_name:
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.predict1 = dspy.Predict("question->answer")
self.predict2 = dspy.Predict("question, answer->answer, score")
def forward(self, question: str, **kwargs):
answer = self.predict1(question=question)
simplified_answer = self.predict2(answer=answer)
return simplified_answer
predict = MyModule()
stream_listeners = [
dspy.streaming.StreamListener(
signature_field_name="answer",
predict=predict.predict1,
predict_name="predict1"
),
dspy.streaming.StreamListener(
signature_field_name="answer",
predict=predict.predict2,
predict_name="predict2"
),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
)
async def read_output_stream():
output = stream_predict(question="why did a chicken cross the kitchen?")
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
输出将类似于:
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk="I'm")
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' ready')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' assist')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk=' you')
StreamResponse(predict_name='predict2', signature_field_name='answer', chunk='! Please provide a question.')
Final output: Prediction(
answer="I'm ready to assist you! Please provide a question.",
score='N/A'
)
中间状态流式传输
状态流让用户了解程序的进度,对于长时间运行的操作(如工具调用或复杂AI管道)尤其有用。要实现状态流:
- 通过子类化
dspy.streaming.StatusMessageProvider创建一个自定义状态消息提供器 - 重写所需的钩子方法以提供自定义状态消息
- 将你的提供者传递给
dspy.streamify
示例:
class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
def lm_start_status_message(self, instance, inputs):
return f"Calling LM with inputs {inputs}..."
def lm_end_status_message(self, outputs):
return f"Tool finished with output: {outputs}!"
可用的钩子:
- lm_start_status_message: 调用dspy.LM开始时的状态消息。
- lm_end_status_message: 调用dspy.LM结束时的状态消息。
- module_start_status_message: 调用dspy.Module开始时的状态消息。
- module_end_status_message: 在调用dspy.Module开始时显示的状态消息。
- tool_start_status_message: 调用dspy.Tool开始时的状态消息。
- tool_end_status_message: 调用dspy.Tool结束时的状态消息。
每个钩子应返回包含状态消息的字符串。
创建消息提供者后,只需将其传递给dspy.streamify,即可同时启用状态消息流式传输和输出令牌流式传输。请查看以下示例。中间状态消息在类dspy.streaming.StatusMessage中表示,因此我们需要另一个条件检查来捕获它。
import asyncio
import dspy
lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)
class MyModule(dspy.Module):
def __init__(self):
super().__init__()
self.tool = dspy.Tool(lambda x: 2 * x, name="double_the_number")
self.predict = dspy.ChainOfThought("num1, num2->sum")
def forward(self, num, **kwargs):
num2 = self.tool(x=num)
return self.predict(num1=num, num2=num2)
class MyStatusMessageProvider(dspy.streaming.StatusMessageProvider):
def tool_start_status_message(self, instance, inputs):
return f"Calling Tool {instance.name} with inputs {inputs}..."
def tool_end_status_message(self, outputs):
return f"Tool finished with output: {outputs}!"
predict = MyModule()
stream_listeners = [
# dspy.ChainOfThought has a built-in output field called "reasoning".
dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(
predict,
stream_listeners=stream_listeners,
status_message_provider=MyStatusMessageProvider(),
)
async def read_output_stream():
output = stream_predict(num=3)
return_value = None
async for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
return_value = chunk
elif isinstance(chunk, dspy.streaming.StatusMessage):
print(chunk)
return return_value
program_output = asyncio.run(read_output_stream())
print("Final output: ", program_output)
示例输出:
StatusMessage(message='Calling tool double_the_number...')
StatusMessage(message='Tool calling finished! Querying the LLM with tool calling results...')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='To')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' find')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' sum')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' of')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' the')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' two')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' numbers')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' we')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' simply')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' add')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' them')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' together')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='.')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' Here')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=',')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' ')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk='3')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' plus')
StreamResponse(predict_name='predict.predict', signature_field_name='reasoning', chunk=' 6 equals 9.')
Final output: Prediction(
reasoning='To find the sum of the two numbers, we simply add them together. Here, 3 plus 6 equals 9.',
sum='9'
)
同步流处理
默认情况下,调用流式化的DSPy程序会产生异步生成器。为了获取同步生成器,您可以设置标志async_streaming=False:
import os
import dspy
os.environ["OPENAI_API_KEY"] = "your_api_key"
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
predict = dspy.Predict("question->answer")
# Enable streaming for the 'answer' field
stream_predict = dspy.streamify(
predict,
stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
async_streaming=False,
)
output = stream_predict(question="why did a chicken cross the kitchen?")
program_output = None
for chunk in output:
if isinstance(chunk, dspy.streaming.StreamResponse):
print(chunk)
elif isinstance(chunk, dspy.Prediction):
program_output = chunk
print(f"Program output: {program_output}")