Back

MCPサーバーをデータベースアクセスで拡張する方法

MCPサーバーをデータベースアクセスで拡張する方法

MCPサーバーを実際のデータベースに接続することで、単純なデモから本番環境対応のAIバックエンドに変えることができます。このガイドでは、PostgreSQLをMCPサーバーに統合し、LLMに安全に公開する方法を詳しく説明します。

重要なポイント

  • asyncpgを使ってPostgreSQLをMCPサーバーに接続できます
  • リソースによりLLMがライブの構造化データを取得できます
  • ツールによりLLMが安全にレコードの挿入、更新、削除を行えます
  • 入力検証とトランザクション管理は本番環境での使用に不可欠です
  • 環境設定により認証情報を安全に保管できます

MCPにデータベースを接続する理由

データベースアクセスがなければ、LLMは実際のアプリケーションデータを見ることができません。接続することで以下が可能になります:実際のユーザー、注文、チケットなどに基づいて質問に答えさせる;エントリの作成やレコードの更新などのアクションを自動化する;別のAPIを必要とせずにインテリジェントな内部エージェントを構築する;アプリケーションの状態を使用したコンテキスト対応のAI応答を可能にする;ビジネスデータに対するAI駆動の分析を作成する。これは、モデルを実際のビジネス価値を提供する本物のアプリケーションアシスタントに変える最初のステップです。

始める前に必要なもの

  • 稼働中のPostgreSQLデータベース(v12以上)
  • PythonのMCPサーバー(基本的な実装)
  • 非同期サポート付きのPython 3.10以上
  • データベースアクセス用のasyncpgライブラリ
  • mcp-serverパッケージ(公式Python SDK)
  • 環境設定用のPython-dotenv
  • SQLと非同期Pythonの基本知識

アーキテクチャの概要

アーキテクチャにはいくつかのコンポーネントが含まれます:(1) LLMクライアント:MCPプロトコルを介して通信するClaudeまたは他のLLM、(2) MCPサーバー:リソースとツールを公開するPythonサーバー、(3) 接続プール:データベース接続を効率的に管理、(4) PostgreSQL:アプリケーションデータを格納する基盤となるデータベース。

このセットアップは関心の明確な分離に従っています:リソースはクエリのための読み取り専用アクセスを提供、ツールは制御された書き込み操作を可能に、接続プーリングはパフォーマンスを最適化、環境設定は認証情報を安全に保管します。

ステップ1:データベース依存関係のインストールと設定

まず、必要なパッケージをインストールします:

pip install asyncpg python-dotenv mcp-server

プロジェクト構造を作成します:

mcp-db-server/
├── .env                  # 環境変数(gitにコミットしないこと)
├── requirements.txt      # 依存関係
├── server.py             # メインサーバーファイル
├── database.py           # データベース接続モジュール
├── resources/            # データベースリソース
│   ├── __init__.py
│   └── users.py          # ユーザー関連リソース
└── tools/                # データベースツール
    ├── __init__.py
    └── users.py          # ユーザー関連ツール

ステップ2:環境設定のセットアップ

データベース認証情報のための.envファイルを作成します:

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

このファイルはバージョン管理にコミットしないでください。 .gitignoreに追加します:

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

これらの環境変数を読み込むためのdatabase.pyファイルを作成します:

import os
import asyncpg
from dotenv import load_dotenv

# 環境変数を読み込む
load_dotenv()

# データベース設定
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

ステップ3:データベース接続プーリングの作成

database.pyファイルを拡張して接続プーリングを含めます:

import os
import asyncpg
import logging
from dotenv import load_dotenv

# ロギングの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# 環境変数を読み込む
load_dotenv()

# データベース設定
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# グローバルプール変数
db_pool = None

async def init_db():
    """データベース接続プールを初期化します。"""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # 接続取得タイムアウト
        )
        logger.info("データベース接続プールが確立されました")
        # 接続をテスト
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"PostgreSQLに接続しました: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"データベースプールの作成に失敗しました: {e}")
        raise

async def close_db():
    """データベース接続プールを閉じます。"""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("データベース接続プールを閉じました")

これにより以下が得られます:適切に設定された接続プール、セキュリティのための環境変数の使用、監視のためのロギング、プールサイズの最適化、接続タイムアウトの処理、クリーンなシャットダウンのための明示的なクローズ関数。

ステップ4:読み取り専用リソースの公開

ユーザーデータを公開するためのresources/users.pyを作成します:

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    データベースから最近のユーザーを取得します。
    
    引数:
        limit: 返すユーザーの最大数(デフォルト: 20)
        
    戻り値:
        ユーザーオブジェクトのリスト
    """
    try:
        if not db_pool:
            logger.error("データベースプールが初期化されていません")
            return {"error": "データベース接続が利用できません"}
            
        async with db_pool.acquire() as connection:
            # 安全のためにパラメータ化クエリを使用
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # 辞書に変換し、datetime のシリアル化を処理
            users = []
            for row in rows:
                user = dict(row)
                # datetime を JSON シリアル化のための ISO 形式の文字列に変換
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"{len(users)}人の最近のユーザーを取得しました")
            return users
    except Exception as e:
        logger.error(f"最近のユーザーの取得中にエラーが発生しました: {e}")
        return {"error": f"データベースエラー: {str(e)}"}

次に、server.pyを更新してこのリソースを登録します:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# ロギングの設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# MCPサーバーの作成
server = MCPServer()

# リソースの登録
@server.resource(
    name="recent_users", 
    description="データベースから最近のユーザーを取得します。"
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # データベースの初期化
        await init_db()
        
        # サーバーの起動
        logger.info("MCPサーバーを起動しています...")
        await server.start()
    except Exception as e:
        logger.error(f"サーバーエラー: {e}")
    finally:
        # シャットダウン時にデータベース接続を閉じる
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

ステップ5:高度なクエリ機能の実装

より柔軟なパラメータ付きクエリを可能にするリソースを作成します:

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    特定の条件に一致するユーザーを取得します。
    
    引数:
        department: 部門でフィルタリング(オプション)
        role: 役割でフィルタリング(オプション)
        active: アクティブステータスでフィルタリング(デフォルト: True)
        limit: 返す結果の最大数(デフォルト: 20)
        
    戻り値:
        一致するユーザーオブジェクトのリスト
    """
    try:
        if not db_pool:
            logger.error("データベースプールが初期化されていません")
            return {"error": "データベース接続が利用できません"}
            
        # 動的クエリの構築
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # 最終クエリの構築
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # 辞書に変換し、datetime のシリアル化を処理
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"条件に一致する{len(users)}人のユーザーを取得しました")
            return users
    except Exception as e:
        logger.error(f"条件によるユーザー取得中にエラーが発生しました: {e}")
        return {"error": f"データベースエラー: {str(e)}"}

これをパラメータ化されたリソースとして登録します:

@server.resource(
    name="users_by_criteria", 
    description="部門や役割などの特定の条件に一致するユーザーを取得します。"
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

これにより、LLMはビジネスニーズに基づいて特定のユーザーサブセットをリクエストできるようになります。

ステップ6:新しいレコードを挿入するための安全なツールの作成

書き込み操作のためのtools/users.pyを作成します:

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """ユーザー作成リクエストの検証モデル。"""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('ユーザー名は英数字である必要があります')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    データベースに新しいユーザーを作成します。
    
    引数:
        data: ユーザー名、メールなどを含むユーザーデータ
        
    戻り値:
        ステータスとユーザー情報を含むレスポンス
    """
    try:
        # Pydanticで入力データを検証
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("データベースプールが初期化されていません")
            return {
                "status": "error",
                "message": "データベース接続が利用できません"
            }
        
        async with db_pool.acquire() as connection:
            # ユーザーが既に存在するか確認
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "このユーザー名またはメールアドレスを持つユーザーは既に存在します"
                }
            
            # 新しいユーザーを挿入
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"新しいユーザーを作成しました: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"ユーザー {user_data.username} が正常に作成されました",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"ユーザー作成中にエラーが発生しました: {e}")
        return {
            "status": "error",
            "message": f"ユーザーの作成に失敗しました: {str(e)}"
        }

このツールをserver.pyに登録します:

from tools.users import create_user

# ... 既存のコード ...

# ツールの登録
@server.tool(
    name="create_user",
    description="データベースに新しいユーザーを作成します。"
)
async def create_user_tool(data: dict):
    return await create_user(data)

追加されたセキュリティ機能: 明確な制約を持つPydanticバリデーション、EmailStrを使用したメール検証、正規表現によるユーザー名形式の検証、挿入前の重複チェック、明確なエラーメッセージ、包括的なロギング、SQLインジェクションを防ぐパラメータ化クエリ。

ステップ7:トランザクション管理

複数のデータベース変更を必要とする操作には、トランザクションを使用します:

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    ユーザーを新しい部門に移動し、監査ログに変更を記録します。
    
    引数:
        user_id: 移動するユーザーのID
        new_department: 対象部門の名前
        
    戻り値:
        操作のステータス
    """
    try:
        if not db_pool:
            return {"error": "データベース接続が利用できません"}
            
        async with db_pool.acquire() as connection:
            # トランザクションを開始
            async with connection.transaction():
                # 現在の部門を取得
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "ユーザーが見つかりません"}
                
                # ユーザーの部門を更新
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # 監査ログに変更を記録
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"ユーザー {user_id}{current_dept} から {new_department} に移動しました")
                
                return {
                    "status": "success",
                    "message": f"ユーザーを {current_dept} から {new_department} に移動しました"
                }
    except Exception as e:
        logger.error(f"ユーザー移動中にエラーが発生しました: {e}")
        return {"error": f"移動に失敗しました: {str(e)}"}

これをツールとして登録します:

@server.tool(
    name="transfer_user",
    description="ユーザーを新しい部門に移動します。"
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id または new_department が不足しています"}
        
    return await transfer_user_to_department(user_id, new_department)

これにより、両方の操作(ユーザー更新と監査ログ追加)が一緒に成功または失敗することが保証されます。

ステップ8:完全なサーバーコード例

すべてをまとめた完全な例を以下に示します:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# ロギングの設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# MCPサーバーの作成
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="PostgreSQLデータベースアクセスを持つMCPサーバー"
)

# リソースの登録
@server.resource(
    name="recent_users", 
    description="データベースから最近のユーザーを取得します。"
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="部門や役割などの特定の条件に一致するユーザーを取得します。"
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# ツールの登録
@server.tool(
    name="create_user",
    description="データベースに新しいユーザーを作成します。"
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="ユーザーを新しい部門に移動します。"
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "user_id または new_department が不足しています"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # データベースの初期化
        await init_db()
        
        # サーバーの起動
        logger.info("MCPサーバーを起動しています...")
        await server.start()
    except Exception as e:
        logger.error(f"サーバーエラー: {e}")
    finally:
        # シャットダウン時にデータベース接続を閉じる
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

セキュリティに関する考慮事項

MCPサーバーをデータベースに接続する際は、以下のセキュリティのベストプラクティスに従ってください:

  1. 限定された権限を持つ専用のデータベースユーザーを使用する:

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- 必要なものだけ許可する
    
  2. Pydanticモデルを使用して厳格な検証ルールですべての入力を検証する。

  3. SQLインジェクションを防ぐためにパラメータ化クエリのみを使用する。

  4. MCPサーバーの認証を実装する:

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Unauthorized"}
        return await next_handler(request)
    
  5. 乱用を防ぐためのレート制限を実装する:

    # 簡単なインメモリレート制限
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # 古いエントリをクリア
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 1分あたり100リクエスト
                return {"error": "レート制限を超えました"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. 監査のためのリクエストロギングを実装する:

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"リクエスト: {request.method} {request.path} from {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. サーバーのパフォーマンスに影響を与える長時間実行クエリを防ぐためのデータベースクエリタイムアウトを設定する。

パフォーマンスの最適化

  1. コネクションプーリングは既に実装されていますが、作業負荷に応じてパラメータを調整してください: python db_pool = await asyncpg.create_pool( db_pool = await asyncpg.create_pool( **DB_CONFIG、 min_size=5, # 高負荷シナリオの場合は高く設定する。 max_size=20, # データベースサーバーの容量に応じて調整する。 statement_cache_size=100, # プリペアドステートメントをキャッシュする max_inactive_connection_lifetime=300 # アイドル状態の接続をリサイクルするまでの秒数 )

  2. 頻繁にクエリーされるフィールドのためにデータベースインデックス を作成する: “sql CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);

  3. 頻繁にアクセスされ、めったに変更されないデータに対して結果キャッシュを実装する: python from functools import lru_cache from datetime import datetime, timedelta

    5分後に有効期限が切れるキャッシュ

    cache_time = なし cached_result = なし

    非同期 def fetch_departments_with_caching(): グローバル cache_time, cached_result

    キャッシュが有効かどうかをチェックする

    current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result

    キャッシュミス - データベースから取得

    async with db_pool.acquire() as connection: 結果 = await connection.fetch(「SELECT * FROM departments」)

    キャッシュの更新

    cache_time = current_time cached_result = [dict(row) for row in result] キャッシュされた結果。

    return cached_result

  4. 複雑なデータの場合は、JSONBを使用する: “sql CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY、 preferences JSONB NOT NULL );

    src/pages/ja/mcpサーバー-データベースアクセス拡張
    
  5. 大きな結果セットのためのページネーション

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Get total count
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Get paginated results
            rows = await connection.fetch(
                "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
                page_size, offset
            )
            
            return {
                "users": [dict(row) for row in rows],
                "pagination": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "pages": (total + page_size - 1) // page_size
                }
            }
    

データベース操作のテスト

testsディレクトリにテストファイルを作成します:

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

テストの実行方法:pytest -xvs tests/

デプロイメントに関する考慮事項

  1. 環境固有の設定ファイルを使用する.env.development.env.staging.env.production

  2. スキーマ変更のためのデータベースマイグレーションをセットアップする

    pip install alembic
    alembic init migrations
  3. 一貫性のためにDockerでデプロイする

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
  4. ヘルスチェックをセットアップする

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
  5. クエリのパフォーマンスをモニタリングする

    -- PostgreSQLにて
    CREATE EXTENSION pg_stat_statements;
    
    -- 遅いクエリを分析するには
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
  6. データベースバックアップをセットアップする

    # 日次バックアップスクリプト
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump

結論

MCPサーバーをPostgreSQLデータベースに接続することで、単純なデモから本番環境対応のAIバックエンドに変わります。セキュリティ、パフォーマンス、信頼性に注意を払うことで、言語モデルに安全にデータベースを公開し、強力なAI駆動ワークフローを実現できます。覚えておくべき重要な原則:読み取り操作にはリソース、書き込み操作にはツール、すべての入力に対する検証、一貫性のためのトランザクション、パフォーマンスのための接続プーリング、セキュリティのための環境変数、信頼性のための構造化されたエラー処理。これらの実践に従うことで、LLMが既存のデータベースアプリケーションと安全かつ制御された方法で連携できるようになり、AI支援ワークフローとデータ分析の新たな可能性が開かれます。次のガイドでは、ビジネスロジックを書き直すことなく、既存のAPIバックエンドをMCP経由で接続する方法を紹介します。

よくある質問

はい。異なるデータベースやスキーマに接続する複数のリソースおよびツールハンドラーを作成できます。

クラッシュする代わりに、LLMに明確なエラーを返すためにtry/except処理を追加する必要があります。

リソースとツールを通じて選択された読み取りまたは書き込み操作のみを公開します。LLMに制限のないデータベースアクセスを与えないでください。

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers