So verbinden Sie Ihren MCP-Server mit Datenbankzugriff

Die Verbindung Ihres MCP-Servers mit einer echten Datenbank verwandelt ihn von einer einfachen Demo in ein produktionsreifes KI-Backend. In dieser Anleitung erfahren Sie genau, wie Sie PostgreSQL in Ihren MCP-Server integrieren und sicher für ein LLM zugänglich machen.
Wichtige Erkenntnisse
- Sie können PostgreSQL mit asyncpg an Ihren MCP-Server anbinden
- Ressourcen ermöglichen LLMs den Zugriff auf aktuelle strukturierte Daten
- Tools erlauben LLMs das sichere Einfügen, Aktualisieren oder Löschen von Datensätzen
- Eingabevalidierung und Transaktionsmanagement sind für den Produktionseinsatz unerlässlich
- Umgebungskonfiguration hält Ihre Zugangsdaten sicher
Warum eine Datenbank mit MCP verbinden
Ohne Datenbankzugriff ist Ihr LLM blind gegenüber Ihren tatsächlichen Anwendungsdaten. Durch die Verbindung können Sie: KI Fragen auf Basis echter Nutzer, Bestellungen, Tickets usw. beantworten lassen; Aktionen wie das Erstellen von Einträgen oder Aktualisieren von Datensätzen automatisieren; Intelligente interne Agenten ohne separate APIs erstellen; Kontextbezogene KI-Antworten unter Verwendung Ihres Anwendungsstatus ermöglichen; KI-gestützte Analysen über Ihre Geschäftsdaten erstellen. Dies ist der erste Schritt, um ein Modell in einen echten Anwendungsassistenten zu verwandeln, der echten geschäftlichen Mehrwert bietet.
Was Sie vor dem Start benötigen
- Eine laufende PostgreSQL-Datenbank (v12+)
- Einen Python MCP-Server (grundlegende Implementierung)
- Python 3.10+ mit Async-Unterstützung
- Die
asyncpg
-Bibliothek für Datenbankzugriff - Das
mcp-server
-Paket (offizielles Python SDK) - Python-dotenv für die Umgebungskonfiguration
- Grundkenntnisse in SQL und asynchronem Python
Architekturübersicht
Die Architektur umfasst mehrere Komponenten: (1) LLM-Client: Claude oder ein anderes LLM, das über das MCP-Protokoll kommuniziert, (2) MCP-Server: Ihr Python-Server, der Ressourcen und Tools bereitstellt, (3) Connection Pool: Verwaltet Datenbankverbindungen effizient, (4) PostgreSQL: Die zugrunde liegende Datenbank, die Ihre Anwendungsdaten speichert.
Dieser Aufbau folgt einer klaren Trennung der Zuständigkeiten: Ressourcen bieten schreibgeschützten Zugriff für Abfragen, Tools ermöglichen kontrollierte Schreiboperationen, Connection Pooling optimiert die Leistung, Umgebungskonfiguration hält Zugangsdaten sicher.
Schritt 1: Installation und Konfiguration der Datenbankabhängigkeiten
Installieren Sie zunächst die erforderlichen Pakete:
pip install asyncpg python-dotenv mcp-server
Erstellen Sie eine Projektstruktur:
mcp-db-server/
├── .env # Umgebungsvariablen (niemals in Git einchecken)
├── requirements.txt # Abhängigkeiten
├── server.py # Hauptserverdatei
├── database.py # Datenbankverbindungsmodul
├── resources/ # Datenbankressourcen
│ ├── __init__.py
│ └── users.py # Nutzerbezogene Ressourcen
└── tools/ # Datenbanktools
├── __init__.py
└── users.py # Nutzerbezogene Tools
Schritt 2: Einrichten der Umgebungskonfiguration
Erstellen Sie eine .env
-Datei für Ihre Datenbankzugangsdaten:
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432
Checken Sie diese Datei niemals in die Versionskontrolle ein. Fügen Sie sie zu .gitignore
hinzu:
# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class
Erstellen Sie eine database.py
-Datei, um diese Umgebungsvariablen zu laden:
import os
import asyncpg
from dotenv import load_dotenv
# Umgebungsvariablen laden
load_dotenv()
# Datenbankkonfiguration
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
Schritt 3: Erstellen eines Datenbankverbindungspools
Erweitern Sie Ihre database.py
-Datei um Connection Pooling:
import os
import asyncpg
import logging
from dotenv import load_dotenv
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")
# Umgebungsvariablen laden
load_dotenv()
# Datenbankkonfiguration
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
# Globale Pool-Variable
db_pool = None
async def init_db():
"""Initialisieren des Datenbankverbindungspools."""
global db_pool
try:
db_pool = await asyncpg.create_pool(
**DB_CONFIG,
min_size=1,
max_size=10,
command_timeout=60,
timeout=10, # Timeout für Verbindungsaufbau
)
logger.info("Datenbankverbindungspool eingerichtet")
# Verbindung testen
async with db_pool.acquire() as connection:
version = await connection.fetchval("SELECT version();")
logger.info(f"Verbunden mit PostgreSQL: {version}")
return db_pool
except Exception as e:
logger.error(f"Fehler beim Erstellen des Datenbankpools: {e}")
raise
async def close_db():
"""Schließen des Datenbankverbindungspools."""
global db_pool
if db_pool:
await db_pool.close()
logger.info("Datenbankverbindungspool geschlossen")
Dies bietet Ihnen: Einen korrekt konfigurierten Verbindungspool, Verwendung von Umgebungsvariablen für Sicherheit, Logging für Überwachung, Optimierung der Poolgröße, Behandlung von Verbindungstimeouts, Explizite Schließfunktion für sauberes Herunterfahren.
Schritt 4: Bereitstellen einer schreibgeschützten Ressource
Erstellen Sie resources/users.py
, um Benutzerdaten bereitzustellen:
from typing import List, Dict, Any, Optional
import logging
from database import db_pool
logger = logging.getLogger("mcp_database.resources.users")
async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
"""
Abrufen der neuesten Benutzer aus der Datenbank.
Args:
limit: Maximale Anzahl der zurückzugebenden Benutzer (Standard: 20)
Returns:
Liste von Benutzerobjekten
"""
try:
if not db_pool:
logger.error("Datenbankpool nicht initialisiert")
return {"error": "Datenbankverbindung nicht verfügbar"}
async with db_pool.acquire() as connection:
# Parametrisierte Abfrage für Sicherheit verwenden
query = """
SELECT id, username, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1;
"""
rows = await connection.fetch(query, limit)
# In Dictionaries umwandeln und Datetime-Serialisierung behandeln
users = []
for row in rows:
user = dict(row)
# Datetime in ISO-Format für JSON-Serialisierung umwandeln
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"{len(users)} aktuelle Benutzer abgerufen")
return users
except Exception as e:
logger.error(f"Fehler beim Abrufen aktueller Benutzer: {e}")
return {"error": f"Datenbankfehler: {str(e)}"}
Aktualisieren Sie nun Ihre server.py
, um diese Ressource zu registrieren:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users
# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# MCP-Server erstellen
server = MCPServer()
# Ressource registrieren
@server.resource(
name="recent_users",
description="Die neuesten Benutzer aus der Datenbank abrufen."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
async def main():
try:
# Datenbank initialisieren
await init_db()
# Server starten
logger.info("MCP-Server wird gestartet...")
await server.start()
except Exception as e:
logger.error(f"Serverfehler: {e}")
finally:
# Datenbankverbindungen beim Herunterfahren schließen
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Schritt 5: Implementierung erweiterter Abfragefunktionen
Erstellen Sie eine Ressource, die flexiblere Abfragen mit Parametern ermöglicht:
async def fetch_users_by_criteria(
department: Optional[str] = None,
role: Optional[str] = None,
active: Optional[bool] = True,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Benutzer abrufen, die bestimmten Kriterien entsprechen.
Args:
department: Nach Abteilung filtern (optional)
role: Nach Rolle filtern (optional)
active: Nach Aktivitätsstatus filtern (Standard: True)
limit: Maximale Anzahl zurückzugebender Ergebnisse (Standard: 20)
Returns:
Liste passender Benutzerobjekte
"""
try:
if not db_pool:
logger.error("Datenbankpool nicht initialisiert")
return {"error": "Datenbankverbindung nicht verfügbar"}
# Dynamische Abfrage erstellen
conditions = ["active = $1"]
params = [active]
param_count = 1
if department:
param_count += 1
conditions.append(f"department = ${param_count}")
params.append(department)
if role:
param_count += 1
conditions.append(f"role = ${param_count}")
params.append(role)
# Endgültige Abfrage erstellen
query = f"""
SELECT id, username, email, department, role, created_at
FROM users
WHERE {' AND '.join(conditions)}
ORDER BY created_at DESC
LIMIT ${param_count + 1};
"""
params.append(limit)
async with db_pool.acquire() as connection:
rows = await connection.fetch(query, *params)
# In Dictionaries umwandeln und Datetime-Serialisierung behandeln
users = []
for row in rows:
user = dict(row)
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"{len(users)} Benutzer nach Kriterien abgerufen")
return users
except Exception as e:
logger.error(f"Fehler beim Abrufen von Benutzern nach Kriterien: {e}")
return {"error": f"Datenbankfehler: {str(e)}"}
Registrieren Sie dies als parametrisierte Ressource:
@server.resource(
name="users_by_criteria",
description="Benutzer abrufen, die bestimmten Kriterien wie Abteilung oder Rolle entsprechen."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
Dies ermöglicht dem LLM, bestimmte Teilmengen von Benutzern basierend auf geschäftlichen Anforderungen anzufordern.
Schritt 6: Erstellen eines sicheren Tools zum Einfügen neuer Datensätze
Erstellen Sie tools/users.py
für Schreiboperationen:
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool
logger = logging.getLogger("mcp_database.tools.users")
class CreateUserRequest(BaseModel):
"""Validierungsmodell für Benutzererststellungsanfragen."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
department: Optional[str] = "General"
role: Optional[str] = "User"
@validator('username')
def username_alphanumeric(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Benutzername muss alphanumerisch sein')
return v
async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Einen neuen Benutzer in der Datenbank erstellen.
Args:
data: Benutzerdaten mit username, email, usw.
Returns:
Antwort mit Status und Benutzerinfo
"""
try:
# Eingabedaten mit Pydantic validieren
user_data = CreateUserRequest(**data)
if not db_pool:
logger.error("Datenbankpool nicht initialisiert")
return {
"status": "error",
"message": "Datenbankverbindung nicht verfügbar"
}
async with db_pool.acquire() as connection:
# Prüfen, ob Benutzer bereits existiert
existing_user = await connection.fetchrow(
"SELECT id FROM users WHERE username = $1 OR email = $2",
user_data.username,
user_data.email
)
if existing_user:
return {
"status": "error",
"message": "Benutzer mit diesem Benutzernamen oder dieser E-Mail existiert bereits"
}
# Neuen Benutzer einfügen
query = """
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
RETURNING id;
"""
user_id = await connection.fetchval(
query,
user_data.username,
user_data.email,
user_data.department,
user_data.role
)
logger.info(f"Neuer Benutzer erstellt: {user_data.username} (ID: {user_id})")
return {
"status": "success",
"message": f"Benutzer {user_data.username} erfolgreich erstellt",
"user_id": user_id
}
except Exception as e:
logger.error(f"Fehler beim Erstellen des Benutzers: {e}")
return {
"status": "error",
"message": f"Benutzer konnte nicht erstellt werden: {str(e)}"
}
Registrieren Sie dieses Tool in server.py
:
from tools.users import create_user
# ... bestehender Code ...
# Tool registrieren
@server.tool(
name="create_user",
description="Einen neuen Benutzer in der Datenbank erstellen."
)
async def create_user_tool(data: dict):
return await create_user(data)
Wichtige Sicherheitsfunktionen: Pydantic-Validierung mit klaren Einschränkungen, E-Mail-Validierung mit EmailStr, Benutzernamenformat-Validierung mit Regex, Duplikatprüfung vor dem Einfügen, Klare Fehlermeldungen, Umfassendes Logging, Parametrisierte Abfragen zur Verhinderung von SQL-Injection.
Schritt 7: Transaktionsmanagement
Für Operationen, die mehrere Datenbankänderungen erfordern, verwenden Sie Transaktionen:
async def transfer_user_to_department(
user_id: int,
new_department: str
) -> Dict[str, Any]:
"""
Einen Benutzer in eine neue Abteilung versetzen und die Änderung im Audit-Log erfassen.
Args:
user_id: ID des zu versetzenden Benutzers
new_department: Name der Zielabteilung
Returns:
Status der Operation
"""
try:
if not db_pool:
return {"error": "Datenbankverbindung nicht verfügbar"}
async with db_pool.acquire() as connection:
# Transaktion starten
async with connection.transaction():
# Aktuelle Abteilung abrufen
current_dept = await connection.fetchval(
"SELECT department FROM users WHERE id = $1",
user_id
)
if not current_dept:
return {"error": "Benutzer nicht gefunden"}
# Abteilung des Benutzers aktualisieren
await connection.execute(
"UPDATE users SET department = $1 WHERE id = $2",
new_department,
user_id
)
# Änderung im Audit-Log erfassen
await connection.execute(
"""
INSERT INTO user_audit_log
(user_id, field_changed, old_value, new_value)
VALUES ($1, $2, $3, $4)
""",
user_id,
"department",
current_dept,
new_department
)
logger.info(f"Benutzer {user_id} von {current_dept} nach {new_department} versetzt")
return {
"status": "success",
"message": f"Benutzer von {current_dept} nach {new_department} versetzt"
}
except Exception as e:
logger.error(f"Fehler beim Versetzen des Benutzers: {e}")
return {"error": f"Versetzung fehlgeschlagen: {str(e)}"}
Registrieren Sie dies als Tool:
@server.tool(
name="transfer_user",
description="Einen Benutzer in eine neue Abteilung versetzen."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "user_id oder new_department fehlt"}
return await transfer_user_to_department(user_id, new_department)
Dies stellt sicher, dass beide Operationen (Benutzer aktualisieren + Audit-Log hinzufügen) gemeinsam erfolgreich sind oder fehlschlagen.
Schritt 8: Vollständiges Server-Codebeispiel
Hier ist ein vollständiges Beispiel, das alles zusammenführt:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")
# MCP-Server erstellen
server = MCPServer(
name="DatabaseMCPServer",
version="1.0.0",
description="MCP-Server mit PostgreSQL-Datenbankzugriff"
)
# Ressourcen registrieren
@server.resource(
name="recent_users",
description="Die neuesten Benutzer aus der Datenbank abrufen."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
@server.resource(
name="users_by_criteria",
description="Benutzer abrufen, die bestimmten Kriterien wie Abteilung oder Rolle entsprechen."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
# Tools registrieren
@server.tool(
name="create_user",
description="Einen neuen Benutzer in der Datenbank erstellen."
)
async def create_user_tool(data: dict):
return await create_user(data)
@server.tool(
name="transfer_user",
description="Einen Benutzer in eine neue Abteilung versetzen."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "user_id oder new_department fehlt"}
return await transfer_user_to_department(user_id, new_department)
async def main():
try:
# Datenbank initialisieren
await init_db()
# Server starten
logger.info("MCP-Server wird gestartet...")
await server.start()
except Exception as e:
logger.error(f"Serverfehler: {e}")
finally:
# Datenbankverbindungen beim Herunterfahren schließen
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Sicherheitsüberlegungen
Wenn Sie Ihren MCP-Server mit einer Datenbank verbinden, befolgen Sie diese Sicherheitsempfehlungen:
-
Verwenden Sie einen dedizierten Datenbankbenutzer mit eingeschränkten Berechtigungen:
CREATE USER mcp_user WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT ON users TO mcp_user; -- Nur das Notwendige gewähren
-
Validieren Sie alle Eingaben mit Pydantic-Modellen mit strengen Validierungsregeln.
-
Verwenden Sie ausschließlich parametrisierte Abfragen, um SQL-Injection zu verhindern.
-
Implementieren Sie Authentifizierung für Ihren MCP-Server:
@server.middleware async def auth_middleware(request, next_handler): token = request.headers.get("Authorization") if not token or not verify_token(token): return {"error": "Nicht autorisiert"} return await next_handler(request)
-
Implementieren Sie Rate-Limiting, um Missbrauch zu verhindern:
# Einfacher In-Memory-Rate-Limiter request_counts = {} @server.middleware async def rate_limit_middleware(request, next_handler): client_id = request.client.host current_time = time.time() # Alte Einträge löschen request_counts = {k: v for k, v in request_counts.items() if v["timestamp"] > current_time - 60} if client_id in request_counts: if request_counts[client_id]["count"] > 100: # 100 Anfragen pro Minute return {"error": "Rate-Limit überschritten"} request_counts[client_id]["count"] += 1 else: request_counts[client_id] = {"count": 1, "timestamp": current_time} return await next_handler(request)
-
Implementieren Sie Request-Logging für Auditing:
@server.middleware async def logging_middleware(request, next_handler): logger.info(f"Anfrage: {request.method} {request.path} von {request.client.host}") response = await next_handler(request) return response
-
Richten Sie Datenbank-Query-Timeouts ein, um zu verhindern, dass lang laufende Abfragen die Serverleistung beeinträchtigen.
Leistungsoptimierung
-
Connection Pooling ist bereits implementiert, aber passen Sie die Parameter basierend auf Ihrer Arbeitslast an:
db_pool = await asyncpg.create_pool( **DB_CONFIG, min_size=5, # Höher setzen für Szenarien mit hoher Last max_size=20, # Anpassen basierend auf der Kapazität Ihres Datenbankservers statement_cache_size=100, # Vorbereitete Anweisungen cachen max_inactive_connection_lifetime=300 # Sekunden bis zum Recycling inaktiver Verbindungen )
-
Erstellen Sie Datenbankindizes für häufig abgefragte Felder:
CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);
-
Implementieren Sie Ergebnis-Caching für häufig abgerufene, selten geänderte Daten:
from functools import lru_cache from datetime import datetime, timedelta # Cache, der nach 5 Minuten abläuft cache_time = None cached_result = None async def fetch_departments_with_caching(): global cache_time, cached_result # Prüfen, ob Cache gültig ist current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result # Cache-Miss - aus Datenbank abrufen async with db_pool.acquire() as connection: result = await connection.fetch("SELECT * FROM departments") # Cache aktualisieren cache_time = current_time cached_result = [dict(row) for row in result] return cached_result
-
Verwenden Sie JSONB für komplexe Daten, die nicht umfangreich abgefragt werden müssen:
CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY, preferences JSONB NOT NULL );
-
Paginierung für große Ergebnismengen:
async def fetch_paginated_users(page: int = 1, page_size: int = 20): offset = (page - 1) * page_size async with db_pool.acquire() as connection: # Gesamtzahl abrufen total = await connection.fetchval("SELECT COUNT(*) FROM users") # Paginierte Ergebnisse abrufen rows = await connection.fetch( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", page_size, offset ) return { "users": [dict(row) for row in rows], "pagination": { "total": total, "page": page, "page_size": page_size, "pages": (total + page_size - 1) // page_size } }
Testen von Datenbankinteraktionen
Erstellen Sie ein tests
-Verzeichnis mit einer Testdatei:
# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg
# Load test environment variables
load_dotenv(".env.test")
# Testing database configuration
TEST_DB_CONFIG = {
"user": os.getenv("TEST_DB_USER"),
"password": os.getenv("TEST_DB_PASSWORD"),
"database": os.getenv("TEST_DB_NAME"),
"host": os.getenv("TEST_DB_HOST", "localhost"),
"port": int(os.getenv("TEST_DB_PORT", "5432")),
}
@pytest.fixture
async def db_pool():
"""Create a test database pool."""
pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
yield pool
await pool.close()
@pytest.fixture
async def setup_test_data(db_pool):
"""Set up test data before tests and clean up after."""
async with db_pool.acquire() as conn:
# Create test tables
await conn.execute("""
CREATE TEMPORARY TABLE users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT NOT NULL DEFAULT 'General',
role TEXT NOT NULL DEFAULT 'User',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
# Insert test users
await conn.executemany(
"""
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
""",
[
("test1", "test1@example.com", "Engineering", "Developer"),
("test2", "test2@example.com", "Marketing", "Manager"),
("test3", "test3@example.com", "Sales", "Representative")
]
)
yield
# Cleanup happens automatically for temporary tables
@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
"""Test that fetching recent users works correctly."""
from resources.users import fetch_recent_users
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Execute the function
result = await fetch_recent_users(limit=10)
# Assertions
assert isinstance(result, list)
assert len(result) == 3
assert result[0]["username"] in ["test1", "test2", "test3"]
assert "email" in result[0]
assert "department" in result[0]
@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
"""Test that creating a user works correctly."""
from tools.users import create_user
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"department": "Finance",
"role": "Analyst"
}
# Execute the function
result = await create_user(user_data)
# Assertions
assert result["status"] == "success"
# Verify in database
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
"newuser"
)
assert user is not None
assert user["email"] == "newuser@example.com"
assert user["department"] == "Finance"
Tests ausführen mit: pytest -xvs tests/
Überlegungen zur Bereitstellung
-
Umgebungsspezifische Konfigurationsdateien verwenden:
.env.development
,.env.staging
,.env.production
-
Datenbank-Migrationen für Schemaänderungen einrichten:
pip install alembic alembic init migrations
-
Mit Docker bereitstellen für Konsistenz:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "server.py"]
-
Gesundheitsprüfungen einrichten:
@server.route("/health") async def health_check(): try: async with db_pool.acquire() as connection: await connection.fetchval("SELECT 1") return {"status": "healthy", "database": "connected"} except Exception as e: return {"status": "unhealthy", "error": str(e)}
-
Abfrageleistung überwachen:
-- In PostgreSQL CREATE EXTENSION pg_stat_statements; -- Um langsame Abfragen zu analysieren SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;
-
Datenbank-Backups einrichten:
# Tägliches Backup-Skript pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
Fazit
Die Verbindung Ihres MCP-Servers mit einer PostgreSQL-Datenbank verwandelt ihn von einer einfachen Demo in ein produktionsreifes KI-Backend. Mit sorgfältiger Beachtung von Sicherheit, Leistung und Zuverlässigkeit können Sie Ihre Datenbank sicher für Sprachmodelle zugänglich machen und leistungsstarke KI-gestützte Workflows ermöglichen. Wichtige Prinzipien: Resources für Leseoperationen, Tools für Schreiboperationen, Validierung für alle Eingaben, Transaktionen für Konsistenz, Connection Pooling für Leistung, Umgebungsvariablen für Sicherheit, strukturierte Fehlerbehandlung für Zuverlässigkeit. Durch Befolgen dieser Praktiken ermöglichen Sie LLMs, sicher und kontrolliert mit Ihren bestehenden Datenbankanwendungen zusammenzuarbeiten — und erschließen neue Möglichkeiten für KI-unterstützte Workflows und Datenanalyse. Im nächsten Leitfaden zeigen wir, wie Sie Ihr bestehendes API-Backend über MCP anbinden können, ohne Geschäftslogik neu zu schreiben.
FAQs
Ja. Sie können mehrere Resource- und Tool-Handler erstellen, die jeweils mit einer anderen Datenbank oder einem anderen Schema verbunden sind.
Sie sollten try/except-Behandlung hinzufügen, um dem LLM einen klaren Fehler zurückzugeben, anstatt abzustürzen.
Stellen Sie nur ausgewählte Lese- oder Schreiboperationen über Resources und Tools zur Verfügung. Geben Sie dem LLM niemals uneingeschränkten Datenbankzugriff.