How to expose your internal REST API to your MCP server

If you already have APIs running, you do not need to rebuild everything for MCP. You can bridge your REST endpoints to your MCP server by creating simple tools and resources. This guide will show you how to do it properly, with working Python examples.
Key takeaways
- You can connect your REST API to MCP servers without rewriting business logic
- Use resources for safe data fetching and tools for action-based endpoints
- Always validate inputs and handle REST failures cleanly
Why connect an existing REST API
Many companies already have: User services, Order management APIs, CRM or support ticket systems, Inventory and stock APIs. MCP allows your LLM to read and act on these existing systems without needing direct database access. This protects your internal systems and allows fast, controlled AI integrations.
What you need
- A running REST API (public or internal)
- MCP server SDK installed
- HTTPX (or another async HTTP client for Python)
Install HTTPX:
pip install httpx mcp-server
You should have basic familiarity with making API requests in Python.
Step 1: Install HTTP client libraries
Instead of rewriting logic, you call your REST API from within MCP handlers.
Example base setup:
import os
import httpx
# Load from environment variables for security
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
Use a shared async client for performance:
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={"Authorization": f"Bearer {API_TOKEN}"},
timeout=30.0 # Set a reasonable timeout
)
This client will be reused in all MCP handlers.
Step 2: Expose API endpoints as resources
Resources fetch data without side effects. Example: fetching a list of active users.
from mcp_server import MCPServer
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_api")
server = MCPServer()
@server.resource(name="active_users", description="Get a list of active users.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
This implementation properly handles errors and provides informative messages when the API call fails.
Step 3: Expose API actions as tools
Tools perform actions that modify state. Example: creating a new support ticket.
@server.tool(name="create_support_ticket", description="Create a new support ticket for a user.")
async def create_ticket(data: dict):
# Input validation
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Validate priority
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Prepare payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
Good practices in this example:
- Input validation before hitting the API
- Validation of allowed values (priorities)
- Structured error handling with informative messages
- Returning the created ticket ID for reference
Step 4: Full working server example
Here’s the full version bringing it all together:
import asyncio
import httpx
import os
import logging
from mcp_server import MCPServer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("mcp_api_server")
# Load configuration from environment
API_BASE_URL = os.environ.get("API_BASE_URL", "https://your.api.internal/v1")
API_TOKEN = os.environ.get("API_TOKEN", "your_api_key")
# Create server
server = MCPServer(
name="REST API MCP Server",
version="1.0.0",
description="Connects LLMs to internal REST APIs"
)
# Create shared HTTP client
client = httpx.AsyncClient(
base_url=API_BASE_URL,
headers={
"Authorization": f"Bearer {API_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json"
},
timeout=30.0
)
# Resources: Safe data fetching
@server.resource(name="active_users", description="Get active users from the CRM system.")
async def get_active_users():
try:
response = await client.get("/users", params={"status": "active"})
response.raise_for_status()
users = response.json()
logger.info(f"Retrieved {len(users)} active users")
return users
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
return {"error": f"API error: {e.response.status_code}", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
@server.resource(name="user_details", description="Get detailed information about a specific user.")
async def get_user_details(data: dict):
user_id = data.get("user_id")
if not user_id:
return {"error": "Missing user_id parameter"}
try:
response = await client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"API error: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 404:
return {"error": f"User with ID {user_id} not found"}
return {"error": f"API error: {e.response.status_code}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Failed to connect to API"}
# Tools: Actions that may modify state
@server.tool(name="create_support_ticket", description="Create a support ticket.")
async def create_ticket(data: dict):
# Input validation
user_id = data.get("user_id")
subject = data.get("subject")
description = data.get("description")
priority = data.get("priority", "medium")
if not all([user_id, subject, description]):
return {"error": "Missing required fields: user_id, subject, and description are required"}
# Validate priority
valid_priorities = ["low", "medium", "high", "critical"]
if priority not in valid_priorities:
return {"error": f"Invalid priority. Must be one of: {', '.join(valid_priorities)}"}
# Prepare payload
payload = {
"user_id": user_id,
"subject": subject,
"description": description,
"priority": priority
}
try:
response = await client.post("/tickets", json=payload)
response.raise_for_status()
ticket_data = response.json()
logger.info(f"Created ticket #{ticket_data.get('id')} for user {user_id}")
return {
"status": "success",
"message": "Ticket created successfully",
"ticket_id": ticket_data.get("id")
}
except httpx.HTTPStatusError as e:
logger.error(f"Failed to create ticket: {e.response.status_code} - {e.response.text}")
return {"error": "Failed to create ticket", "details": e.response.text}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
@server.tool(name="update_ticket_status", description="Update the status of an existing support ticket.")
async def update_ticket_status(data: dict):
ticket_id = data.get("ticket_id")
new_status = data.get("status")
if not ticket_id or not new_status:
return {"error": "Missing ticket_id or status"}
valid_statuses = ["open", "in_progress", "pending", "resolved", "closed"]
if new_status not in valid_statuses:
return {"error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}"}
try:
response = await client.patch(
f"/tickets/{ticket_id}",
json={"status": new_status}
)
response.raise_for_status()
logger.info(f"Updated ticket #{ticket_id} status to {new_status}")
return {"status": "success", "message": f"Ticket status updated to {new_status}"}
except httpx.HTTPStatusError as e:
status_code = e.response.status_code
if status_code == 404:
return {"error": f"Ticket {ticket_id} not found"}
logger.error(f"API error: {status_code} - {e.response.text}")
return {"error": f"Failed to update ticket: {e.response.text}"}
except httpx.RequestError as e:
logger.error(f"Request error: {str(e)}")
return {"error": "Connection error", "details": str(e)}
# Server lifecycle management
async def startup():
logger.info("Starting MCP server...")
# Any initialization code
async def shutdown():
logger.info("Shutting down MCP server...")
await client.aclose() # Close HTTP client connections
async def main():
try:
await startup()
await server.start()
finally:
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Improved features:
- Proper client shutdown using
aclose()
- Structured logging throughout
- Environment variable configuration
- Server lifecycle management
- Additional example endpoints
- Comprehensive error handling
Best practices for real-world setups
-
Retry failed requests: Implement exponential backoff for transient failures:
from tenacity import retry, stop_after_attempt, wait_exponential @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10)) async def fetch_with_retry(endpoint, params=None): response = await client.get(endpoint, params=params) response.raise_for_status() return response.json()
-
Respect rate limits: Implement a rate limiter to avoid API throttling:
from aiolimiter import AsyncLimiter # 10 requests per second maximum rate_limiter = AsyncLimiter(10, 1) async def rate_limited_request(method, url, **kwargs): async with rate_limiter: return await client.request(method, url, **kwargs)
-
Secure credentials: Use a proper secrets management solution:
from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Or use a cloud-based solution like AWS Secrets Manager # import boto3 # client = boto3.client('secretsmanager') # response = client.get_secret_value(SecretId='api-credentials')
-
Handle timeouts: Configure client timeouts appropriately:
client = httpx.AsyncClient( timeout=httpx.Timeout( connect=5.0, # Connection timeout read=30.0, # Read timeout write=30.0, # Write timeout pool=60.0 # Pool timeout ) )
-
Audit access: Implement structured logging with context:
@server.middleware async def audit_middleware(request, next_handler): # Generate a request ID request_id = str(uuid.uuid4()) # Log the incoming request logger.info(f"Request {request_id}: {request.method} {request.path}") # Add context to the logger with logging.contextvars.ContextVar("request_id", default=request_id): response = await next_handler(request) # Log the outcome logger.info(f"Request {request_id} completed with status {response.status_code}") return response
Conclusion
Connecting your existing REST APIs to MCP servers lets you unlock real-world data and actions without rewriting backends. With careful design and validation, you can build powerful, production-ready AI agents — safely and fast.
In the next guide, we’ll show how to trigger background jobs through your MCP server with asynchronous tool execution.
FAQs
Yes. Each resource or tool can call a different API if needed.
Only if you strictly validate inputs and carefully control which endpoints are accessible. Assume the LLM will try everything.
Always prefer asynchronous (httpx.AsyncClient) inside MCP servers to avoid blocking the event loop.