Guide · Database & Event Architecture

Event-Driven Architecture for MCP Servers — pub/sub, webhooks, and real-time tool responses

Many MCP servers need to reflect the current state of the world: the latest GitHub commits, the current deployment status, live sensor readings, incoming chat messages, or inventory levels updated by another system. The naive pattern — query the database on every tool call — works at low scale but misses a critical failure mode: the data source (the event producer) can fail independently of the MCP server. An event-driven MCP server subscribes to a stream of changes, maintains local state, and serves tool calls from that state. This guide covers three event ingestion patterns (Redis pub/sub, PostgreSQL LISTEN/NOTIFY, webhook receivers), event sourcing for MCP audit trails, and how to monitor event pipeline health — not just protocol availability — with AliveMCP.

TL;DR

For real-time tool data: subscribe to Redis pub/sub or PostgreSQL LISTEN/NOTIFY and maintain an in-memory or Redis-cached state that tool handlers read from directly — fast reads with no per-call DB query. For event-driven MCP servers, a protocol probe that says "server is up" doesn't tell you the event pipeline is healthy. Add a staleness check to your /health endpoint: if last_event_received_at is older than threshold, report degraded. AliveMCP polling that endpoint catches silent event pipeline failures before agents get stale data.

Why event-driven MCP servers have different failure modes

A CRUD-style MCP server is stateless between tool calls: each call queries the database, returns the result, and forgets everything. If the database is down, the tool call fails immediately with a clear error. The failure is synchronous and visible.

An event-driven MCP server is stateful across tool calls: it maintains a continuously-updated view of the world built from an event stream. If the event stream stops (Redis crash, producer service restart, network partition), the MCP server continues to operate normally — it returns stale data from its last-known state without any errors. The failure is asynchronous and silent.

This creates a new monitoring requirement: the MCP server can be "up" (protocol probe passes, tool calls return results) while simultaneously "wrong" (the data in those results is hours out of date). Protocol monitoring and data freshness monitoring are two separate concerns.

Pattern 1: Redis pub/sub subscriber state

Redis pub/sub allows one process (the producer) to publish events and one or more processes (subscribers) to receive them instantly. The MCP server subscribes to the relevant channels at startup and updates in-memory or Redis-cached state as events arrive.

import { createClient } from 'redis';

// Subscriber client (dedicated — subscribed clients cannot run regular commands)
const subscriber = createClient({ url: process.env.REDIS_URL });
await subscriber.connect();

// Shared cache for tool handlers to read from
const deploymentState: Map = new Map();
let lastEventAt: Date | null = null;

// Subscribe to deployment status channel
await subscriber.subscribe('deployments:status', (message, channel) => {
  const event = JSON.parse(message) as DeploymentEvent;
  deploymentState.set(event.deployment_id, event.status);
  lastEventAt = new Date();

  console.log(JSON.stringify({
    event: 'state_updated',
    channel,
    deployment_id: event.deployment_id,
    status: event.status,
  }));
});

// Tool handler reads from in-memory state — no DB query per call
server.tool('get_deployment_status', { deployment_id: z.string() }, async ({ deployment_id }) => {
  const status = deploymentState.get(deployment_id);
  if (!status) {
    return { content: [{ type: 'text', text: JSON.stringify({ error: 'deployment_not_found' }) }], isError: true };
  }
  return { content: [{ type: 'text', text: JSON.stringify({ deployment_id, status, as_of: lastEventAt }) }] };
});

Redis pub/sub limitations

Redis pub/sub is fire-and-forget: if the subscriber is disconnected when a message is published, the message is lost. For MCP servers that must not miss events (billing events, security alerts, state transitions), use Redis Streams instead, which maintains a log and allows consumers to replay from a position:

// Redis Streams — consumer group, reads from last-acknowledged position
const streamKey = 'deployments:events';
const groupName = 'mcp-server';
const consumerName = `instance-${process.env.HOSTNAME}`;

// Create consumer group (idempotent — safe to call on every startup)
await redis.xGroupCreate(streamKey, groupName, '0', { MKSTREAM: true }).catch(() => {});

async function consumeEvents() {
  while (true) {
    const results = await redis.xReadGroup(
      groupName, consumerName,
      [{ key: streamKey, id: '>' }],  // '>' = unacknowledged messages
      { COUNT: 10, BLOCK: 5000 }      // block 5s if no messages
    );

    for (const { messages } of results ?? []) {
      for (const { id, message } of messages) {
        const event = JSON.parse(message.data);
        deploymentState.set(event.deployment_id, event.status);
        lastEventAt = new Date();
        await redis.xAck(streamKey, groupName, id);  // acknowledge processed
      }
    }
  }
}

Pattern 2: PostgreSQL LISTEN/NOTIFY

PostgreSQL LISTEN/NOTIFY provides event-driven notifications without requiring an additional message broker. A producer calls NOTIFY channel, payload; subscribers that have called LISTEN channel receive the notification instantly via their open connection.

This is particularly useful for MCP servers that already use PostgreSQL and want to react to database changes without adding Redis or a message queue:

