教程:在DSPy中使用MCP工具
MCP,全称为模型上下文协议(Model Context Protocol),是一种开放协议,用于标准化应用程序如何向大语言模型提供上下文。尽管存在一些开发开销,但MCP提供了一个宝贵的机会,无论您使用何种技术栈,都可以与其他开发者共享工具、资源和提示。同样,您也可以使用其他开发者构建的工具,而无需重写代码。
在本指南中,我们将引导您了解如何在DSPy中使用MCP工具。出于演示目的, 我们将构建一个航空服务智能体,帮助用户预订航班以及修改或取消 现有预订。这将依赖于一个带有自定义工具的MCP服务器,但应该很容易推广到 社区构建的MCP服务器。
如何运行本教程
本教程无法在托管的IPython笔记本(如Google Colab或Databricks笔记本)中运行。 要运行代码,您需要按照指南在本地设备上编写代码。代码 已在macOS上测试,在Linux环境中应该以相同的方式工作。
安装依赖项
开始之前,让我们先安装所需的依赖项:
MCP 服务器设置
首先,让我们为航空公司智能体设置MCP服务器,其中包含:
- 一组数据库
- 用户数据库,存储用户信息。
- 航班数据库,存储航班信息。
- 工单数据库,存储客户工单。
- 一组工具
- fetch_flight_info: 获取特定日期的航班信息。
- fetch_itinerary: 获取已预订行程的信息。
- book_itinerary: 代表用户预订航班。
- modify_itinerary: 修改行程,可通过航班变更或取消实现。
- get_user_info: 获取用户信息。
- file_ticket: 提交待办工单以获取人工协助。
在你的工作目录中,创建一个文件 mcp_server.py,并将以下内容粘贴到其中:
import random
import string
from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel
# Create an MCP server
mcp = FastMCP("Airline Agent")
class Date(BaseModel):
# Somehow LLM is bad at specifying `datetime.datetime`
year: int
month: int
day: int
hour: int
class UserProfile(BaseModel):
user_id: str
name: str
email: str
class Flight(BaseModel):
flight_id: str
date_time: Date
origin: str
destination: str
duration: float
price: float
class Itinerary(BaseModel):
confirmation_number: str
user_profile: UserProfile
flight: Flight
class Ticket(BaseModel):
user_request: str
user_profile: UserProfile
user_database = {
"Adam": UserProfile(user_id="1", name="Adam", email="adam@gmail.com"),
"Bob": UserProfile(user_id="2", name="Bob", email="bob@gmail.com"),
"Chelsie": UserProfile(user_id="3", name="Chelsie", email="chelsie@gmail.com"),
"David": UserProfile(user_id="4", name="David", email="david@gmail.com"),
}
flight_database = {
"DA123": Flight(
flight_id="DA123",
origin="SFO",
destination="JFK",
date_time=Date(year=2025, month=9, day=1, hour=1),
duration=3,
price=200,
),
"DA125": Flight(
flight_id="DA125",
origin="SFO",
destination="JFK",
date_time=Date(year=2025, month=9, day=1, hour=7),
duration=9,
price=500,
),
"DA456": Flight(
flight_id="DA456",
origin="SFO",
destination="SNA",
date_time=Date(year=2025, month=10, day=1, hour=1),
duration=2,
price=100,
),
"DA460": Flight(
flight_id="DA460",
origin="SFO",
destination="SNA",
date_time=Date(year=2025, month=10, day=1, hour=9),
duration=2,
price=120,
),
}
itinery_database = {}
ticket_database = {}
@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
"""Fetch flight information from origin to destination on the given date"""
flights = []
for flight_id, flight in flight_database.items():
if (
flight.date_time.year == date.year
and flight.date_time.month == date.month
and flight.date_time.day == date.day
and flight.origin == origin
and flight.destination == destination
):
flights.append(flight)
return flights
@mcp.tool()
def fetch_itinerary(confirmation_number: str):
"""Fetch a booked itinerary information from database"""
return itinery_database.get(confirmation_number)
@mcp.tool()
def pick_flight(flights: list[Flight]):
"""Pick up the best flight that matches users' request."""
sorted_flights = sorted(
flights,
key=lambda x: (
x.get("duration") if isinstance(x, dict) else x.duration,
x.get("price") if isinstance(x, dict) else x.price,
),
)
return sorted_flights[0]
def generate_id(length=8):
chars = string.ascii_lowercase + string.digits
return "".join(random.choices(chars, k=length))
@mcp.tool()
def book_itinerary(flight: Flight, user_profile: UserProfile):
"""Book a flight on behalf of the user."""
confirmation_number = generate_id()
while confirmation_number in itinery_database:
confirmation_number = generate_id()
itinery_database[confirmation_number] = Itinerary(
confirmation_number=confirmation_number,
user_profile=user_profile,
flight=flight,
)
return confirmation_number, itinery_database[confirmation_number]
@mcp.tool()
def cancel_itinerary(confirmation_number: str, user_profile: UserProfile):
"""Cancel an itinerary on behalf of the user."""
if confirmation_number in itinery_database:
del itinery_database[confirmation_number]
return
raise ValueError("Cannot find the itinerary, please check your confirmation number.")
@mcp.tool()
def get_user_info(name: str):
"""Fetch the user profile from database with given name."""
return user_database.get(name)
@mcp.tool()
def file_ticket(user_request: str, user_profile: UserProfile):
"""File a customer support ticket if this is something the agent cannot handle."""
ticket_id = generate_id(length=6)
ticket_database[ticket_id] = Ticket(
user_request=user_request,
user_profile=user_profile,
)
return ticket_id
if __name__ == "__main__":
mcp.run()
在启动服务器之前,我们先来看一下代码。
我们首先创建一个FastMCP实例,这是一个帮助快速构建MCP服务器的实用工具:
然后我们定义我们的数据结构,在实际应用中这将是数据库模式,例如:
class Flight(BaseModel):
flight_id: str
date_time: Date
origin: str
destination: str
duration: float
price: float
随后,我们初始化数据库实例。在实际应用中,这些应该是连接到真实数据库的连接器,但为简化起见,我们仅使用字典:
user_database = {
"Adam": UserProfile(user_id="1", name="Adam", email="adam@gmail.com"),
"Bob": UserProfile(user_id="2", name="Bob", email="bob@gmail.com"),
"Chelsie": UserProfile(user_id="3", name="Chelsie", email="chelsie@gmail.com"),
"David": UserProfile(user_id="4", name="David", email="david@gmail.com"),
}
下一步是定义工具并用 @mcp.tool() 标记它们,以便MCP客户端能够发现这些MCP工具:
@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
"""Fetch flight information from origin to destination on the given date"""
flights = []
for flight_id, flight in flight_database.items():
if (
flight.date_time.year == date.year
and flight.date_time.month == date.month
and flight.date_time.day == date.day
and flight.origin == origin
and flight.destination == destination
):
flights.append(flight)
return flights
最后一步是启动服务器:
现在我们已经完成了服务器的编写!让我们启动它:
编写一个利用MCP服务器中工具的DSPy程序
现在服务器已运行,让我们构建实际的航空公司服务智能体,它利用我们服务器中的MCP工具来协助用户。在你的工作目录中,创建一个名为dspy_mcp_agent.py的文件,并按照指南向其中添加代码。
从MCP服务器收集工具
我们首先需要从MCP服务器收集所有可用工具,并使它们能够被DSPy使用。DSPy提供了一个API dspy.Tool作为标准工具接口。让我们将所有MCP工具转换为dspy.Tool。
我们需要创建一个MCP客户端实例来与MCP服务器通信,获取所有可用工具,并使用静态方法from_mcp_tool将它们转换为dspy.Tool:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Create server parameters for stdio connection
server_params = StdioServerParameters(
command="python", # Executable
args=["path_to_your_working_directory/mcp_server.py"],
env=None,
)
async def run():
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# List available tools
tools = await session.list_tools()
# Convert MCP tools to DSPy tools
dspy_tools = []
for tool in tools.tools:
dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))
print(len(dspy_tools))
print(dspy_tools[0].args)
if __name__ == "__main__":
import asyncio
asyncio.run(run())
通过上述代码,我们已成功收集所有可用的MCP工具并将其转换为DSPy工具。
构建一个DSPY智能体来处理客户请求
现在我们将使用dspy.ReAct来构建处理客户请求的智能体。ReAct代表
"推理与行动",它要求大语言模型决定是调用工具还是结束流程。
如果需要工具,大语言模型负责决定调用哪个工具并提供
适当的参数。
与往常一样,我们需要创建一个 dspy.Signature 来定义智能体的输入和输出:
import dspy
class DSPyAirlineCustomerService(dspy.Signature):
"""You are an airline customer service agent. You are given a list of tools to handle user requests. You should decide the right tool to use in order to fulfill users' requests."""
user_request: str = dspy.InputField()
process_result: str = dspy.OutputField(
desc=(
"Message that summarizes the process result, and the information users need, "
"e.g., the confirmation_number if it's a flight booking request."
)
)
并为我们的智能体选择一个LM:
然后我们通过将工具和签名传入dspy.ReAct API来创建ReAct智能体。现在我们可以整合完整的代码脚本:
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
import dspy
# Create server parameters for stdio connection
server_params = StdioServerParameters(
command="python", # Executable
args=["script_tmp/mcp_server.py"], # Optional command line arguments
env=None, # Optional environment variables
)
class DSPyAirlineCustomerService(dspy.Signature):
"""You are an airline customer service agent. You are given a list of tools to handle user requests.
You should decide the right tool to use in order to fulfill users' requests."""
user_request: str = dspy.InputField()
process_result: str = dspy.OutputField(
desc=(
"Message that summarizes the process result, and the information users need, "
"e.g., the confirmation_number if it's a flight booking request."
)
)
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
async def run(user_request):
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()
# List available tools
tools = await session.list_tools()
# Convert MCP tools to DSPy tools
dspy_tools = []
for tool in tools.tools:
dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))
# Create the agent
react = dspy.ReAct(DSPyAirlineCustomerService, tools=dspy_tools)
result = await react.acall(user_request=user_request)
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(run("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam"))
请注意我们必须调用react.acall,因为MCP工具默认是异步的。让我们执行脚本:
您应该会看到类似这样的输出:
Prediction(
trajectory={'thought_0': 'I need to fetch flight information for Adam from SFO to JFK on 09/01/2025 to find available flights for booking.', 'tool_name_0': 'fetch_flight_info', 'tool_args_0': {'date': {'year': 2025, 'month': 9, 'day': 1, 'hour': 0}, 'origin': 'SFO', 'destination': 'JFK'}, 'observation_0': ['{"flight_id": "DA123", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 1}, "origin": "SFO", "destination": "JFK", "duration": 3.0, "price": 200.0}', '{"flight_id": "DA125", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 7}, "origin": "SFO", "destination": "JFK", "duration": 9.0, "price": 500.0}'], ..., 'tool_name_4': 'finish', 'tool_args_4': {}, 'observation_4': 'Completed.'},
reasoning="I successfully booked a flight for Adam from SFO to JFK on 09/01/2025. I found two available flights, selected the more economical option (flight DA123 at 1 AM for $200), retrieved Adam's user profile, and completed the booking process. The confirmation number for the flight is 8h7clk3q.",
process_result='Your flight from SFO to JFK on 09/01/2025 has been successfully booked. Your confirmation number is 8h7clk3q.'
)
trajectory 字段包含完整的思考与行动过程。如果您想了解底层运行机制,请查看可观测性指南来设置 MLflow,它可以可视化展示 dspy.ReAct 内部的每一步操作!
结论
在本指南中,我们构建了一个航空公司服务智能体,它利用了一个自定义的MCP服务器和dspy.ReAct模块。在MCP支持的背景下,DSPy提供了一个简单的接口用于与MCP工具交互,让您能够灵活实现所需的任何功能。