dspy.streamify
dspy.streamify(program: Module, status_message_provider: StatusMessageProvider | None = None, stream_listeners: list[StreamListener] | None = None, include_final_prediction_in_output_stream: bool = True, is_async_program: bool = False, async_streaming: bool = True) -> Callable[[Any, Any], Awaitable[Any]]
包装一个DSPy程序,使其能够增量式流式传输输出,而不是一次性返回所有输出。它还会向用户提供状态消息以指示程序进度,并且用户可以自行实现状态消息提供器来自定义状态消息以及指定生成状态消息的模块。
参数:
| 名称 | 类型 | 描述 | 默认值 |
|---|---|---|---|
program
|
Module
|
要包装为流式功能的DSPy程序。 |
必填 |
status_message_provider
|
StatusMessageProvider | None
|
一个自定义状态消息生成器,用于替代默认生成器。用户可以 实现自己的状态消息生成器,以自定义状态消息以及为哪些模块生成 状态消息。 |
None
|
stream_listeners
|
list[StreamListener] | None
|
一个流监听器列表,用于捕获程序中子预测特定字段的流式输出。当提供时,只有目标预测中的目标字段会被流式传输给用户。 |
None
|
include_final_prediction_in_output_stream
|
bool
|
是否在输出流中包含最终预测,仅在提供 |
True
|
is_async_program
|
bool
|
程序是否为异步。如果 |
False
|
async_streaming
|
bool
|
是否返回异步生成器或同步生成器。如果为 |
True
|
返回:
| 类型 | 描述 |
|---|---|
Callable[[Any, Any], Awaitable[Any]]
|
一个函数,接收与原程序相同的参数,但返回一个异步生成器,该生成器会逐步产生程序的输出。 |
示例:
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"))
# Use the program with streaming output
async def use_streaming():
output = program(q="Why did a chicken cross the kitchen?")
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
Example with custom status message provider:
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
class MyStatusMessageProvider(StatusMessageProvider):
def module_start_status_message(self, instance, inputs):
return f"Predicting..."
def tool_end_status_message(self, outputs):
return f"Tool calling finished with output: {outputs}!"
# Create the program and wrap it with streaming functionality
program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider())
# Use the program with streaming output
async def use_streaming():
output = program(q="Why did a chicken cross the kitchen?")
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
使用流监听器的示例:
import asyncio
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False))
# Create the program and wrap it with streaming functionality
predict = dspy.Predict("question->answer, reasoning")
stream_listeners = [
dspy.streaming.StreamListener(signature_field_name="answer"),
dspy.streaming.StreamListener(signature_field_name="reasoning"),
]
stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners)
async def use_streaming():
output = stream_predict(
question="why did a chicken cross the kitchen?",
include_final_prediction_in_output_stream=False,
)
return_value = None
async for value in output:
if isinstance(value, dspy.Prediction):
return_value = value
else:
print(value)
return return_value
output = asyncio.run(use_streaming())
print(output)
你应该会在控制台输出中看到流式传输的数据块(格式为dspy.streaming.StreamResponse)。
Source code in dspy/streaming/streamify.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | |
:::