Back

Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера

Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера

Если у вас уже есть работающие API, вам не нужно перестраивать всё для MCP. Вы можете связать ваши REST-конечные точки с вашим MCP сервером, создав простые инструменты и ресурсы. Это руководство покажет вам, как сделать это правильно, с рабочими примерами на Python.

Ключевые моменты

  • Вы можете подключить ваш REST API к серверам MCP без переписывания бизнес-логики
  • Используйте ресурсы для безопасного получения данных и инструменты для конечных точек, основанных на действиях
  • Всегда проверяйте входные данные и корректно обрабатывайте сбои REST

Зачем подключать существующий REST API

У многих компаний уже есть: Сервисы пользователей, API управления заказами, CRM или системы тикетов поддержки, API инвентаризации и складского учета. MCP позволяет вашей LLM читать и действовать с этими существующими системами без необходимости прямого доступа к базе данных. Это защищает ваши внутренние системы и обеспечивает быструю, контролируемую интеграцию с ИИ.

Что вам понадобится

  • Работающий REST API (публичный или внутренний)
  • Установленный MCP server SDK
  • HTTPX (или другой асинхронный HTTP-клиент для Python)

Установите HTTPX:

pip install httpx mcp-server

Вам следует иметь базовое представление о выполнении API-запросов в Python.

Шаг 1: Установите библиотеки HTTP-клиента

Вместо переписывания логики, вы вызываете ваш REST API изнутри обработчиков MCP.

Пример базовой настройки:

import os
import httpx

# Загрузка из переменных окружения для безопасности
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

Используйте общий асинхронный клиент для производительности:

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Установите разумный тайм-аут
)

Этот клиент будет повторно использоваться во всех обработчиках MCP.

Шаг 2: Предоставьте конечные точки API как ресурсы

Ресурсы получают данные без побочных эффектов. Пример: получение списка активных пользователей.

from mcp_server import MCPServer
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

Эта реализация правильно обрабатывает ошибки и предоставляет информативные сообщения при сбое вызова API.

Шаг 3: Предоставьте действия API как инструменты

Инструменты выполняют действия, которые изменяют состояние. Пример: создание нового тикета поддержки.

@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

Хорошие практики в этом примере:

  • Проверка входных данных перед обращением к API
  • Проверка допустимых значений (приоритетов)
  • Структурированная обработка ошибок с информативными сообщениями
  • Возвращение ID созданного тикета для ссылки

Шаг 4: Полный рабочий пример сервера

Вот полная версия, объединяющая всё вместе:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# Загрузка конфигурации из окружения
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# Создание сервера
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Connects LLMs to internal REST APIs"
)

# Создание общего HTTP-клиента
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# Ресурсы: Безопасное получение данных
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Missing user_id parameter"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"User with ID {user_id} not found"}
        return {"error": f"API error: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

# Инструменты: Действия, которые могут изменять состояние
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "Missing ticket_id or status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
        return {"status": "success", "message": f"Ticket status updated to {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} not found"}
        logger.error(f"API error: {status_code} - {e.response.text}")
        return {"error": f"Failed to update ticket: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

# Управление жизненным циклом сервера
async def startup():
    logger.info("Starting MCP server...")
    # Любой код инициализации

async def shutdown():
    logger.info("Shutting down MCP server...")
    await client.aclose()  # Закрытие соединений HTTP-клиента

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Улучшенные функции:

  • Правильное завершение работы клиента с использованием aclose()
  • Структурированное логирование повсюду
  • Конфигурация через переменные окружения
  • Управление жизненным циклом сервера
  • Дополнительные примеры конечных точек
  • Комплексная обработка ошибок

Лучшие практики для реальных настроек

  • Повторные попытки при неудачных запросах: Реализуйте экспоненциальное отступление для временных сбоев:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • Соблюдайте ограничения скорости: Реализуйте ограничитель скорости, чтобы избежать дросселирования API:

    from aiolimiter import AsyncLimiter
    
    # Максимум 10 запросов в секунду
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • Защита учетных данных: Используйте правильное решение для управления секретами:

    from dotenv import load_dotenv
    
    # Загрузка переменных окружения из файла .env
    load_dotenv()
    
    # Или используйте облачное решение, такое как AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • Обработка тайм-аутов: Настройте тайм-ауты клиента соответствующим образом:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # Тайм-аут соединения
            read=30.0,        # Тайм-аут чтения
            write=30.0,       # Тайм-аут записи
            pool=60.0         # Тайм-аут пула
        )
    )
    
  • Аудит доступа: Реализуйте структурированное логирование с контекстом:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # Генерация ID запроса
        request_id = str(uuid.uuid4())
        
        # Логирование входящего запроса
        logger.info(f"Request {request_id}: {request.method} {request.path}")
        
        # Добавление контекста в логгер
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # Логирование результата
        logger.info(f"Request {request_id} completed with status {response.status_code}")
        return response
    

Заключение

Подключение ваших существующих REST API к серверам MCP позволяет разблокировать реальные данные и действия без переписывания бэкендов. С тщательным проектированием и проверкой вы можете создавать мощных, готовых к производству ИИ-агентов — безопасно и быстро.

В следующем руководстве мы покажем, как запускать фоновые задачи через ваш MCP сервер с асинхронным выполнением инструментов.

Часто задаваемые вопросы

Да. Каждый ресурс или инструмент может вызывать разные API при необходимости.

Только если вы строго проверяете входные данные и тщательно контролируете, какие конечные точки доступны. Предполагайте, что LLM попробует всё.

Всегда предпочитайте асинхронные (httpx.AsyncClient) внутри серверов MCP, чтобы избежать блокировки цикла событий.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers