教程:部署你的DSPy程序
本指南演示了两种在生产环境中部署DSPy程序的潜在方式:使用FastAPI进行轻量级部署,以及使用MLflow进行更具生产级水平的部署,包括程序版本控制和管理。
下面,我们假设您拥有以下简单的DSPY程序,并希望部署它。您可以将其替换为更复杂的程序。
import dspy
dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))
dspy_program = dspy.ChainOfThought("question -> answer")
使用FastAPI进行部署
FastAPI 提供了一种直接的方式,将您的 DSPy 程序作为 REST API 提供服务。当您可以直接访问程序代码并需要轻量级部署解决方案时,这是理想的选择。
让我们创建一个FastAPI应用程序来提供上面定义的dspy_program。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import dspy
app = FastAPI(
title="DSPy Program API",
description="A simple API serving a DSPy Chain of Thought program",
version="1.0.0"
)
# Define request model for better documentation and validation
class Question(BaseModel):
text: str
# Configure your language model and 'asyncify' your DSPy program.
lm = dspy.LM("openai/gpt-4o-mini")
dspy.settings.configure(lm=lm, async_max_workers=4) # default is 8
dspy_program = dspy.ChainOfThought("question -> answer")
dspy_program = dspy.asyncify(dspy_program)
@app.post("/predict")
async def predict(question: Question):
try:
result = await dspy_program(question=question.text)
return {
"status": "success",
"data": result.toDict()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
在上面的代码中,我们调用dspy.asyncify将dspy程序转换为异步模式运行,以实现高吞吐量的FastAPI部署。目前,这会在一个单独的线程中运行dspy程序并等待其结果。
默认情况下,生成的线程限制为8。可以将其视为一个工作池。
如果你有8个正在运行的程序,并再次调用它,第9次调用将等待直到8个中的一个返回。
你可以使用新的async_max_workers设置来配置异步容量。
流式处理,在DSPY 2.6.0+版本中
DSPy 2.6.0+ 版本也支持流式传输,可以通过 pip install -U dspy 安装。
我们可以使用dspy.streamify将dspy程序转换为流式模式。当您希望在最终预测准备好之前向客户端流式传输中间输出(即O1-style推理)时,这非常有用。它在底层使用asyncify并继承了执行语义。
dspy_program = dspy.asyncify(dspy.ChainOfThought("question -> answer"))
streaming_dspy_program = dspy.streamify(dspy_program)
@app.post("/predict/stream")
async def stream(question: Question):
async def generate():
async for value in streaming_dspy_program(question=question.text):
if isinstance(value, dspy.Prediction):
data = {"prediction": value.labels().toDict()}
elif isinstance(value, litellm.ModelResponse):
data = {"chunk": value.json()}
yield f"data: {ujson.dumps(data)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
# Since you're often going to want to stream the result of a DSPy program as server-sent events,
# we've included a helper function for that, which is equivalent to the code above.
from dspy.utils.streaming import streaming_response
@app.post("/predict/stream")
async def stream(question: Question):
stream = streaming_dspy_program(question=question.text)
return StreamingResponse(streaming_response(stream), media_type="text/event-stream")
将你的代码写入文件,例如 fastapi_dspy.py。然后你可以通过以下方式启动应用:
它将在 http://127.0.0.1:8000/ 启动一个本地服务器。你可以使用下面的 Python 代码进行测试:
import requests
response = requests.post(
"http://127.0.0.1:8000/predict",
json={"text": "What is the capital of France?"}
)
print(response.json())
您应该会看到如下响应:
{
"status": "success",
"data": {
"reasoning": "The capital of France is a well-known fact, commonly taught in geography classes and referenced in various contexts. Paris is recognized globally as the capital city, serving as the political, cultural, and economic center of the country.",
"answer": "The capital of France is Paris."
}
}
使用MLflow部署
如果您希望打包您的DSPy程序并在隔离环境中部署,我们推荐使用MLflow进行部署。 MLflow是一个流行的机器学习工作流管理平台,包含版本控制、追踪和部署功能。
让我们启动MLflow追踪服务器,我们将在此存储我们的DSPy程序。以下命令将在http://127.0.0.1:5000/启动一个本地服务器。
然后我们可以定义DSPy程序并将其记录到MLflow服务器中。"log"在MLflow中是一个重载术语,基本上意味着
我们将程序信息以及环境需求存储在MLflow服务器中。这是通过mlflow.dspy.log_model()
函数完成的,请查看以下代码:
[!注意] 截至 MLflow 2.22.0 版本,存在一个注意事项:在使用 MLflow 部署时,必须将您的 DSPy 程序封装在一个自定义的 DSPy 模块类中。 这是因为 MLflow 要求使用位置参数,而 DSPy 预构建模块不允许使用位置参数,例如
dspy.Predict或dspy.ChainOfThought。为了解决这个问题,请创建一个继承自dspy.Module的包装类,并在forward()方法中实现您的程序逻辑,如下例所示。
import dspy
import mlflow
mlflow.set_tracking_uri("http://127.0.0.1:5000/")
mlflow.set_experiment("deploy_dspy_program")
lm = dspy.LM("openai/gpt-4o-mini")
dspy.settings.configure(lm=lm)
class MyProgram(dspy.Module):
def __init__(self):
super().__init__()
self.cot = dspy.ChainOfThought("question -> answer")
def forward(self, messages):
return self.cot(question=messages[0]["content"])
dspy_program = MyProgram()
with mlflow.start_run():
mlflow.dspy.log_model(
dspy_program,
"dspy_program",
input_example={"messages": [{"role": "user", "content": "What is LLM agent?"}]},
task="llm/v1/chat",
)
我们建议您设置task="llm/v1/chat",这样部署的程序会自动以与OpenAI聊天API相同的格式接收输入并生成输出,这是LM应用程序的通用接口。将上述代码写入一个文件,例如mlflow_dspy.py,然后运行它。
在您记录程序后,您可以在MLflow UI中查看保存的信息。打开http://127.0.0.1:5000/并选择
deploy_dspy_program实验,然后选择您刚刚创建的运行,在Artifacts标签下,您应该看到
记录的程序信息,类似于以下截图:

从用户界面获取您的运行ID(或者执行mlflow_dspy.py时控制台打印的信息),现在您可以使用以下命令部署已记录的程序:
程序部署完成后,您可以使用以下命令进行测试:
> curl http://127.0.0.1:6000/invocations -H "Content-Type:application/json" --data '{"messages": [{"content": "what is 2 + 2?", "role": "user"}]}'
你应该看到类似以下的响应:
{
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "{\"reasoning\": \"The question asks for the sum of 2 and 2. To find the answer, we simply add the two numbers together: 2 + 2 = 4.\", \"answer\": \"4\"}"
},
"finish_reason": "stop"
}
]
}
关于如何用MLflow部署DSPy程序以及如何自定义部署的完整指南,请参阅 MLflow文档。
MLflow 部署最佳实践
- 环境管理: 始终在
conda.yaml或requirements.txt文件中指定您的Python依赖项。 - 版本控制: 为您的模型版本使用有意义的标签和描述。
- 输入验证: 定义清晰的输入模式和示例。
- 监控: 为生产部署设置适当的日志记录和监控。
对于生产部署,请考虑使用MLflow与容器化:
> mlflow models build-docker -m "runs:/{run_id}/model" -n "dspy-program"
> docker run -p 6000:8080 dspy-program
有关生产部署选项和最佳实践的完整指南,请参阅 MLflow 文档。