如何使用数据库访问扩展你的MCP服务器

将你的MCP服务器连接到真实数据库可以将其从简单的演示转变为生产就绪的AI后端。在本指南中,你将确切了解如何将PostgreSQL集成到MCP服务器中,并安全地将其暴露给LLM。
关键要点
- 你可以使用asyncpg将PostgreSQL连接到MCP服务器
- 资源(Resources)允许LLM获取实时结构化数据
- 工具(Tools)允许LLM安全地插入、更新或删除记录
- 输入验证和事务管理对生产环境使用至关重要
- 环境配置保证凭据安全
为什么要将数据库连接到MCP
没有数据库访问权限,你的LLM对实际应用数据一无所知。通过连接数据库,你可以:让AI基于真实用户、订单、工单等回答问题;自动执行创建条目或更新记录等操作;构建智能内部代理,无需单独的API;使用应用程序状态启用上下文感知的AI响应;基于业务数据创建AI驱动的分析。这是将模型转变为提供真正业务价值的实际应用助手的第一步。
开始前需要准备的内容
- 一个运行中的PostgreSQL数据库(v12+)
- Python MCP服务器(基本实现)
- 支持异步的Python 3.10+
- 用于数据库访问的
asyncpg
库 mcp-server
包(官方Python SDK)- 用于环境配置的Python-dotenv
- SQL和异步Python的基础知识
架构概述
该架构涉及几个组件:(1) LLM客户端:通过MCP协议通信的Claude或其他LLM,(2) MCP服务器:暴露资源和工具的Python服务器,(3) 连接池:高效管理数据库连接,(4) PostgreSQL:存储应用程序数据的底层数据库。
这种设置遵循关注点分离原则:资源提供只读访问权限用于查询,工具启用受控的写操作,连接池优化性能,环境配置保证凭据安全。
步骤1:安装和配置数据库依赖
首先,安装所需的包:
pip install asyncpg python-dotenv mcp-server
创建项目结构:
mcp-db-server/
├── .env # 环境变量(永远不要提交到git)
├── requirements.txt # 依赖项
├── server.py # 主服务器文件
├── database.py # 数据库连接模块
├── resources/ # 数据库资源
│ ├── __init__.py
│ └── users.py # 用户相关资源
└── tools/ # 数据库工具
├── __init__.py
└── users.py # 用户相关工具
步骤2:设置环境配置
创建一个.env
文件存储数据库凭据:
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432
永远不要将此文件提交到版本控制中。 将其添加到.gitignore
:
# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class
创建一个database.py
文件来加载这些环境变量:
import os
import asyncpg
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 数据库配置
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
步骤3:创建数据库连接池
扩展你的database.py
文件以包含连接池:
import os
import asyncpg
import logging
from dotenv import load_dotenv
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")
# 加载环境变量
load_dotenv()
# 数据库配置
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
# 全局池变量
db_pool = None
async def init_db():
"""初始化数据库连接池。"""
global db_pool
try:
db_pool = await asyncpg.create_pool(
**DB_CONFIG,
min_size=1,
max_size=10,
command_timeout=60,
timeout=10, # 连接获取超时
)
logger.info("数据库连接池已建立")
# 测试连接
async with db_pool.acquire() as connection:
version = await connection.fetchval("SELECT version();")
logger.info(f"已连接到PostgreSQL: {version}")
return db_pool
except Exception as e:
logger.error(f"创建数据库池失败: {e}")
raise
async def close_db():
"""关闭数据库连接池。"""
global db_pool
if db_pool:
await db_pool.close()
logger.info("数据库连接池已关闭")
这为你提供了:正确配置的连接池,使用环境变量以确保安全,用于监控的日志记录,池大小优化,连接超时处理,用于干净关闭的显式关闭函数。
步骤4:暴露只读资源
创建resources/users.py
以暴露用户数据:
from typing import List, Dict, Any, Optional
import logging
from database import db_pool
logger = logging.getLogger("mcp_database.resources.users")
async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
"""
从数据库获取最近的用户。
参数:
limit: 返回的最大用户数(默认: 20)
返回:
用户对象列表
"""
try:
if not db_pool:
logger.error("数据库池未初始化")
return {"error": "数据库连接不可用"}
async with db_pool.acquire() as connection:
# 使用参数化查询以确保安全
query = """
SELECT id, username, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1;
"""
rows = await connection.fetch(query, limit)
# 转换为字典并处理日期时间序列化
users = []
for row in rows:
user = dict(row)
# 将datetime转换为ISO格式字符串用于JSON序列化
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"获取了{len(users)}个最近用户")
return users
except Exception as e:
logger.error(f"获取最近用户时出错: {e}")
return {"error": f"数据库错误: {str(e)}"}
现在,更新你的server.py
来注册这个资源:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# 创建MCP服务器
server = MCPServer()
# 注册资源
@server.resource(
name="recent_users",
description="从数据库获取最近的用户。"
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
async def main():
try:
# 初始化数据库
await init_db()
# 启动服务器
logger.info("启动MCP服务器...")
await server.start()
except Exception as e:
logger.error(f"服务器错误: {e}")
finally:
# 关闭时关闭数据库连接
await close_db()
if __name__ == "__main__":
asyncio.run(main())
步骤5:实现高级查询功能
创建一个允许更灵活查询并带参数的资源:
async def fetch_users_by_criteria(
department: Optional[str] = None,
role: Optional[str] = None,
active: Optional[bool] = True,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
获取符合特定条件的用户。
参数:
department: 按部门筛选(可选)
role: 按角色筛选(可选)
active: 按活跃状态筛选(默认: True)
limit: 返回的最大结果数(默认: 20)
返回:
匹配的用户对象列表
"""
try:
if not db_pool:
logger.error("数据库池未初始化")
return {"error": "数据库连接不可用"}
# 构建动态查询
conditions = ["active = $1"]
params = [active]
param_count = 1
if department:
param_count += 1
conditions.append(f"department = ${param_count}")
params.append(department)
if role:
param_count += 1
conditions.append(f"role = ${param_count}")
params.append(role)
# 构建最终查询
query = f"""
SELECT id, username, email, department, role, created_at
FROM users
WHERE {' AND '.join(conditions)}
ORDER BY created_at DESC
LIMIT ${param_count + 1};
"""
params.append(limit)
async with db_pool.acquire() as connection:
rows = await connection.fetch(query, *params)
# 转换为字典并处理日期时间序列化
users = []
for row in rows:
user = dict(row)
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"获取了{len(users)}个符合条件的用户")
return users
except Exception as e:
logger.error(f"按条件获取用户时出错: {e}")
return {"error": f"数据库错误: {str(e)}"}
将其注册为参数化资源:
@server.resource(
name="users_by_criteria",
description="获取符合特定条件(如部门或角色)的用户。"
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
这允许LLM根据业务需求请求特定的用户子集。
步骤6:创建安全的工具来插入新记录
创建tools/users.py
用于写操作:
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool
logger = logging.getLogger("mcp_database.tools.users")
class CreateUserRequest(BaseModel):
"""用户创建请求的验证模型。"""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
department: Optional[str] = "General"
role: Optional[str] = "User"
@validator('username')
def username_alphanumeric(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('用户名必须是字母数字')
return v
async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
"""
在数据库中创建新用户。
参数:
data: 包含用户名、电子邮件等的用户数据
返回:
带有状态和用户信息的响应
"""
try:
# 使用Pydantic验证输入数据
user_data = CreateUserRequest(**data)
if not db_pool:
logger.error("数据库池未初始化")
return {
"status": "error",
"message": "数据库连接不可用"
}
async with db_pool.acquire() as connection:
# 检查用户是否已存在
existing_user = await connection.fetchrow(
"SELECT id FROM users WHERE username = $1 OR email = $2",
user_data.username,
user_data.email
)
if existing_user:
return {
"status": "error",
"message": "具有此用户名或电子邮件的用户已存在"
}
# 插入新用户
query = """
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
RETURNING id;
"""
user_id = await connection.fetchval(
query,
user_data.username,
user_data.email,
user_data.department,
user_data.role
)
logger.info(f"创建了新用户: {user_data.username} (ID: {user_id})")
return {
"status": "success",
"message": f"用户 {user_data.username} 创建成功",
"user_id": user_id
}
except Exception as e:
logger.error(f"创建用户时出错: {e}")
return {
"status": "error",
"message": f"创建用户失败: {str(e)}"
}
在server.py
中注册这个工具:
from tools.users import create_user
# ... 现有代码 ...
# 注册工具
@server.tool(
name="create_user",
description="在数据库中创建新用户。"
)
async def create_user_tool(data: dict):
return await create_user(data)
添加的关键安全功能: 带有明确约束的Pydantic验证,使用EmailStr进行电子邮件验证,使用正则表达式进行用户名格式验证,插入前的重复检查,清晰的错误消息,全面的日志记录,参数化查询以防止SQL注入。
步骤7:事务管理
对于需要多个数据库更改的操作,使用事务:
async def transfer_user_to_department(
user_id: int,
new_department: str
) -> Dict[str, Any]:
"""
将用户转移到新部门,并在审计日志中记录更改。
参数:
user_id: 要转移的用户ID
new_department: 目标部门名称
返回:
操作状态
"""
try:
if not db_pool:
return {"error": "数据库连接不可用"}
async with db_pool.acquire() as connection:
# 开始事务
async with connection.transaction():
# 获取当前部门
current_dept = await connection.fetchval(
"SELECT department FROM users WHERE id = $1",
user_id
)
if not current_dept:
return {"error": "未找到用户"}
# 更新用户的部门
await connection.execute(
"UPDATE users SET department = $1 WHERE id = $2",
new_department,
user_id
)
# 在审计日志中记录更改
await connection.execute(
"""
INSERT INTO user_audit_log
(user_id, field_changed, old_value, new_value)
VALUES ($1, $2, $3, $4)
""",
user_id,
"department",
current_dept,
new_department
)
logger.info(f"将用户 {user_id} 从 {current_dept} 转移到 {new_department}")
return {
"status": "success",
"message": f"用户从 {current_dept} 转移到 {new_department}"
}
except Exception as e:
logger.error(f"转移用户时出错: {e}")
return {"error": f"转移失败: {str(e)}"}
将其注册为工具:
@server.tool(
name="transfer_user",
description="将用户转移到新部门。"
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "缺少user_id或new_department"}
return await transfer_user_to_department(user_id, new_department)
这确保两个操作(更新用户+添加审计日志)要么一起成功,要么一起失败。
步骤8:完整服务器代码示例
这是一个将所有内容整合在一起的完整示例:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")
# 创建MCP服务器
server = MCPServer(
name="DatabaseMCPServer",
version="1.0.0",
description="具有PostgreSQL数据库访问功能的MCP服务器"
)
# 注册资源
@server.resource(
name="recent_users",
description="从数据库获取最近的用户。"
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
@server.resource(
name="users_by_criteria",
description="获取符合特定条件(如部门或角色)的用户。"
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
# 注册工具
@server.tool(
name="create_user",
description="在数据库中创建新用户。"
)
async def create_user_tool(data: dict):
return await create_user(data)
@server.tool(
name="transfer_user",
description="将用户转移到新部门。"
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "缺少user_id或new_department"}
return await transfer_user_to_department(user_id, new_department)
async def main():
try:
# 初始化数据库
await init_db()
# 启动服务器
logger.info("启动MCP服务器...")
await server.start()
except Exception as e:
logger.error(f"服务器错误: {e}")
finally:
# 关闭时关闭数据库连接
await close_db()
if __name__ == "__main__":
asyncio.run(main())
安全考虑
将MCP服务器连接到数据库时,请遵循以下安全最佳实践:
-
使用具有有限权限的专用数据库用户:
CREATE USER mcp_user WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT ON users TO mcp_user; -- 只授予必要的权限
-
使用Pydantic模型和严格验证规则验证所有输入。
-
专门使用参数化查询以防止SQL注入。
-
为MCP服务器实现身份验证:
@server.middleware async def auth_middleware(request, next_handler): token = request.headers.get("Authorization") if not token or not verify_token(token): return {"error": "未授权"} return await next_handler(request)
-
实现速率限制以防止滥用:
# 简单的内存速率限制器 request_counts = {} @server.middleware async def rate_limit_middleware(request, next_handler): client_id = request.client.host current_time = time.time() # 清除旧条目 request_counts = {k: v for k, v in request_counts.items() if v["timestamp"] > current_time - 60} if client_id in request_counts: if request_counts[client_id]["count"] > 100: # 每分钟100个请求 return {"error": "超出速率限制"} request_counts[client_id]["count"] += 1 else: request_counts[client_id] = {"count": 1, "timestamp": current_time} return await next_handler(request)
-
实现请求日志记录以进行审计:
@server.middleware async def logging_middleware(request, next_handler): logger.info(f"请求: {request.method} {request.path} 来自 {request.client.host}") response = await next_handler(request) return response
-
设置数据库查询超时以防止长时间运行的查询影响服务器性能。
性能优化
-
连接池已经实现,但根据你的工作负载调整参数:
db_pool = await asyncpg.create_pool( **DB_CONFIG, min_size=5, # 对于高负载场景设置更高 max_size=20, # 根据数据库服务器容量调整 statement_cache_size=100, # 缓存预处理语句 max_inactive_connection_lifetime=300 # 闲置连接回收前的秒数 )
-
为经常查询的字段创建数据库索引:
CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);
-
为经常访问、很少更改的数据实现结果缓存:
from functools import lru_cache from datetime import datetime, timedelta # 5分钟后过期的缓存 cache_time = None cached_result = None async def fetch_departments_with_caching(): global cache_time, cached_result # 检查缓存是否有效 current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result # 缓存未命中 - 从数据库获取 async with db_pool.acquire() as connection: result = await connection.fetch("SELECT * FROM departments") # 更新缓存 cache_time = current_time cached_result = [dict(row) for row in result] return cached_result
-
对不需要大量查询的复杂数据使用JSONB:
CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY, preferences JSONB NOT NULL );
-
大结果集的分页:
async def fetch_paginated_users(page: int = 1, page_size: int = 20): offset = (page - 1) * page_size async with db_pool.acquire() as connection: # 获取总数 total = await connection.fetchval("SELECT COUNT(*) FROM users") # 获取分页结果 rows = await connection.fetch( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", page_size, offset ) return { "users": [dict(row) for row in rows], "pagination": { "total": total, "page": page, "page_size": page_size, "pages": (total + page_size - 1) // page_size } }
测试数据库交互
创建一个tests
目录和测试文件:
# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg
# Load test environment variables
load_dotenv(".env.test")
# Testing database configuration
TEST_DB_CONFIG = {
"user": os.getenv("TEST_DB_USER"),
"password": os.getenv("TEST_DB_PASSWORD"),
"database": os.getenv("TEST_DB_NAME"),
"host": os.getenv("TEST_DB_HOST", "localhost"),
"port": int(os.getenv("TEST_DB_PORT", "5432")),
}
@pytest.fixture
async def db_pool():
"""Create a test database pool."""
pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
yield pool
await pool.close()
@pytest.fixture
async def setup_test_data(db_pool):
"""Set up test data before tests and clean up after."""
async with db_pool.acquire() as conn:
# Create test tables
await conn.execute("""
CREATE TEMPORARY TABLE users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT NOT NULL DEFAULT 'General',
role TEXT NOT NULL DEFAULT 'User',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
# Insert test users
await conn.executemany(
"""
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
""",
[
("test1", "test1@example.com", "Engineering", "Developer"),
("test2", "test2@example.com", "Marketing", "Manager"),
("test3", "test3@example.com", "Sales", "Representative")
]
)
yield
# Cleanup happens automatically for temporary tables
@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
"""Test that fetching recent users works correctly."""
from resources.users import fetch_recent_users
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Execute the function
result = await fetch_recent_users(limit=10)
# Assertions
assert isinstance(result, list)
assert len(result) == 3
assert result[0]["username"] in ["test1", "test2", "test3"]
assert "email" in result[0]
assert "department" in result[0]
@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
"""Test that creating a user works correctly."""
from tools.users import create_user
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"department": "Finance",
"role": "Analyst"
}
# Execute the function
result = await create_user(user_data)
# Assertions
assert result["status"] == "success"
# Verify in database
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
"newuser"
)
assert user is not None
assert user["email"] == "newuser@example.com"
assert user["department"] == "Finance"
运行测试命令:pytest -xvs tests/
部署注意事项
-
使用环境特定的配置文件:
.env.development
、.env.staging
、.env.production
-
设置数据库迁移以进行架构更改:
pip install alembic alembic init migrations
-
使用Docker部署以保持一致性:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "server.py"]
-
设置健康检查:
@server.route("/health") async def health_check(): try: async with db_pool.acquire() as connection: await connection.fetchval("SELECT 1") return {"status": "healthy", "database": "connected"} except Exception as e: return {"status": "unhealthy", "error": str(e)}
-
监控查询性能:
-- In PostgreSQL CREATE EXTENSION pg_stat_statements; -- To analyze slow queries SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;
-
设置数据库备份:
# Daily backup script pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
结论
将MCP服务器连接到PostgreSQL数据库可以将其从简单的演示转变为生产就绪的AI后端。通过对安全性、性能和可靠性的仔细关注,您可以安全地将数据库暴露给语言模型,从而实现强大的AI驱动工作流程。需要记住的关键原则:资源用于读取操作,工具用于写入操作,对所有输入进行验证,事务用于一致性,连接池用于性能,环境变量用于安全性,结构化错误处理用于可靠性。通过遵循这些实践,您可以使LLM以安全、受控的方式与现有数据库应用程序一起工作——为AI辅助工作流程和数据分析开启新的可能性。在下一个指南中,我们将展示如何通过MCP连接您的现有API后端,而无需重写业务逻辑。
常见问题
是的。您可以创建多个资源和工具处理程序,每个处理程序连接到不同的数据库或架构。
您应该添加try/except处理,向LLM返回清晰的错误,而不是导致崩溃。
仅通过资源和工具公开选定的读取或写入操作。永远不要给LLM不受限制的数据库访问权限。