Back

Comment exposer votre API REST interne à votre serveur MCP

Comment exposer votre API REST interne à votre serveur MCP

Si vous disposez déjà d’API en fonctionnement, vous n’avez pas besoin de tout reconstruire pour MCP. Vous pouvez connecter vos points de terminaison REST à votre serveur MCP en créant des outils et des ressources simples. Ce guide vous montrera comment le faire correctement, avec des exemples Python fonctionnels.

Points clés

  • Vous pouvez connecter votre API REST aux serveurs MCP sans réécrire la logique métier
  • Utilisez des ressources pour la récupération sécurisée de données et des outils pour les points de terminaison basés sur des actions
  • Validez toujours les entrées et gérez proprement les échecs REST

Pourquoi connecter une API REST existante

De nombreuses entreprises disposent déjà de : Services utilisateurs, API de gestion des commandes, CRM ou systèmes de tickets d’assistance, API d’inventaire et de stock. MCP permet à votre LLM de lire et d’agir sur ces systèmes existants sans nécessiter d’accès direct à la base de données. Cela protège vos systèmes internes et permet des intégrations d’IA rapides et contrôlées.

Ce dont vous avez besoin

  • Une API REST en fonctionnement (publique ou interne)
  • SDK du serveur MCP installé
  • HTTPX (ou un autre client HTTP asynchrone pour Python)

Installez HTTPX :

pip install httpx mcp-server

Vous devriez avoir une connaissance de base des requêtes API en Python.

Étape 1 : Installer les bibliothèques client HTTP

Au lieu de réécrire la logique, vous appelez votre API REST depuis les gestionnaires MCP.

Exemple de configuration de base :

import os
import httpx

# Chargement depuis les variables d'environnement pour la sécurité
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

Utilisez un client asynchrone partagé pour de meilleures performances :

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Définir un délai d'attente raisonnable
)

Ce client sera réutilisé dans tous les gestionnaires MCP.

Étape 2 : Exposer les points de terminaison API comme ressources

Les ressources récupèrent des données sans effets secondaires. Exemple : récupération d’une liste d’utilisateurs actifs.

from mcp_server import MCPServer
import logging

# Configurer la journalisation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="Obtenir une liste des utilisateurs actifs.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Récupération de {len(users)} utilisateurs actifs")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"Erreur API : {e.response.status_code} - {e.response.text}")
        return {"error": f"Erreur API : {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Échec de connexion à l'API"}

Cette implémentation gère correctement les erreurs et fournit des messages informatifs en cas d’échec de l’appel API.

Étape 3 : Exposer les actions API comme outils

Les outils exécutent des actions qui modifient l’état. Exemple : création d’un nouveau ticket d’assistance.

@server.tool(name="create_support_ticket", description="Créer un nouveau ticket d'assistance pour un utilisateur.")
async def create_ticket(data: dict):
    # Validation des entrées
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Champs obligatoires manquants : user_id, subject et description sont requis"}
    
    # Validation de la priorité
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Priorité invalide. Doit être l'une des suivantes : {', '.join(valid_priorities)}"}
    
    # Préparation de la charge utile
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Ticket #{ticket_data.get('id')} créé pour l'utilisateur {user_id}")
        return {
            "status": "success",
            "message": "Ticket créé avec succès",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Échec de création du ticket : {e.response.status_code} - {e.response.text}")
        return {"error": "Échec de création du ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Erreur de connexion", "details": str(e)}

Bonnes pratiques dans cet exemple :

  • Validation des entrées avant d’atteindre l’API
  • Validation des valeurs autorisées (priorités)
  • Gestion structurée des erreurs avec messages informatifs
  • Retour de l’ID du ticket créé pour référence

Étape 4 : Exemple complet de serveur fonctionnel

Voici la version complète qui rassemble tout :

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# Configurer la journalisation
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# Charger la configuration depuis l'environnement
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# Créer le serveur
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Connecte les LLM aux API REST internes"
)

# Créer un client HTTP partagé
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# Ressources : Récupération sécurisée de données
@server.resource(name="active_users", description="Obtenir les utilisateurs actifs du système CRM.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Récupération de {len(users)} utilisateurs actifs")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"Erreur API : {e.response.status_code} - {e.response.text}")
        return {"error": f"Erreur API : {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Échec de connexion à l'API"}

