12k
All articles

如何将内部 REST API 暴露给 MCP 服务器

使用 Python、HTTPX 和结构化工具将现有 REST API 连接到 MCP 服务器,安全、可控地暴露端点以实现 AI 集成。

OpenReplay Team
OpenReplay Team
如何将内部 REST API 暴露给 MCP 服务器

如果您已经有正在运行的 API,您不需要为 MCP 重建所有内容。您可以通过创建简单的工具和资源,将 REST 端点连接到 MCP 服务器。本指南将通过可运行的 Python 示例,向您展示如何正确地实现这一点。

关键要点

  • 您可以将 REST API 连接到 MCP 服务器,无需重写业务逻辑
  • 使用资源(resources)进行安全的数据获取,使用工具(tools)处理基于操作的端点
  • 始终验证输入并妥善处理 REST 失败情况

为什么连接现有的 REST API

许多公司已经拥有:用户服务、订单管理 API、CRM 或支持票务系统、库存和库存 API。MCP 允许您的 LLM 读取操作这些现有系统,而无需直接访问数据库。这保护了您的内部系统,并允许快速、可控的 AI 集成。

您需要的准备

  • 一个运行中的 REST API(公共或内部)
  • 已安装 MCP 服务器 SDK
  • HTTPX(或其他适用于 Python 的异步 HTTP 客户端)

安装 HTTPX:

pip install httpx mcp-server

您应该对在 Python 中发起 API 请求有基本的了解。

步骤 1:安装 HTTP 客户端库

您不需要重写逻辑,而是从 MCP 处理程序内部调用 REST API。

基本设置示例:

import os
import httpx

# 从环境变量加载以确保安全
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

为了提高性能,使用共享异步客户端

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # 设置合理的超时时间
)

这个客户端将在所有 MCP 处理程序中重复使用。

步骤 2:将 API 端点作为资源暴露

资源获取数据而不产生副作用。示例:获取活跃用户列表。

from mcp_server import MCPServer
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="获取活跃用户列表。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"检索到 {len(users)} 个活跃用户")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

这个实现正确地处理了错误,并在 API 调用失败时提供信息丰富的消息。

步骤 3:将 API 操作作为工具暴露

工具执行可以修改状态的操作。示例:创建新的支持工单。

@server.tool(name="create_support_ticket", description="为用户创建新的支持工单。")
async def create_ticket(data: dict):
    # 输入验证
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
    
    # 验证优先级
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
    
    # 准备有效载荷
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
        return {
            "status": "success",
            "message": "工单创建成功",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
        return {"error": "创建工单失败", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

此示例中的良好实践:

  • 在访问 API 之前进行输入验证
  • 验证允许的值(优先级)
  • 结构化的错误处理,提供信息丰富的消息
  • 返回创建的工单 ID 以供参考

步骤 4:完整的工作服务器示例

以下是将所有内容整合在一起的完整版本:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# 从环境加载配置
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# 创建服务器
server = MCPServer(
    name="REST API MCP 服务器",
    version="1.0.0",
    description="将 LLM 连接到内部 REST API"
)

# 创建共享 HTTP 客户端
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# 资源:安全的数据获取
@server.resource(name="active_users", description="从 CRM 系统获取活跃用户。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"检索到 {len(users)} 个活跃用户")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

@server.resource(name="user_details", description="获取特定用户的详细信息。")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "缺少 user_id 参数"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"未找到 ID 为 {user_id} 的用户"}
        return {"error": f"API 错误: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "无法连接到 API"}

# 工具:可能修改状态的操作
@server.tool(name="create_support_ticket", description="创建支持工单。")
async def create_ticket(data: dict):
    # 输入验证
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
    
    # 验证优先级
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
    
    # 准备有效载荷
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
        return {
            "status": "success",
            "message": "工单创建成功",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
        return {"error": "创建工单失败", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

@server.tool(name="update_ticket_status", description="更新现有支持工单的状态。")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "缺少 ticket_id 或 status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"无效的状态。必须是以下之一:{', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"将工单 #{ticket_id} 状态更新为 {new_status}")
        return {"status": "success", "message": f"工单状态已更新为 {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"未找到工单 {ticket_id}"}
        logger.error(f"API 错误: {status_code} - {e.response.text}")
        return {"error": f"更新工单失败: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"请求错误: {str(e)}")
        return {"error": "连接错误", "details": str(e)}

# 服务器生命周期管理
async def startup():
    logger.info("启动 MCP 服务器...")
    # 任何初始化代码

async def shutdown():
    logger.info("关闭 MCP 服务器...")
    await client.aclose()  # 关闭 HTTP 客户端连接

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

改进的功能:

  • 使用 aclose() 正确关闭客户端
  • 全面的结构化日志记录
  • 环境变量配置
  • 服务器生命周期管理
  • 额外的示例端点
  • 全面的错误处理

实际设置的最佳实践

  • 重试失败的请求:为暂时性故障实现指数退避:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • 尊重速率限制:实现速率限制器以避免 API 节流:

    from aiolimiter import AsyncLimiter
    
    # 每秒最多 10 个请求
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • 安全凭证:使用适当的密钥管理解决方案:

    from dotenv import load_dotenv
    
    # 从 .env 文件加载环境变量
    load_dotenv()
    
    # 或使用基于云的解决方案,如 AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • 处理超时:适当配置客户端超时:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # 连接超时
            read=30.0,        # 读取超时
            write=30.0,       # 写入超时
            pool=60.0         # 池超时
        )
    )
    
  • 审计访问:实现带上下文的结构化日志记录:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # 生成请求 ID
        request_id = str(uuid.uuid4())
        
        # 记录传入请求
        logger.info(f"请求 {request_id}: {request.method} {request.path}")
        
        # 向日志记录器添加上下文
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # 记录结果
        logger.info(f"请求 {request_id} 已完成,状态码 {response.status_code}")
        return response
    

结论

将现有的 REST API 连接到 MCP 服务器,让您无需重写后端即可解锁真实世界的数据和操作。通过精心设计和验证,您可以安全快速地构建强大的、可用于生产环境的 AI 代理。

在下一个指南中,我们将展示如何通过异步工具执行,通过 MCP 服务器触发后台作业

常见问题

我可以在同一个 MCP 服务器中使用不同的 API 吗?

是的。如果需要,每个资源或工具可以调用不同的 API。

将敏感的 API 端点暴露给 LLM 安全吗?

只有在严格验证输入并仔细控制哪些端点可访问的情况下才安全。假设 LLM 会尝试一切。

我应该使用同步还是异步请求?

在 MCP 服务器内部,始终优先使用异步请求(httpx.AsyncClient)以避免阻塞事件循环。

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

We use cookies to improve your experience. By using our site, you accept cookies.