import type { Logger } from '../logger.js'; import type { McpToolDefinition, ToolContext } from '../types.js'; import { Buffer } from 'node:buffer'; import process from 'node:process'; import PQueue from 'p-queue'; interface JsonRpcRequest { jsonrpc: '2.0'; id?: null | number | string; method: string; params?: Record; } interface JsonRpcSuccess { jsonrpc: '2.0'; id: null | number | string; result: unknown; } interface JsonRpcError { jsonrpc: '2.0'; id: null | number | string; error: { code: number; message: string; }; } export interface McpServerOptions { name: string; version: string; description: string; tools: McpToolDefinition[]; logger: Logger; concurrency?: number; } export class McpServer { private buffer = ''; private expectedLength: null | number = null; private initialized = false; private readonly metadata: Array>; private readonly options: McpServerOptions; private readonly queue: PQueue; private readonly tools: Map; constructor(options: McpServerOptions) { this.options = options; this.tools = new Map(); this.metadata = []; for (const tool of options.tools) { if (this.tools.has(tool.name)) { throw new Error(`Duplicate MCP tool name: ${tool.name}`); } this.tools.set(tool.name, tool); this.metadata.push({ name: tool.name, description: tool.description, inputSchema: tool.inputSchema, ...(tool.outputSchema ? { outputSchema: tool.outputSchema } : {}), }); } this.queue = new PQueue({ concurrency: options.concurrency ?? 4 }); } start() { process.stdin.setEncoding('utf8'); process.stdin.on('data', (chunk) => { this.buffer += chunk; void this.drain(); }); process.stdin.on('end', () => { this.log('stdin ended'); }); process.stdin.on('close', () => { this.log('stdin closed'); }); process.stdin.resume(); this.log('MCP service ready'); } private assertInitialized(method: string) { if (!this.initialized) { throw new Error(`Received ${method} before initialize`); } } private async dispatch(request: JsonRpcRequest) { switch (request.method) { case 'initialize': { if (this.initialized) { this.respondError(request.id ?? null, -32_600, 'Already initialized'); return; } this.initialized = true; this.respond(request.id ?? null, { protocolVersion: '2024-10-07', capabilities: { tools: { list: true, call: true } }, service: { name: this.options.name, version: this.options.version, description: this.options.description, }, }); this.notify('notifications/ready', {}); return; } case 'ping': { this.respond(request.id ?? null, 'pong'); return; } case 'shutdown': { this.respond(request.id ?? null, null); process.exitCode = 0; process.nextTick(() => { process.stdin.pause(); this.log('shutdown signal received'); }); return; } case 'tools/call': { this.assertInitialized('tools/call'); const params = request.params ?? {}; const toolName = params.name; if (!toolName || typeof toolName !== 'string') { this.respondError( request.id ?? null, -32_602, 'Tool name is required', ); return; } const tool = this.tools.get(toolName); if (!tool) { this.respondError( request.id ?? null, -32_601, `Unknown tool: ${toolName}`, ); return; } await this.queue.add(async () => { try { const args = (params.arguments ?? {}) as Record; const context: ToolContext = { logger: this.options.logger }; const result = await tool.handler(args, context); this.respond(request.id ?? null, result); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.respondError(request.id ?? null, -32_001, message); } }); return; } case 'tools/list': { this.assertInitialized('tools/list'); this.respond(request.id ?? null, { tools: this.metadata }); return; } default: { this.respondError( request.id ?? null, -32_601, `Method not found: ${request.method}`, ); } } } private async drain() { while (true) { if (this.expectedLength === null) { const headerEnd = this.buffer.indexOf('\r\n\r\n'); if (headerEnd === -1) return; const header = this.buffer.slice(0, headerEnd); const match = header.match(/content-length:\s*(\d+)/i); if (!match) { this.buffer = this.buffer.slice(headerEnd + 4); continue; } const lengthHeader = match[1]; if (!lengthHeader) { this.buffer = this.buffer.slice(headerEnd + 4); continue; } this.expectedLength = Number.parseInt(lengthHeader, 10); this.buffer = this.buffer.slice(headerEnd + 4); } if (this.buffer.length < (this.expectedLength ?? 0)) return; const body = this.buffer.slice(0, this.expectedLength ?? 0); this.buffer = this.buffer.slice(this.expectedLength ?? 0); this.expectedLength = null; await this.handleMessage(body); } } private async handleMessage(payload: string) { let request: JsonRpcRequest | null = null; try { request = JSON.parse(payload) as JsonRpcRequest; } catch { this.respondError(null, -32_700, 'Parse error'); return; } if ( !request || request.jsonrpc !== '2.0' || typeof request.method !== 'string' ) { this.respondError(request?.id ?? null, -32_600, 'Invalid Request'); return; } try { await this.dispatch(request); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.log(`Unexpected error: ${message}`); this.respondError(request.id ?? null, -32_000, message); } } private log(message: string) { this.options.logger.debug({ scope: 'mcp-server' }, message); } private notify(method: string, params: Record) { this.write({ jsonrpc: '2.0', method, params }); } private respond(id: JsonRpcSuccess['id'], result: unknown) { if (id === undefined) return; this.write({ jsonrpc: '2.0', id, result }); } private respondError(id: JsonRpcError['id'], code: number, message: string) { if (id === undefined) return; this.write({ jsonrpc: '2.0', id, error: { code, message } }); } private write(payload: JsonRpcError | JsonRpcRequest | JsonRpcSuccess) { const json = JSON.stringify(payload); const frame = `Content-Length: ${Buffer.byteLength(json, 'utf8')}\r\n\r\n${json}`; process.stdout.write(frame); } }