Back

So verbinden Sie Ihre interne REST-API mit Ihrem MCP-Server

So verbinden Sie Ihre interne REST-API mit Ihrem MCP-Server

Wenn Sie bereits APIs betreiben, müssen Sie nicht alles für MCP neu erstellen. Sie können Ihre REST-Endpunkte mit Ihrem MCP-Server verbinden, indem Sie einfache Tools und Ressourcen erstellen. Diese Anleitung zeigt Ihnen, wie Sie dies richtig umsetzen können, mit funktionierenden Python-Beispielen.

Wichtige Erkenntnisse

  • Sie können Ihre REST-API mit MCP-Servern verbinden, ohne Geschäftslogik neu zu schreiben
  • Verwenden Sie Ressourcen für sicheres Datenabrufen und Tools für aktionsbasierte Endpunkte
  • Validieren Sie immer Eingaben und behandeln Sie REST-Fehler sauber

Warum eine bestehende REST-API anbinden

Viele Unternehmen verfügen bereits über: Benutzerdienste, Auftragsverwaltungs-APIs, CRM- oder Support-Ticket-Systeme, Bestands- und Lager-APIs. MCP ermöglicht es Ihrem LLM, diese vorhandenen Systeme zu lesen und darauf zu agieren, ohne direkten Datenbankzugriff zu benötigen. Dies schützt Ihre internen Systeme und ermöglicht schnelle, kontrollierte KI-Integrationen.

Was Sie benötigen

  • Eine laufende REST-API (öffentlich oder intern)
  • MCP-Server-SDK installiert
  • HTTPX (oder einen anderen asynchronen HTTP-Client für Python)

Installieren Sie HTTPX:

pip install httpx mcp-server

Sie sollten grundlegende Kenntnisse im Umgang mit API-Anfragen in Python haben.

Schritt 1: HTTP-Client-Bibliotheken installieren

Anstatt Logik neu zu schreiben, rufen Sie Ihre REST-API innerhalb von MCP-Handlern auf.

Beispiel für die Basiseinrichtung:

import os
import httpx

# Aus Umgebungsvariablen laden für mehr Sicherheit
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

Verwenden Sie einen gemeinsamen asynchronen Client für bessere Leistung:

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # Setzen Sie ein angemessenes Timeout
)

Dieser Client wird in allen MCP-Handlern wiederverwendet.

Schritt 2: API-Endpunkte als Ressourcen bereitstellen

Ressourcen rufen Daten ohne Nebeneffekte ab. Beispiel: Abrufen einer Liste aktiver Benutzer.

from mcp_server import MCPServer
import logging

# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

Diese Implementierung behandelt Fehler ordnungsgemäß und liefert informative Meldungen, wenn der API-Aufruf fehlschlägt.

Schritt 3: API-Aktionen als Tools bereitstellen

Tools führen Aktionen aus, die den Zustand verändern. Beispiel: Erstellen eines neuen Support-Tickets.

@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
    # Eingabevalidierung
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Priorität validieren
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Payload vorbereiten
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

Gute Praktiken in diesem Beispiel:

  • Eingabevalidierung vor dem API-Aufruf
  • Validierung erlaubter Werte (Prioritäten)
  • Strukturierte Fehlerbehandlung mit informativen Meldungen
  • Rückgabe der erstellten Ticket-ID als Referenz

Schritt 4: Vollständiges Serverbeispiel

Hier ist die vollständige Version, die alles zusammenführt:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# Logging konfigurieren
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# Konfiguration aus Umgebungsvariablen laden
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# Server erstellen
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="Connects LLMs to internal REST APIs"
)

# Gemeinsamen HTTP-Client erstellen
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# Ressourcen: Sicheres Datenabrufen
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"Retrieved {len(users)} active users")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "Missing user_id parameter"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"API error: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"User with ID {user_id} not found"}
        return {"error": f"API error: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Failed to connect to API"}

# Tools: Aktionen, die den Zustand ändern können
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
    # Eingabevalidierung
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "Missing required fields: user_id, subject, and description are required"}
    
    # Priorität validieren
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
    
    # Payload vorbereiten
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
        return {
            "status": "success",
            "message": "Ticket created successfully",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
        return {"error": "Failed to create ticket", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "Missing ticket_id or status"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
        return {"status": "success", "message": f"Ticket status updated to {new_status}"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"Ticket {ticket_id} not found"}
        logger.error(f"API error: {status_code} - {e.response.text}")
        return {"error": f"Failed to update ticket: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"Request error: {str(e)}")
        return {"error": "Connection error", "details": str(e)}

# Server-Lebenszyklusverwaltung
async def startup():
    logger.info("Starting MCP server...")
    # Beliebiger Initialisierungscode

async def shutdown():
    logger.info("Shutting down MCP server...")
    await client.aclose()  # HTTP-Client-Verbindungen schließen

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Verbesserte Funktionen:

  • Ordnungsgemäßes Client-Herunterfahren mit aclose()
  • Strukturiertes Logging durchgängig
  • Konfiguration über Umgebungsvariablen
  • Server-Lebenszyklusverwaltung
  • Zusätzliche Beispiel-Endpunkte
  • Umfassende Fehlerbehandlung

Best Practices für reale Umgebungen

  • Fehlgeschlagene Anfragen wiederholen: Implementieren Sie exponentielles Backoff für vorübergehende Fehler:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • Rate Limits beachten: Implementieren Sie einen Rate Limiter, um API-Drosselung zu vermeiden:

    from aiolimiter import AsyncLimiter
    
    # Maximal 10 Anfragen pro Sekunde
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • Sichere Anmeldedaten: Verwenden Sie eine geeignete Lösung zur Geheimnisverwaltung:

    from dotenv import load_dotenv
    
    # Umgebungsvariablen aus .env-Datei laden
    load_dotenv()
    
    # Oder verwenden Sie eine Cloud-basierte Lösung wie AWS Secrets Manager
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • Timeouts behandeln: Konfigurieren Sie Client-Timeouts angemessen:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # Verbindungs-Timeout
            read=30.0,        # Lese-Timeout
            write=30.0,       # Schreib-Timeout
            pool=60.0         # Pool-Timeout
        )
    )
    
  • Zugriff überwachen: Implementieren Sie strukturiertes Logging mit Kontext:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # Request-ID generieren
        request_id = str(uuid.uuid4())
        
        # Eingehende Anfrage protokollieren
        logger.info(f"Request {request_id}: {request.method} {request.path}")
        
        # Kontext zum Logger hinzufügen
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # Ergebnis protokollieren
        logger.info(f"Request {request_id} completed with status {response.status_code}")
        return response
    

Fazit

Die Verbindung Ihrer bestehenden REST-APIs mit MCP-Servern ermöglicht es Ihnen, reale Daten und Aktionen zu nutzen, ohne Backends neu zu schreiben. Mit sorgfältigem Design und Validierung können Sie leistungsstarke, produktionsreife KI-Agenten erstellen – sicher und schnell.

In der nächsten Anleitung zeigen wir, wie Sie Hintergrundaufgaben über Ihren MCP-Server mit asynchroner Tool-Ausführung auslösen können.

FAQs

Ja. Jede Ressource oder jedes Tool kann bei Bedarf eine andere API aufrufen.

Nur, wenn Sie Eingaben streng validieren und sorgfältig kontrollieren, welche Endpunkte zugänglich sind. Gehen Sie davon aus, dass das LLM alles ausprobieren wird.

Bevorzugen Sie immer asynchrone Anfragen (httpx.AsyncClient) innerhalb von MCP-Servern, um ein Blockieren des Event-Loops zu vermeiden.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers