Back

Como estender seu servidor MCP com acesso a banco de dados

Como estender seu servidor MCP com acesso a banco de dados

Conectar seu servidor MCP a um banco de dados real o transforma de uma simples demonstração em um backend de IA pronto para produção. Neste guia, você verá exatamente como integrar o PostgreSQL ao seu servidor MCP e expô-lo com segurança para um LLM.

Principais aprendizados

  • Você pode conectar o PostgreSQL ao seu servidor MCP com asyncpg
  • Os recursos permitem que LLMs busquem dados estruturados em tempo real
  • As ferramentas permitem que LLMs insiram, atualizem ou excluam registros com segurança
  • Validação de entrada e gerenciamento de transações são essenciais para uso em produção
  • A configuração do ambiente mantém suas credenciais seguras

Por que conectar um banco de dados ao MCP

Sem acesso ao banco de dados, seu LLM está cego para os dados reais de sua aplicação. Ao conectá-lo, você pode: Permitir que a IA responda perguntas baseadas em usuários reais, pedidos, tickets, etc; Automatizar ações como criar entradas ou atualizar registros; Construir agentes internos inteligentes sem precisar de APIs separadas; Habilitar respostas de IA sensíveis ao contexto usando o estado da sua aplicação; Criar análises baseadas em IA sobre seus dados de negócios. Este é o primeiro passo para transformar um modelo em um assistente de aplicação real que fornece valor genuíno ao negócio.

O que você precisa antes de começar

  • Um banco de dados PostgreSQL em execução (v12+)
  • Um servidor Python MCP (implementação básica)
  • Python 3.10+ com suporte assíncrono
  • A biblioteca asyncpg para acesso ao banco de dados
  • O pacote mcp-server (SDK oficial Python)
  • Python-dotenv para configuração de ambiente
  • Conhecimento básico de SQL e Python assíncrono

Visão geral da arquitetura

A arquitetura envolve vários componentes: (1) Cliente LLM: Claude ou outro LLM que se comunica via protocolo MCP, (2) Servidor MCP: Seu servidor Python expondo recursos e ferramentas, (3) Pool de Conexões: Gerencia conexões de banco de dados de forma eficiente, (4) PostgreSQL: O banco de dados subjacente armazenando os dados da sua aplicação.

Esta configuração segue uma separação limpa de responsabilidades: Recursos fornecem acesso somente leitura para consultas, Ferramentas permitem operações de escrita controladas, Pooling de conexões otimiza o desempenho, Configuração de ambiente mantém as credenciais seguras.

Passo 1: Instalar e configurar dependências do banco de dados

Primeiro, instale os pacotes necessários:

pip install asyncpg python-dotenv mcp-server

Crie uma estrutura de projeto:

mcp-db-server/
├── .env                  # Variáveis de ambiente (nunca comitar para o git)
├── requirements.txt      # Dependências
├── server.py             # Arquivo principal do servidor
├── database.py           # Módulo de conexão com o banco de dados
├── resources/            # Recursos do banco de dados
│   ├── __init__.py
│   └── users.py          # Recursos relacionados a usuários
└── tools/                # Ferramentas do banco de dados
    ├── __init__.py
    └── users.py          # Ferramentas relacionadas a usuários

Passo 2: Configurar as variáveis de ambiente

Crie um arquivo .env para suas credenciais de banco de dados:

DB_USER=your_db_user
DB_PASSWORD=your_db_password
DB_NAME=your_db_name
DB_HOST=localhost
DB_PORT=5432

Nunca comite este arquivo para controle de versão. Adicione-o ao .gitignore:

# .gitignore
.env
__pycache__/
*.py[cod]
*$py.class

Crie um arquivo database.py para carregar essas variáveis de ambiente:

import os
import asyncpg
from dotenv import load_dotenv

# Carrega variáveis de ambiente
load_dotenv()

# Configuração do banco de dados
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

Passo 3: Criar pool de conexões do banco de dados

Estenda seu arquivo database.py para incluir o pool de conexões:

import os
import asyncpg
import logging
from dotenv import load_dotenv

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_database")

# Carregar variáveis de ambiente
load_dotenv()

# Configuração do banco de dados
DB_CONFIG = {
    "user": os.getenv("DB_USER"),
    "password": os.getenv("DB_PASSWORD"),
    "database": os.getenv("DB_NAME"),
    "host": os.getenv("DB_HOST", "localhost"),
    "port": int(os.getenv("DB_PORT", "5432")),
}

# Variável global do pool
db_pool = None

async def init_db():
    """Inicializa o pool de conexões do banco de dados."""
    global db_pool
    try:
        db_pool = await asyncpg.create_pool(
            **DB_CONFIG,
            min_size=1,
            max_size=10,
            command_timeout=60,
            timeout=10,  # Tempo limite para aquisição de conexão
        )
        logger.info("Pool de conexões do banco de dados estabelecido")
        # Testar a conexão
        async with db_pool.acquire() as connection:
            version = await connection.fetchval("SELECT version();")
            logger.info(f"Conectado ao PostgreSQL: {version}")
        return db_pool
    except Exception as e:
        logger.error(f"Falha ao criar pool de banco de dados: {e}")
        raise

async def close_db():
    """Fecha o pool de conexões do banco de dados."""
    global db_pool
    if db_pool:
        await db_pool.close()
        logger.info("Pool de conexões do banco de dados fechado")

Isso fornece: Um pool de conexões configurado adequadamente, Uso de variáveis de ambiente para segurança, Logging para monitoramento, Otimização do tamanho do pool, Tratamento de timeout de conexão, Função explícita de fechamento para desligamentos limpos.

Passo 4: Expor um recurso somente leitura

Crie resources/users.py para expor dados de usuários:

from typing import List, Dict, Any, Optional
import logging
from database import db_pool

logger = logging.getLogger("mcp_database.resources.users")

async def fetch_recent_users(limit: int = 20) -> List[Dict[str, Any]]:
    """
    Busca os usuários mais recentes do banco de dados.
    
    Args:
        limit: Número máximo de usuários a retornar (padrão: 20)
        
    Returns:
        Lista de objetos de usuário
    """
    try:
        if not db_pool:
            logger.error("Pool de banco de dados não inicializado")
            return {"error": "Conexão com banco de dados não disponível"}
            
        async with db_pool.acquire() as connection:
            # Use uma consulta parametrizada para segurança
            query = """
                SELECT id, username, email, created_at 
                FROM users 
                ORDER BY created_at DESC 
                LIMIT $1;
            """
            rows = await connection.fetch(query, limit)
            
            # Converte para dicionários e trata serialização de datetime
            users = []
            for row in rows:
                user = dict(row)
                # Converte datetime para string formato ISO para serialização JSON
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Buscados {len(users)} usuários recentes")
            return users
    except Exception as e:
        logger.error(f"Erro ao buscar usuários recentes: {e}")
        return {"error": f"Erro de banco de dados: {str(e)}"}

Agora, atualize seu server.py para registrar este recurso:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db
from resources.users import fetch_recent_users

# Configurar logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")

# Criar servidor MCP
server = MCPServer()

# Registrar o recurso
@server.resource(
    name="recent_users", 
    description="Busca os usuários mais recentes do banco de dados."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

async def main():
    try:
        # Inicializar o banco de dados
        await init_db()
        
        # Iniciar o servidor
        logger.info("Iniciando servidor MCP...")
        await server.start()
    except Exception as e:
        logger.error(f"Erro no servidor: {e}")
    finally:
        # Fechar conexões de banco de dados ao desligar
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Passo 5: Implementar capacidades de consulta avançadas

Crie um recurso que permita consultas mais flexíveis com parâmetros:

async def fetch_users_by_criteria(
    department: Optional[str] = None,
    role: Optional[str] = None,
    active: Optional[bool] = True,
    limit: int = 20
) -> List[Dict[str, Any]]:
    """
    Busca usuários que correspondem a critérios específicos.
    
    Args:
        department: Filtrar por departamento (opcional)
        role: Filtrar por função (opcional)
        active: Filtrar por status ativo (padrão: True)
        limit: Máximo de resultados a retornar (padrão: 20)
        
    Returns:
        Lista de objetos de usuário correspondentes
    """
    try:
        if not db_pool:
            logger.error("Pool de banco de dados não inicializado")
            return {"error": "Conexão com banco de dados não disponível"}
            
        # Construir consulta dinâmica
        conditions = ["active = $1"]
        params = [active]
        param_count = 1
        
        if department:
            param_count += 1
            conditions.append(f"department = ${param_count}")
            params.append(department)
            
        if role:
            param_count += 1
            conditions.append(f"role = ${param_count}")
            params.append(role)
            
        # Construir a consulta final
        query = f"""
            SELECT id, username, email, department, role, created_at 
            FROM users 
            WHERE {' AND '.join(conditions)}
            ORDER BY created_at DESC 
            LIMIT ${param_count + 1};
        """
        params.append(limit)
        
        async with db_pool.acquire() as connection:
            rows = await connection.fetch(query, *params)
            
            # Converter para dicionários e tratar serialização de datetime
            users = []
            for row in rows:
                user = dict(row)
                if "created_at" in user and user["created_at"]:
                    user["created_at"] = user["created_at"].isoformat()
                users.append(user)
                
            logger.info(f"Buscados {len(users)} usuários correspondentes aos critérios")
            return users
    except Exception as e:
        logger.error(f"Erro ao buscar usuários por critérios: {e}")
        return {"error": f"Erro de banco de dados: {str(e)}"}

Registre isso como um recurso parametrizado:

@server.resource(
    name="users_by_criteria", 
    description="Busca usuários correspondentes a critérios específicos como departamento ou função."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

Isso permite que o LLM solicite subconjuntos específicos de usuários com base nas necessidades do negócio.

Passo 6: Criar uma ferramenta segura para inserir novos registros

Crie tools/users.py para operações de escrita:

from typing import Dict, Any, Optional
from pydantic import BaseModel, EmailStr, Field, validator
import logging
import re
from database import db_pool

logger = logging.getLogger("mcp_database.tools.users")

class CreateUserRequest(BaseModel):
    """Modelo de validação para solicitações de criação de usuário."""
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    department: Optional[str] = "General"
    role: Optional[str] = "User"
    
    @validator('username')
    def username_alphanumeric(cls, v):
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Nome de usuário deve ser alfanumérico')
        return v

async def create_user(data: Dict[str, Any]) -> Dict[str, Any]:
    """
    Cria um novo usuário no banco de dados.
    
    Args:
        data: Dados do usuário contendo username, email, etc.
        
    Returns:
        Resposta com status e informações do usuário
    """
    try:
        # Validar os dados de entrada com Pydantic
        user_data = CreateUserRequest(**data)
        
        if not db_pool:
            logger.error("Pool de banco de dados não inicializado")
            return {
                "status": "error",
                "message": "Conexão com banco de dados não disponível"
            }
        
        async with db_pool.acquire() as connection:
            # Verificar se o usuário já existe
            existing_user = await connection.fetchrow(
                "SELECT id FROM users WHERE username = $1 OR email = $2",
                user_data.username,
                user_data.email
            )
            
            if existing_user:
                return {
                    "status": "error",
                    "message": "Usuário com este nome de usuário ou email já existe"
                }
            
            # Inserir o novo usuário
            query = """
                INSERT INTO users (username, email, department, role) 
                VALUES ($1, $2, $3, $4)
                RETURNING id;
            """
            
            user_id = await connection.fetchval(
                query,
                user_data.username,
                user_data.email,
                user_data.department,
                user_data.role
            )
            
            logger.info(f"Criado novo usuário: {user_data.username} (ID: {user_id})")
            
            return {
                "status": "success",
                "message": f"Usuário {user_data.username} criado com sucesso",
                "user_id": user_id
            }
    except Exception as e:
        logger.error(f"Erro ao criar usuário: {e}")
        return {
            "status": "error",
            "message": f"Falha ao criar usuário: {str(e)}"
        }

Registre esta ferramenta em server.py:

from tools.users import create_user

# ... código existente ...

# Registrar a ferramenta
@server.tool(
    name="create_user",
    description="Cria um novo usuário no banco de dados."
)
async def create_user_tool(data: dict):
    return await create_user(data)

Recursos de segurança adicionados: Validação Pydantic com restrições claras, Validação de email usando EmailStr, Validação de formato de nome de usuário com regex, Verificação de duplicidade antes da inserção, Mensagens de erro claras, Logging abrangente, Consultas parametrizadas para prevenir injeção SQL.

Passo 7: Gerenciamento de transações

Para operações que requerem múltiplas alterações no banco de dados, use transações:

async def transfer_user_to_department(
    user_id: int,
    new_department: str
) -> Dict[str, Any]:
    """
    Transfere um usuário para um novo departamento, registrando a alteração no log de auditoria.
    
    Args:
        user_id: ID do usuário a ser transferido
        new_department: Nome do departamento de destino
        
    Returns:
        Status da operação
    """
    try:
        if not db_pool:
            return {"error": "Conexão com banco de dados não disponível"}
            
        async with db_pool.acquire() as connection:
            # Iniciar uma transação
            async with connection.transaction():
                # Obter departamento atual
                current_dept = await connection.fetchval(
                    "SELECT department FROM users WHERE id = $1",
                    user_id
                )
                
                if not current_dept:
                    return {"error": "Usuário não encontrado"}
                
                # Atualizar o departamento do usuário
                await connection.execute(
                    "UPDATE users SET department = $1 WHERE id = $2",
                    new_department,
                    user_id
                )
                
                # Registrar a alteração no log de auditoria
                await connection.execute(
                    """
                    INSERT INTO user_audit_log 
                    (user_id, field_changed, old_value, new_value) 
                    VALUES ($1, $2, $3, $4)
                    """,
                    user_id,
                    "department",
                    current_dept,
                    new_department
                )
                
                logger.info(f"Transferido usuário {user_id} de {current_dept} para {new_department}")
                
                return {
                    "status": "success",
                    "message": f"Usuário transferido de {current_dept} para {new_department}"
                }
    except Exception as e:
        logger.error(f"Erro ao transferir usuário: {e}")
        return {"error": f"Transferência falhou: {str(e)}"}

Registre isso como uma ferramenta:

@server.tool(
    name="transfer_user",
    description="Transfere um usuário para um novo departamento."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Faltando user_id ou new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

Isso garante que ambas as operações (atualizar usuário + adicionar log de auditoria) sejam bem-sucedidas ou falhem juntas.

Passo 8: Exemplo completo de código do servidor

Aqui está um exemplo completo reunindo tudo:

import asyncio
import logging
from mcp_server import MCPServer
from database import init_db, close_db, db_pool
from resources.users import fetch_recent_users, fetch_users_by_criteria
from tools.users import create_user, transfer_user_to_department

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_server")

# Criar servidor MCP
server = MCPServer(
    name="DatabaseMCPServer",
    version="1.0.0",
    description="Servidor MCP com acesso ao banco de dados PostgreSQL"
)

# Registrar recursos
@server.resource(
    name="recent_users", 
    description="Busca os usuários mais recentes do banco de dados."
)
async def recent_users_resource():
    return await fetch_recent_users(limit=20)

@server.resource(
    name="users_by_criteria", 
    description="Busca usuários correspondentes a critérios específicos como departamento ou função."
)
async def users_by_criteria_resource(data: dict):
    return await fetch_users_by_criteria(
        department=data.get("department"),
        role=data.get("role"),
        active=data.get("active", True),
        limit=data.get("limit", 20)
    )

# Registrar ferramentas
@server.tool(
    name="create_user",
    description="Cria um novo usuário no banco de dados."
)
async def create_user_tool(data: dict):
    return await create_user(data)

@server.tool(
    name="transfer_user",
    description="Transfere um usuário para um novo departamento."
)
async def transfer_user_tool(data: dict):
    user_id = data.get("user_id")
    new_department = data.get("new_department")
    
    if not user_id or not new_department:
        return {"error": "Faltando user_id ou new_department"}
        
    return await transfer_user_to_department(user_id, new_department)

async def main():
    try:
        # Inicializar o banco de dados
        await init_db()
        
        # Iniciar o servidor
        logger.info("Iniciando servidor MCP...")
        await server.start()
    except Exception as e:
        logger.error(f"Erro no servidor: {e}")
    finally:
        # Fechar conexões de banco de dados ao desligar
        await close_db()

if __name__ == "__main__":
    asyncio.run(main())

Considerações de segurança

Ao conectar seu servidor MCP a um banco de dados, siga estas melhores práticas de segurança:

  1. Use um usuário de banco de dados dedicado com permissões limitadas:

    CREATE USER mcp_user WITH PASSWORD 'secure_password';
    GRANT SELECT, INSERT ON users TO mcp_user;
    -- Conceda apenas o necessário
    
  2. Valide todas as entradas usando modelos Pydantic com regras de validação rigorosas.

  3. Use consultas parametrizadas exclusivamente para prevenir injeção SQL.

  4. Implemente autenticação para seu servidor MCP:

    @server.middleware
    async def auth_middleware(request, next_handler):
        token = request.headers.get("Authorization")
        if not token or not verify_token(token):
            return {"error": "Não autorizado"}
        return await next_handler(request)
    
  5. Implemente limitação de taxa para prevenir abusos:

    # Limitador de taxa simples em memória
    request_counts = {}
    
    @server.middleware
    async def rate_limit_middleware(request, next_handler):
        client_id = request.client.host
        current_time = time.time()
        
        # Limpar entradas antigas
        request_counts = {k: v for k, v in request_counts.items() 
                         if v["timestamp"] > current_time - 60}
        
        if client_id in request_counts:
            if request_counts[client_id]["count"] > 100:  # 100 requisições por minuto
                return {"error": "Limite de taxa excedido"}
            request_counts[client_id]["count"] += 1
        else:
            request_counts[client_id] = {"count": 1, "timestamp": current_time}
            
        return await next_handler(request)
    
  6. Implemente registro de requisições para auditoria:

    @server.middleware
    async def logging_middleware(request, next_handler):
        logger.info(f"Requisição: {request.method} {request.path} de {request.client.host}")
        response = await next_handler(request)
        return response
    
  7. Configure timeouts de consulta de banco de dados para evitar que consultas de longa duração afetem o desempenho do servidor.

Otimização de desempenho

  1. Pool de conexões já está implementado, mas ajuste os parâmetros com base na sua carga de trabalho:

    db_pool = await asyncpg.create_pool(
        **DB_CONFIG,
        min_size=5,       # Defina mais alto para cenários de alta carga
        max_size=20,      # Ajuste com base na capacidade do seu servidor de banco de dados
        statement_cache_size=100,  # Cache de declarações preparadas
        max_inactive_connection_lifetime=300  # Segundos antes de reciclar conexões ociosas
    )
    
  2. Crie índices de banco de dados para campos frequentemente consultados:

    CREATE INDEX idx_users_department ON users(department);
    CREATE INDEX idx_users_created_at ON users(created_at DESC);
    
  3. Implemente cache de resultados para dados frequentemente acessados e raramente alterados:

    from functools import lru_cache
    from datetime import datetime, timedelta
    
    # Cache que expira após 5 minutos
    cache_time = None
    cached_result = None
    
    async def fetch_departments_with_caching():
        global cache_time, cached_result
        
        # Verificar se o cache é válido
        current_time = datetime.now()
        if cache_time and cached_result and current_time - cache_time < timedelta(minutes=5):
            return cached_result
            
        # Cache miss - buscar do banco de dados
        async with db_pool.acquire() as connection:
            result = await connection.fetch("SELECT * FROM departments")
            
        # Atualizar cache
        cache_time = current_time
        cached_result = [dict(row) for row in result]
        
        return cached_result
    
  4. Use JSONB para dados complexos que não precisam ser consultados extensivamente:

    CREATE TABLE user_preferences (
        user_id INTEGER PRIMARY KEY,
        preferences JSONB NOT NULL
    );
    
  5. Paginação para grandes conjuntos de resultados:

    async def fetch_paginated_users(page: int = 1, page_size: int = 20):
        offset = (page - 1) * page_size
        
        async with db_pool.acquire() as connection:
            # Obter contagem total
            total = await connection.fetchval("SELECT COUNT(*) FROM users")
            
            # Obter resultados paginados
            rows = await connection.fetch(
                "SELECT * FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
                page_size, offset
            )
            
            return {
                "users": [dict(row) for row in rows],
                "pagination": {
                    "total": total,
                    "page": page,
                    "page_size": page_size,
                    "pages": (total + page_size - 1) // page_size
                }
            }
    

Testando interações com o banco de dados

Crie um diretório tests com um arquivo de teste:

# tests/test_database.py
import asyncio
import pytest
import os
from dotenv import load_dotenv
import asyncpg

# Load test environment variables
load_dotenv(".env.test")

# Testing database configuration
TEST_DB_CONFIG = {
    "user": os.getenv("TEST_DB_USER"),
    "password": os.getenv("TEST_DB_PASSWORD"),
    "database": os.getenv("TEST_DB_NAME"),
    "host": os.getenv("TEST_DB_HOST", "localhost"),
    "port": int(os.getenv("TEST_DB_PORT", "5432")),
}

@pytest.fixture
async def db_pool():
    """Create a test database pool."""
    pool = await asyncpg.create_pool(**TEST_DB_CONFIG)
    yield pool
    await pool.close()

@pytest.fixture
async def setup_test_data(db_pool):
    """Set up test data before tests and clean up after."""
    async with db_pool.acquire() as conn:
        # Create test tables
        await conn.execute("""
            CREATE TEMPORARY TABLE users (
                id SERIAL PRIMARY KEY,
                username TEXT UNIQUE NOT NULL,
                email TEXT UNIQUE NOT NULL,
                department TEXT NOT NULL DEFAULT 'General',
                role TEXT NOT NULL DEFAULT 'User',
                active BOOLEAN NOT NULL DEFAULT TRUE,
                created_at TIMESTAMP NOT NULL DEFAULT NOW()
            )
        """)
        
        # Insert test users
        await conn.executemany(
            """
            INSERT INTO users (username, email, department, role) 
            VALUES ($1, $2, $3, $4)
            """,
            [
                ("test1", "test1@example.com", "Engineering", "Developer"),
                ("test2", "test2@example.com", "Marketing", "Manager"),
                ("test3", "test3@example.com", "Sales", "Representative")
            ]
        )
    
    yield
    
    # Cleanup happens automatically for temporary tables

@pytest.mark.asyncio
async def test_fetch_recent_users(db_pool, setup_test_data):
    """Test that fetching recent users works correctly."""
    from resources.users import fetch_recent_users
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Execute the function
    result = await fetch_recent_users(limit=10)
    
    # Assertions
    assert isinstance(result, list)
    assert len(result) == 3
    assert result[0]["username"] in ["test1", "test2", "test3"]
    assert "email" in result[0]
    assert "department" in result[0]

@pytest.mark.asyncio
async def test_create_user(db_pool, setup_test_data):
    """Test that creating a user works correctly."""
    from tools.users import create_user
    
    # Monkeypatch db_pool
    import database
    database.db_pool = db_pool
    
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "department": "Finance",
        "role": "Analyst"
    }
    
    # Execute the function
    result = await create_user(user_data)
    
    # Assertions
    assert result["status"] == "success"
    
    # Verify in database
    async with db_pool.acquire() as conn:
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE username = $1",
            "newuser"
        )
        
    assert user is not None
    assert user["email"] == "newuser@example.com"
    assert user["department"] == "Finance"

Execute os testes com: pytest -xvs tests/

Considerações para implantação

  1. Use arquivos de configuração específicos para cada ambiente: .env.development, .env.staging, .env.production

  2. Configure migrações de banco de dados para alterações de esquema:

    pip install alembic
    alembic init migrations
  3. Implante com Docker para consistência:

    FROM python:3.10-slim
    
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    COPY . .
    
    CMD ["python", "server.py"]
  4. Configure verificações de saúde:

    @server.route("/health")
    async def health_check():
        try:
            async with db_pool.acquire() as connection:
                await connection.fetchval("SELECT 1")
            return {"status": "healthy", "database": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}
  5. Monitore o desempenho das consultas:

    -- No PostgreSQL
    CREATE EXTENSION pg_stat_statements;
    
    -- Para analisar consultas lentas
    SELECT query, calls, total_time, mean_time
    FROM pg_stat_statements
    ORDER BY mean_time DESC
    LIMIT 10;
  6. Configure backups do banco de dados:

    # Script de backup diário
    pg_dump -U user -d database -F c -f /backups/db_$(date +%Y%m%d).dump

Conclusão

Conectar seu servidor MCP a um banco de dados PostgreSQL o transforma de uma simples demonstração em um backend de IA pronto para produção. Com atenção cuidadosa à segurança, desempenho e confiabilidade, você pode expor seu banco de dados aos modelos de linguagem com segurança, permitindo fluxos de trabalho poderosos orientados por IA. Princípios-chave a lembrar: Resources para operações de leitura, Tools para operações de escrita, Validação para todas as entradas, Transações para consistência, Pooling de conexões para desempenho, Variáveis de ambiente para segurança, Tratamento estruturado de erros para confiabilidade. Seguindo essas práticas, você permite que LLMs trabalhem junto com suas aplicações de banco de dados existentes de maneira segura e controlada — desbloqueando novas possibilidades para fluxos de trabalho e análise de dados assistidos por IA. No próximo guia, mostraremos como conectar seu backend de API existente via MCP sem reescrever a lógica de negócios.

Perguntas Frequentes

Sim. Você pode criar vários manipuladores de recursos e ferramentas, cada um conectando-se a um banco de dados ou esquema diferente.

Você deve adicionar tratamento try/except para retornar um erro claro para o LLM em vez de travar.

Exponha apenas operações selecionadas de leitura ou escrita por meio de recursos e ferramentas. Nunca dê ao LLM acesso irrestrito ao banco de dados.

Listen to your bugs 🧘, with OpenReplay

See how users use your app and resolve issues fast.
Loved by thousands of developers