Como expor sua API REST interna para seu servidor MCP

Se você já tem APIs em execução, não precisa reconstruir tudo para o MCP. Você pode conectar seus endpoints REST ao seu servidor MCP criando ferramentas e recursos simples. Este guia mostrará como fazer isso corretamente, com exemplos funcionais em Python.
Principais pontos
- Você pode conectar sua API REST aos servidores MCP sem reescrever a lógica de negócios
- Use recursos para busca segura de dados e ferramentas para endpoints baseados em ações
- Sempre valide as entradas e trate as falhas REST de forma limpa
Por que conectar uma API REST existente
Muitas empresas já possuem: Serviços de usuário, APIs de gerenciamento de pedidos, CRM ou sistemas de tickets de suporte, APIs de inventário e estoque. O MCP permite que seu LLM leia e atue nesses sistemas existentes sem precisar de acesso direto ao banco de dados. Isso protege seus sistemas internos e permite integrações rápidas e controladas com IA.
O que você precisa
- Uma API REST em execução (pública ou interna)
- SDK do servidor MCP instalado
- HTTPX (ou outro cliente HTTP assíncrono para Python)
Instale o HTTPX:
pip install httpx mcp-server
Você deve ter familiaridade básica com a realização de requisições API em Python.
Passo 1: Instale bibliotecas cliente HTTP
Em vez de reescrever a lógica, você chama sua API REST de dentro dos manipuladores MCP.
Exemplo de configuração básica:
import os
import httpx
# Carregue de variáveis de ambiente para segurança
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Use um cliente assíncrono compartilhado para melhor desempenho:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # Defina um timeout razoável
)
Este cliente será reutilizado em todos os manipuladores MCP.
Passo 2: Exponha endpoints da API como recursos
Recursos buscam dados sem efeitos colaterais. Exemplo: buscar uma lista de usuários ativos.
from mcp_server import MCPServer
import logging
# Configure o logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Obter uma lista de usuários ativos.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Recuperados {len(users)} usuários ativos")
return users
except httpx.HTTPStatusError as e:
logger.error(f"Erro de API: {e.response.status_code} - {e.response.text}")
return {"error": f"Erro de API: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Falha ao conectar à API"}
Esta implementação trata adequadamente os erros e fornece mensagens informativas quando a chamada da API falha.
Passo 3: Exponha ações da API como ferramentas
Ferramentas executam ações que modificam o estado. Exemplo: criar um novo ticket de suporte.
@server.tool(name="create_support_ticket", description="Criar um novo ticket de suporte para um usuário.")
async def create_ticket(data: dict):
# Validação de entrada
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Campos obrigatórios ausentes: user_id, subject e description são obrigatórios"}
# Validar prioridade
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Prioridade inválida. Deve ser uma das seguintes: {', '.join(valid_priorities)}"}
# Preparar payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Ticket #{ticket_data.get('id')} criado para o usuário {user_id}")
return {
"status": "success",
"message": "Ticket criado com sucesso",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Falha ao criar ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Falha ao criar ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Erro de conexão", "details": str(e)}
Boas práticas neste exemplo:
- Validação de entrada antes de acessar a API
- Validação de valores permitidos (prioridades)
- Tratamento estruturado de erros com mensagens informativas
- Retorno do ID do ticket criado para referência
Passo 4: Exemplo completo de servidor funcional
Aqui está a versão completa reunindo tudo:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Configure o logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Carregue a configuração do ambiente
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Crie o servidor
server = MCPServer(
name="REST API MCP Server",
version="1.0.0",
description="Conecta LLMs a APIs REST internas"
)
# Crie um cliente HTTP compartilhado
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# Recursos: Busca segura de dados
@server.resource(name="active_users", description="Obter usuários ativos do sistema CRM.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Recuperados {len(users)} usuários ativos")
return users
except httpx.HTTPStatusError as e:
logger.error(f"Erro de API: {e.response.status_code} - {e.response.text}")
return {"error": f"Erro de API: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Falha ao conectar à API"}
@server.resource(name="user_details", description="Obter informações detalhadas sobre um usuário específico.")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "Parâmetro user_id ausente"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"Erro de API: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"Usuário com ID {user_id} não encontrado"}
return {"error": f"Erro de API: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Falha ao conectar à API"}
# Ferramentas: Ações que podem modificar o estado
@server.tool(name="create_support_ticket", description="Criar um ticket de suporte.")
async def create_ticket(data: dict):
# Validação de entrada
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Campos obrigatórios ausentes: user_id, subject e description são obrigatórios"}
# Validar prioridade
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Prioridade inválida. Deve ser uma das seguintes: {', '.join(valid_priorities)}"}
# Preparar payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Ticket #{ticket_data.get('id')} criado para o usuário {user_id}")
return {
"status": "success",
"message": "Ticket criado com sucesso",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Falha ao criar ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Falha ao criar ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Erro de conexão", "details": str(e)}
@server.tool(name="update_ticket_status", description="Atualizar o status de um ticket de suporte existente.")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "ticket_id ou status ausente"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"Status inválido. Deve ser um dos seguintes: {', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"Status do ticket #{ticket_id} atualizado para {new_status}")
return {"status": "success", "message": f"Status do ticket atualizado para {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"Ticket {ticket_id} não encontrado"}
logger.error(f"Erro de API: {status_code} - {e.response.text}")
return {"error": f"Falha ao atualizar ticket: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"Erro de requisição: {str(e)}")
return {"error": "Erro de conexão", "details": str(e)}
# Gerenciamento do ciclo de vida do servidor
async def startup():
logger.info("Iniciando servidor MCP...")
# Qualquer código de inicialização
async def shutdown():
logger.info("Desligando servidor MCP...")
await client.aclose() # Feche as conexões do cliente HTTP
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Recursos aprimorados:
- Encerramento adequado do cliente usando
aclose()
- Logging estruturado em todo o código
- Configuração por variáveis de ambiente
- Gerenciamento do ciclo de vida do servidor
- Endpoints de exemplo adicionais
- Tratamento abrangente de erros
Melhores práticas para configurações do mundo real
-
Repetir requisições com falha: Implemente recuo exponencial para falhas transitórias:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
Respeitar limites de taxa: Implemente um limitador de taxa para evitar throttling da API:
from aiolimiter import AsyncLimiter # Máximo de 10 requisições por segundo rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
Credenciais seguras: Use uma solução adequada de gerenciamento de segredos:
from dotenv import load_dotenv # Carregue variáveis de ambiente do arquivo .env load_dotenv() # Ou use uma solução baseada em nuvem como AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
Lidar com timeouts: Configure os timeouts do cliente adequadamente:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Timeout de conexão read=30.0, # Timeout de leitura write=30.0, # Timeout de escrita pool=60.0 # Timeout de pool ) )
-
Auditoria de acesso: Implemente logging estruturado com contexto:
@server.middleware async def audit_middleware(request, next_handler): # Gere um ID de requisição request_id = str(uuid.uuid4()) # Registre a requisição recebida logger.info(f"Requisição {request_id}: {request.method} {request.path}") # Adicione contexto ao logger with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Registre o resultado logger.info(f"Requisição {request_id} concluída com status {response.status_code}") return response
Conclusão
Conectar suas APIs REST existentes aos servidores MCP permite que você desbloqueie dados e ações do mundo real sem reescrever backends. Com um design cuidadoso e validação, você pode construir agentes de IA poderosos e prontos para produção — de forma segura e rápida.
No próximo guia, mostraremos como acionar jobs em segundo plano através do seu servidor MCP com execução assíncrona de ferramentas.
Perguntas Frequentes
Sim. Cada recurso ou ferramenta pode chamar uma API diferente, se necessário.
Apenas se você validar estritamente as entradas e controlar cuidadosamente quais endpoints são acessíveis. Assuma que o LLM tentará tudo.
Sempre prefira assíncronas (httpx.AsyncClient) dentro dos servidores MCP para evitar bloquear o loop de eventos.