如何将内部 REST API 暴露给 MCP 服务器

如果您已经有正在运行的 API,您不需要为 MCP 重建所有内容。您可以通过创建简单的工具和资源,将 REST 端点连接到 MCP 服务器。本指南将通过可运行的 Python 示例,向您展示如何正确地实现这一点。
关键要点
- 您可以将 REST API 连接到 MCP 服务器,无需重写业务逻辑
- 使用资源(resources)进行安全的数据获取,使用工具(tools)处理基于操作的端点
- 始终验证输入并妥善处理 REST 失败情况
为什么连接现有的 REST API
许多公司已经拥有:用户服务、订单管理 API、CRM 或支持票务系统、库存和库存 API。MCP 允许您的 LLM 读取和操作这些现有系统,而无需直接访问数据库。这保护了您的内部系统,并允许快速、可控的 AI 集成。
您需要的准备
- 一个运行中的 REST API(公共或内部)
- 已安装 MCP 服务器 SDK
- HTTPX(或其他适用于 Python 的异步 HTTP 客户端)
安装 HTTPX:
pip install httpx mcp-server
您应该对在 Python 中发起 API 请求有基本的了解。
步骤 1:安装 HTTP 客户端库
您不需要重写逻辑,而是从 MCP 处理程序内部调用 REST API。
基本设置示例:
import os
import httpx
# 从环境变量加载以确保安全
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
为了提高性能,使用共享异步客户端:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # 设置合理的超时时间
)
这个客户端将在所有 MCP 处理程序中重复使用。
步骤 2:将 API 端点作为资源暴露
资源获取数据而不产生副作用。示例:获取活跃用户列表。
from mcp_server import MCPServer
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="获取活跃用户列表。")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"检索到 {len(users)} 个活跃用户")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "无法连接到 API"}
这个实现正确地处理了错误,并在 API 调用失败时提供信息丰富的消息。
步骤 3:将 API 操作作为工具暴露
工具执行可以修改状态的操作。示例:创建新的支持工单。
@server.tool(name="create_support_ticket", description="为用户创建新的支持工单。")
async def create_ticket(data: dict):
# 输入验证
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
# 验证优先级
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
# 准备有效载荷
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
return {
"status": "success",
"message": "工单创建成功",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
return {"error": "创建工单失败", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "连接错误", "details": str(e)}
此示例中的良好实践:
- 在访问 API 之前进行输入验证
- 验证允许的值(优先级)
- 结构化的错误处理,提供信息丰富的消息
- 返回创建的工单 ID 以供参考
步骤 4:完整的工作服务器示例
以下是将所有内容整合在一起的完整版本:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# 从环境加载配置
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# 创建服务器
server = MCPServer(
name="REST API MCP 服务器",
version="1.0.0",
description="将 LLM 连接到内部 REST API"
)
# 创建共享 HTTP 客户端
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# 资源:安全的数据获取
@server.resource(name="active_users", description="从 CRM 系统获取活跃用户。")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"检索到 {len(users)} 个活跃用户")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
return {"error": f"API 错误: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "无法连接到 API"}
@server.resource(name="user_details", description="获取特定用户的详细信息。")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "缺少 user_id 参数"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"API 错误: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"未找到 ID 为 {user_id} 的用户"}
return {"error": f"API 错误: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "无法连接到 API"}
# 工具:可能修改状态的操作
@server.tool(name="create_support_ticket", description="创建支持工单。")
async def create_ticket(data: dict):
# 输入验证
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "缺少必填字段:user_id、subject 和 description 是必需的"}
# 验证优先级
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"无效的优先级。必须是以下之一:{', '.join(valid_priorities)}"}
# 准备有效载荷
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"为用户 {user_id} 创建了工单 #{ticket_data.get('id')}")
return {
"status": "success",
"message": "工单创建成功",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"创建工单失败: {e.response.status_code} - {e.response.text}")
return {"error": "创建工单失败", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "连接错误", "details": str(e)}
@server.tool(name="update_ticket_status", description="更新现有支持工单的状态。")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "缺少 ticket_id 或 status"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"无效的状态。必须是以下之一:{', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"将工单 #{ticket_id} 状态更新为 {new_status}")
return {"status": "success", "message": f"工单状态已更新为 {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"未找到工单 {ticket_id}"}
logger.error(f"API 错误: {status_code} - {e.response.text}")
return {"error": f"更新工单失败: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"请求错误: {str(e)}")
return {"error": "连接错误", "details": str(e)}
# 服务器生命周期管理
async def startup():
logger.info("启动 MCP 服务器...")
# 任何初始化代码
async def shutdown():
logger.info("关闭 MCP 服务器...")
await client.aclose() # 关闭 HTTP 客户端连接
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
改进的功能:
- 使用
aclose()
正确关闭客户端 - 全面的结构化日志记录
- 环境变量配置
- 服务器生命周期管理
- 额外的示例端点
- 全面的错误处理
实际设置的最佳实践
-
重试失败的请求:为暂时性故障实现指数退避:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
尊重速率限制:实现速率限制器以避免 API 节流:
from aiolimiter import AsyncLimiter # 每秒最多 10 个请求 rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
安全凭证:使用适当的密钥管理解决方案:
from dotenv import load_dotenv # 从 .env 文件加载环境变量 load_dotenv() # 或使用基于云的解决方案,如 AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
处理超时:适当配置客户端超时:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # 连接超时 read=30.0, # 读取超时 write=30.0, # 写入超时 pool=60.0 # 池超时 ) )
-
审计访问:实现带上下文的结构化日志记录:
@server.middleware async def audit_middleware(request, next_handler): # 生成请求 ID request_id = str(uuid.uuid4()) # 记录传入请求 logger.info(f"请求 {request_id}: {request.method} {request.path}") # 向日志记录器添加上下文 with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # 记录结果 logger.info(f"请求 {request_id} 已完成,状态码 {response.status_code}") return response
结论
将现有的 REST API 连接到 MCP 服务器,让您无需重写后端即可解锁真实世界的数据和操作。通过精心设计和验证,您可以安全快速地构建强大的、可用于生产环境的 AI 代理。
在下一个指南中,我们将展示如何通过异步工具执行,通过 MCP 服务器触发后台作业。
常见问题
是的。如果需要,每个资源或工具可以调用不同的 API。
只有在严格验证输入并仔细控制哪些端点可访问的情况下才安全。假设 LLM 会尝试一切。
在 MCP 服务器内部,始终优先使用异步请求(httpx.AsyncClient)以避免阻塞事件循环。