So verbinden Sie Ihre interne REST-API mit Ihrem MCP-Server

Wenn Sie bereits APIs betreiben, müssen Sie nicht alles für MCP neu erstellen. Sie können Ihre REST-Endpunkte mit Ihrem MCP-Server verbinden, indem Sie einfache Tools und Ressourcen erstellen. Diese Anleitung zeigt Ihnen, wie Sie dies richtig umsetzen können, mit funktionierenden Python-Beispielen.
Wichtige Erkenntnisse
- Sie können Ihre REST-API mit MCP-Servern verbinden, ohne Geschäftslogik neu zu schreiben
- Verwenden Sie Ressourcen für sicheres Datenabrufen und Tools für aktionsbasierte Endpunkte
- Validieren Sie immer Eingaben und behandeln Sie REST-Fehler sauber
Warum eine bestehende REST-API anbinden
Viele Unternehmen verfügen bereits über: Benutzerdienste, Auftragsverwaltungs-APIs, CRM- oder Support-Ticket-Systeme, Bestands- und Lager-APIs. MCP ermöglicht es Ihrem LLM, diese vorhandenen Systeme zu lesen und darauf zu agieren, ohne direkten Datenbankzugriff zu benötigen. Dies schützt Ihre internen Systeme und ermöglicht schnelle, kontrollierte KI-Integrationen.
Was Sie benötigen
- Eine laufende REST-API (öffentlich oder intern)
- MCP-Server-SDK installiert
- HTTPX (oder einen anderen asynchronen HTTP-Client für Python)
Installieren Sie HTTPX:
pip install httpx mcp-server
Sie sollten grundlegende Kenntnisse im Umgang mit API-Anfragen in Python haben.
Schritt 1: HTTP-Client-Bibliotheken installieren
Anstatt Logik neu zu schreiben, rufen Sie Ihre REST-API innerhalb von MCP-Handlern auf.
Beispiel für die Basiseinrichtung:
import os
import httpx
# Aus Umgebungsvariablen laden für mehr Sicherheit
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Verwenden Sie einen gemeinsamen asynchronen Client für bessere Leistung:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # Setzen Sie ein angemessenes Timeout
)
Dieser Client wird in allen MCP-Handlern wiederverwendet.
Schritt 2: API-Endpunkte als Ressourcen bereitstellen
Ressourcen rufen Daten ohne Nebeneffekte ab. Beispiel: Abrufen einer Liste aktiver Benutzer.
from mcp_server import MCPServer
import logging
# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
Diese Implementierung behandelt Fehler ordnungsgemäß und liefert informative Meldungen, wenn der API-Aufruf fehlschlägt.
Schritt 3: API-Aktionen als Tools bereitstellen
Tools führen Aktionen aus, die den Zustand verändern. Beispiel: Erstellen eines neuen Support-Tickets.
@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
# Eingabevalidierung
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Priorität validieren
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Payload vorbereiten
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
Gute Praktiken in diesem Beispiel:
- Eingabevalidierung vor dem API-Aufruf
- Validierung erlaubter Werte (Prioritäten)
- Strukturierte Fehlerbehandlung mit informativen Meldungen
- Rückgabe der erstellten Ticket-ID als Referenz
Schritt 4: Vollständiges Serverbeispiel
Hier ist die vollständige Version, die alles zusammenführt:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Konfiguration aus Umgebungsvariablen laden
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Server erstellen
server = MCPServer(
name="REST API MCP Server",
version="1.0.0",
description="Connects LLMs to internal REST APIs"
)
# Gemeinsamen HTTP-Client erstellen
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# Ressourcen: Sicheres Datenabrufen
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "Missing user_id parameter"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"User with ID {user_id} not found"}
return {"error": f"API error: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
# Tools: Aktionen, die den Zustand ändern können
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
# Eingabevalidierung
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Priorität validieren
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Payload vorbereiten
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "Missing ticket_id or status"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
return {"status": "success", "message": f"Ticket status updated to {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"Ticket {ticket_id} not found"}
logger.error(f"API error: {status_code} - {e.response.text}")
return {"error": f"Failed to update ticket: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
# Server-Lebenszyklusverwaltung
async def startup():
logger.info("Starting MCP server...")
# Beliebiger Initialisierungscode
async def shutdown():
logger.info("Shutting down MCP server...")
await client.aclose() # HTTP-Client-Verbindungen schließen
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Verbesserte Funktionen:
- Ordnungsgemäßes Client-Herunterfahren mit
aclose()
- Strukturiertes Logging durchgängig
- Konfiguration über Umgebungsvariablen
- Server-Lebenszyklusverwaltung
- Zusätzliche Beispiel-Endpunkte
- Umfassende Fehlerbehandlung
Best Practices für reale Umgebungen
-
Fehlgeschlagene Anfragen wiederholen: Implementieren Sie exponentielles Backoff für vorübergehende Fehler:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
Rate Limits beachten: Implementieren Sie einen Rate Limiter, um API-Drosselung zu vermeiden:
from aiolimiter import AsyncLimiter # Maximal 10 Anfragen pro Sekunde rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
Sichere Anmeldedaten: Verwenden Sie eine geeignete Lösung zur Geheimnisverwaltung:
from dotenv import load_dotenv # Umgebungsvariablen aus .env-Datei laden load_dotenv() # Oder verwenden Sie eine Cloud-basierte Lösung wie AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
Timeouts behandeln: Konfigurieren Sie Client-Timeouts angemessen:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Verbindungs-Timeout read=30.0, # Lese-Timeout write=30.0, # Schreib-Timeout pool=60.0 # Pool-Timeout ) )
-
Zugriff überwachen: Implementieren Sie strukturiertes Logging mit Kontext:
@server.middleware async def audit_middleware(request, next_handler): # Request-ID generieren request_id = str(uuid.uuid4()) # Eingehende Anfrage protokollieren logger.info(f"Request {request_id}: {request.method} {request.path}") # Kontext zum Logger hinzufügen with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Ergebnis protokollieren logger.info(f"Request {request_id} completed with status {response.status_code}") return response
Fazit
Die Verbindung Ihrer bestehenden REST-APIs mit MCP-Servern ermöglicht es Ihnen, reale Daten und Aktionen zu nutzen, ohne Backends neu zu schreiben. Mit sorgfältigem Design und Validierung können Sie leistungsstarke, produktionsreife KI-Agenten erstellen – sicher und schnell.
In der nächsten Anleitung zeigen wir, wie Sie Hintergrundaufgaben über Ihren MCP-Server mit asynchroner Tool-Ausführung auslösen können.
FAQs
Ja. Jede Ressource oder jedes Tool kann bei Bedarf eine andere API aufrufen.
Nur, wenn Sie Eingaben streng validieren und sorgfältig kontrollieren, welche Endpunkte zugänglich sind. Gehen Sie davon aus, dass das LLM alles ausprobieren wird.
Bevorzugen Sie immer asynchrone Anfragen (httpx.AsyncClient) innerhalb von MCP-Servern, um ein Blockieren des Event-Loops zu vermeiden.