Back

Cómo exponer tu API REST interna a tu servidor MCP

Cómo exponer tu API REST interna a tu servidor MCP

Si ya tienes APIs en funcionamiento, no necesitas reconstruir todo para MCP. Puedes conectar tus endpoints REST a tu servidor MCP creando herramientas y recursos simples. Esta guía te mostrará cómo hacerlo correctamente, con ejemplos funcionales en Python.

Puntos clave

  • Puedes conectar tu API REST a servidores MCP sin reescribir la lógica de negocio
  • Usa recursos para obtener datos de forma segura y herramientas para endpoints basados en acciones
  • Valida siempre las entradas y maneja los fallos REST de manera limpia

Por qué conectar una API REST existente

Muchas empresas ya tienen: Servicios de usuarios, APIs de gestión de pedidos, Sistemas CRM o de tickets de soporte, APIs de inventario y stock. MCP permite que tu LLM lea y actúe sobre estos sistemas existentes sin necesitar acceso directo a la base de datos. Esto protege tus sistemas internos y permite integraciones de IA rápidas y controladas.

Lo que necesitas

  • Una API REST en funcionamiento (pública o interna)
  • SDK del servidor MCP instalado
  • HTTPX (u otro cliente HTTP asíncrono para Python)

Instala HTTPX:

pip install httpx mcp-server

Deberías tener familiaridad básica con la realización de solicitudes API en Python.

Paso 1: Instalar bibliotecas cliente HTTP

En lugar de reescribir la lógica, llamas a tu API REST desde dentro de los manejadores MCP.

Ejemplo de configuración básica:

import os
import httpx

# Cargar desde variables de entorno por seguridad
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

Usa un cliente asíncrono compartido para mejor rendimiento:

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Establece un tiempo de espera razonable
)

Este cliente será reutilizado en todos los manejadores MCP.

Paso 2: Exponer endpoints de API como recursos

Los recursos obtienen datos sin efectos secundarios. Ejemplo: obtener una lista de usuarios activos.

from mcp_server import MCPServer
import logging

# Configurar registro
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="Obtener una lista de usuarios activos.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Recuperados {len(users)} usuarios activos")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
        return {"error": f"Error de API: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "No se pudo conectar a la API"}

Esta implementación maneja adecuadamente los errores y proporciona mensajes informativos cuando la llamada a la API falla.

Paso 3: Exponer acciones de API como herramientas

Las herramientas realizan acciones que modifican el estado. Ejemplo: crear un nuevo ticket de soporte.

@server.tool(name="create_support_ticket", description="Crear un nuevo ticket de soporte para un usuario.")
async def create_ticket(data: dict):
    # Validación de entrada
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Faltan campos obligatorios: user_id, subject y description son requeridos"}
    
    # Validar prioridad
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Prioridad inválida. Debe ser una de: {', '.join(valid_priorities)}"}
    
    # Preparar payload
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Ticket #{ticket_data.get('id')} creado para el usuario {user_id}")
        return {
            "status": "success",
            "message": "Ticket creado exitosamente",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Error al crear ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Error al crear ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "Error de conexión", "details": str(e)}

Buenas prácticas en este ejemplo:

  • Validación de entrada antes de llamar a la API
  • Validación de valores permitidos (prioridades)
  • Manejo estructurado de errores con mensajes informativos
  • Devolución del ID del ticket creado como referencia

Paso 4: Ejemplo completo de servidor funcional

Aquí está la versión completa que lo reúne todo:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# Configurar registro
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# Cargar configuración desde el entorno
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# Crear servidor
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Conecta LLMs a APIs REST internas"
)

# Crear cliente HTTP compartido
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# Recursos: Obtención segura de datos
@server.resource(name="active_users", description="Obtener usuarios activos del sistema CRM.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Recuperados {len(users)} usuarios activos")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
        return {"error": f"Error de API: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "No se pudo conectar a la API"}

@server.resource(name="user_details", description="Obtener información detallada sobre un usuario específico.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Falta el parámetro user_id"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"Usuario con ID {user_id} no encontrado"}
        return {"error": f"Error de API: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "No se pudo conectar a la API"}

# Herramientas: Acciones que pueden modificar el estado
@server.tool(name="create_support_ticket", description="Crear un ticket de soporte.")
async def create_ticket(data: dict):
    # Validación de entrada
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Faltan campos obligatorios: user_id, subject y description son requeridos"}
    
    # Validar prioridad
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Prioridad inválida. Debe ser una de: {', '.join(valid_priorities)}"}
    
    # Preparar payload
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Ticket #{ticket_data.get('id')} creado para el usuario {user_id}")
        return {
            "status": "success",
            "message": "Ticket creado exitosamente",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Error al crear ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Error al crear ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "Error de conexión", "details": str(e)}

@server.tool(name="update_ticket_status", description="Actualizar el estado de un ticket de soporte existente.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "Falta ticket_id o status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Estado inválido. Debe ser uno de: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Estado del ticket #{ticket_id} actualizado a {new_status}")
        return {"status": "success", "message": f"Estado del ticket actualizado a {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} no encontrado"}
        logger.error(f"Error de API: {status_code} - {e.response.text}")
        return {"error": f"Error al actualizar ticket: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Error de solicitud: {str(e)}")
        return {"error": "Error de conexión", "details": str(e)}

# Gestión del ciclo de vida del servidor
async def startup():
    logger.info("Iniciando servidor MCP...")
    # Cualquier código de inicialización

async def shutdown():
    logger.info("Apagando servidor MCP...")
    await client.aclose()  # Cerrar conexiones del cliente HTTP

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Características mejoradas:

  • Cierre adecuado del cliente usando aclose()
  • Registro estructurado en todo el código
  • Configuración mediante variables de entorno
  • Gestión del ciclo de vida del servidor
  • Endpoints de ejemplo adicionales
  • Manejo integral de errores

Mejores prácticas para configuraciones en entornos reales

  • Reintentar solicitudes fallidas: Implementar retroceso exponencial para fallos transitorios:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • Respetar límites de tasa: Implementar un limitador de tasa para evitar la limitación de la API:

    from aiolimiter import AsyncLimiter
    
    # Máximo 10 solicitudes por segundo
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • Asegurar credenciales: Usar una solución adecuada de gestión de secretos:

    from dotenv import load_dotenv
    
    # Cargar variables de entorno desde archivo .env
    load_dotenv()
    
    # O usar una solución basada en la nube como AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • Manejar tiempos de espera: Configurar tiempos de espera del cliente adecuadamente:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # Tiempo de espera de conexión
            read=30.0,        # Tiempo de espera de lectura
            write=30.0,       # Tiempo de espera de escritura
            pool=60.0         # Tiempo de espera del pool
        )
    )
    
  • Auditar accesos: Implementar registro estructurado con contexto:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # Generar un ID de solicitud
        request_id = str(uuid.uuid4())
        
        # Registrar la solicitud entrante
        logger.info(f"Solicitud {request_id}: {request.method} {request.path}")
        
        # Añadir contexto al logger
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # Registrar el resultado
        logger.info(f"Solicitud {request_id} completada con estado {response.status_code}")
        return response
    

Conclusión

Conectar tus APIs REST existentes a servidores MCP te permite desbloquear datos y acciones del mundo real sin reescribir backends. Con un diseño cuidadoso y validación, puedes construir agentes de IA potentes y listos para producción, de forma segura y rápida.

En la próxima guía, mostraremos cómo activar trabajos en segundo plano a través de tu servidor MCP con ejecución asíncrona de herramientas.

Preguntas frecuentes

Sí. Cada recurso o herramienta puede llamar a una API diferente si es necesario.

Solo si validas estrictamente las entradas y controlas cuidadosamente qué endpoints son accesibles. Asume que el LLM intentará todo.

Siempre es preferible usar asíncronas (httpx.AsyncClient) dentro de servidores MCP para evitar bloquear el bucle de eventos.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers