Como estender seu servidor MCP com acesso a banco de dados

Conectar seu servidor MCP a um banco de dados real o transforma de uma simples demonstração em um backend de IA pronto para produção. Neste guia, você verá exatamente como integrar o PostgreSQL ao seu servidor MCP e expô-lo com segurança para um LLM.
Principais aprendizados
- Você pode conectar o PostgreSQL ao seu servidor MCP com asyncpg
- Os recursos permitem que LLMs busquem dados estruturados em tempo real
- As ferramentas permitem que LLMs insiram, atualizem ou excluam registros com segurança
- Validação de entrada e gerenciamento de transações são essenciais para uso em produção
- A configuração do ambiente mantém suas credenciais seguras
Por que conectar um banco de dados ao MCP
Sem acesso ao banco de dados, seu LLM está cego para os dados reais de sua aplicação. Ao conectá-lo, você pode: Permitir que a IA responda perguntas baseadas em usuários reais, pedidos, tickets, etc; Automatizar ações como criar entradas ou atualizar registros; Construir agentes internos inteligentes sem precisar de APIs separadas; Habilitar respostas de IA sensíveis ao contexto usando o estado da sua aplicação; Criar análises baseadas em IA sobre seus dados de negócios. Este é o primeiro passo para transformar um modelo em um assistente de aplicação real que fornece valor genuíno ao negócio.
O que você precisa antes de começar
- Um banco de dados PostgreSQL em execução (v12+)
- Um servidor Python MCP (implementação básica)
- Python 3.10+ com suporte assíncrono
- A biblioteca
asyncpg
para acesso ao banco de dados - O pacote
mcp-server
(SDK oficial Python) - Python-dotenv para configuração de ambiente
- Conhecimento básico de SQL e Python assíncrono
Visão geral da arquitetura
A arquitetura envolve vários componentes: (1) Cliente LLM: Claude ou outro LLM que se comunica via protocolo MCP, (2) Servidor MCP: Seu servidor Python expondo recursos e ferramentas, (3) Pool de Conexões: Gerencia conexões de banco de dados de forma eficiente, (4) PostgreSQL: O banco de dados subjacente armazenando os dados da sua aplicação.
Esta configuração segue uma separação limpa de responsabilidades: Recursos fornecem acesso somente leitura para consultas, Ferramentas permitem operações de escrita controladas, Pooling de conexões otimiza o desempenho, Configuração de ambiente mantém as credenciais seguras.
Passo 1: Instalar e configurar dependências do banco de dados
Primeiro, instale os pacotes necessários:
pip install asyncpg python-dotenv mcp-server
Crie uma estrutura de projeto:
mcp-db-server/
├── .env # Variáveis de ambiente (nunca comitar para o git)
├── requirements.txt # Dependências
├── server.py # Arquivo principal do servidor
├── database.py # Módulo de conexão com o banco de dados
├── resources/ # Recursos do banco de dados
│ ├── __init__.py
│ └── users.py # Recursos relacionados a usuários
└── tools/ # Ferramentas do banco de dados
├── __init__.py
└── users.py # Ferramentas relacionadas a usuários
Passo 2: Configurar as variáveis de ambiente
Crie um arquivo .env
para suas credenciais de banco de dados:
DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432
Nunca comite este arquivo para controle de versão. Adicione-o ao .gitignore
:
# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class
Crie um arquivo database.py
para carregar essas variáveis de ambiente:
import os
import asyncpg
from dotenv import load_dotenv
# Carrega variáveis de ambiente
load_dotenv()
# Configuração do banco de dados
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
Passo 3: Criar pool de conexões do banco de dados
Estenda seu arquivo database.py
para incluir o pool de conexões:
import os
import asyncpg
import logging
from dotenv import load_dotenv
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")
# Carregar variáveis de ambiente
load_dotenv()
# Configuração do banco de dados
DB_CONFIG = {
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
"database": os.getenv("DB_NAME"),
"host": os.getenv("DB_HOST", "localhost"),
"port": int(os.getenv("DB_PORT", "5432")),
}
# Variável global do pool
db_pool = None
async def init_db():
"""Inicializa o pool de conexões do banco de dados."""
global db_pool
try:
db_pool = await asyncpg.create_pool(
**DB_CONFIG,
min_size=1,
max_size=10,
command_timeout=60,
timeout=10, # Tempo limite para aquisição de conexão
)
logger.info("Pool de conexões do banco de dados estabelecido")
# Testar a conexão
async with db_pool.acquire() as connection:
version = await connection.fetchval("SELECT version();")
logger.info(f"Conectado ao PostgreSQL: {version}")
return db_pool
except Exception as e:
logger.error(f"Falha ao criar pool de banco de dados: {e}")
raise
async def close_db():
"""Fecha o pool de conexões do banco de dados."""
global db_pool
if db_pool:
await db_pool.close()
logger.info("Pool de conexões do banco de dados fechado")
Isso fornece: Um pool de conexões configurado adequadamente, Uso de variáveis de ambiente para segurança, Logging para monitoramento, Otimização do tamanho do pool, Tratamento de timeout de conexão, Função explícita de fechamento para desligamentos limpos.
Passo 4: Expor um recurso somente leitura
Crie resources/users.py
para expor dados de usuários:
from typing import List, Dict, Any, Optional
import logging
from database import db_pool
logger = logging.getLogger("mcp_database.resources.users")
async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
"""
Busca os usuários mais recentes do banco de dados.
Args:
limit: Número máximo de usuários a retornar (padrão: 20)
Returns:
Lista de objetos de usuário
"""
try:
if not db_pool:
logger.error("Pool de banco de dados não inicializado")
return {"error": "Conexão com banco de dados não disponível"}
async with db_pool.acquire() as connection:
# Use uma consulta parametrizada para segurança
query = """
SELECT id, username, email, created_at
FROM users
ORDER BY created_at DESC
LIMIT $1;
"""
rows = await connection.fetch(query, limit)
# Converte para dicionários e trata serialização de datetime
users = []
for row in rows:
user = dict(row)
# Converte datetime para string formato ISO para serialização JSON
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Buscados {len(users)} usuários recentes")
return users
except Exception as e:
logger.error(f"Erro ao buscar usuários recentes: {e}")
return {"error": f"Erro de banco de dados: {str(e)}"}
Agora, atualize seu server.py
para registrar este recurso:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users
# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# Criar servidor MCP
server = MCPServer()
# Registrar o recurso
@server.resource(
name="recent_users",
description="Busca os usuários mais recentes do banco de dados."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
async def main():
try:
# Inicializar o banco de dados
await init_db()
# Iniciar o servidor
logger.info("Iniciando servidor MCP...")
await server.start()
except Exception as e:
logger.error(f"Erro no servidor: {e}")
finally:
# Fechar conexões de banco de dados ao desligar
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Passo 5: Implementar capacidades de consulta avançadas
Crie um recurso que permita consultas mais flexíveis com parâmetros:
async def fetch_users_by_criteria(
department: Optional[str] = None,
role: Optional[str] = None,
active: Optional[bool] = True,
limit: int = 20
) -> List[Dict[str, Any]]:
"""
Busca usuários que correspondem a critérios específicos.
Args:
department: Filtrar por departamento (opcional)
role: Filtrar por função (opcional)
active: Filtrar por status ativo (padrão: True)
limit: Máximo de resultados a retornar (padrão: 20)
Returns:
Lista de objetos de usuário correspondentes
"""
try:
if not db_pool:
logger.error("Pool de banco de dados não inicializado")
return {"error": "Conexão com banco de dados não disponível"}
# Construir consulta dinâmica
conditions = ["active = $1"]
params = [active]
param_count = 1
if department:
param_count += 1
conditions.append(f"department = ${param_count}")
params.append(department)
if role:
param_count += 1
conditions.append(f"role = ${param_count}")
params.append(role)
# Construir a consulta final
query = f"""
SELECT id, username, email, department, role, created_at
FROM users
WHERE {' AND '.join(conditions)}
ORDER BY created_at DESC
LIMIT ${param_count + 1};
"""
params.append(limit)
async with db_pool.acquire() as connection:
rows = await connection.fetch(query, *params)
# Converter para dicionários e tratar serialização de datetime
users = []
for row in rows:
user = dict(row)
if "created_at" in user and user["created_at"]:
user["created_at"] = user["created_at"].isoformat()
users.append(user)
logger.info(f"Buscados {len(users)} usuários correspondentes aos critérios")
return users
except Exception as e:
logger.error(f"Erro ao buscar usuários por critérios: {e}")
return {"error": f"Erro de banco de dados: {str(e)}"}
Registre isso como um recurso parametrizado:
@server.resource(
name="users_by_criteria",
description="Busca usuários correspondentes a critérios específicos como departamento ou função."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
Isso permite que o LLM solicite subconjuntos específicos de usuários com base nas necessidades do negócio.
Passo 6: Criar uma ferramenta segura para inserir novos registros
Crie tools/users.py
para operações de escrita:
from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool
logger = logging.getLogger("mcp_database.tools.users")
class CreateUserRequest(BaseModel):
"""Modelo de validação para solicitações de criação de usuário."""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
department: Optional[str] = "General"
role: Optional[str] = "User"
@validator('username')
def username_alphanumeric(cls, v):
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Nome de usuário deve ser alfanumérico')
return v
async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
"""
Cria um novo usuário no banco de dados.
Args:
data: Dados do usuário contendo username, email, etc.
Returns:
Resposta com status e informações do usuário
"""
try:
# Validar os dados de entrada com Pydantic
user_data = CreateUserRequest(**data)
if not db_pool:
logger.error("Pool de banco de dados não inicializado")
return {
"status": "error",
"message": "Conexão com banco de dados não disponível"
}
async with db_pool.acquire() as connection:
# Verificar se o usuário já existe
existing_user = await connection.fetchrow(
"SELECT id FROM users WHERE username = $1 OR email = $2",
user_data.username,
user_data.email
)
if existing_user:
return {
"status": "error",
"message": "Usuário com este nome de usuário ou email já existe"
}
# Inserir o novo usuário
query = """
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
RETURNING id;
"""
user_id = await connection.fetchval(
query,
user_data.username,
user_data.email,
user_data.department,
user_data.role
)
logger.info(f"Criado novo usuário: {user_data.username} (ID: {user_id})")
return {
"status": "success",
"message": f"Usuário {user_data.username} criado com sucesso",
"user_id": user_id
}
except Exception as e:
logger.error(f"Erro ao criar usuário: {e}")
return {
"status": "error",
"message": f"Falha ao criar usuário: {str(e)}"
}
Registre esta ferramenta em server.py
:
from tools.users import create_user
# ... código existente ...
# Registrar a ferramenta
@server.tool(
name="create_user",
description="Cria um novo usuário no banco de dados."
)
async def create_user_tool(data: dict):
return await create_user(data)
Recursos de segurança adicionados: Validação Pydantic com restrições claras, Validação de email usando EmailStr, Validação de formato de nome de usuário com regex, Verificação de duplicidade antes da inserção, Mensagens de erro claras, Logging abrangente, Consultas parametrizadas para prevenir injeção SQL.
Passo 7: Gerenciamento de transações
Para operações que requerem múltiplas alterações no banco de dados, use transações:
async def transfer_user_to_department(
user_id: int,
new_department: str
) -> Dict[str, Any]:
"""
Transfere um usuário para um novo departamento, registrando a alteração no log de auditoria.
Args:
user_id: ID do usuário a ser transferido
new_department: Nome do departamento de destino
Returns:
Status da operação
"""
try:
if not db_pool:
return {"error": "Conexão com banco de dados não disponível"}
async with db_pool.acquire() as connection:
# Iniciar uma transação
async with connection.transaction():
# Obter departamento atual
current_dept = await connection.fetchval(
"SELECT department FROM users WHERE id = $1",
user_id
)
if not current_dept:
return {"error": "Usuário não encontrado"}
# Atualizar o departamento do usuário
await connection.execute(
"UPDATE users SET department = $1 WHERE id = $2",
new_department,
user_id
)
# Registrar a alteração no log de auditoria
await connection.execute(
"""
INSERT INTO user_audit_log
(user_id, field_changed, old_value, new_value)
VALUES ($1, $2, $3, $4)
""",
user_id,
"department",
current_dept,
new_department
)
logger.info(f"Transferido usuário {user_id} de {current_dept} para {new_department}")
return {
"status": "success",
"message": f"Usuário transferido de {current_dept} para {new_department}"
}
except Exception as e:
logger.error(f"Erro ao transferir usuário: {e}")
return {"error": f"Transferência falhou: {str(e)}"}
Registre isso como uma ferramenta:
@server.tool(
name="transfer_user",
description="Transfere um usuário para um novo departamento."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Faltando user_id ou new_department"}
return await transfer_user_to_department(user_id, new_department)
Isso garante que ambas as operações (atualizar usuário + adicionar log de auditoria) sejam bem-sucedidas ou falhem juntas.
Passo 8: Exemplo completo de código do servidor
Aqui está um exemplo completo reunindo tudo:
import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")
# Criar servidor MCP
server = MCPServer(
name="DatabaseMCPServer",
version="1.0.0",
description="Servidor MCP com acesso ao banco de dados PostgreSQL"
)
# Registrar recursos
@server.resource(
name="recent_users",
description="Busca os usuários mais recentes do banco de dados."
)
async def recent_users_resource():
return await fetch_recent_users(limit=20)
@server.resource(
name="users_by_criteria",
description="Busca usuários correspondentes a critérios específicos como departamento ou função."
)
async def users_by_criteria_resource(data: dict):
return await fetch_users_by_criteria(
department=data.get("department"),
role=data.get("role"),
active=data.get("active", True),
limit=data.get("limit", 20)
)
# Registrar ferramentas
@server.tool(
name="create_user",
description="Cria um novo usuário no banco de dados."
)
async def create_user_tool(data: dict):
return await create_user(data)
@server.tool(
name="transfer_user",
description="Transfere um usuário para um novo departamento."
)
async def transfer_user_tool(data: dict):
user_id = data.get("user_id")
new_department = data.get("new_department")
if not user_id or not new_department:
return {"error": "Faltando user_id ou new_department"}
return await transfer_user_to_department(user_id, new_department)
async def main():
try:
# Inicializar o banco de dados
await init_db()
# Iniciar o servidor
logger.info("Iniciando servidor MCP...")
await server.start()
except Exception as e:
logger.error(f"Erro no servidor: {e}")
finally:
# Fechar conexões de banco de dados ao desligar
await close_db()
if __name__ == "__main__":
asyncio.run(main())
Considerações de segurança
Ao conectar seu servidor MCP a um banco de dados, siga estas melhores práticas de segurança:
-
Use um usuário de banco de dados dedicado com permissões limitadas:
CREATE USER mcp_user WITH PASSWORD 'secure_password'; GRANT SELECT, INSERT ON users TO mcp_user; -- Conceda apenas o necessário
-
Valide todas as entradas usando modelos Pydantic com regras de validação rigorosas.
-
Use consultas parametrizadas exclusivamente para prevenir injeção SQL.
-
Implemente autenticação para seu servidor MCP:
@server.middleware async def auth_middleware(request, next_handler): token = request.headers.get("Authorization") if not token or not verify_token(token): return {"error": "Não autorizado"} return await next_handler(request)
-
Implemente limitação de taxa para prevenir abusos:
# Limitador de taxa simples em memória request_counts = {} @server.middleware async def rate_limit_middleware(request, next_handler): client_id = request.client.host current_time = time.time() # Limpar entradas antigas request_counts = {k: v for k, v in request_counts.items() if v["timestamp"] > current_time - 60} if client_id in request_counts: if request_counts[client_id]["count"] > 100: # 100 requisições por minuto return {"error": "Limite de taxa excedido"} request_counts[client_id]["count"] += 1 else: request_counts[client_id] = {"count": 1, "timestamp": current_time} return await next_handler(request)
-
Implemente registro de requisições para auditoria:
@server.middleware async def logging_middleware(request, next_handler): logger.info(f"Requisição: {request.method} {request.path} de {request.client.host}") response = await next_handler(request) return response
-
Configure timeouts de consulta de banco de dados para evitar que consultas de longa duração afetem o desempenho do servidor.
Otimização de desempenho
-
Pool de conexões já está implementado, mas ajuste os parâmetros com base na sua carga de trabalho:
db_pool = await asyncpg.create_pool( **DB_CONFIG, min_size=5, # Defina mais alto para cenários de alta carga max_size=20, # Ajuste com base na capacidade do seu servidor de banco de dados statement_cache_size=100, # Cache de declarações preparadas max_inactive_connection_lifetime=300 # Segundos antes de reciclar conexões ociosas )
-
Crie índices de banco de dados para campos frequentemente consultados:
CREATE INDEX idx_users_department ON users(department); CREATE INDEX idx_users_created_at ON users(created_at DESC);
-
Implemente cache de resultados para dados frequentemente acessados e raramente alterados:
from functools import lru_cache from datetime import datetime, timedelta # Cache que expira após 5 minutos cache_time = None cached_result = None async def fetch_departments_with_caching(): global cache_time, cached_result # Verificar se o cache é válido current_time = datetime.now() if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5): return cached_result # Cache miss - buscar do banco de dados async with db_pool.acquire() as connection: result = await connection.fetch("SELECT * FROM departments") # Atualizar cache cache_time = current_time cached_result = [dict(row) for row in result] return cached_result
-
Use JSONB para dados complexos que não precisam ser consultados extensivamente:
CREATE TABLE user_preferences ( user_id INTEGER PRIMARY KEY, preferences JSONB NOT NULL );
-
Paginação para grandes conjuntos de resultados:
async def fetch_paginated_users(page: int = 1, page_size: int = 20): offset = (page - 1) * page_size async with db_pool.acquire() as connection: # Obter contagem total total = await connection.fetchval("SELECT COUNT(*) FROM users") # Obter resultados paginados rows = await connection.fetch( "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2", page_size, offset ) return { "users": [dict(row) for row in rows], "pagination": { "total": total, "page": page, "page_size": page_size, "pages": (total + page_size - 1) // page_size } }
Testando interações com o banco de dados
Crie um diretório tests
com um arquivo de teste:
# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg
# Load test environment variables
load_dotenv(".env.test")
# Testing database configuration
TEST_DB_CONFIG = {
"user": os.getenv("TEST_DB_USER"),
"password": os.getenv("TEST_DB_PASSWORD"),
"database": os.getenv("TEST_DB_NAME"),
"host": os.getenv("TEST_DB_HOST", "localhost"),
"port": int(os.getenv("TEST_DB_PORT", "5432")),
}
@pytest.fixture
async def db_pool():
"""Create a test database pool."""
pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
yield pool
await pool.close()
@pytest.fixture
async def setup_test_data(db_pool):
"""Set up test data before tests and clean up after."""
async with db_pool.acquire() as conn:
# Create test tables
await conn.execute("""
CREATE TEMPORARY TABLE users (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
department TEXT NOT NULL DEFAULT 'General',
role TEXT NOT NULL DEFAULT 'User',
active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
# Insert test users
await conn.executemany(
"""
INSERT INTO users (username, email, department, role)
VALUES ($1, $2, $3, $4)
""",
[
("test1", "test1@example.com", "Engineering", "Developer"),
("test2", "test2@example.com", "Marketing", "Manager"),
("test3", "test3@example.com", "Sales", "Representative")
]
)
yield
# Cleanup happens automatically for temporary tables
@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
"""Test that fetching recent users works correctly."""
from resources.users import fetch_recent_users
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Execute the function
result = await fetch_recent_users(limit=10)
# Assertions
assert isinstance(result, list)
assert len(result) == 3
assert result[0]["username"] in ["test1", "test2", "test3"]
assert "email" in result[0]
assert "department" in result[0]
@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
"""Test that creating a user works correctly."""
from tools.users import create_user
# Monkeypatch db_pool
import database
database.db_pool = db_pool
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"department": "Finance",
"role": "Analyst"
}
# Execute the function
result = await create_user(user_data)
# Assertions
assert result["status"] == "success"
# Verify in database
async with db_pool.acquire() as conn:
user = await conn.fetchrow(
"SELECT * FROM users WHERE username = $1",
"newuser"
)
assert user is not None
assert user["email"] == "newuser@example.com"
assert user["department"] == "Finance"
Execute os testes com: pytest -xvs tests/
Considerações para implantação
-
Use arquivos de configuração específicos para cada ambiente:
.env.development
,.env.staging
,.env.production
-
Configure migrações de banco de dados para alterações de esquema:
pip install alembic alembic init migrations
-
Implante com Docker para consistência:
FROM python:3.10-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD ["python", "server.py"]
-
Configure verificações de saúde:
@server.route("/health") async def health_check(): try: async with db_pool.acquire() as connection: await connection.fetchval("SELECT 1") return {"status": "healthy", "database": "connected"} except Exception as e: return {"status": "unhealthy", "error": str(e)}
-
Monitore o desempenho das consultas:
-- No PostgreSQL CREATE EXTENSION pg_stat_statements; -- Para analisar consultas lentas SELECT query, calls, total_time, mean_time FROM pg_stat_statements ORDER BY mean_time DESC LIMIT 10;
-
Configure backups do banco de dados:
# Script de backup diário pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump
Conclusão
Conectar seu servidor MCP a um banco de dados PostgreSQL o transforma de uma simples demonstração em um backend de IA pronto para produção. Com atenção cuidadosa à segurança, desempenho e confiabilidade, você pode expor seu banco de dados aos modelos de linguagem com segurança, permitindo fluxos de trabalho poderosos orientados por IA. Princípios-chave a lembrar: Resources para operações de leitura, Tools para operações de escrita, Validação para todas as entradas, Transações para consistência, Pooling de conexões para desempenho, Variáveis de ambiente para segurança, Tratamento estruturado de erros para confiabilidade. Seguindo essas práticas, você permite que LLMs trabalhem junto com suas aplicações de banco de dados existentes de maneira segura e controlada — desbloqueando novas possibilidades para fluxos de trabalho e análise de dados assistidos por IA. No próximo guia, mostraremos como conectar seu backend de API existente via MCP sem reescrever a lógica de negócios.
Perguntas Frequentes
Sim. Você pode criar vários manipuladores de recursos e ferramentas, cada um conectando-se a um banco de dados ou esquema diferente.
Você deve adicionar tratamento try/except para retornar um erro claro para o LLM em vez de travar.
Exponha apenas operações selecionadas de leitura ou escrita por meio de recursos e ferramentas. Nunca dê ao LLM acesso irrestrito ao banco de dados.