12k
All articles

Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера

Подключение существующих REST API к MCP серверу с помощью Python, HTTPX и структурированных инструментов для безопасной и контролируемой интеграции с AI.

OpenReplay Team
OpenReplay Team
Как предоставить доступ к вашему внутреннему REST API для вашего MCP сервера

Если у вас уже есть работающие API, вам не нужно перестраивать всё для MCP. Вы можете связать ваши REST-конечные точки с вашим MCP сервером, создав простые инструменты и ресурсы. Это руководство покажет вам, как сделать это правильно, с рабочими примерами на Python.

Ключевые моменты

  • Вы можете подключить ваш REST API к серверам MCP без переписывания бизнес-логики
  • Используйте ресурсы для безопасного получения данных и инструменты для конечных точек, основанных на действиях
  • Всегда проверяйте входные данные и корректно обрабатывайте сбои REST

Зачем подключать существующий REST API

У многих компаний уже есть: Сервисы пользователей, API управления заказами, CRM или системы тикетов поддержки, API инвентаризации и складского учета. MCP позволяет вашей LLM читать и действовать с этими существующими системами без необходимости прямого доступа к базе данных. Это защищает ваши внутренние системы и обеспечивает быструю, контролируемую интеграцию с ИИ.

Что вам понадобится

  • Работающий REST API (публичный или внутренний)
  • Установленный MCP server SDK
  • HTTPX (или другой асинхронный HTTP-клиент для Python)

Установите HTTPX:

pip install httpx mcp-server

Вам следует иметь базовое представление о выполнении API-запросов в Python.

Шаг 1: Установите библиотеки HTTP-клиента

Вместо переписывания логики, вы вызываете ваш REST API изнутри обработчиков MCP.

Пример базовой настройки:

import os
import httpx

# Загрузка из переменных окружения для безопасности
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

Используйте общий асинхронный клиент для производительности:

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Установите разумный тайм-аут
)

Этот клиент будет повторно использоваться во всех обработчиках MCP.

Шаг 2: Предоставьте конечные точки API как ресурсы

Ресурсы получают данные без побочных эффектов. Пример: получение списка активных пользователей.

from mcp_server import MCPServer
import logging

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

Эта реализация правильно обрабатывает ошибки и предоставляет информативные сообщения при сбое вызова API.

Шаг 3: Предоставьте действия API как инструменты

Инструменты выполняют действия, которые изменяют состояние. Пример: создание нового тикета поддержки.

@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

Хорошие практики в этом примере:

  • Проверка входных данных перед обращением к API
  • Проверка допустимых значений (приоритетов)
  • Структурированная обработка ошибок с информативными сообщениями
  • Возвращение ID созданного тикета для ссылки

Шаг 4: Полный рабочий пример сервера

Вот полная версия, объединяющая всё вместе:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# Загрузка конфигурации из окружения
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# Создание сервера
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Connects LLMs to internal REST APIs"
)

# Создание общего HTTP-клиента
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# Ресурсы: Безопасное получение данных
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Missing user_id parameter"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"User with ID {user_id} not found"}
        return {"error": f"API error: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

# Инструменты: Действия, которые могут изменять состояние
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
    # Проверка входных данных
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Проверка приоритета
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Подготовка полезной нагрузки
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "Missing ticket_id or status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
        return {"status": "success", "message": f"Ticket status updated to {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} not found"}
        logger.error(f"API error: {status_code} - {e.response.text}")
        return {"error": f"Failed to update ticket: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

# Управление жизненным циклом сервера
async def startup():
    logger.info("Starting MCP server...")
    # Любой код инициализации

async def shutdown():
    logger.info("Shutting down MCP server...")
    await client.aclose()  # Закрытие соединений HTTP-клиента

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Улучшенные функции:

  • Правильное завершение работы клиента с использованием aclose()
  • Структурированное логирование повсюду
  • Конфигурация через переменные окружения
  • Управление жизненным циклом сервера
  • Дополнительные примеры конечных точек
  • Комплексная обработка ошибок

Лучшие практики для реальных настроек

  • Повторные попытки при неудачных запросах: Реализуйте экспоненциальное отступление для временных сбоев:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • Соблюдайте ограничения скорости: Реализуйте ограничитель скорости, чтобы избежать дросселирования API:

    from aiolimiter import AsyncLimiter
    
    # Максимум 10 запросов в секунду
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • Защита учетных данных: Используйте правильное решение для управления секретами:

    from dotenv import load_dotenv
    
    # Загрузка переменных окружения из файла .env
    load_dotenv()
    
    # Или используйте облачное решение, такое как AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • Обработка тайм-аутов: Настройте тайм-ауты клиента соответствующим образом:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # Тайм-аут соединения
            read=30.0,        # Тайм-аут чтения
            write=30.0,       # Тайм-аут записи
            pool=60.0         # Тайм-аут пула
        )
    )
    
  • Аудит доступа: Реализуйте структурированное логирование с контекстом:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # Генерация ID запроса
        request_id = str(uuid.uuid4())
        
        # Логирование входящего запроса
        logger.info(f"Request {request_id}: {request.method} {request.path}")
        
        # Добавление контекста в логгер
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # Логирование результата
        logger.info(f"Request {request_id} completed with status {response.status_code}")
        return response
    

Заключение

Подключение ваших существующих REST API к серверам MCP позволяет разблокировать реальные данные и действия без переписывания бэкендов. С тщательным проектированием и проверкой вы можете создавать мощных, готовых к производству ИИ-агентов — безопасно и быстро.

В следующем руководстве мы покажем, как запускать фоновые задачи через ваш MCP сервер с асинхронным выполнением инструментов.

Часто задаваемые вопросы

Могу ли я использовать разные API внутри одного MCP сервера?

Да. Каждый ресурс или инструмент может вызывать разные API при необходимости.

Безопасно ли предоставлять доступ к конфиденциальным конечным точкам API для LLM?

Только если вы строго проверяете входные данные и тщательно контролируете, какие конечные точки доступны. Предполагайте, что LLM попробует всё.

Следует ли использовать синхронные или асинхронные запросы?

Всегда предпочитайте асинхронные (httpx.AsyncClient) внутри серверов MCP, чтобы избежать блокировки цикла событий.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

We use cookies to improve your experience. By using our site, you accept cookies.