Cómo exponer tu API REST interna a tu servidor MCP

Si ya tienes APIs en funcionamiento, no necesitas reconstruir todo para MCP. Puedes conectar tus endpoints REST a tu servidor MCP creando herramientas y recursos simples. Esta guía te mostrará cómo hacerlo correctamente, con ejemplos funcionales en Python.
Puntos clave
- Puedes conectar tu API REST a servidores MCP sin reescribir la lógica de negocio
- Usa recursos para obtener datos de forma segura y herramientas para endpoints basados en acciones
- Valida siempre las entradas y maneja los fallos REST de manera limpia
Por qué conectar una API REST existente
Muchas empresas ya tienen: Servicios de usuarios, APIs de gestión de pedidos, Sistemas CRM o de tickets de soporte, APIs de inventario y stock. MCP permite que tu LLM lea y actúe sobre estos sistemas existentes sin necesitar acceso directo a la base de datos. Esto protege tus sistemas internos y permite integraciones de IA rápidas y controladas.
Lo que necesitas
- Una API REST en funcionamiento (pública o interna)
- SDK del servidor MCP instalado
- HTTPX (u otro cliente HTTP asíncrono para Python)
Instala HTTPX:
pip install httpx mcp-server
Deberías tener familiaridad básica con la realización de solicitudes API en Python.
Paso 1: Instalar bibliotecas cliente HTTP
En lugar de reescribir la lógica, llamas a tu API REST desde dentro de los manejadores MCP.
Ejemplo de configuración básica:
import os
import httpx
# Cargar desde variables de entorno por seguridad
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Usa un cliente asíncrono compartido para mejor rendimiento:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # Establece un tiempo de espera razonable
)
Este cliente será reutilizado en todos los manejadores MCP.
Paso 2: Exponer endpoints de API como recursos
Los recursos obtienen datos sin efectos secundarios. Ejemplo: obtener una lista de usuarios activos.
from mcp_server import MCPServer
import logging
# Configurar registro
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Obtener una lista de usuarios activos.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Recuperados {len(users)} usuarios activos")
return users
except httpx.HTTPStatusError as e:
logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
return {"error": f"Error de API: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "No se pudo conectar a la API"}
Esta implementación maneja adecuadamente los errores y proporciona mensajes informativos cuando la llamada a la API falla.
Paso 3: Exponer acciones de API como herramientas
Las herramientas realizan acciones que modifican el estado. Ejemplo: crear un nuevo ticket de soporte.
@server.tool(name="create_support_ticket", description="Crear un nuevo ticket de soporte para un usuario.")
async def create_ticket(data: dict):
# Validación de entrada
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Faltan campos obligatorios: user_id, subject y description son requeridos"}
# Validar prioridad
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Prioridad inválida. Debe ser una de: {', '.join(valid_priorities)}"}
# Preparar payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Ticket #{ticket_data.get('id')} creado para el usuario {user_id}")
return {
"status": "success",
"message": "Ticket creado exitosamente",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Error al crear ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Error al crear ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "Error de conexión", "details": str(e)}
Buenas prácticas en este ejemplo:
- Validación de entrada antes de llamar a la API
- Validación de valores permitidos (prioridades)
- Manejo estructurado de errores con mensajes informativos
- Devolución del ID del ticket creado como referencia
Paso 4: Ejemplo completo de servidor funcional
Aquí está la versión completa que lo reúne todo:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Configurar registro
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Cargar configuración desde el entorno
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Crear servidor
server = MCPServer(
name="REST API MCP Server",
version="1.0.0",
description="Conecta LLMs a APIs REST internas"
)
# Crear cliente HTTP compartido
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# Recursos: Obtención segura de datos
@server.resource(name="active_users", description="Obtener usuarios activos del sistema CRM.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Recuperados {len(users)} usuarios activos")
return users
except httpx.HTTPStatusError as e:
logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
return {"error": f"Error de API: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "No se pudo conectar a la API"}
@server.resource(name="user_details", description="Obtener información detallada sobre un usuario específico.")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "Falta el parámetro user_id"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Error de API: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"Usuario con ID {user_id} no encontrado"}
return {"error": f"Error de API: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "No se pudo conectar a la API"}
# Herramientas: Acciones que pueden modificar el estado
@server.tool(name="create_support_ticket", description="Crear un ticket de soporte.")
async def create_ticket(data: dict):
# Validación de entrada
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Faltan campos obligatorios: user_id, subject y description son requeridos"}
# Validar prioridad
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Prioridad inválida. Debe ser una de: {', '.join(valid_priorities)}"}
# Preparar payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Ticket #{ticket_data.get('id')} creado para el usuario {user_id}")
return {
"status": "success",
"message": "Ticket creado exitosamente",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Error al crear ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Error al crear ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "Error de conexión", "details": str(e)}
@server.tool(name="update_ticket_status", description="Actualizar el estado de un ticket de soporte existente.")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "Falta ticket_id o status"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"Estado inválido. Debe ser uno de: {', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"Estado del ticket #{ticket_id} actualizado a {new_status}")
return {"status": "success", "message": f"Estado del ticket actualizado a {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"Ticket {ticket_id} no encontrado"}
logger.error(f"Error de API: {status_code} - {e.response.text}")
return {"error": f"Error al actualizar ticket: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"Error de solicitud: {str(e)}")
return {"error": "Error de conexión", "details": str(e)}
# Gestión del ciclo de vida del servidor
async def startup():
logger.info("Iniciando servidor MCP...")
# Cualquier código de inicialización
async def shutdown():
logger.info("Apagando servidor MCP...")
await client.aclose() # Cerrar conexiones del cliente HTTP
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Características mejoradas:
- Cierre adecuado del cliente usando
aclose()
- Registro estructurado en todo el código
- Configuración mediante variables de entorno
- Gestión del ciclo de vida del servidor
- Endpoints de ejemplo adicionales
- Manejo integral de errores
Mejores prácticas para configuraciones en entornos reales
-
Reintentar solicitudes fallidas: Implementar retroceso exponencial para fallos transitorios:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
Respetar límites de tasa: Implementar un limitador de tasa para evitar la limitación de la API:
from aiolimiter import AsyncLimiter # Máximo 10 solicitudes por segundo rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
Asegurar credenciales: Usar una solución adecuada de gestión de secretos:
from dotenv import load_dotenv # Cargar variables de entorno desde archivo .env load_dotenv() # O usar una solución basada en la nube como AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
Manejar tiempos de espera: Configurar tiempos de espera del cliente adecuadamente:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Tiempo de espera de conexión read=30.0, # Tiempo de espera de lectura write=30.0, # Tiempo de espera de escritura pool=60.0 # Tiempo de espera del pool ) )
-
Auditar accesos: Implementar registro estructurado con contexto:
@server.middleware async def audit_middleware(request, next_handler): # Generar un ID de solicitud request_id = str(uuid.uuid4()) # Registrar la solicitud entrante logger.info(f"Solicitud {request_id}: {request.method} {request.path}") # Añadir contexto al logger with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Registrar el resultado logger.info(f"Solicitud {request_id} completada con estado {response.status_code}") return response
Conclusión
Conectar tus APIs REST existentes a servidores MCP te permite desbloquear datos y acciones del mundo real sin reescribir backends. Con un diseño cuidadoso y validación, puedes construir agentes de IA potentes y listos para producción, de forma segura y rápida.
En la próxima guía, mostraremos cómo activar trabajos en segundo plano a través de tu servidor MCP con ejecución asíncrona de herramientas.
Preguntas frecuentes
Sí. Cada recurso o herramienta puede llamar a una API diferente si es necesario.
Solo si validas estrictamente las entradas y controlas cuidadosamente qué endpoints son accesibles. Asume que el LLM intentará todo.
Siempre es preferible usar asíncronas (httpx.AsyncClient) dentro de servidores MCP para evitar bloquear el bucle de eventos.