@server.resource(name="user_details", description="Obtenir des informations détaillées sur un utilisateur spécifique.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Paramètre user_id manquant"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"Erreur API : {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"Utilisateur avec ID {user_id} non trouvé"}
        return {"error": f"Erreur API : {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Échec de connexion à l'API"}

# Outils : Actions qui peuvent modifier l'état
@server.tool(name="create_support_ticket", description="Créer un ticket d'assistance.")
async def create_ticket(data: dict):
    # Validation des entrées
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Champs obligatoires manquants : user_id, subject et description sont requis"}
    
    # Validation de la priorité
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Priorité invalide. Doit être l'une des suivantes : {', '.join(valid_priorities)}"}
    
    # Préparation de la charge utile
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Ticket #{ticket_data.get('id')} créé pour l'utilisateur {user_id}")
        return {
            "status": "success",
            "message": "Ticket créé avec succès",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Échec de création du ticket : {e.response.status_code} - {e.response.text}")
        return {"error": "Échec de création du ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Erreur de connexion", "details": str(e)}

@server.tool(name="update_ticket_status", description="Mettre à jour le statut d'un ticket d'assistance existant.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "ticket_id ou status manquant"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Statut invalide. Doit être l'un des suivants : {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Statut du ticket #{ticket_id} mis à jour vers {new_status}")
        return {"status": "success", "message": f"Statut du ticket mis à jour vers {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} non trouvé"}
        logger.error(f"Erreur API : {status_code} - {e.response.text}")
        return {"error": f"Échec de mise à jour du ticket : {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Erreur de requête : {str(e)}")
        return {"error": "Erreur de connexion", "details": str(e)}

# Gestion du cycle de vie du serveur
async def startup():
    logger.info("Démarrage du serveur MCP...")
    # Code d'initialisation

async def shutdown():
    logger.info("Arrêt du serveur MCP...")
    await client.aclose()  # Fermer les connexions du client HTTP

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Fonctionnalités améliorées :

  • Fermeture correcte du client avec aclose()
  • Journalisation structurée
  • Configuration par variables d’environnement
  • Gestion du cycle de vie du serveur
  • Points de terminaison d’exemple supplémentaires
  • Gestion complète des erreurs

Meilleures pratiques pour les configurations réelles

  • Réessayer les requêtes échouées : Implémentez un backoff exponentiel pour les échecs transitoires :

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • Respecter les limites de débit : Implémentez un limiteur de débit pour éviter le throttling de l’API :

    from aiolimiter import AsyncLimiter
    
    # Maximum 10 requêtes par seconde
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • Sécuriser les identifiants : Utilisez une solution de gestion des secrets appropriée :

    from dotenv import load_dotenv
    
    # Charger les variables d'environnement depuis le fichier .env
    load_dotenv()
    
    # Ou utilisez une solution cloud comme AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • Gérer les délais d’attente : Configurez les délais du client de manière appropriée :

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # Délai de connexion
            read=30.0,        # Délai de lecture
            write=30.0,       # Délai d'écriture
            pool=60.0         # Délai de pool
        )
    )
    
  • Auditer les accès : Implémentez une journalisation structurée avec contexte :

    @server.middleware
    async def audit_middleware(request, next_handler):
        # Générer un ID de requête
        request_id = str(uuid.uuid4())
        
        # Journaliser la requête entrante
        logger.info(f"Requête {request_id} : {request.method} {request.path}")
        
        # Ajouter le contexte au logger
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # Journaliser le résultat
        logger.info(f"Requête {request_id} terminée avec statut {response.status_code}")
        return response
    

Conclusion

Connecter vos API REST existantes aux serveurs MCP vous permet d’exploiter des données et des actions du monde réel sans réécrire les backends. Avec une conception et une validation soignées, vous pouvez créer des agents IA puissants et prêts pour la production — de manière sûre et rapide.

Dans le prochain guide, nous vous montrerons comment déclencher des tâches en arrière-plan via votre serveur MCP avec une exécution d’outils asynchrone.

FAQ

Oui. Chaque ressource ou outil peut appeler une API différente si nécessaire.

Seulement si vous validez strictement les entrées et contrôlez soigneusement quels points de terminaison sont accessibles. Supposez que le LLM essaiera tout.

Préférez toujours les requêtes asynchrones (httpx.AsyncClient) dans les serveurs MCP pour éviter de bloquer la boucle d'événements.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers