Guide · Architecture

MCP server tool composition

Agents can call tools in sequence, but complex workflows benefit from server-side composition — the server assembles a pipeline of internal operations, reducing round-trips and keeping the agent's context window clean. Tool composition patterns let you build higher-order tools from primitive ones without exposing every intermediate step.

TL;DR

Server-side tool composition reduces the number of MCP round-trips by assembling pipelines of primitive operations inside a single tool handler, returning a single composed result. Use a sequential pipeline for ordered transforms, map-reduce for parallelizing a sub-operation across a list, async generators for streaming partial results, and typed error accumulators for collect-all-errors behavior. For calling another MCP server from within a tool handler, create a per-request Client instance — not a shared singleton — to avoid session cross-contamination. Monitor composed pipelines with per-step latency histograms so AliveMCP probe regressions can be traced to the specific step that slowed down. See also: multi-agent topologies for orchestrator-side composition and error handling for propagating structured errors from composed steps.

Why compose server-side vs. agent-side

When an agent needs to run a multi-step process — fetch data, transform it, validate it, and store it — it has two options: call each step as a separate MCP tool and coordinate the pipeline itself, or call a single composed tool that runs the whole pipeline server-side. Both approaches work, but they have very different tradeoffs.

Agent-side composition (the agent orchestrates): the agent calls fetch_data, receives the result, calls transform_data with it, receives that result, calls validate_data, and finally calls store_result. Each step is a separate MCP round-trip. The intermediate results flow through the agent's context window.

Server-side composition (the server orchestrates): the agent calls a single process_document tool. The server runs fetch, transform, validate, and store internally, then returns the final outcome. The agent never sees the intermediates.

ConcernAgent-sideServer-side
Round-trip countN (one per step)1
Context window usageHigh — intermediates fill the contextLow — only final result
AtomicityNone — partial progress if agent fails mid-pipelineEnforced via server-side transactions
Step visibility to agentFull — agent sees and can branch on intermediatesNone by default (can add partial progress events)
DebuggabilityEasy — each step is a distinct tool call in the logRequires server-side step tracing
Reusability of stepsHigh — each step is a standalone callable toolLower — steps may be internal functions only

The decision rule: compose server-side when the intermediate results are not meaningful to the agent (the agent would just pass them through to the next call unchanged), when atomicity matters (you want all steps to succeed or none), or when round-trip latency is a significant fraction of the task's total time. Keep steps agent-side when the agent genuinely needs to reason about intermediate results, branch conditionally, or retry individual steps independently.

A practical heuristic: if you find yourself writing agent prompt logic that says "call tool A, take the output, pass it directly to tool B, take the output, pass it directly to tool C," that is a signal to compose server-side. The agent gains nothing from seeing the intermediates.

Sequential pipeline pattern

The simplest composition pattern is a sequential chain: each step takes the output of the previous step as its input. Implement this as a typed async function pipeline where each step is a separate function with clear input and output types:

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

const server = new McpServer({ name: 'pipeline-server', version: '1.0.0' });

// --- Step functions: pure transforms with typed I/O ---

interface RawDocument {
  url: string;
  fetchedAt: number;
  rawHtml: string;
}

interface ParsedDocument {
  url: string;
  title: string;
  bodyText: string;
  wordCount: number;
}

interface ScoredDocument {
  url: string;
  title: string;
  relevanceScore: number;
  summary: string;
}

async function fetchStep(url: string): Promise<RawDocument> {
  const res = await fetch(url, { signal: AbortSignal.timeout(10_000) });
  if (!res.ok) throw new StepError('fetch', `HTTP ${res.status}`, { url });
  return { url, fetchedAt: Date.now(), rawHtml: await res.text() };
}

async function parseStep(doc: RawDocument): Promise<ParsedDocument> {
  // Simplified — real impl would use a proper HTML parser
  const title = doc.rawHtml.match(/<title>([^<]+)<\/title>/i)?.[1] ?? '';
  const bodyText = doc.rawHtml.replace(/<[^>]+>/g, ' ').trim();
  return {
    url: doc.url,
    title,
    bodyText,
    wordCount: bodyText.split(/\s+/).length,
  };
}

async function scoreStep(
  doc: ParsedDocument,
  query: string
): Promise<ScoredDocument> {
  const queryTerms = query.toLowerCase().split(/\s+/);
  const text = `${doc.title} ${doc.bodyText}`.toLowerCase();
  const hits = queryTerms.filter(term => text.includes(term)).length;
  const score = hits / queryTerms.length;

  return {
    url: doc.url,
    title: doc.title,
    relevanceScore: score,
    summary: doc.bodyText.slice(0, 300),
  };
}

// --- Composed tool: runs the full pipeline in one call ---

server.tool(
  'fetch_and_score',
  'Fetch a URL, parse its content, and score its relevance to a query',
  {
    url: z.string().url(),
    query: z.string().min(1),
  },
  async ({ url, query }) => {
    const raw = await fetchStep(url);
    const parsed = await parseStep(raw);
    const scored = await scoreStep(parsed, query);

    return {
      content: [{ type: 'text', text: JSON.stringify(scored) }],
    };
  }
);

Notice that each step function is independently testable — unit tests for parseStep do not need a live HTTP server. The composed tool handler is thin: it calls the steps in order and returns the final result. This separation keeps the individual steps reusable; a different tool handler might call only parseStep and scoreStep on already-fetched content. See the MCP server testing guide for patterns to test each pipeline step in isolation.

Map-reduce over tool results

When the pipeline needs to apply the same operation to a list of inputs and aggregate the results, the map-reduce pattern combines fan-out parallelism with a sequential reduce step. The map phase runs sub-operations in parallel (bounded by a concurrency limit); the reduce phase combines results.

import pLimit from 'p-limit';

interface DocumentSource {
  url: string;
  weight: number;
}

interface AggregatedReport {
  query: string;
  totalSources: number;
  successCount: number;
  failureCount: number;
  topResults: ScoredDocument[];
  processingMs: number;
}

server.tool(
  'research_topic',
  'Fetch and score multiple URLs, then aggregate into a relevance-ranked report',
  {
    query: z.string().min(1),
    sources: z.array(z.object({
      url: z.string().url(),
      weight: z.number().min(0).max(1).default(1),
    })).min(1).max(20),
  },
  async ({ query, sources }) => {
    const start = Date.now();

    // Map phase: fetch and score all sources in parallel, max 5 concurrent
    const limit = pLimit(5);
    const mapResults = await Promise.allSettled(
      sources.map(source =>
        limit(async () => {
          const raw = await fetchStep(source.url);
          const parsed = await parseStep(raw);
          const scored = await scoreStep(parsed, query);
          // Apply source weight to relevance score
          return { ...scored, relevanceScore: scored.relevanceScore * source.weight };
        })
      )
    );

    // Reduce phase: separate successes from failures, sort by score
    const successes: ScoredDocument[] = [];
    let failureCount = 0;

    for (const result of mapResults) {
      if (result.status === 'fulfilled') {
        successes.push(result.value);
      } else {
        failureCount++;
        // Log failures but do not abort — partial results are useful
        console.error({ event: 'map_step_failed', reason: result.reason?.message });
      }
    }

    const topResults = successes
      .sort((a, b) => b.relevanceScore - a.relevanceScore)
      .slice(0, 5);

    const report: AggregatedReport = {
      query,
      totalSources: sources.length,
      successCount: successes.length,
      failureCount,
      topResults,
      processingMs: Date.now() - start,
    };

    return {
      content: [{ type: 'text', text: JSON.stringify(report) }],
    };
  }
);

Promise.allSettled (rather than Promise.all) is critical for the map phase of a composed pipeline: it collects results from all branches whether or not individual branches failed. A composed tool that calls Promise.all across 10 sources will fail completely if any single source returns a non-200. Promise.allSettled gives you a partial success response — 9 out of 10 sources processed — which is almost always more useful to the agent than an all-or-nothing failure. See the error handling guide for patterns to express partial success in MCP tool responses.

Streaming composition

For long-running pipelines where the agent benefits from seeing partial results as they become available — rather than waiting for the entire pipeline to complete — use an async generator pipeline to stream intermediate outputs. The MCP protocol supports progressive content delivery; the agent can start processing early results while the pipeline continues.

// Streaming pipeline: each source is processed and yielded as it completes
// The agent receives results incrementally rather than waiting for all sources

async function* streamResearchPipeline(
  query: string,
  sources: DocumentSource[]
): AsyncGenerator<ScoredDocument, void, unknown> {
  const limit = pLimit(3); // process 3 sources concurrently

  // Create a channel: each worker pushes to a queue as it completes
  const queue: Array<ScoredDocument | Error> = [];
  let pending = sources.length;
  let resolve: (() => void) | null = null;

  const notify = () => { if (resolve) { resolve(); resolve = null; } };

  // Kick off all workers — they push results into the queue as they finish
  for (const source of sources) {
    limit(async () => {
      try {
        const raw = await fetchStep(source.url);
        const parsed = await parseStep(raw);
        const scored = await scoreStep(parsed, query);
        queue.push(scored);
      } catch (err) {
        queue.push(err instanceof Error ? err : new Error(String(err)));
      } finally {
        pending--;
        notify();
      }
    });
  }

  // Yield results as they arrive
  while (pending > 0 || queue.length > 0) {
    if (queue.length === 0) {
      // Wait for the next worker to push a result
      await new Promise<void>(r => { resolve = r; });
    }
    const item = queue.shift()!;
    if (!(item instanceof Error)) {
      yield item; // emit successful result immediately
    }
    // Errors are silently skipped — callers can wrap to log them
  }
}

// MCP tool wrapping the streaming generator
// Note: current MCP SDK collects all content before sending —
// future streaming transport support will deliver incremental content_delta events
server.tool(
  'stream_research',
  'Research a topic across multiple sources, returning results as they complete',
  {
    query: z.string().min(1),
    sources: z.array(z.object({ url: z.string().url(), weight: z.number().default(1) }))
      .min(1).max(10),
  },
  async ({ query, sources }) => {
    const results: ScoredDocument[] = [];

    for await (const result of streamResearchPipeline(query, sources)) {
      results.push(result);
    }

    // Sort final collected results by score
    results.sort((a, b) => b.relevanceScore - a.relevanceScore);

    return {
      content: [{ type: 'text', text: JSON.stringify(results) }],
    };
  }
);

The async generator pattern is valuable even when the MCP transport does not yet deliver streaming results to the client, because it structures the server-side pipeline cleanly: each source is processed independently, results are emitted as they complete rather than batched, and the generator can be tested and reused by other server-side code outside of the tool handler. When MCP streaming transports (content delta events) become widely available, wrapping a generator-based pipeline becomes straightforward — the outer tool handler switches from collecting to forwarding deltas.

Error propagation across steps

Sequential pipelines face a fundamental design question: when a step fails, should the pipeline abort immediately (short-circuit) or continue and collect all errors (collect-all)? The right answer depends on whether downstream steps can run independently of the failed step's output.

For a strict pipeline where each step needs the previous step's output, short-circuit is the only sensible behavior — a failed fetch means there is no HTML to parse. For a validation pipeline where each step checks an independent constraint, collect-all errors is better — report all validation failures at once rather than making the agent call the tool repeatedly to discover each one.

// Typed step errors — each step declares its own error type
class StepError extends Error {
  constructor(
    public readonly step: string,
    message: string,
    public readonly context: Record<string, unknown> = {}
  ) {
    super(`[${step}] ${message}`);
    this.name = 'StepError';
  }
}

// Collect-all validation pipeline
interface ValidationResult {
  valid: boolean;
  errors: Array<{ step: string; message: string; field?: string }>;
  warnings: Array<{ step: string; message: string }>;
}

async function validateDocument(doc: ParsedDocument): Promise<ValidationResult> {
  const errors: ValidationResult['errors'] = [];
  const warnings: ValidationResult['warnings'] = [];

  // Run all validators — do NOT short-circuit on first failure
  await Promise.allSettled([
    (async () => {
      if (doc.wordCount < 100) {
        errors.push({ step: 'length_check', message: 'Document too short', field: 'wordCount' });
      }
    })(),
    (async () => {
      if (!doc.title) {
        errors.push({ step: 'title_check', message: 'Missing title tag', field: 'title' });
      } else if (doc.title.length > 200) {
        warnings.push({ step: 'title_check', message: 'Title exceeds 200 characters' });
      }
    })(),
    (async () => {
      // Async validation step: check against an allowlist
      const allowed = await isAllowedDomain(doc.url);
      if (!allowed) {
        errors.push({ step: 'domain_check', message: 'URL domain not in allowlist', field: 'url' });
      }
    })(),
  ]);

  return { valid: errors.length === 0, errors, warnings };
}

// Short-circuit pipeline: abort on first failure
async function processWithShortCircuit(url: string, query: string): Promise<ScoredDocument> {
  let raw: RawDocument;
  try {
    raw = await fetchStep(url);
  } catch (err) {
    throw new StepError('fetch', err instanceof Error ? err.message : String(err), { url });
  }

  let parsed: ParsedDocument;
  try {
    parsed = await parseStep(raw);
  } catch (err) {
    throw new StepError('parse', err instanceof Error ? err.message : String(err), { url });
  }

  const validation = await validateDocument(parsed);
  if (!validation.valid) {
    throw new StepError('validate', 'Document failed validation', {
      errors: validation.errors,
    });
  }

  return scoreStep(parsed, query);
}

// MCP tool: returns structured partial success even when some steps fail
server.tool(
  'process_document',
  'Fetch, validate, and score a document for relevance to a query',
  { url: z.string().url(), query: z.string() },
  async ({ url, query }) => {
    try {
      const result = await processWithShortCircuit(url, query);
      return { content: [{ type: 'text', text: JSON.stringify({ success: true, result }) }] };
    } catch (err) {
      if (err instanceof StepError) {
        // Return a structured error response — not an exception
        // The agent can read the step name and decide whether to retry or skip
        return {
          content: [{
            type: 'text',
            text: JSON.stringify({
              success: false,
              failedStep: err.step,
              error: err.message,
              context: err.context,
            }),
          }],
          isError: true,
        };
      }
      throw err; // re-throw unexpected errors
    }
  }
);

The isError: true flag in the MCP tool response signals to the agent that the result represents a failure state, without throwing an exception that would produce a generic error message. The agent receives a structured JSON object with the exact step that failed and the context needed to decide how to proceed — retry the whole pipeline, skip this URL, or escalate to a human. This is far more useful than a generic InternalServerError. See the error handling guide for the full error type taxonomy.

Composing with external MCP servers

A tool handler can itself be an MCP client — calling another MCP server as a sub-call within its implementation. This is the server-side equivalent of the orchestrator-dispatcher pattern: your MCP server acts as an orchestrator for downstream MCP servers, composing their tools into higher-level operations.

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';

// Call a downstream MCP server from within a tool handler
// IMPORTANT: create a fresh Client per tool call — do NOT share a single Client
// across concurrent requests, as session state would be cross-contaminated

async function callDownstreamTool(
  serverUrl: string,
  toolName: string,
  args: Record<string, unknown>
): Promise<unknown> {
  const transport = new SSEClientTransport(new URL(serverUrl));
  const client = new Client(
    { name: 'mcp-server-sub-client', version: '1.0.0' },
    { capabilities: {} }
  );

  await client.connect(transport);
  try {
    const result = await client.callTool({ name: toolName, arguments: args });
    return result;
  } finally {
    await client.close(); // always close — do not leak sub-sessions
  }
}

server.tool(
  'summarize_and_translate',
  'Summarize a document and translate the summary to a target language',
  {
    text: z.string().min(1),
    targetLanguage: z.string().length(2), // ISO 639-1 code
  },
  async ({ text, targetLanguage }) => {
    // Step 1: call the summarization MCP server
    const summaryResult = await callDownstreamTool(
      process.env.SUMMARIZER_MCP_URL!,
      'summarize',
      { text, maxWords: 100 }
    ) as { content: Array<{ type: string; text: string }> };

    const summary = summaryResult.content
      .filter(c => c.type === 'text')
      .map(c => c.text)
      .join('');

    // Step 2: call the translation MCP server with the summary
    const translationResult = await callDownstreamTool(
      process.env.TRANSLATOR_MCP_URL!,
      'translate',
      { text: summary, targetLanguage }
    ) as { content: Array<{ type: string; text: string }> };

    const translated = translationResult.content
      .filter(c => c.type === 'text')
      .map(c => c.text)
      .join('');

    return {
      content: [{ type: 'text', text: JSON.stringify({ summary, translated }) }],
    };
  }
);

Two pitfalls when composing with external MCP servers:

Connection lifecycle. Create a fresh Client per tool call invocation, not a long-lived singleton shared across requests. A shared client's session can be in an unexpected state if a previous call timed out or errored, causing the next call on that session to behave incorrectly. The cost of a fresh connect/handshake per call is a few milliseconds over a local network — acceptable for infrequent composed calls. For high-frequency composition, maintain a pool of pre-connected sub-clients with health checks. See the connection pooling guide.

Error propagation across server boundaries. When a downstream MCP server returns an error, it arrives at your tool handler as either an exception (transport error) or a tool result with isError: true (tool-level error). Handle both cases: wrap the downstream call in a try/catch for transport errors, and inspect result.isError for tool-level errors. Do not let downstream errors produce confusing generic messages — translate them into StepError instances with the downstream server name and tool name as context fields. This makes debugging composed pipelines from the agent's perspective much easier.

Monitor sub-client call latency separately from your own tool handler logic. If a downstream MCP server is slow, your composed tool's p99 will degrade even if your own code is fast. Add per-downstream-server latency histograms and wire them into a circuit breaker so a slow downstream server does not cascade into your server's overall health.

Monitoring composed pipelines with AliveMCP

Composed pipelines are harder to monitor than single-step tools because a regression can be isolated to one step without making the overall tool appear failed. An AliveMCP external probe that calls fetch_and_score and checks for a 200 response will not tell you whether the fetch step or the score step is degrading — only that the pipeline completed.

Instrument each step of every composed pipeline individually:

import { Histogram, Counter } from 'prom-client';

const stepDuration = new Histogram({
  name: 'mcp_pipeline_step_duration_seconds',
  help: 'Duration of individual pipeline steps',
  labelNames: ['pipeline', 'step'],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],
});

const stepErrors = new Counter({
  name: 'mcp_pipeline_step_errors_total',
  help: 'Number of pipeline step failures',
  labelNames: ['pipeline', 'step', 'error_type'],
});

// Instrumented wrapper — wraps any step function with metrics
function instrumentStep<TIn, TOut>(
  pipelineName: string,
  stepName: string,
  fn: (input: TIn) => Promise<TOut>
): (input: TIn) => Promise<TOut> {
  return async (input: TIn) => {
    const end = stepDuration.startTimer({ pipeline: pipelineName, step: stepName });
    try {
      const result = await fn(input);
      end();
      return result;
    } catch (err) {
      end();
      const errorType = err instanceof StepError ? err.step : 'unknown';
      stepErrors.inc({ pipeline: pipelineName, step: stepName, error_type: errorType });
      throw err;
    }
  };
}

// Wrap each step at startup — zero per-call overhead after wrapping
const instrumentedFetch = instrumentStep('research', 'fetch', fetchStep);
const instrumentedParse = instrumentStep('research', 'parse', parseStep);
const instrumentedScore = (query: string) =>
  instrumentStep('research', 'score', (doc: ParsedDocument) => scoreStep(doc, query));

With per-step metrics in place, set up these alerts:

AliveMCP external probes catch the user-visible symptom: the composed tool returns an error or takes too long. Pair external probing with the per-step metrics above so you can go from "the probe is failing" to "the score step's p99 jumped from 200ms to 4s" without manual log triage. Configure the probe to assert on response structure — not just HTTP status — so it catches silent failures where the pipeline returns a 200 with success: false in the body.

See also: MCP server observability, structured logging for per-step trace context, and MCP server metrics for the full Prometheus setup.

Further reading