Back

So verbinden Sie Ihren MCP-Server mit Datenbankzugriff

So verbinden Sie Ihren MCP-Server mit Datenbankzugriff

Die Verbindung Ihres MCP-Servers mit einer echten Datenbank verwandelt ihn von einer einfachen Demo in ein produktionsreifes KI-Backend. In dieser Anleitung erfahren Sie genau, wie Sie PostgreSQL in Ihren MCP-Server integrieren und sicher für ein LLM zugänglich machen.

Wichtige Erkenntnisse

  • Sie können PostgreSQL mit asyncpg an Ihren MCP-Server anbinden
  • Ressourcen ermöglichen LLMs den Zugriff auf aktuelle strukturierte Daten
  • Tools erlauben LLMs das sichere Einfügen, Aktualisieren oder Löschen von Datensätzen
  • Eingabevalidierung und Transaktionsmanagement sind für den Produktionseinsatz unerlässlich
  • Umgebungskonfiguration hält Ihre Zugangsdaten sicher

Warum eine Datenbank mit MCP verbinden

Ohne Datenbankzugriff ist Ihr LLM blind gegenüber Ihren tatsächlichen Anwendungsdaten. Durch die Verbindung können Sie: KI Fragen auf Basis echter Nutzer, Bestellungen, Tickets usw. beantworten lassen; Aktionen wie das Erstellen von Einträgen oder Aktualisieren von Datensätzen automatisieren; Intelligente interne Agenten ohne separate APIs erstellen; Kontextbezogene KI-Antworten unter Verwendung Ihres Anwendungsstatus ermöglichen; KI-gestützte Analysen über Ihre Geschäftsdaten erstellen. Dies ist der erste Schritt, um ein Modell in einen echten Anwendungsassistenten zu verwandeln, der echten geschäftlichen Mehrwert bietet.

Was Sie vor dem Start benötigen

  • Eine laufende PostgreSQL-Datenbank (v12+)
  • Einen Python MCP-Server (grundlegende Implementierung)
  • Python 3.10+ mit Async-Unterstützung
  • Die asyncpg-Bibliothek für Datenbankzugriff
  • Das mcp-server-Paket (offizielles Python SDK)
  • Python-dotenv für die Umgebungskonfiguration
  • Grundkenntnisse in SQL und asynchronem Python

Architekturübersicht

Die Architektur umfasst mehrere Komponenten: (1) LLM-Client: Claude oder ein anderes LLM, das über das MCP-Protokoll kommuniziert, (2) MCP-Server: Ihr Python-Server, der Ressourcen und Tools bereitstellt, (3) Connection Pool: Verwaltet Datenbankverbindungen effizient, (4) PostgreSQL: Die zugrunde liegende Datenbank, die Ihre Anwendungsdaten speichert.

Dieser Aufbau folgt einer klaren Trennung der Zuständigkeiten: Ressourcen bieten schreibgeschützten Zugriff für Abfragen, Tools ermöglichen kontrollierte Schreiboperationen, Connection Pooling optimiert die Leistung, Umgebungskonfiguration hält Zugangsdaten sicher.

Schritt 1: Installation und Konfiguration der Datenbankabhängigkeiten

Installieren Sie zunächst die erforderlichen Pakete:

pip install asyncpg python-dotenv mcp-server

Erstellen Sie eine Projektstruktur:

mcp-db-server/
├── .env                  # Umgebungsvariablen (niemals in Git einchecken)
├── requirements.txt      # Abhängigkeiten
├── server.py             # Hauptserverdatei
├── database.py           # Datenbankverbindungsmodul
├── resources/            # Datenbankressourcen
│   ├── __init__.py
│   └── users.py          # Nutzerbezogene Ressourcen
└── tools/                # Datenbanktools
    ├── __init__.py
    └── users.py          # Nutzerbezogene Tools

Schritt 2: Einrichten der Umgebungskonfiguration

Erstellen Sie eine .env-Datei für Ihre Datenbankzugangsdaten:

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

Checken Sie diese Datei niemals in die Versionskontrolle ein. Fügen Sie sie zu .gitignore hinzu:

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

Erstellen Sie eine database.py-Datei, um diese Umgebungsvariablen zu laden:

import os
import asyncpg
from dotenv import load_dotenv

# Umgebungsvariablen laden
load_dotenv()

# Datenbankkonfiguration
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

Schritt 3: Erstellen eines Datenbankverbindungspools

Erweitern Sie Ihre database.py-Datei um Connection Pooling:

import os
import asyncpg
import logging
from dotenv import load_dotenv

# Logging konfigurieren
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# Umgebungsvariablen laden
load_dotenv()

# Datenbankkonfiguration
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# Globale Pool-Variable
db_pool = None

