Back

Как расширить ваш MCP-сервер с доступом к базе данных

Как расширить ваш MCP-сервер с доступом к базе данных

Подключение вашего MCP-сервера к реальной базе данных превращает его из простой демонстрации в готовое к производству AI-решение. В этом руководстве вы узнаете, как именно интегрировать PostgreSQL в ваш MCP-сервер и безопасно предоставить к нему доступ для LLM.

Ключевые моменты

  • Вы можете подключить PostgreSQL к вашему MCP-серверу с помощью asyncpg
  • Ресурсы позволяют LLM получать актуальные структурированные данные
  • Инструменты позволяют LLM безопасно вставлять, обновлять или удалять записи
  • Валидация ввода и управление транзакциями необходимы для производственного использования
  • Конфигурация окружения обеспечивает безопасность ваших учетных данных

Зачем подключать базу данных к MCP

Без доступа к базе данных ваша LLM не видит реальных данных приложения. Подключив её, вы можете: позволить ИИ отвечать на вопросы на основе реальных пользователей, заказов, тикетов и т.д.; автоматизировать действия, такие как создание записей или обновление данных; создавать интеллектуальных внутренних агентов без необходимости отдельных API; обеспечить контекстно-зависимые ответы ИИ с использованием состояния вашего приложения; создавать аналитику на базе ИИ для ваших бизнес-данных. Это первый шаг к превращению модели в реального помощника приложения, который обеспечивает реальную бизнес-ценность.

Что вам нужно перед началом

  • Работающая база данных PostgreSQL (v12+)
  • Python MCP-сервер (базовая реализация)
  • Python 3.10+ с поддержкой асинхронности
  • Библиотека asyncpg для доступа к базе данных
  • Пакет mcp-server (официальный Python SDK)
  • Python-dotenv для конфигурации окружения
  • Базовые знания SQL и асинхронного Python

Обзор архитектуры

Архитектура включает несколько компонентов: (1) LLM-клиент: Claude или другая LLM, которая взаимодействует по протоколу MCP, (2) MCP-сервер: Ваш Python-сервер, предоставляющий ресурсы и инструменты, (3) Пул соединений: Эффективно управляет соединениями с базой данных, (4) PostgreSQL: Базовая база данных, хранящая данные вашего приложения.

Эта настройка следует четкому разделению обязанностей: Ресурсы обеспечивают доступ только для чтения для запросов, Инструменты позволяют выполнять контролируемые операции записи, Пулинг соединений оптимизирует производительность, Конфигурация окружения обеспечивает безопасность учетных данных.

Шаг 1: Установка и настройка зависимостей базы данных

Сначала установите необходимые пакеты:

pip install asyncpg python-dotenv mcp-server

Создайте структуру проекта:

mcp-db-server/
├── .env                  # Переменные окружения (никогда не коммитить в git)
├── requirements.txt      # Зависимости
├── server.py             # Основной файл сервера
├── database.py           # Модуль подключения к базе данных
├── resources/            # Ресурсы базы данных
│   ├── __init__.py
│   └── users.py          # Ресурсы, связанные с пользователями
└── tools/                # Инструменты базы данных
    ├── __init__.py
    └── users.py          # Инструменты, связанные с пользователями

Шаг 2: Настройка конфигурации окружения

Создайте файл .env для учетных данных вашей базы данных:

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

Никогда не коммитьте этот файл в систему контроля версий. Добавьте его в .gitignore:

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

Создайте файл database.py для загрузки этих переменных окружения:

import os
import asyncpg
from dotenv import load_dotenv

# Загрузка переменных окружения
load_dotenv()

# Конфигурация базы данных
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

Шаг 3: Создание пула соединений с базой данных

Расширьте ваш файл database.py, чтобы включить пулинг соединений:

import os
import asyncpg
import logging
from dotenv import load_dotenv

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# Загрузка переменных окружения
load_dotenv()

# Конфигурация базы данных
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# Глобальная переменная пула
db_pool = None

async def init_db():
    """Инициализация пула соединений с базой данных."""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # Таймаут получения соединения
        )
        logger.info("Пул соединений с базой данных установлен")
        # Проверка соединения
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"Подключено к PostgreSQL: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"Не удалось создать пул базы данных: {e}")
        raise

async def close_db():
    """Закрытие пула соединений с базой данных."""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("Пул соединений с базой данных закрыт")

Это дает вам: Правильно настроенный пул соединений, использование переменных окружения для безопасности, логирование для мониторинга, оптимизацию размера пула, обработку таймаутов соединений, явную функцию закрытия для корректного завершения работы.

Шаг 4: Предоставление ресурса только для чтения

Создайте resources/users.py для предоставления данных о пользователях:

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    Получение самых последних пользователей из базы данных.
    
    Args:
        limit: Максимальное количество пользователей для возврата (по умолчанию: 20)
        
    Returns:
        Список объектов пользователей
    """
    try:
        if not db_pool:
            logger.error("Пул базы данных не инициализирован")
            return {"error": "Соединение с базой данных недоступно"}
            
        async with db_pool.acquire() as connection:
            # Используем параметризованный запрос для безопасности
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # Преобразуем в словари и обрабатываем сериализацию datetime
            users = []
            for row in rows:
                user = dict(row)
                # Преобразуем datetime в строку формата ISO для сериализации JSON
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Получено {len(users)} последних пользователей")
            return users
    except Exception as e:
        logger.error(f"Ошибка при получении последних пользователей: {e}")
        return {"error": f"Ошибка базы данных: {str(e)}"}

Теперь обновите ваш server.py, чтобы зарегистрировать этот ресурс:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# Создание MCP-сервера
server = MCPServer()

# Регистрация ресурса
@server.resource(
    name="recent_users", 
    description="Получение самых последних пользователей из базы данных."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # Инициализация базы данных
        await init_db()
        
        # Запуск сервера
        logger.info("Запуск MCP-сервера...")
        await server.start()
    except Exception as e:
        logger.error(f"Ошибка сервера: {e}")
    finally:
        # Закрытие соединений с базой данных при завершении
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Шаг 5: Реализация расширенных возможностей запросов

Создайте ресурс, который позволяет более гибко запрашивать с параметрами:

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    Получение пользователей, соответствующих определенным критериям.
    
    Args:
        department: Фильтр по отделу (опционально)
        role: Фильтр по роли (опционально)
        active: Фильтр по статусу активности (по умолчанию: True)
        limit: Максимальное количество результатов для возврата (по умолчанию: 20)
        
    Returns:
        Список соответствующих объектов пользователей
    """
    try:
        if not db_pool:
            logger.error("Пул базы данных не инициализирован")
            return {"error": "Соединение с базой данных недоступно"}
            
        # Построение динамического запроса
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # Построение итогового запроса
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # Преобразуем в словари и обрабатываем сериализацию datetime
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Получено {len(users)} пользователей, соответствующих критериям")
            return users
    except Exception as e:
        logger.error(f"Ошибка при получении пользователей по критериям: {e}")
        return {"error": f"Ошибка базы данных: {str(e)}"}

Зарегистрируйте это как параметризованный ресурс:

@server.resource(
    name="users_by_criteria", 
    description="Получение пользователей, соответствующих определенным критериям, таким как отдел или роль."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

Это позволяет LLM запрашивать определенные подмножества пользователей на основе бизнес-потребностей.

Шаг 6: Создание безопасного инструмента для вставки новых записей

Создайте tools/users.py для операций записи:

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """Модель валидации для запросов на создание пользователя."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Имя пользователя должно быть буквенно-цифровым')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Создание нового пользователя в базе данных.
    
    Args:
        data: Данные пользователя, содержащие имя пользователя, email и т.д.
        
    Returns:
        Ответ со статусом и информацией о пользователе
    """
    try:
        # Валидация входных данных с помощью Pydantic
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("Пул базы данных не инициализирован")
            return {
                "status": "error",
                "message": "Соединение с базой данных недоступно"
            }
        
        async with db_pool.acquire() as connection:
            # Проверка, существует ли пользователь уже
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "Пользователь с таким именем или email уже существует"
                }
            
            # Вставка нового пользователя
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"Создан новый пользователь: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"Пользователь {user_data.username} успешно создан",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"Ошибка при создании пользователя: {e}")
        return {
            "status": "error",
            "message": f"Не удалось создать пользователя: {str(e)}"
        }

Зарегистрируйте этот инструмент в server.py:

from tools.users import create_user

# ... существующий код ...

# Регистрация инструмента
@server.tool(
    name="create_user",
    description="Создание нового пользователя в базе данных."
)
async def create_user_tool(data: dict):
    return await create_user(data)

Ключевые функции безопасности: Валидация Pydantic с четкими ограничениями, валидация email с использованием EmailStr, валидация формата имени пользователя с помощью регулярных выражений, проверка дубликатов перед вставкой, четкие сообщения об ошибках, комплексное логирование, параметризованные запросы для предотвращения SQL-инъекций.

Шаг 7: Управление транзакциями

Для операций, требующих нескольких изменений в базе данных, используйте транзакции:

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    Перевод пользователя в новый отдел с записью изменения в журнале аудита.
    
    Args:
        user_id: ID пользователя для перевода
        new_department: Название целевого отдела
        
    Returns:
        Статус операции
    """
    try:
        if not db_pool:
            return {"error": "Соединение с базой данных недоступно"}
            
        async with db_pool.acquire() as connection:
            # Начало транзакции
            async with connection.transaction():
                # Получение текущего отдела
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "Пользователь не найден"}
                
                # Обновление отдела пользователя
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # Запись изменения в журнале аудита
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"Пользователь {user_id} переведен из {current_dept} в {new_department}")
                
                return {
                    "status": "success",
                    "message": f"Пользователь переведен из {current_dept} в {new_department}"
                }
    except Exception as e:
        logger.error(f"Ошибка при переводе пользователя: {e}")
        return {"error": f"Перевод не удался: {str(e)}"}

Зарегистрируйте это как инструмент:

@server.tool(
    name="transfer_user",
    description="Перевод пользователя в новый отдел."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Отсутствует user_id или new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

Это гарантирует, что обе операции (обновление пользователя + добавление записи аудита) будут успешными или неуспешными вместе.

Шаг 8: Полный пример кода сервера

Вот полный пример, объединяющий все вместе:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# Создание MCP-сервера
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="MCP-сервер с доступом к базе данных PostgreSQL"
)

# Регистрация ресурсов
@server.resource(
    name="recent_users", 
    description="Получение самых последних пользователей из базы данных."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="Получение пользователей, соответствующих определенным критериям, таким как отдел или роль."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# Регистрация инструментов
@server.tool(
    name="create_user",
    description="Создание нового пользователя в базе данных."
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="Перевод пользователя в новый отдел."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Отсутствует user_id или new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # Инициализация базы данных
        await init_db()
        
        # Запуск сервера
        logger.info("Запуск MCP-сервера...")
        await server.start()
    except Exception as e:
        logger.error(f"Ошибка сервера: {e}")
    finally:
        # Закрытие соединений с базой данных при завершении
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Соображения безопасности

При подключении MCP-сервера к базе данных следуйте этим лучшим практикам безопасности:

  1. Используйте выделенного пользователя базы данных с ограниченными правами:

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- Предоставляйте только необходимые права
    
  2. Проверяйте все входные данные с использованием моделей Pydantic со строгими правилами валидации.

  3. Используйте параметризованные запросы исключительно для предотвращения SQL-инъекций.

  4. Реализуйте аутентификацию для вашего MCP-сервера:

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Unauthorized"}
        return await next_handler(request)
    
  5. Реализуйте ограничение скорости для предотвращения злоупотреблений:

    # Простой ограничитель скорости в памяти
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # Очистка старых записей
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 100 запросов в минуту
                return {"error": "Превышен лимит запросов"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. Реализуйте логирование запросов для аудита:

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"Запрос: {request.method} {request.path} от {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. Настройте таймауты запросов к базе данных для предотвращения влияния долго выполняющихся запросов на производительность сервера.

Оптимизация производительности

  1. Пулинг соединений уже реализован, но настройте параметры в зависимости от вашей нагрузки:

    db_pool = await asyncpg.create_pool(
        **DB_CONFIG,
        min_size=5,       # Установите выше для сценариев с высокой нагрузкой
        max_size=20,      # Настройте в зависимости от возможностей вашего сервера базы данных
        statement_cache_size=100,  # Кэширование подготовленных выражений
        max_inactive_connection_lifetime=300  # Секунды до переработки неактивных соединений
    )
    
  2. Создайте индексы базы данных для часто запрашиваемых полей:

    CREATE INDEX idx_users_department ON users(department);
    CREATE INDEX idx_users_created_at ON users(created_at DESC);
    
  3. Реализуйте кэширование результатов для часто используемых, редко изменяющихся данных:

    from functools import lru_cache
    from datetime import datetime, timedelta
    
    # Кэш, который истекает через 5 минут
    cache_time = None
    cached_result = None
    
    async def fetch_departments_with_caching():
        global cache_time, cached_result
        
        # Проверка, действителен ли кэш
        current_time = datetime.now()
        if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5):
            return cached_result
            
        # Промах кэша - получение из базы данных
        async with db_pool.acquire() as connection:
            result = await connection.fetch("SELECT * FROM departments")
            
        # Обновление кэша
        cache_time = current_time
        cached_result = [dict(row) for row in result]
        
        return cached_result
    
  4. Используйте JSONB для сложных данных, которые не нужно интенсивно запрашивать:

    CREATE TABLE user_preferences (
        user_id INTEGER PRIMARY KEY,
        preferences JSONB NOT NULL
    );
    
  5. Пагинация для больших наборов результатов:

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Получение общего количества
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Получение результатов с пагинацией
            rows = await connection.fetch(
             "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
             page_size, offset
         )
         
         return {
             "users": [dict(row) for row in rows],
             "pagination": {
                 "total": total,
                 "page": page,
                 "page_size": page_size,
                 "pages": (total + page_size - 1) // page_size
             }
         }

Тестирование взаимодействий с базой данных

Создайте директорию tests с тестовым файлом:

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

Запустите тесты с помощью: pytest -xvs tests/

Рекомендации по развертыванию

  1. Используйте конфигурационные файлы для разных сред: .env.development, .env.staging, .env.production

  2. Настройте миграции базы данных для изменений схемы:

    pip install alembic
    alembic init migrations
  3. Разверните с помощью Docker для обеспечения согласованности:

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
  4. Настройте проверки работоспособности:

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
  5. Мониторинг производительности запросов:

    -- В PostgreSQL
    CREATE EXTENSION pg_stat_statements;
    
    -- Для анализа медленных запросов
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
  6. Настройка резервного копирования базы данных:

    # Скрипт ежедневного резервного копирования
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump

Заключение

Подключение вашего MCP-сервера к базе данных PostgreSQL превращает его из простой демонстрации в готовый к промышленной эксплуатации ИИ-бэкенд. Уделяя должное внимание безопасности, производительности и надежности, вы можете безопасно предоставить доступ к вашей базе данных языковым моделям, обеспечивая мощные рабочие процессы на базе ИИ. Ключевые принципы, которые следует помнить: Ресурсы для операций чтения, Инструменты для операций записи, Валидация для всех входных данных, Транзакции для согласованности, Пулы соединений для производительности, Переменные окружения для безопасности, Структурированная обработка ошибок для надежности. Следуя этим практикам, вы позволяете языковым моделям работать вместе с вашими существующими приложениями баз данных безопасным, контролируемым способом — открывая новые возможности для рабочих процессов с помощью ИИ и анализа данных. В следующем руководстве мы покажем, как подключить ваш существующий API-бэкенд через MCP без переписывания бизнес-логики.

Часто задаваемые вопросы

Да. Вы можете создать несколько обработчиков ресурсов и инструментов, каждый из которых подключается к разной базе данных или схеме.

Вы должны добавить обработку try/except, чтобы возвращать понятную ошибку языковой модели вместо аварийного завершения.

Предоставляйте доступ только к выбранным операциям чтения или записи через ресурсы и инструменты. Никогда не давайте языковой модели неограниченный доступ к базе данных.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers