Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера

Если у вас уже есть работающие API, вам не нужно перестраивать всё для MCP. Вы можете связать ваши REST-конечные точки с вашим MCP сервером, создав простые инструменты и ресурсы. Это руководство покажет вам, как сделать это правильно, с рабочими примерами на Python.
Ключевые моменты
- Вы можете подключить ваш REST API к серверам MCP без переписывания бизнес-логики
- Используйте ресурсы для безопасного получения данных и инструменты для конечных точек, основанных на действиях
- Всегда проверяйте входные данные и корректно обрабатывайте сбои REST
Зачем подключать существующий REST API
У многих компаний уже есть: Сервисы пользователей, API управления заказами, CRM или системы тикетов поддержки, API инвентаризации и складского учета. MCP позволяет вашей LLM читать и действовать с этими существующими системами без необходимости прямого доступа к базе данных. Это защищает ваши внутренние системы и обеспечивает быструю, контролируемую интеграцию с ИИ.
Что вам понадобится
- Работающий REST API (публичный или внутренний)
- Установленный MCP server SDK
- HTTPX (или другой асинхронный HTTP-клиент для Python)
Установите HTTPX:
pip install httpx mcp-server
Вам следует иметь базовое представление о выполнении API-запросов в Python.
Шаг 1: Установите библиотеки HTTP-клиента
Вместо переписывания логики, вы вызываете ваш REST API изнутри обработчиков MCP.
Пример базовой настройки:
import os
import httpx
# Загрузка из переменных окружения для безопасности
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Используйте общий асинхронный клиент для производительности:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # Установите разумный тайм-аут
)
Этот клиент будет повторно использоваться во всех обработчиках MCP.
Шаг 2: Предоставьте конечные точки API как ресурсы
Ресурсы получают данные без побочных эффектов. Пример: получение списка активных пользователей.
from mcp_server import MCPServer
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
Эта реализация правильно обрабатывает ошибки и предоставляет информативные сообщения при сбое вызова API.
Шаг 3: Предоставьте действия API как инструменты
Инструменты выполняют действия, которые изменяют состояние. Пример: создание нового тикета поддержки.
@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
# Проверка входных данных
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Проверка приоритета
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Подготовка полезной нагрузки
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
Хорошие практики в этом примере:
- Проверка входных данных перед обращением к API
- Проверка допустимых значений (приоритетов)
- Структурированная обработка ошибок с информативными сообщениями
- Возвращение ID созданного тикета для ссылки
Шаг 4: Полный рабочий пример сервера
Вот полная версия, объединяющая всё вместе:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Загрузка конфигурации из окружения
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Создание сервера
server = MCPServer(
name="REST API MCP Server",
version="1.0.0",
description="Connects LLMs to internal REST APIs"
)
# Создание общего HTTP-клиента
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# Ресурсы: Безопасное получение данных
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "Missing user_id parameter"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"User with ID {user_id} not found"}
return {"error": f"API error: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
# Инструменты: Действия, которые могут изменять состояние
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
# Проверка входных данных
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Проверка приоритета
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Подготовка полезной нагрузки
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "Missing ticket_id or status"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
return {"status": "success", "message": f"Ticket status updated to {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"Ticket {ticket_id} not found"}
logger.error(f"API error: {status_code} - {e.response.text}")
return {"error": f"Failed to update ticket: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
# Управление жизненным циклом сервера
async def startup():
logger.info("Starting MCP server...")
# Любой код инициализации
async def shutdown():
logger.info("Shutting down MCP server...")
await client.aclose() # Закрытие соединений HTTP-клиента
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Улучшенные функции:
- Правильное завершение работы клиента с использованием
aclose()
- Структурированное логирование повсюду
- Конфигурация через переменные окружения
- Управление жизненным циклом сервера
- Дополнительные примеры конечных точек
- Комплексная обработка ошибок
Лучшие практики для реальных настроек
-
Повторные попытки при неудачных запросах: Реализуйте экспоненциальное отступление для временных сбоев:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
Соблюдайте ограничения скорости: Реализуйте ограничитель скорости, чтобы избежать дросселирования API:
from aiolimiter import AsyncLimiter # Максимум 10 запросов в секунду rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
Защита учетных данных: Используйте правильное решение для управления секретами:
from dotenv import load_dotenv # Загрузка переменных окружения из файла .env load_dotenv() # Или используйте облачное решение, такое как AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
Обработка тайм-аутов: Настройте тайм-ауты клиента соответствующим образом:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Тайм-аут соединения read=30.0, # Тайм-аут чтения write=30.0, # Тайм-аут записи pool=60.0 # Тайм-аут пула ) )
-
Аудит доступа: Реализуйте структурированное логирование с контекстом:
@server.middleware async def audit_middleware(request, next_handler): # Генерация ID запроса request_id = str(uuid.uuid4()) # Логирование входящего запроса logger.info(f"Request {request_id}: {request.method} {request.path}") # Добавление контекста в логгер with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Логирование результата logger.info(f"Request {request_id} completed with status {response.status_code}") return response
Заключение
Подключение ваших существующих REST API к серверам MCP позволяет разблокировать реальные данные и действия без переписывания бэкендов. С тщательным проектированием и проверкой вы можете создавать мощных, готовых к производству ИИ-агентов — безопасно и быстро.
В следующем руководстве мы покажем, как запускать фоновые задачи через ваш MCP сервер с асинхронным выполнением инструментов.
Часто задаваемые вопросы
Да. Каждый ресурс или инструмент может вызывать разные API при необходимости.
Только если вы строго проверяете входные данные и тщательно контролируете, какие конечные точки доступны. Предполагайте, что LLM попробует всё.
Всегда предпочитайте асинхронные (httpx.AsyncClient) внутри серверов MCP, чтобы избежать блокировки цикла событий.