async def init_db():
    """Initialisieren des Datenbankverbindungspools."""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # Timeout für Verbindungsaufbau
        )
        logger.info("Datenbankverbindungspool eingerichtet")
        # Verbindung testen
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"Verbunden mit PostgreSQL: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"Fehler beim Erstellen des Datenbankpools: {e}")
        raise

async def close_db():
    """Schließen des Datenbankverbindungspools."""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("Datenbankverbindungspool geschlossen")

Dies bietet Ihnen: Einen korrekt konfigurierten Verbindungspool, Verwendung von Umgebungsvariablen für Sicherheit, Logging für Überwachung, Optimierung der Poolgröße, Behandlung von Verbindungstimeouts, Explizite Schließfunktion für sauberes Herunterfahren.

Schritt 4: Bereitstellen einer schreibgeschützten Ressource

Erstellen Sie resources/users.py, um Benutzerdaten bereitzustellen:

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    Abrufen der neuesten Benutzer aus der Datenbank.
    
    Args:
        limit: Maximale Anzahl der zurückzugebenden Benutzer (Standard: 20)
        
    Returns:
        Liste von Benutzerobjekten
    """
    try:
        if not db_pool:
            logger.error("Datenbankpool nicht initialisiert")
            return {"error": "Datenbankverbindung nicht verfügbar"}
            
        async with db_pool.acquire() as connection:
            # Parametrisierte Abfrage für Sicherheit verwenden
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # In Dictionaries umwandeln und Datetime-Serialisierung behandeln
            users = []
            for row in rows:
                user = dict(row)
                # Datetime in ISO-Format für JSON-Serialisierung umwandeln
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"{len(users)} aktuelle Benutzer abgerufen")
            return users
    except Exception as e:
        logger.error(f"Fehler beim Abrufen aktueller Benutzer: {e}")
        return {"error": f"Datenbankfehler: {str(e)}"}

Aktualisieren Sie nun Ihre server.py, um diese Ressource zu registrieren:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# MCP-Server erstellen
server = MCPServer()

# Ressource registrieren
@server.resource(
    name="recent_users", 
    description="Die neuesten Benutzer aus der Datenbank abrufen."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # Datenbank initialisieren
        await init_db()
        
        # Server starten
        logger.info("MCP-Server wird gestartet...")
        await server.start()
    except Exception as e:
        logger.error(f"Serverfehler: {e}")
    finally:
        # Datenbankverbindungen beim Herunterfahren schließen
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Schritt 5: Implementierung erweiterter Abfragefunktionen

Erstellen Sie eine Ressource, die flexiblere Abfragen mit Parametern ermöglicht:

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    Benutzer abrufen, die bestimmten Kriterien entsprechen.
    
    Args:
        department: Nach Abteilung filtern (optional)
        role: Nach Rolle filtern (optional)
        active: Nach Aktivitätsstatus filtern (Standard: True)
        limit: Maximale Anzahl zurückzugebender Ergebnisse (Standard: 20)
        
    Returns:
        Liste passender Benutzerobjekte
    """
    try:
        if not db_pool:
            logger.error("Datenbankpool nicht initialisiert")
            return {"error": "Datenbankverbindung nicht verfügbar"}
            
        # Dynamische Abfrage erstellen
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # Endgültige Abfrage erstellen
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # In Dictionaries umwandeln und Datetime-Serialisierung behandeln
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"{len(users)} Benutzer nach Kriterien abgerufen")
            return users
    except Exception as e:
        logger.error(f"Fehler beim Abrufen von Benutzern nach Kriterien: {e}")
        return {"error": f"Datenbankfehler: {str(e)}"}

Registrieren Sie dies als parametrisierte Ressource:

@server.resource(
    name="users_by_criteria", 
    description="Benutzer abrufen, die bestimmten Kriterien wie Abteilung oder Rolle entsprechen."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

Dies ermöglicht dem LLM, bestimmte Teilmengen von Benutzern basierend auf geschäftlichen Anforderungen anzufordern.

Schritt 6: Erstellen eines sicheren Tools zum Einfügen neuer Datensätze

Erstellen Sie tools/users.py für Schreiboperationen:

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """Validierungsmodell für Benutzererststellungsanfragen."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Benutzername muss alphanumerisch sein')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Einen neuen Benutzer in der Datenbank erstellen.
    
    Args:
        data: Benutzerdaten mit username, email, usw.
        
    Returns:
        Antwort mit Status und Benutzerinfo
    """
    try:
        # Eingabedaten mit Pydantic validieren
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("Datenbankpool nicht initialisiert")
            return {
                "status": "error",
                "message": "Datenbankverbindung nicht verfügbar"
            }
        
        async with db_pool.acquire() as connection:
            # Prüfen, ob Benutzer bereits existiert
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "Benutzer mit diesem Benutzernamen oder dieser E-Mail existiert bereits"
                }
            
            # Neuen Benutzer einfügen
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"Neuer Benutzer erstellt: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"Benutzer {user_data.username} erfolgreich erstellt",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"Fehler beim Erstellen des Benutzers: {e}")
        return {
            "status": "error",
            "message": f"Benutzer konnte nicht erstellt werden: {str(e)}"
        }

Registrieren Sie dieses Tool in server.py:

from tools.users import create_user

# ... bestehender Code ...

# Tool registrieren
@server.tool(
    name="create_user",
    description="Einen neuen Benutzer in der Datenbank erstellen."
)
async def create_user_tool(data: dict):
    return await create_user(data)

Wichtige Sicherheitsfunktionen: Pydantic-Validierung mit klaren Einschränkungen, E-Mail-Validierung mit EmailStr, Benutzernamenformat-Validierung mit Regex, Duplikatprüfung vor dem Einfügen, Klare Fehlermeldungen, Umfassendes Logging, Parametrisierte Abfragen zur Verhinderung von SQL-Injection.

Schritt 7: Transaktionsmanagement

Für Operationen, die mehrere Datenbankänderungen erfordern, verwenden Sie Transaktionen:

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    Einen Benutzer in eine neue Abteilung versetzen und die Änderung im Audit-Log erfassen.
    
    Args:
        user_id: ID des zu versetzenden Benutzers
        new_department: Name der Zielabteilung
        
    Returns:
        Status der Operation
    """
    try:
        if not db_pool:
            return {"error": "Datenbankverbindung nicht verfügbar"}
            
        async with db_pool.acquire() as connection:
            # Transaktion starten
            async with connection.transaction():
                # Aktuelle Abteilung abrufen
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "Benutzer nicht gefunden"}
                
                # Abteilung des Benutzers aktualisieren
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # Änderung im Audit-Log erfassen
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"Benutzer {user_id} von {current_dept} nach {new_department} versetzt")
                
                return {
                    "status": "success",
                    "message": f"Benutzer von {current_dept} nach {new_department} versetzt"
                }
    except Exception as e:
        logger.error(f"Fehler beim Versetzen des Benutzers: {e}")
        return {"error": f"Versetzung fehlgeschlagen: {str(e)}"}

Registrieren Sie dies als Tool:

@server.tool(
    name="transfer_user",
    description="Einen Benutzer in eine neue Abteilung versetzen."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id oder new_department fehlt"}
        
    return await transfer_user_to_department(user_id, new_department)

Dies stellt sicher, dass beide Operationen (Benutzer aktualisieren + Audit-Log hinzufügen) gemeinsam erfolgreich sind oder fehlschlagen.

Schritt 8: Vollständiges Server-Codebeispiel

Hier ist ein vollständiges Beispiel, das alles zusammenführt:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# Logging konfigurieren
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# MCP-Server erstellen
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="MCP-Server mit PostgreSQL-Datenbankzugriff"
)

# Ressourcen registrieren
@server.resource(
    name="recent_users", 
    description="Die neuesten Benutzer aus der Datenbank abrufen."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="Benutzer abrufen, die bestimmten Kriterien wie Abteilung oder Rolle entsprechen."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# Tools registrieren
@server.tool(
    name="create_user",
    description="Einen neuen Benutzer in der Datenbank erstellen."
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="Einen Benutzer in eine neue Abteilung versetzen."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id oder new_department fehlt"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # Datenbank initialisieren
        await init_db()
        
        # Server starten
        logger.info("MCP-Server wird gestartet...")
        await server.start()
    except Exception as e:
        logger.error(f"Serverfehler: {e}")
    finally:
        # Datenbankverbindungen beim Herunterfahren schließen
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Sicherheitsüberlegungen

Wenn Sie Ihren MCP-Server mit einer Datenbank verbinden, befolgen Sie diese Sicherheitsempfehlungen:

  1. Verwenden Sie einen dedizierten Datenbankbenutzer mit eingeschränkten Berechtigungen:

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- Nur das Notwendige gewähren
    
  2. Validieren Sie alle Eingaben mit Pydantic-Modellen mit strengen Validierungsregeln.

  3. Verwenden Sie ausschließlich parametrisierte Abfragen, um SQL-Injection zu verhindern.

  4. Implementieren Sie Authentifizierung für Ihren MCP-Server:

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Nicht autorisiert"}
        return await next_handler(request)
    
  5. Implementieren Sie Rate-Limiting, um Missbrauch zu verhindern:

    # Einfacher In-Memory-Rate-Limiter
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # Alte Einträge löschen
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 100 Anfragen pro Minute
                return {"error": "Rate-Limit überschritten"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. Implementieren Sie Request-Logging für Auditing:

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"Anfrage: {request.method} {request.path} von {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. Richten Sie Datenbank-Query-Timeouts ein, um zu verhindern, dass lang laufende Abfragen die Serverleistung beeinträchtigen.

Leistungsoptimierung

  1. Connection Pooling ist bereits implementiert, aber passen Sie die Parameter basierend auf Ihrer Arbeitslast an:

    db_pool = await asyncpg.create_pool(
        **DB_CONFIG,
        min_size=5,       # Höher setzen für Szenarien mit hoher Last
        max_size=20,      # Anpassen basierend auf der Kapazität Ihres Datenbankservers
        statement_cache_size=100,  # Vorbereitete Anweisungen cachen
        max_inactive_connection_lifetime=300  # Sekunden bis zum Recycling inaktiver Verbindungen
    )
    
  2. Erstellen Sie Datenbankindizes für häufig abgefragte Felder:

    CREATE INDEX idx_users_department ON users(department);
    CREATE INDEX idx_users_created_at ON users(created_at DESC);
    
  3. Implementieren Sie Ergebnis-Caching für häufig abgerufene, selten geänderte Daten:

    from functools import lru_cache
    from datetime import datetime, timedelta
    
    # Cache, der nach 5 Minuten abläuft
    cache_time = None
    cached_result = None
    
    async def fetch_departments_with_caching():
        global cache_time, cached_result
        
        # Prüfen, ob Cache gültig ist
        current_time = datetime.now()
        if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5):
            return cached_result
            
        # Cache-Miss - aus Datenbank abrufen
        async with db_pool.acquire() as connection:
            result = await connection.fetch("SELECT * FROM departments")
            
        # Cache aktualisieren
        cache_time = current_time
        cached_result = [dict(row) for row in result]
        
        return cached_result
    
  4. Verwenden Sie JSONB für komplexe Daten, die nicht umfangreich abgefragt werden müssen:

    CREATE TABLE user_preferences (
        user_id INTEGER PRIMARY KEY,
        preferences JSONB NOT NULL
    );
    
  5. Paginierung für große Ergebnismengen:

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Gesamtzahl abrufen
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Paginierte Ergebnisse abrufen
            rows = await connection.fetch(
                "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
                page_size, offset
            )
            
            return {
                "users": [dict(row) for row in rows],
                "pagination": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "pages": (total + page_size - 1) // page_size
                }
            }
    

Testen von Datenbankinteraktionen

Erstellen Sie ein tests-Verzeichnis mit einer Testdatei:

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

Tests ausführen mit: pytest -xvs tests/

Überlegungen zur Bereitstellung

  1. Umgebungsspezifische Konfigurationsdateien verwenden: .env.development, .env.staging, .env.production

  2. Datenbank-Migrationen für Schemaänderungen einrichten:

    pip install alembic
    alembic init migrations
  3. Mit Docker bereitstellen für Konsistenz:

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
  4. Gesundheitsprüfungen einrichten:

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
  5. Abfrageleistung überwachen:

    -- In PostgreSQL
    CREATE EXTENSION pg_stat_statements;
    
    -- Um langsame Abfragen zu analysieren
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
  6. Datenbank-Backups einrichten:

    # Tägliches Backup-Skript
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump

Fazit

Die Verbindung Ihres MCP-Servers mit einer PostgreSQL-Datenbank verwandelt ihn von einer einfachen Demo in ein produktionsreifes KI-Backend. Mit sorgfältiger Beachtung von Sicherheit, Leistung und Zuverlässigkeit können Sie Ihre Datenbank sicher für Sprachmodelle zugänglich machen und leistungsstarke KI-gestützte Workflows ermöglichen. Wichtige Prinzipien: Resources für Leseoperationen, Tools für Schreiboperationen, Validierung für alle Eingaben, Transaktionen für Konsistenz, Connection Pooling für Leistung, Umgebungsvariablen für Sicherheit, strukturierte Fehlerbehandlung für Zuverlässigkeit. Durch Befolgen dieser Praktiken ermöglichen Sie LLMs, sicher und kontrolliert mit Ihren bestehenden Datenbankanwendungen zusammenzuarbeiten — und erschließen neue Möglichkeiten für KI-unterstützte Workflows und Datenanalyse. Im nächsten Leitfaden zeigen wir, wie Sie Ihr bestehendes API-Backend über MCP anbinden können, ohne Geschäftslogik neu zu schreiben.

FAQs

Ja. Sie können mehrere Resource- und Tool-Handler erstellen, die jeweils mit einer anderen Datenbank oder einem anderen Schema verbunden sind.

Sie sollten try/except-Behandlung hinzufügen, um dem LLM einen klaren Fehler zurückzugeben, anstatt abzustürzen.

Stellen Sie nur ausgewählte Lese- oder Schreiboperationen über Resources und Tools zur Verfügung. Geben Sie dem LLM niemals uneingeschränkten Datenbankzugriff.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers