Как расширить ваш MCP-сервер с доступом к базе данных

Подключение вашего MCP-сервера к реальной базе данных превращает его из простой демонстрации в готовое к производству AI-решение. В этом руководстве вы узнаете, как именно интегрировать PostgreSQL в ваш MCP-сервер и безопасно предоставить к нему доступ для LLM.
Ключевые моменты
- Вы можете подключить PostgreSQL к вашему MCP-серверу с помощью asyncpg
- Ресурсы позволяют LLM получать актуальные структурированные данные
- Инструменты позволяют LLM безопасно вставлять, обновлять или удалять записи
- Валидация ввода и управление транзакциями необходимы для производственного использования
- Конфигурация окружения обеспечивает безопасность ваших учетных данных
Зачем подключать базу данных к MCP
Без доступа к базе данных ваша LLM не видит реальных данных приложения. Подключив её, вы можете: позволить ИИ отвечать на вопросы на основе реальных пользователей, заказов, тикетов и т.д.; автоматизировать действия, такие как создание записей или обновление данных; создавать интеллектуальных внутренних агентов без необходимости отдельных API; обеспечить контекстно-зависимые ответы ИИ с использованием состояния вашего приложения; создавать аналитику на базе ИИ для ваших бизнес-данных. Это первый шаг к превращению модели в реального помощника приложения, который обеспечивает реальную бизнес-ценность.
Что вам нужно перед началом
- Работающая база данных PostgreSQL (v12+)
- Python MCP-сервер (базовая реализация)
- Python 3.10+ с поддержкой асинхронности
- Библиотека
asyncpg
для доступа к базе данных - Пакет
mcp-server
(официальный Python SDK) - Python-dotenv для конфигурации окружения
- Базовые знания SQL и асинхронного Python
Обзор архитектуры
Архитектура включает несколько компонентов: (1) LLM-клиент: Claude или другая LLM, которая взаимодействует по протоколу MCP, (2) MCP-сервер: Ваш Python-сервер, предоставляющий ресурсы и инструменты, (3) Пул соединений: Эффективно управляет соединениями с базой данных, (4) PostgreSQL: Базовая база данных, хранящая данные вашего приложения.
Эта настройка следует четкому разделению обязанностей: Ресурсы обеспечивают доступ только для чтения для запросов, Инструменты позволяют выполнять контролируемые операции записи, Пулинг соединений оптимизирует производительность, Конфигурация окружения обеспечивает безопасность учетных данных.
Шаг 1: Установка и настройка зависимостей базы данных
Сначала установите необходимые пакеты:
pip install asyncpg python-dotenv mcp-server
Создайте структуру проекта:
mcp-db-server/
├── .env # Переменные окружения (никогда не коммитить в git)
├── requirements.txt # Зависимости
├── server.py # Основной файл сервера
├── database.py # Модуль подключения к базе данных
├── resources/ # Ресурсы базы данных
│ ├── __init__.py
│ └── users.py # Ресурсы, связанные с пользователями
└── tools/ # Инструменты базы данных
├── __init__.py
└── users.py # Инструменты, связанные с пользователями
Шаг 2: Настройка конфигурации окружения
Создайте файл .env
для учетных данных вашей базы данных:
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432
Никогда не коммитьте этот файл в систему контроля версий. Добавьте его в .gitignore
:
# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class
Создайте файл database.py
для загрузки этих переменных окружения:
import os
import asyncpg
from dotenv import load_dotenv
# Загрузка переменных окружения
load_dotenv()
# Конфигурация базы данных
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
Шаг 3: Создание пула соединений с базой данных
Расширьте ваш файл database.py
, чтобы включить пулинг соединений:
import os
import asyncpg
import logging
from dotenv import load_dotenv
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")
# Загрузка переменных окружения
load_dotenv()
# Конфигурация базы данных
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
# Глобальная переменная пула
db_pool = None
async def init_db():
"""Инициализация пула соединений с базой данных."""
global db_pool
try:
db_pool = await asyncpg.create_pool(
**DB_CONFIG,
min_size=1,
max_size=10,
command_timeout=60,
timeout=10, # Таймаут получения соединения
)
logger.info("Пул соединений с базой данных установлен")
# Проверка соединения
async with db_pool.acquire() as connection:
version = await connection.fetchval("SELECT version();")
logger.info(f"Подключено к PostgreSQL: {version}")
return db_pool
except Exception as e:
logger.error(f"Не удалось создать пул базы данных: {e}")
raise
async def close_db():
"""Закрытие пула соединений с базой данных."""
global db_pool
if db_pool:
await db_pool.close()
logger.info("Пул соединений с базой данных закрыт")
Это дает вам: Правильно настроенный пул соединений, использование переменных окружения для безопасности, логирование для мониторинга, оптимизацию размера пула, обработку таймаутов соединений, явную функцию закрытия для корректного завершения работы.
Шаг 4: Предоставление ресурса только для чтения
Создайте resources/users.py
для предоставления данных о пользователях:
from typing import List, Dict, Any, Optional
import logging
from database import db_pool
logger = logging.getLogger("mcp_database.resources.users")
async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
"""
Получение самых последних пользователей из базы данных.
Args:
limit: Максимальное количество пользователей для возврата (по умолчанию: 20)
Returns:
Список объектов пользователей
"""
try:
if not db_pool:
logger.error("Пул базы данных не инициализирован")
return {"error": "Соединение с базой данных недоступно"}
async with db_pool.acquire() as connection:
# Используем параметризованный запрос для безопасности
query = """
SELECT id, username, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1;
"""
rows = await connection.fetch(query, limit)
# Преобразуем в словари и обрабатываем сериализацию datetime
users = []
for row in rows:
user = dict(row)
# Преобразуем datetime в строку формата ISO для сериализации JSON
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Получено {len(users)} последних пользователей")
return users
except Exception as e:
logger.error(f"Ошибка при получении последних пользователей: {e}")
return {"error": f"Ошибка базы данных: {str(e)}"}
Теперь обновите ваш server.py
, чтобы зарегистрировать этот ресурс:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# Создание MCP-сервера
server = MCPServer()
# Регистрация ресурса
@server.resource(
name="recent_users",
description="Получение самых последних пользователей из базы данных."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
async def main():
try:
# Инициализация базы данных
await init_db()
# Запуск сервера
logger.info("Запуск MCP-сервера...")
await server.start()
except Exception as e:
logger.error(f"Ошибка сервера: {e}")
finally:
# Закрытие соединений с базой данных при завершении
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Шаг 5: Реализация расширенных возможностей запросов
Создайте ресурс, который позволяет более гибко запрашивать с параметрами:
async def fetch_users_by_criteria(
department: Optional[str] = None,
role: Optional[str] = None,
active: Optional[bool] = True,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Получение пользователей, соответствующих определенным критериям.
Args:
department: Фильтр по отделу (опционально)
role: Фильтр по роли (опционально)
active: Фильтр по статусу активности (по умолчанию: True)
limit: Максимальное количество результатов для возврата (по умолчанию: 20)
Returns:
Список соответствующих объектов пользователей
"""
try:
if not db_pool:
logger.error("Пул базы данных не инициализирован")
return {"error": "Соединение с базой данных недоступно"}
# Построение динамического запроса
conditions = ["active = $1"]
params = [active]
param_count = 1
if department:
param_count += 1
conditions.append(f"department = ${param_count}")
params.append(department)
if role:
param_count += 1
conditions.append(f"role = ${param_count}")
params.append(role)
# Построение итогового запроса
query = f"""
SELECT id, username, email, department, role, created_at
FROM users
WHERE {' AND '.join(conditions)}
ORDER BY created_at DESC
LIMIT ${param_count + 1};
"""
params.append(limit)
async with db_pool.acquire() as connection:
rows = await connection.fetch(query, *params)
# Преобразуем в словари и обрабатываем сериализацию datetime
users = []
for row in rows:
user = dict(row)
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Получено {len(users)} пользователей, соответствующих критериям")
return users
except Exception as e:
logger.error(f"Ошибка при получении пользователей по критериям: {e}")
return {"error": f"Ошибка базы данных: {str(e)}"}
Зарегистрируйте это как параметризованный ресурс:
@server.resource(
name="users_by_criteria",
description="Получение пользователей, соответствующих определенным критериям, таким как отдел или роль."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
Это позволяет LLM запрашивать определенные подмножества пользователей на основе бизнес-потребностей.
Шаг 6: Создание безопасного инструмента для вставки новых записей
Создайте tools/users.py
для операций записи:
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool
logger = logging.getLogger("mcp_database.tools.users")
class CreateUserRequest(BaseModel):
"""Модель валидации для запросов на создание пользователя."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
department: Optional[str] = "General"
role: Optional[str] = "User"
@validator('username')
def username_alphanumeric(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Имя пользователя должно быть буквенно-цифровым')
return v
async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Создание нового пользователя в базе данных.
Args:
data: Данные пользователя, содержащие имя пользователя, email и т.д.
Returns:
Ответ со статусом и информацией о пользователе
"""
try:
# Валидация входных данных с помощью Pydantic
user_data = CreateUserRequest(**data)
if not db_pool:
logger.error("Пул базы данных не инициализирован")
return {
"status": "error",
"message": "Соединение с базой данных недоступно"
}
async with db_pool.acquire() as connection:
# Проверка, существует ли пользователь уже
existing_user = await connection.fetchrow(
"SELECT id FROM users WHERE username = $1 OR email = $2",
user_data.username,
user_data.email
)
if existing_user:
return {
"status": "error",
"message": "Пользователь с таким именем или email уже существует"
}
# Вставка нового пользователя
query = """
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
RETURNING id;
"""
user_id = await connection.fetchval(
query,
user_data.username,
user_data.email,
user_data.department,
user_data.role
)
logger.info(f"Создан новый пользователь: {user_data.username} (ID: {user_id})")
return {
"status": "success",
"message": f"Пользователь {user_data.username} успешно создан",
"user_id": user_id
}
except Exception as e:
logger.error(f"Ошибка при создании пользователя: {e}")
return {
"status": "error",
"message": f"Не удалось создать пользователя: {str(e)}"
}
Зарегистрируйте этот инструмент в server.py
:
from tools.users import create_user
# ... существующий код ...
# Регистрация инструмента
@server.tool(
name="create_user",
description="Создание нового пользователя в базе данных."
)
async def create_user_tool(data: dict):
return await create_user(data)
Ключевые функции безопасности: Валидация Pydantic с четкими ограничениями, валидация email с использованием EmailStr, валидация формата имени пользователя с помощью регулярных выражений, проверка дубликатов перед вставкой, четкие сообщения об ошибках, комплексное логирование, параметризованные запросы для предотвращения SQL-инъекций.
Шаг 7: Управление транзакциями
Для операций, требующих нескольких изменений в базе данных, используйте транзакции:
async def transfer_user_to_department(
user_id: int,
new_department: str
) -> Dict[str, Any]:
"""
Перевод пользователя в новый отдел с записью изменения в журнале аудита.
Args:
user_id: ID пользователя для перевода
new_department: Название целевого отдела
Returns:
Статус операции
"""
try:
if not db_pool:
return {"error": "Соединение с базой данных недоступно"}
async with db_pool.acquire() as connection:
# Начало транзакции
async with connection.transaction():
# Получение текущего отдела
current_dept = await connection.fetchval(
"SELECT department FROM users WHERE id = $1",
user_id
)
if not current_dept:
return {"error": "Пользователь не найден"}
# Обновление отдела пользователя
await connection.execute(
"UPDATE users SET department = $1 WHERE id = $2",
new_department,
user_id
)
# Запись изменения в журнале аудита
await connection.execute(
"""
INSERT INTO user_audit_log
(user_id, field_changed, old_value, new_value)
VALUES ($1, $2, $3, $4)
""",
user_id,
"department",
current_dept,
new_department
)
logger.info(f"Пользователь {user_id} переведен из {current_dept} в {new_department}")
return {
"status": "success",
"message": f"Пользователь переведен из {current_dept} в {new_department}"
}
except Exception as e:
logger.error(f"Ошибка при переводе пользователя: {e}")
return {"error": f"Перевод не удался: {str(e)}"}
Зарегистрируйте это как инструмент:
@server.tool(
name="transfer_user",
description="Перевод пользователя в новый отдел."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Отсутствует user_id или new_department"}
return await transfer_user_to_department(user_id, new_department)
Это гарантирует, что обе операции (обновление пользователя + добавление записи аудита) будут успешными или неуспешными вместе.
Шаг 8: Полный пример кода сервера
Вот полный пример, объединяющий все вместе:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")
# Создание MCP-сервера
server = MCPServer(
name="DatabaseMCPServer",
version="1.0.0",
description="MCP-сервер с доступом к базе данных PostgreSQL"
)
# Регистрация ресурсов
@server.resource(
name="recent_users",
description="Получение самых последних пользователей из базы данных."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
@server.resource(
name="users_by_criteria",
description="Получение пользователей, соответствующих определенным критериям, таким как отдел или роль."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
# Регистрация инструментов
@server.tool(
name="create_user",
description="Создание нового пользователя в базе данных."
)
async def create_user_tool(data: dict):
return await create_user(data)
@server.tool(
name="transfer_user",
description="Перевод пользователя в новый отдел."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Отсутствует user_id или new_department"}
return await transfer_user_to_department(user_id, new_department)
async def main():
try:
# Инициализация базы данных
await init_db()
# Запуск сервера
logger.info("Запуск MCP-сервера...")
await server.start()
except Exception as e:
logger.error(f"Ошибка сервера: {e}")
finally:
# Закрытие соединений с базой данных при завершении
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Соображения безопасности
При подключении MCP-сервера к базе данных следуйте этим лучшим практикам безопасности:
-
Используйте выделенного пользователя базы данных с ограниченными правами:
CREATE USER mcp_user WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT ON users TO mcp_user; -- Предоставляйте только необходимые права
-
Проверяйте все входные данные с использованием моделей Pydantic со строгими правилами валидации.
-
Используйте параметризованные запросы исключительно для предотвращения SQL-инъекций.
-
Реализуйте аутентификацию для вашего MCP-сервера:
@server.middleware async def auth_middleware(request, next_handler): token = request.headers.get("Authorization") if not token or not verify_token(token): return {"error": "Unauthorized"} return await next_handler(request)
-
Реализуйте ограничение скорости для предотвращения злоупотреблений:
# Простой ограничитель скорости в памяти request_counts = {} @server.middleware async def rate_limit_middleware(request, next_handler): client_id = request.client.host current_time = time.time() # Очистка старых записей request_counts = {k: v for k, v in request_counts.items() if v["timestamp"] > current_time - 60} if client_id in request_counts: if request_counts[client_id]["count"] > 100: # 100 запросов в минуту return {"error": "Превышен лимит запросов"} request_counts[client_id]["count"] += 1 else: request_counts[client_id] = {"count": 1, "timestamp": current_time} return await next_handler(request)
-
Реализуйте логирование запросов для аудита:
@server.middleware async def logging_middleware(request, next_handler): logger.info(f"Запрос: {request.method} {request.path} от {request.client.host}") response = await next_handler(request) return response
-
Настройте таймауты запросов к базе данных для предотвращения влияния долго выполняющихся запросов на производительность сервера.
Оптимизация производительности
-
Пулинг соединений уже реализован, но настройте параметры в зависимости от вашей нагрузки:
db_pool = await asyncpg.create_pool( **DB_CONFIG, min_size=5, # Установите выше для сценариев с высокой нагрузкой max_size=20, # Настройте в зависимости от возможностей вашего сервера базы данных statement_cache_size=100, # Кэширование подготовленных выражений max_inactive_connection_lifetime=300 # Секунды до переработки неактивных соединений )
-
Создайте индексы базы данных для часто запрашиваемых полей:
CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);
-
Реализуйте кэширование результатов для часто используемых, редко изменяющихся данных:
from functools import lru_cache from datetime import datetime, timedelta # Кэш, который истекает через 5 минут cache_time = None cached_result = None async def fetch_departments_with_caching(): global cache_time, cached_result # Проверка, действителен ли кэш current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result # Промах кэша - получение из базы данных async with db_pool.acquire() as connection: result = await connection.fetch("SELECT * FROM departments") # Обновление кэша cache_time = current_time cached_result = [dict(row) for row in result] return cached_result
-
Используйте JSONB для сложных данных, которые не нужно интенсивно запрашивать:
CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY, preferences JSONB NOT NULL );
-
Пагинация для больших наборов результатов:
async def fetch_paginated_users(page: int = 1, page_size: int = 20): offset = (page - 1) * page_size async with db_pool.acquire() as connection: # Получение общего количества total = await connection.fetchval("SELECT COUNT(*) FROM users") # Получение результатов с пагинацией rows = await connection.fetch( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", page_size, offset ) return { "users": [dict(row) for row in rows], "pagination": { "total": total, "page": page, "page_size": page_size, "pages": (total + page_size - 1) // page_size } }
Тестирование взаимодействий с базой данных
Создайте директорию tests
с тестовым файлом:
# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg
# Load test environment variables
load_dotenv(".env.test")
# Testing database configuration
TEST_DB_CONFIG = {
"user": os.getenv("TEST_DB_USER"),
"password": os.getenv("TEST_DB_PASSWORD"),
"database": os.getenv("TEST_DB_NAME"),
"host": os.getenv("TEST_DB_HOST", "localhost"),
"port": int(os.getenv("TEST_DB_PORT", "5432")),
}
@pytest.fixture
async def db_pool():
"""Create a test database pool."""
pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
yield pool
await pool.close()
@pytest.fixture
async def setup_test_data(db_pool):
"""Set up test data before tests and clean up after."""
async with db_pool.acquire() as conn:
# Create test tables
await conn.execute("""
CREATE TEMPORARY TABLE users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT NOT NULL DEFAULT 'General',
role TEXT NOT NULL DEFAULT 'User',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
# Insert test users
await conn.executemany(
"""
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
""",
[
("test1", "test1@example.com", "Engineering", "Developer"),
("test2", "test2@example.com", "Marketing", "Manager"),
("test3", "test3@example.com", "Sales", "Representative")
]
)
yield
# Cleanup happens automatically for temporary tables
@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
"""Test that fetching recent users works correctly."""
from resources.users import fetch_recent_users
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Execute the function
result = await fetch_recent_users(limit=10)
# Assertions
assert isinstance(result, list)
assert len(result) == 3
assert result[0]["username"] in ["test1", "test2", "test3"]
assert "email" in result[0]
assert "department" in result[0]
@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
"""Test that creating a user works correctly."""
from tools.users import create_user
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"department": "Finance",
"role": "Analyst"
}
# Execute the function
result = await create_user(user_data)
# Assertions
assert result["status"] == "success"
# Verify in database
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
"newuser"
)
assert user is not None
assert user["email"] == "newuser@example.com"
assert user["department"] == "Finance"
Запустите тесты с помощью: pytest -xvs tests/
Рекомендации по развертыванию
-
Используйте конфигурационные файлы для разных сред:
.env.development
,.env.staging
,.env.production
-
Настройте миграции базы данных для изменений схемы:
pip install alembic alembic init migrations
-
Разверните с помощью Docker для обеспечения согласованности:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "server.py"]
-
Настройте проверки работоспособности:
@server.route("/health") async def health_check(): try: async with db_pool.acquire() as connection: await connection.fetchval("SELECT 1") return {"status": "healthy", "database": "connected"} except Exception as e: return {"status": "unhealthy", "error": str(e)}
-
Мониторинг производительности запросов:
-- В PostgreSQL CREATE EXTENSION pg_stat_statements; -- Для анализа медленных запросов SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;
-
Настройка резервного копирования базы данных:
# Скрипт ежедневного резервного копирования pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
Заключение
Подключение вашего MCP-сервера к базе данных PostgreSQL превращает его из простой демонстрации в готовый к промышленной эксплуатации ИИ-бэкенд. Уделяя должное внимание безопасности, производительности и надежности, вы можете безопасно предоставить доступ к вашей базе данных языковым моделям, обеспечивая мощные рабочие процессы на базе ИИ. Ключевые принципы, которые следует помнить: Ресурсы для операций чтения, Инструменты для операций записи, Валидация для всех входных данных, Транзакции для согласованности, Пулы соединений для производительности, Переменные окружения для безопасности, Структурированная обработка ошибок для надежности. Следуя этим практикам, вы позволяете языковым моделям работать вместе с вашими существующими приложениями баз данных безопасным, контролируемым способом — открывая новые возможности для рабочих процессов с помощью ИИ и анализа данных. В следующем руководстве мы покажем, как подключить ваш существующий API-бэкенд через MCP без переписывания бизнес-логики.
Часто задаваемые вопросы
Да. Вы можете создать несколько обработчиков ресурсов и инструментов, каждый из которых подключается к разной базе данных или схеме.
Вы должны добавить обработку try/except, чтобы возвращать понятную ошибку языковой модели вместо аварийного завершения.
Предоставляйте доступ только к выбранным операциям чтения или записи через ресурсы и инструменты. Никогда не давайте языковой модели неограниченный доступ к базе данных.