Skip to content

教程:在DSPy中使用MCP工具

MCP,全称为模型上下文协议(Model Context Protocol),是一种开放协议,用于标准化应用程序如何向大语言模型提供上下文。尽管存在一些开发开销,但MCP提供了一个宝贵的机会,无论您使用何种技术栈,都可以与其他开发者共享工具、资源和提示。同样,您也可以使用其他开发者构建的工具,而无需重写代码。

在本指南中,我们将引导您了解如何在DSPy中使用MCP工具。出于演示目的, 我们将构建一个航空服务智能体,帮助用户预订航班以及修改或取消 现有预订。这将依赖于一个带有自定义工具的MCP服务器,但应该很容易推广到 社区构建的MCP服务器

如何运行本教程

本教程无法在托管的IPython笔记本(如Google Colab或Databricks笔记本)中运行。 要运行代码,您需要按照指南在本地设备上编写代码。代码 已在macOS上测试,在Linux环境中应该以相同的方式工作。

安装依赖项

开始之前,让我们先安装所需的依赖项:

pip install -U "dspy[mcp]"

MCP 服务器设置

首先,让我们为航空公司智能体设置MCP服务器,其中包含:

  • 一组数据库
  • 用户数据库,存储用户信息。
  • 航班数据库,存储航班信息。
  • 工单数据库,存储客户工单。
  • 一组工具
  • fetch_flight_info: 获取特定日期的航班信息。
  • fetch_itinerary: 获取已预订行程的信息。
  • book_itinerary: 代表用户预订航班。
  • modify_itinerary: 修改行程,可通过航班变更或取消实现。
  • get_user_info: 获取用户信息。
  • file_ticket: 提交待办工单以获取人工协助。

在你的工作目录中,创建一个文件 mcp_server.py,并将以下内容粘贴到其中:

import random
import string

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

# Create an MCP server
mcp = FastMCP("Airline Agent")


class Date(BaseModel):
    # Somehow LLM is bad at specifying `datetime.datetime`
    year: int
    month: int
    day: int
    hour: int


class UserProfile(BaseModel):
    user_id: str
    name: str
    email: str


class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float


class Itinerary(BaseModel):
    confirmation_number: str
    user_profile: UserProfile
    flight: Flight


class Ticket(BaseModel):
    user_request: str
    user_profile: UserProfile


user_database = {
    "Adam": UserProfile(user_id="1", name="Adam", email="adam@gmail.com"),
    "Bob": UserProfile(user_id="2", name="Bob", email="bob@gmail.com"),
    "Chelsie": UserProfile(user_id="3", name="Chelsie", email="chelsie@gmail.com"),
    "David": UserProfile(user_id="4", name="David", email="david@gmail.com"),
}

flight_database = {
    "DA123": Flight(
        flight_id="DA123",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=1),
        duration=3,
        price=200,
    ),
    "DA125": Flight(
        flight_id="DA125",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=7),
        duration=9,
        price=500,
    ),
    "DA456": Flight(
        flight_id="DA456",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=1),
        duration=2,
        price=100,
    ),
    "DA460": Flight(
        flight_id="DA460",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=9),
        duration=2,
        price=120,
    ),
}

itinery_database = {}
ticket_database = {}


@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
    """Fetch flight information from origin to destination on the given date"""
    flights = []

    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    return flights


@mcp.tool()
def fetch_itinerary(confirmation_number: str):
    """Fetch a booked itinerary information from database"""
    return itinery_database.get(confirmation_number)


@mcp.tool()
def pick_flight(flights: list[Flight]):
    """Pick up the best flight that matches users' request."""
    sorted_flights = sorted(
        flights,
        key=lambda x: (
            x.get("duration") if isinstance(x, dict) else x.duration,
            x.get("price") if isinstance(x, dict) else x.price,
        ),
    )
    return sorted_flights[0]


def generate_id(length=8):
    chars = string.ascii_lowercase + string.digits
    return "".join(random.choices(chars, k=length))


@mcp.tool()
def book_itinerary(flight: Flight, user_profile: UserProfile):
    """Book a flight on behalf of the user."""
    confirmation_number = generate_id()
    while confirmation_number in itinery_database:
        confirmation_number = generate_id()
    itinery_database[confirmation_number] = Itinerary(
        confirmation_number=confirmation_number,
        user_profile=user_profile,
        flight=flight,
    )
    return confirmation_number, itinery_database[confirmation_number]


@mcp.tool()
def cancel_itinerary(confirmation_number: str, user_profile: UserProfile):
    """Cancel an itinerary on behalf of the user."""
    if confirmation_number in itinery_database:
        del itinery_database[confirmation_number]
        return
    raise ValueError("Cannot find the itinerary, please check your confirmation number.")


@mcp.tool()
def get_user_info(name: str):
    """Fetch the user profile from database with given name."""
    return user_database.get(name)


@mcp.tool()
def file_ticket(user_request: str, user_profile: UserProfile):
    """File a customer support ticket if this is something the agent cannot handle."""
    ticket_id = generate_id(length=6)
    ticket_database[ticket_id] = Ticket(
        user_request=user_request,
        user_profile=user_profile,
    )
    return ticket_id


if __name__ == "__main__":
    mcp.run()

在启动服务器之前,我们先来看一下代码。

我们首先创建一个FastMCP实例,这是一个帮助快速构建MCP服务器的实用工具:

mcp = FastMCP("Airline Agent")

然后我们定义我们的数据结构,在实际应用中这将是数据库模式,例如:

class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float

随后,我们初始化数据库实例。在实际应用中,这些应该是连接到真实数据库的连接器,但为简化起见,我们仅使用字典:

user_database = {
    "Adam": UserProfile(user_id="1", name="Adam", email="adam@gmail.com"),
    "Bob": UserProfile(user_id="2", name="Bob", email="bob@gmail.com"),
    "Chelsie": UserProfile(user_id="3", name="Chelsie", email="chelsie@gmail.com"),
    "David": UserProfile(user_id="4", name="David", email="david@gmail.com"),
}

下一步是定义工具并用 @mcp.tool() 标记它们,以便MCP客户端能够发现这些MCP工具:

@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
    """Fetch flight information from origin to destination on the given date"""
    flights = []

    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    return flights

最后一步是启动服务器:

if __name__ == "__main__":
    mcp.run()

现在我们已经完成了服务器的编写!让我们启动它:

python path_to_your_working_directory/mcp_server.py

编写一个利用MCP服务器中工具的DSPy程序

现在服务器已运行,让我们构建实际的航空公司服务智能体,它利用我们服务器中的MCP工具来协助用户。在你的工作目录中,创建一个名为dspy_mcp_agent.py的文件,并按照指南向其中添加代码。

从MCP服务器收集工具

我们首先需要从MCP服务器收集所有可用工具,并使它们能够被DSPy使用。DSPy提供了一个API dspy.Tool作为标准工具接口。让我们将所有MCP工具转换为dspy.Tool

我们需要创建一个MCP客户端实例来与MCP服务器通信,获取所有可用工具,并使用静态方法from_mcp_tool将它们转换为dspy.Tool

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="python",  # Executable
    args=["path_to_your_working_directory/mcp_server.py"],
    env=None,
)

async def run():
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()
            # List available tools
            tools = await session.list_tools()

            # Convert MCP tools to DSPy tools
            dspy_tools = []
            for tool in tools.tools:
                dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))

            print(len(dspy_tools))
            print(dspy_tools[0].args)

if __name__ == "__main__":
    import asyncio

    asyncio.run(run())

通过上述代码,我们已成功收集所有可用的MCP工具并将其转换为DSPy工具。

构建一个DSPY智能体来处理客户请求

现在我们将使用dspy.ReAct来构建处理客户请求的智能体。ReAct代表 "推理与行动",它要求大语言模型决定是调用工具还是结束流程。 如果需要工具,大语言模型负责决定调用哪个工具并提供 适当的参数。

与往常一样,我们需要创建一个 dspy.Signature 来定义智能体的输入和输出:

import dspy

class DSPyAirlineCustomerService(dspy.Signature):
    """You are an airline customer service agent. You are given a list of tools to handle user requests. You should decide the right tool to use in order to fulfill users' requests."""

    user_request: str = dspy.InputField()
    process_result: str = dspy.OutputField(
        desc=(
            "Message that summarizes the process result, and the information users need, "
            "e.g., the confirmation_number if it's a flight booking request."
        )
    )

并为我们的智能体选择一个LM:

dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

然后我们通过将工具和签名传入dspy.ReAct API来创建ReAct智能体。现在我们可以整合完整的代码脚本:

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

import dspy

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="python",  # Executable
    args=["script_tmp/mcp_server.py"],  # Optional command line arguments
    env=None,  # Optional environment variables
)


class DSPyAirlineCustomerService(dspy.Signature):
    """You are an airline customer service agent. You are given a list of tools to handle user requests.
    You should decide the right tool to use in order to fulfill users' requests."""

    user_request: str = dspy.InputField()
    process_result: str = dspy.OutputField(
        desc=(
            "Message that summarizes the process result, and the information users need, "
            "e.g., the confirmation_number if it's a flight booking request."
        )
    )


dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))


async def run(user_request):
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()
            # List available tools
            tools = await session.list_tools()

            # Convert MCP tools to DSPy tools
            dspy_tools = []
            for tool in tools.tools:
                dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))

            # Create the agent
            react = dspy.ReAct(DSPyAirlineCustomerService, tools=dspy_tools)

            result = await react.acall(user_request=user_request)
            print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(run("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam"))

请注意我们必须调用react.acall,因为MCP工具默认是异步的。让我们执行脚本:

python path_to_your_working_directory/dspy_mcp_agent.py

您应该会看到类似这样的输出:

Prediction(
    trajectory={'thought_0': 'I need to fetch flight information for Adam from SFO to JFK on 09/01/2025 to find available flights for booking.', 'tool_name_0': 'fetch_flight_info', 'tool_args_0': {'date': {'year': 2025, 'month': 9, 'day': 1, 'hour': 0}, 'origin': 'SFO', 'destination': 'JFK'}, 'observation_0': ['{"flight_id": "DA123", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 1}, "origin": "SFO", "destination": "JFK", "duration": 3.0, "price": 200.0}', '{"flight_id": "DA125", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 7}, "origin": "SFO", "destination": "JFK", "duration": 9.0, "price": 500.0}'], ..., 'tool_name_4': 'finish', 'tool_args_4': {}, 'observation_4': 'Completed.'},
    reasoning="I successfully booked a flight for Adam from SFO to JFK on 09/01/2025. I found two available flights, selected the more economical option (flight DA123 at 1 AM for $200), retrieved Adam's user profile, and completed the booking process. The confirmation number for the flight is 8h7clk3q.",
    process_result='Your flight from SFO to JFK on 09/01/2025 has been successfully booked. Your confirmation number is 8h7clk3q.'
)

trajectory 字段包含完整的思考与行动过程。如果您想了解底层运行机制,请查看可观测性指南来设置 MLflow,它可以可视化展示 dspy.ReAct 内部的每一步操作!

结论

在本指南中,我们构建了一个航空公司服务智能体,它利用了一个自定义的MCP服务器和dspy.ReAct模块。在MCP支持的背景下,DSPy提供了一个简单的接口用于与MCP工具交互,让您能够灵活实现所需的任何功能。

优云智算