import { Client } from 'pg';

// LISTEN requires a dedicated, persistent connection (not a pool client)
const listenClient = new Client({ connectionString: process.env.DATABASE_URL });
await listenClient.connect();

await listenClient.query('LISTEN inventory_changes');

// In-memory cache of inventory levels
const inventoryCache: Map = new Map();
let lastNotificationAt: Date | null = null;

listenClient.on('notification', (msg) => {
  if (msg.channel !== 'inventory_changes') return;
  const payload = JSON.parse(msg.payload ?? '{}');
  inventoryCache.set(payload.product_id, payload.quantity);
  lastNotificationAt = new Date();
});

// Handle disconnection — LISTEN connection must be re-established
listenClient.on('error', async (err) => {
  console.error(JSON.stringify({ event: 'listen_connection_error', error: err.message }));
  await reconnectWithBackoff();
});

// The notification trigger — fires when inventory_levels table is updated
// CREATE OR REPLACE FUNCTION notify_inventory_change()
// RETURNS TRIGGER AS $$
// BEGIN
//   PERFORM pg_notify('inventory_changes',
//     json_build_object('product_id', NEW.product_id, 'quantity', NEW.quantity)::text);
//   RETURN NEW;
// END;
// $$ LANGUAGE plpgsql;
//
// CREATE TRIGGER inventory_change_trigger
// AFTER INSERT OR UPDATE ON inventory_levels
// FOR EACH ROW EXECUTE FUNCTION notify_inventory_change();

LISTEN/NOTIFY caveats

Notifications are not persistent — if the LISTEN connection is down when a NOTIFY fires, that notification is lost. On reconnection, the MCP server must re-sync state from the database (a full read of the relevant table) before relying on notifications again. Build a startup sync that reads the current state and sets lastNotificationAt before marking the server ready.

Pattern 3: Webhook receivers

External systems (GitHub, Stripe, PagerDuty, Slack) push events to HTTP endpoints rather than publishing to a message broker. An event-driven MCP server exposes a webhook receiver that persists incoming events and makes them available via tools.

// Webhook ingestion endpoint
app.post('/webhooks/github', express.raw({ type: 'application/json' }), async (req, res) => {
  // Validate webhook signature (HMAC-SHA256)
  const signature = req.headers['x-hub-signature-256'] as string;
  const expected = `sha256=${crypto
    .createHmac('sha256', process.env.GITHUB_WEBHOOK_SECRET!)
    .update(req.body)
    .digest('hex')}`;

  if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expected))) {
    return res.status(401).json({ error: 'invalid_signature' });
  }

  const event = JSON.parse(req.body.toString()) as GitHubWebhookPayload;
  const eventType = req.headers['x-github-event'] as string;

  // Persist event atomically — never process synchronously in the webhook handler
  await db.query(
    'INSERT INTO github_events (id, type, payload, received_at) VALUES ($1, $2, $3, now())',
    [req.headers['x-github-delivery'], eventType, event]
  );

  res.status(202).json({ queued: true });  // acknowledge immediately
});

// Tool reads persisted events — agent gets structured, queryable event history
server.tool(
  'list_recent_github_events',
  { event_type: z.string().optional(), limit: z.number().int().min(1).max(100).default(20) },
  async ({ event_type, limit }) => {
    const rows = await db.query(
      `SELECT id, type, payload, received_at FROM github_events
       WHERE ($1::text IS NULL OR type = $1)
       ORDER BY received_at DESC LIMIT $2`,
      [event_type ?? null, limit]
    );
    return { content: [{ type: 'text', text: JSON.stringify(rows.rows) }] };
  }
);

Always acknowledge the webhook immediately (HTTP 202) and process asynchronously. External webhook senders retry on non-2xx responses or timeouts, creating duplicate deliveries. The x-github-delivery ID (or equivalent per provider) serves as the deduplication key — use it as the primary key of the events table to make inserts idempotent.

Event sourcing for MCP audit trails

For MCP servers where tool calls mutate state, event sourcing maintains an append-only log of every change. The current state is reconstructed by replaying events. This pattern provides a complete audit trail and enables temporal queries ("what was the state at time T?").

// Append-only event log
interface McpToolEvent {
  id: string;
  occurred_at: Date;
  tool_name: string;
  agent_id: string;
  input_hash: string;       // SHA-256 of tool inputs (not PII)
  outcome: 'success' | 'error';
  duration_ms: number;
}

// Write-through event logging middleware
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const start = Date.now();
  const eventId = crypto.randomUUID();

  let result: CallToolResult;
  let outcome: 'success' | 'error' = 'success';

  try {
    result = await handleToolCall(request);
    if (result.isError) outcome = 'error';
  } catch (err) {
    outcome = 'error';
    throw err;
  } finally {
    // Append to event log — never update or delete
    await db.query(
      `INSERT INTO mcp_tool_events (id, occurred_at, tool_name, agent_id, input_hash, outcome, duration_ms)
       VALUES ($1, now(), $2, $3, $4, $5, $6)`,
      [
        eventId,
        request.params.name,
        request.params._meta?.agent_id ?? 'unknown',
        hashInputs(request.params.arguments),
        outcome,
        Date.now() - start,
      ]
    );
  }

  return result;
});

The event log is queryable by agents as a tool: search_audit_trail can retrieve all tool calls by a specific agent, all calls to a specific tool, or all error outcomes in a time window. This gives AI agents (and human operators) a structured history of what actions were taken.

Monitoring event pipeline health

The event pipeline has failure modes independent of the MCP server process:

A health endpoint that includes event freshness:

// Event pipeline staleness in /health
app.get('/health', async (req, res) => {
  const checks: Record = {};
  let overallStatus = 'ok';

  // Check 1: last event received via pub/sub or LISTEN
  if (lastEventAt !== null) {
    const lagSeconds = (Date.now() - lastEventAt.getTime()) / 1000;
    checks.event_lag_seconds = lagSeconds;

    if (lagSeconds > 300) {  // 5 minutes without an event = stale
      checks.event_pipeline = 'stale';
      overallStatus = 'degraded';
    } else {
      checks.event_pipeline = 'fresh';
    }
  } else {
    checks.event_pipeline = 'no_events_received';
    overallStatus = 'degraded';
  }

  // Check 2: subscriber connection (if Redis pub/sub)
  try {
    await subscriber.ping();
    checks.subscriber_connection = 'ok';
  } catch (err) {
    checks.subscriber_connection = 'disconnected';
    overallStatus = 'unhealthy';
  }

  res.status(overallStatus === 'ok' ? 200 : 503).json({
    status: overallStatus,
    checks,
    last_event_at: lastEventAt?.toISOString() ?? null,
  });
});

Point AliveMCP's custom health check URL at this endpoint. When the event pipeline stalls and lastEventAt exceeds the staleness threshold, AliveMCP alerts — even though the MCP server process is healthy and the protocol probe passes. This is the only way to detect the most dangerous failure mode of an event-driven MCP server: serving stale data without errors.

Frequently asked questions

How do I handle events that arrive while the MCP server is restarting?

For Redis Streams or Kafka (persistent, replayable): reconnect and read from the last-acknowledged offset. No events are lost — the stream retains messages. For Redis pub/sub or PostgreSQL LISTEN/NOTIFY (ephemeral): maintain a "last_synced_at" timestamp. On reconnection, query the source database for all changes since last_synced_at to rebuild state. This is the startup-sync pattern: before serving tool calls, load the current database state; then switch to event-driven updates. Only mark the server ready after the sync completes.

What is the difference between Redis pub/sub and PostgreSQL LISTEN/NOTIFY?

Both deliver real-time notifications but differ in persistence and scale. Redis pub/sub: no persistence (missed messages are lost), supported across all Redis clients, no payload size limit in practice. PostgreSQL LISTEN/NOTIFY: no persistence (missed notifications are lost during disconnect), notifications are sent when the notifying transaction commits (no phantom notifications from rolled-back transactions), payload limited to ~8000 bytes. Use LISTEN/NOTIFY when you already have PostgreSQL and the events originate from database writes (via a trigger or explicit NOTIFY call). Use Redis pub/sub when the producer is an application-level service, not the database.

Should I process webhook events synchronously or asynchronously?

Always asynchronously. Acknowledge the webhook immediately (HTTP 202) and write the raw payload to a persistent store (database, Redis Stream, Kafka). Process the payload in a separate worker. Reasons: (1) external senders retry on slow responses, causing duplicates; (2) processing logic may be slow (parsing, validation, state update); (3) if processing fails, you want the raw payload preserved for retry — acknowledgment before processing means you lose the event on failure. The idempotency key (delivery ID from the webhook sender) prevents duplicate processing on worker retry.

How does the SSE transport relate to event-driven MCP servers?

There is a natural alignment: SSE transport sends server-initiated messages over a persistent HTTP connection, which is the same channel used by notifications/progress and other server push messages. An event-driven MCP server can forward events from its subscriber (Redis pub/sub, LISTEN/NOTIFY) to connected AI agents as MCP notifications: when a deployment status changes, the server immediately notifies any connected agents via the SSE channel without waiting for a poll. This pattern requires the Streamable HTTP or SSE transport — stdio does not support server-initiated push.

What is the right staleness threshold for the /health event check?

Set the threshold based on the expected event arrival rate. For a deployment status stream where events arrive every few minutes in a busy system, 5–15 minutes is a reasonable staleness threshold. For a real-time sensor stream where events arrive every second, 30–60 seconds is the threshold. The goal is to distinguish "the producer is quiet but healthy" from "the pipeline is broken." Check your event arrival rate over a week and set the threshold at 3–5× the typical maximum quiet period. Avoid setting it too low — false alerts from quiet periods erode trust in monitoring.

Further reading

Monitor event pipeline health, not just protocol availability

AliveMCP polls your custom health endpoint and alerts when your event pipeline goes stale — before AI agents start returning answers built on hours-old data.

Start monitoring free