跳至内容

MCP Servers

MCPServer

基类: ABC

Model Context Protocol 服务器的基础类。

Source code in src/agents/mcp/server.py
class MCPServer(abc.ABC):
    """Base class for Model Context Protocol servers."""

    @abc.abstractmethod
    async def connect(self):
        """Connect to the server. For example, this might mean spawning a subprocess or
        opening a network connection. The server is expected to remain connected until
        `cleanup()` is called.
        """
        pass

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """A readable name for the server."""
        pass

    @abc.abstractmethod
    async def cleanup(self):
        """Cleanup the server. For example, this might mean closing a subprocess or
        closing a network connection.
        """
        pass

    @abc.abstractmethod
    async def list_tools(self) -> list[MCPTool]:
        """List the tools available on the server."""
        pass

    @abc.abstractmethod
    async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
        """Invoke a tool on the server."""
        pass

名称 abstractmethod property

name: str

服务器的可读名称。

连接 abstractmethod async

connect()

连接到服务器。例如,这可能意味着生成一个子进程或建立网络连接。服务器预期将保持连接状态,直到调用cleanup()为止。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def connect(self):
    """Connect to the server. For example, this might mean spawning a subprocess or
    opening a network connection. The server is expected to remain connected until
    `cleanup()` is called.
    """
    pass

清理 abstractmethod async

cleanup()

清理服务器。例如,这可能意味着关闭子进程或关闭网络连接。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def cleanup(self):
    """Cleanup the server. For example, this might mean closing a subprocess or
    closing a network connection.
    """
    pass

列出工具 abstractmethod async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    pass

调用工具 abstractmethod async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用工具。

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    pass

MCPServerStdioParams

基类: TypedDict

镜像mcp.client.stdio.StdioServerParameters的功能,但允许您无需额外导入即可传递参数。

Source code in src/agents/mcp/server.py
class MCPServerStdioParams(TypedDict):
    """Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another
    import.
    """

    command: str
    """The executable to run to start the server. For example, `python` or `node`."""

    args: NotRequired[list[str]]
    """Command line args to pass to the `command` executable. For example, `['foo.py']` or
    `['server.js', '--port', '8080']`."""

    env: NotRequired[dict[str, str]]
    """The environment variables to set for the server. ."""

    cwd: NotRequired[str | Path]
    """The working directory to use when spawning the process."""

    encoding: NotRequired[str]
    """The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`."""

    encoding_error_handler: NotRequired[Literal["strict", "ignore", "replace"]]
    """The text encoding error handler. Defaults to `strict`.

    See https://docs.python.org/3/library/codecs.html#codec-base-classes for
    explanations of possible values.
    """

命令 instance-attribute

command: str

用于启动服务器的可执行文件。例如,pythonnode

参数 instance-attribute

args: NotRequired[list[str]]

传递给command可执行文件的命令行参数。例如,['foo.py']['server.js', '--port', '8080']

环境变量 instance-attribute

env: NotRequired[dict[str, str]]

为服务器设置的环境变量。

当前工作目录 instance-attribute

cwd: NotRequired[str | Path]

生成进程时使用的工作目录。

编码 instance-attribute

encoding: NotRequired[str]

与服务器发送/接收消息时使用的文本编码。默认为utf-8

编码错误处理器 instance-attribute

encoding_error_handler: NotRequired[
    Literal["strict", "ignore", "replace"]
]

文本编码错误处理程序。默认为strict

有关可能值的解释,请参见 https://docs.python.org/3/library/codecs.html#codec-base-classes。

MCPServerStdio

基类: _MCPServerWithClientSession

使用stdio传输的MCP服务器实现。详情请参阅[规范](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio)。

Source code in src/agents/mcp/server.py
class MCPServerStdio(_MCPServerWithClientSession):
    """MCP server implementation that uses the stdio transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for
    details.
    """

    def __init__(
        self,
        params: MCPServerStdioParams,
        cache_tools_list: bool = False,
        name: str | None = None,
    ):
        """Create a new MCP server based on the stdio transport.

        Args:
            params: The params that configure the server. This includes the command to run to
                start the server, the args to pass to the command, the environment variables to
                set for the server, the working directory to use when spawning the process, and
                the text encoding used when sending/receiving messages to the server.
            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).
            name: A readable name for the server. If not provided, we'll create one from the
                command.
        """
        super().__init__(cache_tools_list)

        self.params = StdioServerParameters(
            command=params["command"],
            args=params.get("args", []),
            env=params.get("env"),
            cwd=params.get("cwd"),
            encoding=params.get("encoding", "utf-8"),
            encoding_error_handler=params.get("encoding_error_handler", "strict"),
        )

        self._name = name or f"stdio: {self.params.command}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[JSONRPCMessage | Exception],
            MemoryObjectSendStream[JSONRPCMessage],
        ]
    ]:
        """Create the streams for the server."""
        return stdio_client(self.params)

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

名称 property

name: str

服务器的可读名称。

连接 async

connect()

连接到服务器。

Source code in src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        read, write = transport
        session = await self.exit_stack.enter_async_context(ClientSession(read, write))
        await session.initialize()
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

清理 async

cleanup()

清理服务器。

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
            self.session = None
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")

list_tools async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

调用工具 async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用工具。

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

使工具缓存失效。

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

__init__

__init__(
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
)

基于stdio传输创建一个新的MCP服务器。

参数:

名称 类型 描述 默认值
params MCPServerStdioParams

用于配置服务器的参数。这包括启动服务器时运行的命令、传递给该命令的参数、为服务器设置的环境变量、生成进程时使用的工作目录,以及与服务器发送/接收消息时使用的文本编码。

required
cache_tools_list bool

是否缓存工具列表。如果设为True,工具列表将被缓存且仅从服务器获取一次。如果设为False,每次调用list_tools()时都会从服务器重新获取。可通过调用invalidate_tools_cache()使缓存失效。当您确认服务器不会更改其工具列表时,建议将此设为True,因为能显著降低延迟(避免每次调用时与服务器的往返通信)。

False
name str | None

服务器的可读名称。如果未提供,我们将根据命令创建一个。

None
Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
):
    """Create a new MCP server based on the stdio transport.

    Args:
        params: The params that configure the server. This includes the command to run to
            start the server, the args to pass to the command, the environment variables to
            set for the server, the working directory to use when spawning the process, and
            the text encoding used when sending/receiving messages to the server.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            command.
    """
    super().__init__(cache_tools_list)

    self.params = StdioServerParameters(
        command=params["command"],
        args=params.get("args", []),
        env=params.get("env"),
        cwd=params.get("cwd"),
        encoding=params.get("encoding", "utf-8"),
        encoding_error_handler=params.get("encoding_error_handler", "strict"),
    )

    self._name = name or f"stdio: {self.params.command}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            JSONRPCMessage | Exception
        ],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]

为服务器创建流。

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]:
    """Create the streams for the server."""
    return stdio_client(self.params)

MCPServerSseParams

基类: TypedDict

镜像mcp.client.sse.sse_client中的参数。

Source code in src/agents/mcp/server.py
class MCPServerSseParams(TypedDict):
    """Mirrors the params in`mcp.client.sse.sse_client`."""

    url: str
    """The URL of the server."""

    headers: NotRequired[dict[str, str]]
    """The headers to send to the server."""

    timeout: NotRequired[float]
    """The timeout for the HTTP request. Defaults to 5 seconds."""

    sse_read_timeout: NotRequired[float]
    """The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""

网址 instance-attribute

url: str

服务器的URL。

请求头 instance-attribute

headers: NotRequired[dict[str, str]]

发送到服务器的头部信息。

超时时间 instance-attribute

timeout: NotRequired[float]

HTTP请求的超时时间。默认为5秒。

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[float]

SSE连接的超时时间,单位为秒。默认为5分钟。

MCPServerSse

基类: _MCPServerWithClientSession

使用HTTP与SSE传输的MCP服务器实现。详情请参阅[规范](https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)。

Source code in src/agents/mcp/server.py
class MCPServerSse(_MCPServerWithClientSession):
    """MCP server implementation that uses the HTTP with SSE transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)
    for details.
    """

    def __init__(
        self,
        params: MCPServerSseParams,
        cache_tools_list: bool = False,
        name: str | None = None,
    ):
        """Create a new MCP server based on the HTTP with SSE transport.

        Args:
            params: The params that configure the server. This includes the URL of the server,
                the headers to send to the server, the timeout for the HTTP request, and the
                timeout for the SSE connection.

            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).

            name: A readable name for the server. If not provided, we'll create one from the
                URL.
        """
        super().__init__(cache_tools_list)

        self.params = params
        self._name = name or f"sse: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[JSONRPCMessage | Exception],
            MemoryObjectSendStream[JSONRPCMessage],
        ]
    ]:
        """Create the streams for the server."""
        return sse_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
        )

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

名称 property

name: str

服务器的可读名称。

连接 async

connect()

连接到服务器。

Source code in src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        read, write = transport
        session = await self.exit_stack.enter_async_context(ClientSession(read, write))
        await session.initialize()
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

清理 async

cleanup()

清理服务器。

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
            self.session = None
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")

list_tools async

list_tools() -> list[Tool]

列出服务器上可用的工具。

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

调用工具 async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

在服务器上调用工具。

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

使工具缓存失效。

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

__init__

__init__(
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
)

基于HTTP与SSE传输创建一个新的MCP服务器。

参数:

名称 类型 描述 默认值
params MCPServerSseParams

配置服务器的参数。这包括服务器的URL、发送到服务器的请求头、HTTP请求的超时时间以及SSE连接的超时时间。

required
cache_tools_list bool

是否缓存工具列表。如果设为True,工具列表将被缓存且仅从服务器获取一次。如果设为False,每次调用list_tools()时都会从服务器重新获取。可通过调用invalidate_tools_cache()使缓存失效。当您确认服务器不会更改其工具列表时,建议将此设为True,因为能显著降低延迟(避免每次调用时与服务器的往返通信)。

False
name str | None

服务器的可读名称。如果未提供,我们将根据URL自动生成一个。

None
Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
):
    """Create a new MCP server based on the HTTP with SSE transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, and the
            timeout for the SSE connection.

        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).

        name: A readable name for the server. If not provided, we'll create one from the
            URL.
    """
    super().__init__(cache_tools_list)

    self.params = params
    self._name = name or f"sse: {self.params['url']}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            JSONRPCMessage | Exception
        ],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]

为服务器创建流。

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]:
    """Create the streams for the server."""
    return sse_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
    )