Guide · asyncio · Python MCP

Python MCP server asyncio — concurrent tools, semaphores, and async libraries

FastMCP runs on a single asyncio event loop. Every tool call is an awaited coroutine — if it blocks the event loop, all other concurrent calls stall until it returns. Getting the concurrency model right determines whether your Python MCP server handles parallel agent requests smoothly or serializes them through a bottleneck. This guide covers the core asyncio patterns for MCP tools: parallel sub-calls with asyncio.gather, resource limits with Semaphore, timeout enforcement with asyncio.wait_for, async I/O libraries for HTTP and database access, and how to safely offload CPU-bound work to a thread pool without blocking the event loop.

TL;DR

Use async def for all tool functions. For parallel sub-calls within a tool, use asyncio.gather(). Rate-limit external API calls with a module-level asyncio.Semaphore. Enforce timeouts with asyncio.wait_for(coro, timeout=30). Use aiohttp.ClientSession for HTTP requests and aiosqlite or asyncpg for database access. For CPU-bound work (image processing, crypto, heavy computation), use asyncio.to_thread() or a concurrent.futures.ProcessPoolExecutor to avoid blocking the event loop. Never call time.sleep() — use await asyncio.sleep().

The event loop model

FastMCP runs all tool handlers in a single asyncio event loop. When an MCP client sends a tools/call request, FastMCP awaits your tool coroutine. If multiple clients or a multi-agent orchestrator sends concurrent tool calls, FastMCP queues them and processes them cooperatively — each await point is a chance for another coroutine to run.

The implication: one blocking operation (a synchronous library call, time.sleep(), heavy computation) blocks all concurrent tool calls. Async-first code throughout your tool handlers is not just a style preference — it is a correctness requirement for a server that handles multiple concurrent sessions.

FastMCP automatically wraps synchronous (non-async) tool functions in asyncio.to_thread(), so a def tool runs in a thread pool and does not block the event loop. But this only applies to the top-level function — synchronous library calls inside an async def still block the event loop directly.

Parallel sub-calls with asyncio.gather

asyncio.gather() runs multiple coroutines concurrently and waits for all to complete. Use it when a tool needs to fetch data from multiple sources simultaneously:

import asyncio
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("data-aggregator")

@mcp.tool()
async def get_dashboard_data(user_id: str) -> dict:
    """Fetch all dashboard data in one call."""
    # Sequential (slow): ~3 × 200ms = 600ms
    # profile = await fetch_profile(user_id)
    # orders = await fetch_recent_orders(user_id)
    # metrics = await fetch_metrics(user_id)

    # Parallel (fast): max(200ms, 200ms, 200ms) = ~200ms
    profile, orders, metrics = await asyncio.gather(
        fetch_profile(user_id),
        fetch_recent_orders(user_id),
        fetch_metrics(user_id)
    )
    return {"profile": profile, "orders": orders, "metrics": metrics}

By default, asyncio.gather() raises the first exception it encounters and cancels remaining tasks. To handle partial failures — returning whatever succeeded and logging what failed — use return_exceptions=True:

@mcp.tool()
async def aggregate_data(ids: list[str]) -> dict:
    """Fetch data for multiple IDs, returning partial results on failure."""
    results = await asyncio.gather(
        *[fetch_item(id) for id in ids],
        return_exceptions=True
    )
    return {
        "data": [r for r in results if not isinstance(r, Exception)],
        "errors": [str(r) for r in results if isinstance(r, Exception)],
        "total": len(ids),
        "succeeded": sum(1 for r in results if not isinstance(r, Exception))
    }

Resource limits with asyncio.Semaphore

Without limits, a single asyncio.gather() call with a large list fans out to as many concurrent requests as the list length. This can exhaust connection pools, hit external API rate limits, or overwhelm a downstream service. Use asyncio.Semaphore as an async token bucket:

import asyncio

# Module-level semaphore limits concurrent external API calls
# across all tool calls in all concurrent sessions
_api_semaphore = asyncio.Semaphore(10)  # max 10 concurrent calls

async def fetch_with_limit(url: str) -> dict:
    async with _api_semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                return await resp.json()

@mcp.tool()
async def bulk_fetch(urls: list[str]) -> list[dict]:
    """Fetch multiple URLs concurrently with a concurrency limit."""
    return await asyncio.gather(*[fetch_with_limit(url) for url in urls])

The semaphore is module-level, so its limit applies across all concurrent sessions — not per tool call. A semaphore of 10 means at most 10 concurrent external API calls at any time regardless of how many MCP clients are connected.

Size the semaphore based on your external service's rate limit and your connection pool size. If the external API allows 100 req/s and each request takes ~100ms, the theoretical maximum concurrent requests is 10 (100 req/s × 0.1s = 10 in-flight). Set the semaphore to 8–9 to leave headroom.

Timeout enforcement

Wrap external calls with asyncio.wait_for() to enforce a maximum duration. Without timeouts, a stalled external service stalls the entire tool call indefinitely — and the MCP client may wait forever:

import asyncio

@mcp.tool()
async def call_external_api(endpoint: str, payload: dict) -> dict:
    """Call an external API with a 30-second timeout."""
    try:
        result = await asyncio.wait_for(
            _do_api_call(endpoint, payload),
            timeout=30.0
        )
        return result
    except asyncio.TimeoutError:
        raise RuntimeError(
            f"External API timed out after 30 seconds: {endpoint}. "
            "Try again or check the service status."
        )

async def _do_api_call(endpoint: str, payload: dict) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.post(endpoint, json=payload) as resp:
            resp.raise_for_status()
            return await resp.json()

The RuntimeError raised by the timeout handler is caught by FastMCP and returned as isError: true — the LLM receives the message and can decide to retry or inform the user.

Set timeout budgets relative to your MCP client's patience. Claude Desktop and most MCP clients wait indefinitely for a tool response, but agent orchestrators often set their own timeouts. A 30-second timeout on individual external calls is a reasonable starting point for most integrations.

Async HTTP with aiohttp

The requests library is synchronous — calling it in an async def tool blocks the event loop for the entire HTTP round trip. Use aiohttp for all HTTP calls from MCP tools:

import aiohttp
import asyncio

# Reuse a single ClientSession for the server's lifetime
# (creating a session per call is expensive — creates a new connection pool each time)
_http_session: aiohttp.ClientSession | None = None

async def get_http_session() -> aiohttp.ClientSession:
    global _http_session
    if _http_session is None or _http_session.closed:
        _http_session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30, connect=5),
            headers={"User-Agent": "my-mcp-server/1.0"}
        )
    return _http_session

@mcp.tool()
async def http_get(url: str, headers: dict = {}) -> dict:
    """Make an HTTP GET request and return the JSON response."""
    session = await get_http_session()
    try:
        async with session.get(url, headers=headers) as resp:
            resp.raise_for_status()
            content_type = resp.headers.get("Content-Type", "")
            if "json" in content_type:
                return await resp.json()
            return {"text": await resp.text(), "status": resp.status}
    except aiohttp.ClientResponseError as exc:
        raise RuntimeError(f"HTTP {exc.status} from {url}: {exc.message}") from exc
    except aiohttp.ClientError as exc:
        raise RuntimeError(f"HTTP request failed: {exc}") from exc

Use a module-level session to avoid creating a new connection pool on every tool call. Close it in a shutdown hook to cleanly drain connections.

Async database access

Use async database drivers that do not block the event loop:

DatabaseSync (avoid in async def)Async (use this)
SQLitesqlite3aiosqlite
PostgreSQLpsycopg2asyncpg
MySQL/MariaDBmysqlclientaiomysql
Redisredis (sync)redis.asyncio
MongoDBpymongomotor
import aiosqlite, asyncio

DB_PATH = "data.db"

@mcp.tool()
async def query_records(table: str, limit: int = 50) -> list[dict]:
    """Query records from a SQLite table."""
    # aiosqlite runs SQLite in a thread pool — non-blocking for the event loop
    async with aiosqlite.connect(DB_PATH) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute(
            "SELECT * FROM ? LIMIT ?",
            (table, limit)
        ) as cursor:
            rows = await cursor.fetchall()
    return [dict(row) for row in rows]

For production PostgreSQL, use a connection pool with asyncpg:

import asyncpg, os

_pg_pool: asyncpg.Pool | None = None

async def get_pool() -> asyncpg.Pool:
    global _pg_pool
    if _pg_pool is None:
        _pg_pool = await asyncpg.create_pool(
            dsn=os.environ["DATABASE_URL"],
            min_size=2,
            max_size=10
        )
    return _pg_pool

@mcp.tool()
async def get_user(user_id: str) -> dict:
    """Fetch user by ID from PostgreSQL."""
    pool = await get_pool()
    row = await pool.fetchrow("SELECT id, name, email FROM users WHERE id=$1", user_id)
    if row is None:
        raise KeyError(f"User not found: {user_id}")
    return dict(row)

Offloading CPU-bound work

Some tool operations are CPU-intensive: image processing, PDF generation, heavy regex on untrusted input, cryptographic hashing. Running these in the event loop blocks all other coroutines for their entire duration. Use asyncio.to_thread() to run them in a thread pool:

import asyncio
import hashlib
from pathlib import Path

@mcp.tool()
async def hash_file(path: str) -> dict:
    """Compute SHA-256 hash of a file (offloaded to thread pool)."""
    def _hash(path: str) -> str:
        data = Path(path).read_bytes()
        return hashlib.sha256(data).hexdigest()

    # run_in_executor / to_thread: CPU work in thread, event loop stays free
    digest = await asyncio.to_thread(_hash, path)
    return {"path": path, "sha256": digest}

For truly parallel CPU work (multiple cores), use concurrent.futures.ProcessPoolExecutor. Thread pools in Python are limited by the GIL — they allow I/O concurrency but not true parallel CPU execution. Process pools bypass the GIL but have higher startup cost and cannot share memory directly.

from concurrent.futures import ProcessPoolExecutor
import asyncio

_proc_pool = ProcessPoolExecutor(max_workers=4)

@mcp.tool()
async def render_pdf(html_content: str) -> bytes:
    """Render HTML to PDF using multiple CPU cores."""
    loop = asyncio.get_event_loop()
    pdf_bytes = await loop.run_in_executor(_proc_pool, _render_sync, html_content)
    return pdf_bytes

def _render_sync(html: str) -> bytes:
    # weasyprint, wkhtmltopdf, etc.
    import weasyprint
    return weasyprint.HTML(string=html).write_pdf()

Background tasks

Some tools trigger work that should continue after the tool returns — sending a notification, running a cleanup job, logging to a slow external sink. Use asyncio.create_task() to fire and continue:

import asyncio

async def _send_notification(user_id: str, message: str) -> None:
    # Slow external call — don't await in the tool handler
    async with aiohttp.ClientSession() as s:
        await s.post(NOTIFY_URL, json={"user": user_id, "msg": message})

@mcp.tool()
async def process_data(user_id: str, payload: dict) -> dict:
    """Process data and notify the user asynchronously."""
    result = await process(payload)

    # Fire notification without blocking the tool response
    asyncio.create_task(
        _send_notification(user_id, f"Processing complete: {result['id']}")
    )

    return result

Tasks created with asyncio.create_task() run on the same event loop. Store a reference to avoid garbage collection if the task might outlive the current function scope:

_background_tasks: set[asyncio.Task] = set()

def fire_and_forget(coro):
    task = asyncio.create_task(coro)
    _background_tasks.add(task)
    task.add_done_callback(_background_tasks.discard)

Related questions

Can I use anyio instead of asyncio directly in MCP tools?

Yes. FastMCP uses anyio internally, and your tool functions can use anyio primitives (anyio.sleep(), anyio.create_task_group()) interchangeably with asyncio equivalents when running under the asyncio backend. anyio's create_task_group() is particularly useful as a safer alternative to asyncio.gather() — it cancels all tasks if one fails by default, and raises an ExceptionGroup that you can handle with except*.

What happens when a tool raises an exception inside asyncio.gather?

With default settings, asyncio.gather() cancels remaining tasks and re-raises the first exception. FastMCP catches the exception at the tool boundary and returns isError: true. With return_exceptions=True, exceptions are returned as values in the result list and FastMCP does not see them — you must check for and re-raise them if you want isError: true behavior on partial failure.

How do I handle asyncio.CancelledError in tool handlers?

If an MCP client disconnects mid-session, FastMCP may cancel pending tool coroutines. asyncio.CancelledError (a subclass of BaseException) will propagate through await points. Do not catch it in your tool handlers unless you need to do cleanup — if you do catch it, re-raise it immediately: except asyncio.CancelledError: raise. Suppressing CancelledError breaks asyncio's cancellation mechanism.

Is asyncio safe for concurrent writes to a SQLite database?

SQLite with aiosqlite is safe for concurrent reads, but concurrent writes are serialized by SQLite's writer lock. For write-heavy workloads, enable WAL mode (PRAGMA journal_mode=WAL) to allow concurrent reads alongside a single writer. For very high write throughput, switch to PostgreSQL with asyncpg. See MCP server SQLite patterns for WAL configuration and write-serialization strategies.

Further reading