Back

如何将内部 REST API 暴露给 MCP 服务器

如何将内部 REST API 暴露给 MCP 服务器

如果您已经有正在运行的 API,您不需要为 MCP 重建所有内容。您可以通过创建简单的工具和资源,将 REST 端点连接到 MCP 服务器。本指南将通过可运行的 Python 示例,向您展示如何正确地实现这一点。

关键要点

  • 您可以将 REST API 连接到 MCP 服务器,无需重写业务逻辑
  • 使用资源(resources)进行安全的数据获取,使用工具(tools)处理基于操作的端点
  • 始终验证输入并妥善处理 REST 失败情况

为什么连接现有的 REST API

许多公司已经拥有:用户服务、订单管理 API、CRM 或支持票务系统、库存和库存 API。MCP 允许您的 LLM 读取操作这些现有系统,而无需直接访问数据库。这保护了您的内部系统,并允许快速、可控的 AI 集成。

您需要的准备

  • 一个运行中的 REST API(公共或内部)
  • 已安装 MCP 服务器 SDK
  • HTTPX(或其他适用于 Python 的异步 HTTP 客户端)

安装 HTTPX:

pip install httpx mcp-server

您应该对在 Python 中发起 API 请求有基本的了解。

步骤 1:安装 HTTP 客户端库

您不需要重写逻辑,而是从 MCP 处理程序内部调用 REST API。

基本设置示例:

import os
import httpx

# 从环境变量加载以确保安全
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

为了提高性能,使用共享异步客户端

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # 设置合理的超时时间
)

这个客户端将在所有 MCP 处理程序中重复使用。

步骤 2:将 API 端点作为资源暴露

资源获取数据而不产生副作用。示例:获取活跃用户列表。

from mcp_server import MCPServer
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="获取活跃用户列表。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"检索到 {len(users)} 个活跃用户")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

这个实现正确地处理了错误,并在 API 调用失败时提供信息丰富的消息。

步骤 3:将 API 操作作为工具暴露

工具执行可以修改状态的操作。示例:创建新的支持工单。

@server.tool(name="create_support_ticket", description="为用户创建新的支持工单。")
async def create_ticket(data: dict):
    # 输入验证
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
    
    # 验证优先级
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
    
    # 准备有效载荷
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
        return {
            "status": "success",
            "message": "工单创建成功",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
        return {"error": "创建工单失败", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

此示例中的良好实践:

  • 在访问 API 之前进行输入验证
  • 验证允许的值(优先级)
  • 结构化的错误处理,提供信息丰富的消息
  • 返回创建的工单 ID 以供参考

步骤 4:完整的工作服务器示例

以下是将所有内容整合在一起的完整版本:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# 从环境加载配置
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# 创建服务器
server = MCPServer(
    name="REST API MCP 服务器",
    version="1.0.0",
    description="将 LLM 连接到内部 REST API"
)

# 创建共享 HTTP 客户端
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# 资源:安全的数据获取
@server.resource(name="active_users", description="从 CRM 系统获取活跃用户。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"检索到 {len(users)} 个活跃用户")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

@server.resource(name="user_details", description="获取特定用户的详细信息。")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "缺少 user_id 参数"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"未找到 ID 为 {user_id} 的用户"}
        return {"error": f"API 错误: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

# 工具:可能修改状态的操作
@server.tool(name="create_support_ticket", description="创建支持工单。")
async def create_ticket(data: dict):
    # 输入验证
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
    
    # 验证优先级
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
    
    # 准备有效载荷
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
        return {
            "status": "success",
            "message": "工单创建成功",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
        return {"error": "创建工单失败", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

@server.tool(name="update_ticket_status", description="更新现有支持工单的状态。")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "缺少 ticket_id 或 status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"无效的状态。必须是以下之一:{', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"将工单 #{ticket_id} 状态更新为 {new_status}")
        return {"status": "success", "message": f"工单状态已更新为 {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"未找到工单 {ticket_id}"}
        logger.error(f"API 错误: {status_code} - {e.response.text}")
        return {"error": f"更新工单失败: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

# 服务器生命周期管理
async def startup():
    logger.info("启动 MCP 服务器...")
    # 任何初始化代码

async def shutdown():
    logger.info("关闭 MCP 服务器...")
    await client.aclose()  # 关闭 HTTP 客户端连接

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

改进的功能:

  • 使用 aclose() 正确关闭客户端
  • 全面的结构化日志记录
  • 环境变量配置
  • 服务器生命周期管理
  • 额外的示例端点
  • 全面的错误处理

实际设置的最佳实践

  • 重试失败的请求:为暂时性故障实现指数退避:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • 尊重速率限制:实现速率限制器以避免 API 节流:

    from aiolimiter import AsyncLimiter
    
    # 每秒最多 10 个请求
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • 安全凭证:使用适当的密钥管理解决方案:

    from dotenv import load_dotenv
    
    # 从 .env 文件加载环境变量
    load_dotenv()
    
    # 或使用基于云的解决方案,如 AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • 处理超时:适当配置客户端超时:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # 连接超时
            read=30.0,        # 读取超时
            write=30.0,       # 写入超时
            pool=60.0         # 池超时
        )
    )
    
  • 审计访问:实现带上下文的结构化日志记录:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # 生成请求 ID
        request_id = str(uuid.uuid4())
        
        # 记录传入请求
        logger.info(f"请求 {request_id}: {request.method} {request.path}")
        
        # 向日志记录器添加上下文
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # 记录结果
        logger.info(f"请求 {request_id} 已完成,状态码 {response.status_code}")
        return response
    

结论

将现有的 REST API 连接到 MCP 服务器,让您无需重写后端即可解锁真实世界的数据和操作。通过精心设计和验证,您可以安全快速地构建强大的、可用于生产环境的 AI 代理。

在下一个指南中,我们将展示如何通过异步工具执行,通过 MCP 服务器触发后台作业

常见问题

是的。如果需要,每个资源或工具可以调用不同的 API。

只有在严格验证输入并仔细控制哪些端点可访问的情况下才安全。假设 LLM 会尝试一切。

在 MCP 服务器内部,始终优先使用异步请求(httpx.AsyncClient)以避免阻塞事件循环。

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers