Back

内部のREST APIをMCPサーバーに公開する方法

内部のREST APIをMCPサーバーに公開する方法

すでにAPIを実行している場合、MCPのためにすべてを再構築する必要はありません。シンプルなツールとリソースを作成することで、RESTエンドポイントをMCPサーバーに橋渡しできます。このガイドでは、Pythonの実用例を使って、適切な方法で実装する方法を紹介します。

重要なポイント

  • 既存のREST APIをビジネスロジックを書き直すことなくMCPサーバーに接続できます
  • 安全なデータ取得にはリソースを、アクションベースのエンドポイントにはツールを使用します
  • 常に入力を検証し、REST障害をクリーンに処理します

既存のREST APIを接続する理由

多くの企業はすでに以下のようなシステムを持っています:ユーザーサービス、注文管理API、CRMやサポートチケットシステム、在庫管理API。MCPを使用すると、LLMがデータベースに直接アクセスすることなく、これらの既存システムを読み取り操作できるようになります。これにより内部システムを保護し、迅速で制御されたAI統合が可能になります。

必要なもの

  • 稼働中のREST API(公開または内部)
  • インストール済みのMCPサーバーSDK
  • HTTPX(または他の非同期HTTPクライアント for Python)

HTTPXのインストール:

pip install httpx mcp-server

PythonでのAPIリクエスト作成に関する基本的な知識が必要です。

ステップ1:HTTPクライアントライブラリのインストール

ロジックを書き直す代わりに、MCPハンドラー内からREST APIを呼び出します。

基本的なセットアップ例:

import os
import httpx

# セキュリティのために環境変数から読み込み
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

パフォーマンスのために共有非同期クライアントを使用:

client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={"Authorization": f"Bearer {API_TOKEN}"},
    timeout=30.0  # 適切なタイムアウトを設定
)

このクライアントはすべてのMCPハンドラーで再利用されます。

ステップ2:APIエンドポイントをリソースとして公開

リソースは副作用なくデータを取得します。例:アクティブユーザーのリスト取得。

from mcp_server import MCPServer
import logging

# ロギングの設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")

server = MCPServer()

@server.resource(name="active_users", description="アクティブユーザーのリストを取得します。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"{len(users)}人のアクティブユーザーを取得しました")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"APIエラー: {e.response.status_code} - {e.response.text}")
        return {"error": f"APIエラー: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "APIへの接続に失敗しました"}

この実装は適切にエラーを処理し、API呼び出しが失敗した場合に情報提供のメッセージを表示します。

ステップ3:APIアクションをツールとして公開

ツールは状態を変更するアクションを実行します。例:新しいサポートチケットの作成。

@server.tool(name="create_support_ticket", description="ユーザーの新しいサポートチケットを作成します。")
async def create_ticket(data: dict):
    # 入力検証
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "必須フィールドが不足しています: user_id、subject、descriptionが必要です"}
    
    # 優先度の検証
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"無効な優先度です。次のいずれかである必要があります: {', '.join(valid_priorities)}"}
    
    # ペイロードの準備
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"ユーザー{user_id}のチケット#{ticket_data.get('id')}を作成しました")
        return {
            "status": "success",
            "message": "チケットが正常に作成されました",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"チケット作成に失敗しました: {e.response.status_code} - {e.response.text}")
        return {"error": "チケット作成に失敗しました", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "接続エラー", "details": str(e)}

この例の良い実践点:

  • APIにアクセスする前の入力検証
  • 許可値(優先度)の検証
  • 情報提供のメッセージを含む構造化されたエラー処理
  • 参照用に作成されたチケットIDの返却

ステップ4:完全な動作サーバー例

すべてをまとめた完全版:

import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer

# ロギングの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")

# 環境から設定を読み込み
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")

# サーバーの作成
server = MCPServer(
    name="REST API MCP Server",
    version="1.0.0",
    description="LLMを内部REST APIに接続します"
)

# 共有HTTPクライアントの作成
client = httpx.AsyncClient(
    base_url=API_BASE_URL,
    headers={
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    },
    timeout=30.0
)

# リソース:安全なデータ取得
@server.resource(name="active_users", description="CRMシステムからアクティブユーザーを取得します。")
async def get_active_users():
    try:
        response = await client.get("/users", params={"status": "active"})
        response.raise_for_status()
        users = response.json()
        logger.info(f"{len(users)}人のアクティブユーザーを取得しました")
        return users
    except httpx.HTTPStatusError as e:
        logger.error(f"APIエラー: {e.response.status_code} - {e.response.text}")
        return {"error": f"APIエラー: {e.response.status_code}", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "APIへの接続に失敗しました"}

@server.resource(name="user_details", description="特定のユーザーに関する詳細情報を取得します。")
async def get_user_details(data: dict):
    user_id = data.get("user_id")
    if not user_id:
        return {"error": "user_idパラメータが不足しています"}
    
    try:
        response = await client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()
    except httpx.HTTPStatusError as e:
        logger.error(f"APIエラー: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 404:
            return {"error": f"ID {user_id}のユーザーが見つかりません"}
        return {"error": f"APIエラー: {e.response.status_code}"}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "APIへの接続に失敗しました"}

# ツール:状態を変更する可能性のあるアクション
@server.tool(name="create_support_ticket", description="サポートチケットを作成します。")
async def create_ticket(data: dict):
    # 入力検証
    user_id = data.get("user_id")
    subject = data.get("subject")
    description = data.get("description")
    priority = data.get("priority", "medium")
    
    if not all([user_id, subject, description]):
        return {"error": "必須フィールドが不足しています: user_id、subject、descriptionが必要です"}
    
    # 優先度の検証
    valid_priorities = ["low", "medium", "high", "critical"]
    if priority not in valid_priorities:
        return {"error": f"無効な優先度です。次のいずれかである必要があります: {', '.join(valid_priorities)}"}
    
    # ペイロードの準備
    payload = {
        "user_id": user_id,
        "subject": subject,
        "description": description,
        "priority": priority
    }
    
    try:
        response = await client.post("/tickets", json=payload)
        response.raise_for_status()
        ticket_data = response.json()
        logger.info(f"ユーザー{user_id}のチケット#{ticket_data.get('id')}を作成しました")
        return {
            "status": "success",
            "message": "チケットが正常に作成されました",
            "ticket_id": ticket_data.get("id")
        }
    except httpx.HTTPStatusError as e:
        logger.error(f"チケット作成に失敗しました: {e.response.status_code} - {e.response.text}")
        return {"error": "チケット作成に失敗しました", "details": e.response.text}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "接続エラー", "details": str(e)}

@server.tool(name="update_ticket_status", description="既存のサポートチケットのステータスを更新します。")
async def update_ticket_status(data: dict):
    ticket_id = data.get("ticket_id")
    new_status = data.get("status")
    
    if not ticket_id or not new_status:
        return {"error": "ticket_idまたはstatusが不足しています"}
    
    valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
    if new_status not in valid_statuses:
        return {"error": f"無効なステータスです。次のいずれかである必要があります: {', '.join(valid_statuses)}"}
    
    try:
        response = await client.patch(
            f"/tickets/{ticket_id}",
            json={"status": new_status}
        )
        response.raise_for_status()
        logger.info(f"チケット#{ticket_id}のステータスを{new_status}に更新しました")
        return {"status": "success", "message": f"チケットステータスを{new_status}に更新しました"}
    except httpx.HTTPStatusError as e:
        status_code = e.response.status_code
        if status_code == 404:
            return {"error": f"チケット{ticket_id}が見つかりません"}
        logger.error(f"APIエラー: {status_code} - {e.response.text}")
        return {"error": f"チケット更新に失敗しました: {e.response.text}"}
    except httpx.RequestError as e:
        logger.error(f"リクエストエラー: {str(e)}")
        return {"error": "接続エラー", "details": str(e)}

# サーバーライフサイクル管理
async def startup():
    logger.info("MCPサーバーを起動しています...")
    # 初期化コード

async def shutdown():
    logger.info("MCPサーバーをシャットダウンしています...")
    await client.aclose()  # HTTPクライアント接続を閉じる

async def main():
    try:
        await startup()
        await server.start()
    finally:
        await shutdown()

if __name__ == "__main__":
    asyncio.run(main())

改善された機能:

  • aclose()を使用した適切なクライアントシャットダウン
  • 全体的な構造化ロギング
  • 環境変数による設定
  • サーバーライフサイクル管理
  • 追加のエンドポイント例
  • 包括的なエラー処理

実環境での設定におけるベストプラクティス

  • 失敗したリクエストの再試行:一時的な障害に対して指数バックオフを実装:

    from tenacity import retry, stop_after_attempt, wait_exponential
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def fetch_with_retry(endpoint, params=None):
        response = await client.get(endpoint, params=params)
        response.raise_for_status()
        return response.json()
    
  • レート制限の尊重:APIスロットリングを避けるためのレート制限の実装:

    from aiolimiter import AsyncLimiter
    
    # 最大で1秒あたり10リクエスト
    rate_limiter = AsyncLimiter(10, 1)
    
    async def rate_limited_request(method, url, **kwargs):
        async with rate_limiter:
            return await client.request(method, url, **kwargs)
    
  • 認証情報の安全確保:適切なシークレット管理ソリューションの使用:

    from dotenv import load_dotenv
    
    # .envファイルから環境変数を読み込み
    load_dotenv()
    
    # またはAWS Secrets Managerなどのクラウドベースのソリューションを使用
    # import boto3
    # client = boto3.client('secretsmanager')
    # response = client.get_secret_value(SecretId='api-credentials')
    
  • タイムアウトの処理:クライアントタイムアウトを適切に設定:

    client = httpx.AsyncClient(
        timeout=httpx.Timeout(
            connect=5.0,      # 接続タイムアウト
            read=30.0,        # 読み取りタイムアウト
            write=30.0,       # 書き込みタイムアウト
            pool=60.0         # プールタイムアウト
        )
    )
    
  • アクセス監査:コンテキストを持つ構造化ロギングの実装:

    @server.middleware
    async def audit_middleware(request, next_handler):
        # リクエストIDの生成
        request_id = str(uuid.uuid4())
        
        # 受信リクエストのログ記録
        logger.info(f"リクエスト {request_id}: {request.method} {request.path}")
        
        # ロガーにコンテキストを追加
        with logging.contextvars.ContextVar("request_id", default=request_id):
            response = await next_handler(request)
            
        # 結果のログ記録
        logger.info(f"リクエスト {request_id} はステータス {response.status_code} で完了しました")
        return response
    

結論

既存のREST APIをMCPサーバーに接続することで、バックエンドを書き直すことなく実世界のデータとアクションを活用できます。慎重な設計と検証により、安全かつ迅速に強力な本番環境対応のAIエージェントを構築できます。

次のガイドでは、非同期ツール実行を通じてMCPサーバーからバックグラウンドジョブをトリガーする方法を紹介します。

よくある質問

はい。必要に応じて、各リソースやツールが異なるAPIを呼び出すことができます。

入力を厳密に検証し、アクセス可能なエンドポイントを慎重に制御する場合のみ安全です。LLMはあらゆることを試みると想定してください。

MCPサーバー内ではイベントループをブロックしないよう、常に非同期(httpx.AsyncClient)を優先してください。

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers