Guide · Database & Event Architecture
Background Jobs for Long-Running MCP Tools — BullMQ, pg-boss, and progress notifications
Some MCP tool calls cannot complete in the time a synchronous response allows: generating a large PDF, running an LLM inference chain over thousands of documents, exporting a database snapshot, or processing uploaded files. Returning a tool response after 30 seconds works in theory, but in practice it ties up an MCP server connection, risks timeout at every layer, and gives the AI agent no visibility into progress. The correct pattern is to enqueue the work as a background job and immediately return a job ID; the agent polls a resource or receives progress notifications as the job runs. This guide covers how to implement this pattern with BullMQ (Redis-backed) and pg-boss (PostgreSQL-backed), how to stream progress notifications, and how to monitor the full pipeline — including worker health — with AliveMCP.
TL;DR
For long-running MCP tools: immediately enqueue the work and return { job_id: "..." }. Expose job status via a resources/read endpoint at job:{id}. For incremental progress, emit notifications/progress during execution. Use BullMQ if you already have Redis; use pg-boss if you only have PostgreSQL. Deduplicate jobs via jobId matching the input's idempotency key so agent retries don't create duplicate work. Monitor with AliveMCP: a canary tool call that enqueues a sentinel job and verifies it completes within threshold validates the full worker pipeline, not just the MCP protocol layer.
When a synchronous tool call is not enough
MCP tool calls are request/response: the AI agent sends a tools/call request and waits for the response. The MCP specification imposes no hard timeout on tool calls, but practical limits exist at every layer:
- The AI agent's LLM context has a limited timeout for tool execution before it marks the call as failed
- HTTP proxies and load balancers commonly have 30–120 second read timeouts
- The MCP server process may have per-request timeouts configured
- Holding an open connection and a database pool slot for 60+ seconds blocks other tool calls
Operations that regularly exceed 10–15 seconds should be delegated to background jobs. Common examples:
| Tool type | Typical duration | Why it's slow |
|---|---|---|
| LLM inference over large corpus | 30s–5m | Sequential LLM calls for chunked processing |
| PDF/report generation | 5s–60s | Rendering, image embedding, page count |
| Large data export | 10s–5m | Sequential DB reads + serialization |
| Web scraping pipeline | 20s–3m | Network latency × page count |
| Email/notification send to list | 5s–60s | Rate-limited external API |
The async job pattern for MCP tools
The pattern has three components: an enqueue tool, a status resource, and a worker process.
Step 1: The enqueue tool
The tool handler enqueues the work and returns immediately with a job ID. The agent receives the ID and can track the job without holding an open connection.
import { Queue } from 'bullmq';
import { Redis } from 'ioredis';
import { z } from 'zod';
const redis = new Redis(process.env.REDIS_URL);
const exportQueue = new Queue('export', { connection: redis });
const ExportSchema = z.object({
format: z.enum(['csv', 'json', 'parquet']),
date_from: z.string().datetime(),
date_to: z.string().datetime(),
idempotency_key: z.string().optional(),
});
server.tool(
'export_data',
ExportSchema.shape,
async (args) => {
const parsed = ExportSchema.parse(args);
// Use idempotency_key as jobId — agent retries don't create duplicate exports
const jobId = parsed.idempotency_key ?? crypto.randomUUID();
await exportQueue.add('export', parsed, {
jobId, // deduplicate by this ID
removeOnComplete: { age: 3600 }, // keep completed jobs for 1 hour
removeOnFail: { age: 86400 }, // keep failed jobs for 24 hours
});
return {
content: [{
type: 'text',
text: JSON.stringify({
job_id: jobId,
status: 'queued',
poll_resource: `job:${jobId}`,
estimated_seconds: 30,
}),
}],
};
}
);
Step 2: The status resource
Expose job status via the MCP resources protocol. The agent calls resources/read with the URI returned by the tool to poll for completion.
// Register a resource template for job status
server.resource(
'job-status',
new ResourceTemplate('job:{jobId}', { list: undefined }),
async (uri, { jobId }) => {
const job = await exportQueue.getJob(jobId);
if (!job) {
return {
contents: [{ uri: uri.href, mimeType: 'application/json', text: JSON.stringify({ status: 'not_found' }) }],
};
}
const state = await job.getState(); // 'waiting' | 'active' | 'completed' | 'failed'
const payload: Record = {
job_id: jobId,
status: state,
progress: job.progress,
};
if (state === 'completed') {
payload.result = job.returnvalue;
payload.completed_at = new Date(job.finishedOn!).toISOString();
}
if (state === 'failed') {
payload.error = job.failedReason;
payload.failed_at = new Date(job.finishedOn!).toISOString();
}
return {
contents: [{
uri: uri.href,
mimeType: 'application/json',
text: JSON.stringify(payload),
}],
};
}
);
Step 3: The worker process
The worker runs in a separate process (or a separate instance of the same process). Separating workers from the MCP server means a slow job does not block tool call handling and the worker can be scaled independently.
// worker.ts — separate process
import { Worker } from 'bullmq';
const worker = new Worker('export', async (job) => {
const { format, date_from, date_to } = job.data;
const rows = [];
let page = 0;
while (true) {
const batch = await db.query(
'SELECT * FROM events WHERE created_at BETWEEN $1 AND $2 LIMIT 1000 OFFSET $3',
[date_from, date_to, page * 1000]
);
if (batch.rows.length === 0) break;
rows.push(...batch.rows);
// Report progress — AI agent can read this via resources/read
await job.updateProgress(Math.round((rows.length / estimatedTotal) * 100));
page++;
}
const output = formatData(rows, format);
const url = await uploadToStorage(output, `exports/${job.id}.${format}`);
return { download_url: url, row_count: rows.length };
}, { connection: redis, concurrency: 3 });
worker.on('failed', (job, err) => {
console.error(JSON.stringify({ event: 'job_failed', jobId: job?.id, error: err.message }));
});
Progress notifications via the MCP protocol
For AI agents that need live updates (not just final results), MCP supports notifications/progress — the server pushes incremental progress events over the SSE connection while the tool call is running. The agent receives updates without polling.
Progress notifications require the agent to include a _meta.progressToken in the tool call. The MCP server sends notifications referencing that token.
// Tool handler with progress notifications
server.setRequestHandler(CallToolRequestSchema, async (request, context) => {
if (request.params.name !== 'export_data') return;
const progressToken = request.params._meta?.progressToken;
// For jobs that run in-process (not via a separate worker):
const sendProgress = progressToken
? (progress: number, total: number) => {
context.sendNotification({
method: 'notifications/progress',
params: { progressToken, progress, total },
});
}
: () => {};
// Long-running operation with progress reporting
const rows = [];
const batches = await countBatches(request.params.arguments);
for (let i = 0; i < batches; i++) {
const batch = await fetchBatch(i, request.params.arguments);
rows.push(...batch);
sendProgress(i + 1, batches);
}
return {
content: [{ type: 'text', text: JSON.stringify({ row_count: rows.length, data: rows }) }],
};
});
Progress notifications require the Streamable HTTP or SSE transport — stdio transport does not support server-initiated notifications mid-request. For stdio-transport MCP servers, fall back to the resource polling pattern.
pg-boss: background jobs with PostgreSQL only
If your MCP server already uses PostgreSQL and you want to avoid adding Redis as a dependency, pg-boss provides a PostgreSQL-backed job queue with comparable features: job deduplication, retries, TTL, and concurrency control.
import PgBoss from 'pg-boss';
const boss = new PgBoss(process.env.DATABASE_URL);
await boss.start();
// Enqueue a job (identical interface to BullMQ from the tool handler's perspective)
const jobId = await boss.send('export', jobData, {
id: idempotencyKey, // deduplication key
retryLimit: 3,
retryDelay: 30,
expireInSeconds: 3600,
});
// Worker (can run in same process or separate)
await boss.work('export', { teamSize: 3 }, async (job) => {
// Process job.data
return { download_url: url, row_count: count };
});
// Poll job status
const job = await boss.getJobById(jobId);
// job.state: 'created' | 'active' | 'completed' | 'failed' | 'cancelled'
// job.output: return value when completed
pg-boss stores jobs in a pgboss schema in your PostgreSQL database. The job table is indexed for efficient polling. The tradeoff vs BullMQ: PostgreSQL-backed queues have higher per-job latency (1–10ms vs Redis's sub-millisecond) but eliminate Redis as an operational dependency. For jobs that run over seconds, the queue latency difference is irrelevant.
| Criteria | BullMQ (Redis) | pg-boss (PostgreSQL) |
|---|---|---|
| Job enqueue latency | <1ms | 1–10ms |
| Dependencies | Redis required | PostgreSQL only |
| Job retention | TTL-based (in-memory) | Rows in DB (persistent) |
| Query job history | Limited (Redis TTL) | Full SQL access |
| Job deduplication | jobId uniqueness | id field uniqueness |
Monitoring the worker pipeline with AliveMCP
The MCP protocol probe (initialize → tools/list) confirms the MCP server is running. It does not confirm the worker process is running, the Redis/PostgreSQL queue is draining, or that jobs complete within expected windows. A worker process crash is invisible to the protocol probe.
The two monitoring patterns for background job pipelines:
1. Canary job tool call
Add a health_check_job tool that enqueues a sentinel job and polls for completion:
// health_check_job tool — validates the full worker pipeline
server.tool('health_check_job', {}, async () => {
const jobId = `health-${Date.now()}`;
await healthQueue.add('sentinel', { sentinel: true }, { jobId, priority: 1 });
// Poll for up to 30 seconds
const deadline = Date.now() + 30_000;
while (Date.now() < deadline) {
await new Promise(r => setTimeout(r, 1000));
const job = await healthQueue.getJob(jobId);
if (!job) continue;
const state = await job.getState();
if (state === 'completed') {
return { content: [{ type: 'text', text: JSON.stringify({ ok: true, queue_healthy: true }) }] };
}
if (state === 'failed') {
throw new Error(`sentinel_job_failed: ${job.failedReason}`);
}
}
throw new Error('sentinel_job_timeout: worker not processing within 30s');
});
Configure AliveMCP to call health_check_job on each probe cycle. If the canary job doesn't complete within 30 seconds, AliveMCP alerts — the worker process may have crashed, Redis may be unreachable, or the queue may be backlogged.
2. Health endpoint with queue depth
// Queue health in /health endpoint
app.get('/health', async (req, res) => {
const counts = await exportQueue.getJobCounts('waiting', 'active', 'failed');
if (counts.failed > 50) {
return res.status(503).json({
status: 'degraded',
reason: 'high_failure_rate',
queue: counts,
});
}
if (counts.waiting > 200) {
return res.status(503).json({
status: 'degraded',
reason: 'queue_backlog',
queue: counts,
});
}
res.json({ status: 'ok', queue: counts });
});
Frequently asked questions
How does the AI agent know when a background job is done?
Two patterns. First: the tool returns a poll_resource URI like job:abc123; the agent calls resources/read on that URI every few seconds until status === 'completed'. Second: the server emits notifications/progress events with progress: 100 when done, and sends the final result in the tool response after the job completes. The polling resource pattern is simpler and works across all MCP transports (including stdio); progress notifications require SSE or Streamable HTTP and a progressToken in the initial call.
What happens if the agent retries the tool call while the original job is still running?
This is why job deduplication via jobId is critical. Set jobId to the input's idempotency_key (or a deterministic hash of the inputs). BullMQ will return the existing job's ID instead of creating a new one — the agent gets the same job_id back and can continue polling the same resource. Without deduplication, each retry creates a new job, running the expensive operation N times in parallel.
Should the worker run in the same process as the MCP server or a separate process?
Separate process for anything CPU-intensive or long-running. A BullMQ worker running in the same Node.js process as the MCP server can block the event loop during CPU-intensive operations, delaying MCP protocol responses. Use a separate process (a separate Node.js script, a separate container, or a worker thread) for the actual job execution. The MCP server only enqueues the job and reads job status — both are fast async operations.
How long should I retain completed jobs?
Retain completed jobs long enough for the agent to poll the result: 1–24 hours is typical. The agent retrieves the result via the job:{id} resource; once it has the result, it no longer needs the job record. Set removeOnComplete: { age: 3600 } (1 hour) for most cases. Retain failed jobs longer (24–72 hours) to allow post-incident debugging. Do not retain indefinitely — accumulated job records degrade BullMQ/pg-boss query performance.
What should a job do when its underlying operation is idempotent vs non-idempotent?
For idempotent operations (read-only exports, report generation): job deduplication with a stable jobId is safe — running the job twice produces the same result. For non-idempotent operations (sending emails, making API writes): use a completed-check guard at the start of the worker: check if the job output already exists in the database (using the idempotencyKey as the lookup key) and return early if so. This prevents duplicate side effects even if the job is re-enqueued by BullMQ's retry logic after a partial failure.
Further reading
- Redis Patterns for MCP Servers — caching, pub/sub, and session state
- MCP Progress Notifications — streaming job status to the AI agent
- MCP Server Idempotency — preventing duplicate side effects across agent retries
- MCP Resources API — exposing structured data for AI agent consumption
- Worker Threads for MCP Servers — CPU-intensive tools without blocking the event loop
- Graceful Shutdown — draining in-flight jobs before process exit