Back

Comment étendre votre serveur MCP avec un accès à une base de données

Comment étendre votre serveur MCP avec un accès à une base de données

Connecter votre serveur MCP à une véritable base de données le transforme d’une simple démo en un backend IA prêt pour la production. Dans ce guide, vous verrez exactement comment intégrer PostgreSQL à votre serveur MCP et l’exposer en toute sécurité à un LLM.

Points clés

  • Vous pouvez connecter PostgreSQL à votre serveur MCP avec asyncpg
  • Les ressources permettent aux LLM d’extraire des données structurées en temps réel
  • Les outils permettent aux LLM d’insérer, mettre à jour ou supprimer des enregistrements en toute sécurité
  • La validation des entrées et la gestion des transactions sont essentielles pour une utilisation en production
  • La configuration de l’environnement garde vos identifiants sécurisés

Pourquoi connecter une base de données à MCP

Sans accès à une base de données, votre LLM est aveugle aux données réelles de votre application. En le connectant, vous pouvez : permettre à l’IA de répondre aux questions basées sur des utilisateurs, commandes ou tickets réels ; automatiser des actions comme la création d’entrées ou la mise à jour d’enregistrements ; construire des agents internes intelligents sans avoir besoin d’API séparées ; activer des réponses IA contextuelles utilisant l’état de votre application ; créer des analyses alimentées par l’IA sur vos données d’entreprise. C’est la première étape pour transformer un modèle en un véritable assistant d’application qui fournit une valeur commerciale réelle.

Ce dont vous avez besoin avant de commencer

  • Une base de données PostgreSQL en fonctionnement (v12+)
  • Un serveur MCP Python (implémentation de base)
  • Python 3.10+ avec support asynchrone
  • La bibliothèque asyncpg pour l’accès à la base de données
  • Le package mcp-server (SDK Python officiel)
  • Python-dotenv pour la configuration de l’environnement
  • Connaissances de base en SQL et Python asynchrone

Vue d’ensemble de l’architecture

L’architecture implique plusieurs composants : (1) Client LLM : Claude ou un autre LLM qui communique via le protocole MCP, (2) Serveur MCP : Votre serveur Python exposant des ressources et des outils, (3) Pool de connexions : Gère efficacement les connexions à la base de données, (4) PostgreSQL : La base de données sous-jacente stockant les données de votre application.

Cette configuration suit une séparation claire des préoccupations : les ressources fournissent un accès en lecture seule pour les requêtes, les outils permettent des opérations d’écriture contrôlées, le pooling de connexions optimise les performances, la configuration de l’environnement garde les identifiants sécurisés.

Étape 1 : Installer et configurer les dépendances de la base de données

D’abord, installez les packages requis :

pip install asyncpg python-dotenv mcp-server

Créez une structure de projet :

mcp-db-server/
├── .env                  # Variables d'environnement (ne jamais commit sur git)
├── requirements.txt      # Dépendances
├── server.py             # Fichier principal du serveur
├── database.py           # Module de connexion à la base de données
├── resources/            # Ressources de la base de données
│   ├── __init__.py
│   └── users.py          # Ressources liées aux utilisateurs
└── tools/                # Outils de la base de données
    ├── __init__.py
    └── users.py          # Outils liés aux utilisateurs

Étape 2 : Configurer l’environnement

Créez un fichier .env pour vos identifiants de base de données :

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

Ne commitez jamais ce fichier dans le contrôle de version. Ajoutez-le à .gitignore :

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

Créez un fichier database.py pour charger ces variables d’environnement :

import os
import asyncpg
from dotenv import load_dotenv

# Charger les variables d'environnement
load_dotenv()

# Configuration de la base de données
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

Étape 3 : Créer un pool de connexions à la base de données

Étendez votre fichier database.py pour inclure le pooling de connexions :

import os
import asyncpg
import logging
from dotenv import load_dotenv

# Configurer la journalisation
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# Charger les variables d'environnement
load_dotenv()

# Configuration de la base de données
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# Variable de pool globale
db_pool = None

async def init_db():
    """Initialiser le pool de connexions à la base de données."""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # Délai d'acquisition de connexion
        )
        logger.info("Pool de connexions à la base de données établi")
        # Tester la connexion
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"Connecté à PostgreSQL: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"Échec de création du pool de base de données: {e}")
        raise

async def close_db():
    """Fermer le pool de connexions à la base de données."""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("Pool de connexions à la base de données fermé")

Cela vous donne : un pool de connexions correctement configuré, l’utilisation de variables d’environnement pour la sécurité, la journalisation pour la surveillance, l’optimisation de la taille du pool, la gestion des délais de connexion, une fonction de fermeture explicite pour des arrêts propres.

Étape 4 : Exposer une ressource en lecture seule

Créez resources/users.py pour exposer les données utilisateur :

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    Récupérer les utilisateurs les plus récents de la base de données.
    
    Args:
        limit: Nombre maximum d'utilisateurs à retourner (par défaut: 20)
        
    Returns:
        Liste d'objets utilisateur
    """
    try:
        if not db_pool:
            logger.error("Pool de base de données non initialisé")
            return {"error": "Connexion à la base de données non disponible"}
            
        async with db_pool.acquire() as connection:
            # Utiliser une requête paramétrée pour la sécurité
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # Convertir en dictionnaires et gérer la sérialisation des dates
            users = []
            for row in rows:
                user = dict(row)
                # Convertir datetime en format ISO pour la sérialisation JSON
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Récupération de {len(users)} utilisateurs récents")
            return users
    except Exception as e:
        logger.error(f"Erreur lors de la récupération des utilisateurs récents: {e}")
        return {"error": f"Erreur de base de données: {str(e)}"}

Maintenant, mettez à jour votre server.py pour enregistrer cette ressource :

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# Configurer la journalisation
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# Créer le serveur MCP
server = MCPServer()

# Enregistrer la ressource
@server.resource(
    name="recent_users", 
    description="Récupérer les utilisateurs les plus récents de la base de données."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # Initialiser la base de données
        await init_db()
        
        # Démarrer le serveur
        logger.info("Démarrage du serveur MCP...")
        await server.start()
    except Exception as e:
        logger.error(f"Erreur serveur: {e}")
    finally:
        # Fermer les connexions à la base de données lors de l'arrêt
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Étape 5 : Implémenter des capacités de requête avancées

Créez une ressource qui permet des requêtes plus flexibles avec des paramètres :

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    Récupérer les utilisateurs correspondant à des critères spécifiques.
    
    Args:
        department: Filtrer par département (optionnel)
        role: Filtrer par rôle (optionnel)
        active: Filtrer par statut actif (par défaut: True)
        limit: Nombre maximum de résultats à retourner (par défaut: 20)
        
    Returns:
        Liste d'objets utilisateur correspondants
    """
    try:
        if not db_pool:
            logger.error("Pool de base de données non initialisé")
            return {"error": "Connexion à la base de données non disponible"}
            
        # Construire une requête dynamique
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # Construire la requête finale
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # Convertir en dictionnaires et gérer la sérialisation des dates
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Récupération de {len(users)} utilisateurs correspondant aux critères")
            return users
    except Exception as e:
        logger.error(f"Erreur lors de la récupération des utilisateurs par critères: {e}")
        return {"error": f"Erreur de base de données: {str(e)}"}

Enregistrez ceci comme une ressource paramétrée :

@server.resource(
    name="users_by_criteria", 
    description="Récupérer les utilisateurs correspondant à des critères spécifiques comme le département ou le rôle."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

Cela permet au LLM de demander des sous-ensembles spécifiques d’utilisateurs en fonction des besoins métier.

Étape 6 : Créer un outil sécurisé pour insérer de nouveaux enregistrements

Créez tools/users.py pour les opérations d’écriture :

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """Modèle de validation pour les demandes de création d'utilisateur."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Le nom d'utilisateur doit être alphanumérique')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Créer un nouvel utilisateur dans la base de données.
    
    Args:
        data: Données utilisateur contenant nom d'utilisateur, email, etc.
        
    Returns:
        Réponse avec statut et informations utilisateur
    """
    try:
        # Valider les données d'entrée avec Pydantic
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("Pool de base de données non initialisé")
            return {
                "status": "error",
                "message": "Connexion à la base de données non disponible"
            }
        
        async with db_pool.acquire() as connection:
            # Vérifier si l'utilisateur existe déjà
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "Un utilisateur avec ce nom ou cet email existe déjà"
                }
            
            # Insérer le nouvel utilisateur
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"Nouvel utilisateur créé: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"Utilisateur {user_data.username} créé avec succès",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"Erreur lors de la création de l'utilisateur: {e}")
        return {
            "status": "error",
            "message": f"Échec de création de l'utilisateur: {str(e)}"
        }

Enregistrez cet outil dans server.py :

from tools.users import create_user

# ... code existant ...

# Enregistrer l'outil
@server.tool(
    name="create_user",
    description="Créer un nouvel utilisateur dans la base de données."
)
async def create_user_tool(data: dict):
    return await create_user(data)

Fonctionnalités de sécurité clés ajoutées : Validation Pydantic avec contraintes claires, validation d’email avec EmailStr, validation du format de nom d’utilisateur avec regex, vérification des doublons avant insertion, messages d’erreur clairs, journalisation complète, requêtes paramétrées pour prévenir l’injection SQL.

Étape 7 : Gestion des transactions

Pour les opérations nécessitant plusieurs modifications de base de données, utilisez des transactions :

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    Transférer un utilisateur vers un nouveau département, en enregistrant le changement dans le journal d'audit.
    
    Args:
        user_id: ID de l'utilisateur à transférer
        new_department: Nom du département cible
        
    Returns:
        Statut de l'opération
    """
    try:
        if not db_pool:
            return {"error": "Connexion à la base de données non disponible"}
            
        async with db_pool.acquire() as connection:
            # Démarrer une transaction
            async with connection.transaction():
                # Obtenir le département actuel
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "Utilisateur non trouvé"}
                
                # Mettre à jour le département de l'utilisateur
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # Enregistrer le changement dans le journal d'audit
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"Utilisateur {user_id} transféré de {current_dept} vers {new_department}")
                
                return {
                    "status": "success",
                    "message": f"Utilisateur transféré de {current_dept} vers {new_department}"
                }
    except Exception as e:
        logger.error(f"Erreur lors du transfert de l'utilisateur: {e}")
        return {"error": f"Échec du transfert: {str(e)}"}

Enregistrez ceci comme un outil :

@server.tool(
    name="transfer_user",
    description="Transférer un utilisateur vers un nouveau département."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id ou new_department manquant"}
        
    return await transfer_user_to_department(user_id, new_department)

Cela garantit que les deux opérations (mise à jour de l’utilisateur + ajout au journal d’audit) réussissent ou échouent ensemble.

Étape 8 : Exemple de code complet du serveur

Voici un exemple complet qui rassemble tout :

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# Configurer la journalisation
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# Créer le serveur MCP
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="Serveur MCP avec accès à la base de données PostgreSQL"
)

# Enregistrer les ressources
@server.resource(
    name="recent_users", 
    description="Récupérer les utilisateurs les plus récents de la base de données."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="Récupérer les utilisateurs correspondant à des critères spécifiques comme le département ou le rôle."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# Enregistrer les outils
@server.tool(
    name="create_user",
    description="Créer un nouvel utilisateur dans la base de données."
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="Transférer un utilisateur vers un nouveau département."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id ou new_department manquant"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # Initialiser la base de données
        await init_db()
        
        # Démarrer le serveur
        logger.info("Démarrage du serveur MCP...")
        await server.start()
    except Exception as e:
        logger.error(f"Erreur serveur: {e}")
    finally:
        # Fermer les connexions à la base de données lors de l'arrêt
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Considérations de sécurité

Lorsque vous connectez votre serveur MCP à une base de données, suivez ces bonnes pratiques de sécurité :

  1. Utilisez un utilisateur de base de données dédié avec des permissions limitées :

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- N'accordez que ce qui est nécessaire
    
  2. Validez toutes les entrées en utilisant des modèles Pydantic avec des règles de validation strictes.

  3. Utilisez exclusivement des requêtes paramétrées pour prévenir l’injection SQL.

  4. Implémentez l’authentification pour votre serveur MCP :

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Non autorisé"}
        return await next_handler(request)
    
  5. Implémentez la limitation de débit pour prévenir les abus :

    # Limiteur de débit simple en mémoire
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # Effacer les anciennes entrées
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 100 requêtes par minute
                return {"error": "Limite de débit dépassée"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. Implémentez la journalisation des requêtes pour l’audit :

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"Requête: {request.method} {request.path} de {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. Configurez des délais d’expiration pour les requêtes afin d’éviter que des requêtes longues n’affectent les performances du serveur.

Optimisation des performances

  1. Le pooling de connexions est déjà implémenté, mais ajustez les paramètres en fonction de votre charge de travail :

    db_pool = await asyncpg.create_pool(
        **DB_CONFIG,
        min_size=5,       # Définir plus haut pour les scénarios à forte charge
        max_size=20,      # Ajuster en fonction de la capacité de votre serveur de base de données
        statement_cache_size=100,  # Mettre en cache les instructions préparées
        max_inactive_connection_lifetime=300  # Secondes avant de recycler les connexions inactives
    )
    
  2. Créez des index de base de données pour les champs fréquemment interrogés :

    CREATE INDEX idx_users_department ON users(department);
    CREATE INDEX idx_users_created_at ON users(created_at DESC);
    
  3. Implémentez la mise en cache des résultats pour les données fréquemment accédées et rarement modifiées :

    from functools import lru_cache
    from datetime import datetime, timedelta
    
    # Cache qui expire après 5 minutes
    cache_time = None
    cached_result = None
    
    async def fetch_departments_with_caching():
        global cache_time, cached_result
        
        # Vérifier si le cache est valide
        current_time = datetime.now()
        if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5):
            return cached_result
            
        # Cache manqué - récupérer depuis la base de données
        async with db_pool.acquire() as connection:
            result = await connection.fetch("SELECT * FROM departments")
            
        # Mettre à jour le cache
        cache_time = current_time
        cached_result = [dict(row) for row in result]
        
        return cached_result
    
  4. Utilisez JSONB pour les données complexes qui n’ont pas besoin d’être interrogées de manière approfondie :

    CREATE TABLE user_preferences (
        user_id INTEGER PRIMARY KEY,
        preferences JSONB NOT NULL
    );
    
  5. Pagination pour les grands ensembles de résultats :

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Obtenir le nombre total
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Obtenir les résultats paginés
            rows = await connection.fetch(
                "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
                page_size, offset
            )
            
            return {
                "users": [dict(row) for row in rows],
                "pagination": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "pages": (total + page_size - 1) // page_size
                }
            }
    

Test des interactions avec la base de données

Créez un répertoire tests avec un fichier de test :

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

Exécutez les tests avec : pytest -xvs tests/

Considérations pour le déploiement

  1. Utilisez des fichiers de configuration spécifiques à l’environnement : .env.development, .env.staging, .env.production

  2. Configurez des migrations de base de données pour les changements de schéma :

    pip install alembic
    alembic init migrations
  3. Déployez avec Docker pour la cohérence :

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
  4. Mettez en place des vérifications de santé :

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
  5. Surveillez les performances des requêtes :

    -- Dans PostgreSQL
    CREATE EXTENSION pg_stat_statements;
    
    -- Pour analyser les requêtes lentes
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
  6. Configurez des sauvegardes de base de données :

    # Script de sauvegarde quotidienne
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump

Conclusion

Connecter votre serveur MCP à une base de données PostgreSQL le transforme d’une simple démo en un backend IA prêt pour la production. Avec une attention particulière à la sécurité, aux performances et à la fiabilité, vous pouvez exposer votre base de données aux modèles de langage en toute sécurité, permettant ainsi des flux de travail puissants pilotés par l’IA. Principes clés à retenir : Resources pour les opérations de lecture, Tools pour les opérations d’écriture, Validation pour toutes les entrées, Transactions pour la cohérence, Pooling de connexions pour les performances, Variables d’environnement pour la sécurité, Gestion structurée des erreurs pour la fiabilité. En suivant ces pratiques, vous permettez aux LLM de travailler avec vos applications de base de données existantes de manière sûre et contrôlée — débloquant de nouvelles possibilités pour les flux de travail assistés par l’IA et l’analyse de données. Dans le prochain guide, nous montrerons comment connecter votre backend API existant via MCP sans réécrire la logique métier.

FAQ

Oui. Vous pouvez créer plusieurs gestionnaires de ressources et d'outils, chacun se connectant à une base de données ou un schéma différent.

Vous devriez ajouter une gestion try/except pour renvoyer une erreur claire au LLM au lieu de planter.

Exposez uniquement des opérations de lecture ou d'écriture sélectionnées via des ressources et des outils. Ne donnez jamais au LLM un accès illimité à la base de données.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers