Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера
  Если у вас уже есть работающие API, вам не нужно перестраивать всё для MCP. Вы можете связать ваши REST-конечные точки с вашим MCP сервером, создав простые инструменты и ресурсы. Это руководство покажет вам, как сделать это правильно, с рабочими примерами на Python.
Ключевые моменты
- Вы можете подключить ваш REST API к серверам MCP без переписывания бизнес-логики
 - Используйте ресурсы для безопасного получения данных и инструменты для конечных точек, основанных на действиях
 - Всегда проверяйте входные данные и корректно обрабатывайте сбои REST
 
Зачем подключать существующий REST API
У многих компаний уже есть: Сервисы пользователей, API управления заказами, CRM или системы тикетов поддержки, API инвентаризации и складского учета. MCP позволяет вашей LLM читать и действовать с этими существующими системами без необходимости прямого доступа к базе данных. Это защищает ваши внутренние системы и обеспечивает быструю, контролируемую интеграцию с ИИ.
Что вам понадобится
- Работающий REST API (публичный или внутренний)
 - Установленный MCP server SDK
 - HTTPX (или другой асинхронный HTTP-клиент для Python)
 
Установите HTTPX:
pip install httpx mcp-server
Вам следует иметь базовое представление о выполнении API-запросов в Python.
Шаг 1: Установите библиотеки HTTP-клиента
Вместо переписывания логики, вы вызываете ваш REST API изнутри обработчиков MCP.
Пример базовой настройки:
import os
import httpx
# Загрузка из переменных окружения для безопасности
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Используйте общий асинхронный клиент для производительности:
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Установите разумный тайм-аут
)
Этот клиент будет повторно использоваться во всех обработчиках MCP.
Шаг 2: Предоставьте конечные точки API как ресурсы
Ресурсы получают данные без побочных эффектов. Пример: получение списка активных пользователей.
from mcp_server import MCPServer
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}
Эта реализация правильно обрабатывает ошибки и предоставляет информативные сообщения при сбое вызова API.
Шаг 3: Предоставьте действия API как инструменты
Инструменты выполняют действия, которые изменяют состояние. Пример: создание нового тикета поддержки.
@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}
Хорошие практики в этом примере:
- Проверка входных данных перед обращением к API
 - Проверка допустимых значений (приоритетов)
 - Структурированная обработка ошибок с информативными сообщениями
 - Возвращение ID созданного тикета для ссылки
 
Шаг 4: Полный рабочий пример сервера
Вот полная версия, объединяющая всё вместе:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Загрузка конфигурации из окружения
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Создание сервера
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Connects LLMs to internal REST APIs"
)
# Создание общего HTTP-клиента
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)
# Ресурсы: Безопасное получение данных
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}
@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Missing user_id parameter"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"User with ID {user_id} not found"}
        return {"error": f"API error: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}
# Инструменты: Действия, которые могут изменять состояние
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}
@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "Missing ticket_id or status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
        return {"status": "success", "message": f"Ticket status updated to {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} not found"}
        logger.error(f"API error: {status_code} - {e.response.text}")
        return {"error": f"Failed to update ticket: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}
# Управление жизненным циклом сервера
async def startup():
    logger.info("Starting MCP server...")
    # Любой код инициализации
async def shutdown():
    logger.info("Shutting down MCP server...")
    await client.aclose()  # Закрытие соединений HTTP-клиента
async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()
if __name__ == "__main__":
    asyncio.run(main())
Улучшенные функции:
- Правильное завершение работы клиента с использованием 
aclose() - Структурированное логирование повсюду
 - Конфигурация через переменные окружения
 - Управление жизненным циклом сервера
 - Дополнительные примеры конечных точек
 - Комплексная обработка ошибок
 
Лучшие практики для реальных настроек
- 
Повторные попытки при неудачных запросах: Реализуйте экспоненциальное отступление для временных сбоев:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json() - 
Соблюдайте ограничения скорости: Реализуйте ограничитель скорости, чтобы избежать дросселирования API:
from aiolimiter import AsyncLimiter # Максимум 10 запросов в секунду rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs) - 
Защита учетных данных: Используйте правильное решение для управления секретами:
from dotenv import load_dotenv # Загрузка переменных окружения из файла .env load_dotenv() # Или используйте облачное решение, такое как AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials') - 
Обработка тайм-аутов: Настройте тайм-ауты клиента соответствующим образом:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Тайм-аут соединения read=30.0, # Тайм-аут чтения write=30.0, # Тайм-аут записи pool=60.0 # Тайм-аут пула ) ) - 
Аудит доступа: Реализуйте структурированное логирование с контекстом:
@server.middleware async def audit_middleware(request, next_handler): # Генерация ID запроса request_id = str(uuid.uuid4()) # Логирование входящего запроса logger.info(f"Request {request_id}: {request.method} {request.path}") # Добавление контекста в логгер with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Логирование результата logger.info(f"Request {request_id} completed with status {response.status_code}") return response 
Заключение
Подключение ваших существующих REST API к серверам MCP позволяет разблокировать реальные данные и действия без переписывания бэкендов. С тщательным проектированием и проверкой вы можете создавать мощных, готовых к производству ИИ-агентов — безопасно и быстро.
В следующем руководстве мы покажем, как запускать фоновые задачи через ваш MCP сервер с асинхронным выполнением инструментов.
Часто задаваемые вопросы
Да. Каждый ресурс или инструмент может вызывать разные API при необходимости.
Только если вы строго проверяете входные данные и тщательно контролируете, какие конечные точки доступны. Предполагайте, что LLM попробует всё.
Всегда предпочитайте асинхронные (httpx.AsyncClient) внутри серверов MCP, чтобы избежать блокировки цикла событий.