#!/usr/bin/env node import { Server } from '@modelcontextprotocol/sdk/server/index.js'; import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; import { CallToolRequestSchema, ListToolsRequestSchema, } from '@modelcontextprotocol/sdk/types.js'; import { getPendingTasks, getTaskById, updateTaskStatus, createTask, getAllTasks, closePool, } from './database.js'; import { processTask } from './task-processor.js'; import { info, error, setMcpServer } from './logger.js'; /** * Task Manager MCP Server * * 提供任务管理功能,从PostgreSQL数据库读取任务, * 通过MCP协议与Cursor交互执行代码开发任务。 */ class TaskManagerServer { constructor() { this.server = new Server( { name: 'task-manager', version: '1.0.0', }, { capabilities: { tools: {}, logging: {}, // 启用日志通知功能 }, } ); this.setupHandlers(); this.pollingInterval = null; this.isProcessing = false; // 将 Server 实例传递给 logger,以便通过 MCP 协议发送日志 setMcpServer(this.server); } setupHandlers() { // 列出所有可用工具 this.server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [ { name: 'get_pending_tasks', description: '获取所有待处理的任务列表', inputSchema: { type: 'object', properties: {}, }, }, { name: 'get_task_by_id', description: '根据任务ID获取任务详情', inputSchema: { type: 'object', properties: { task_id: { type: 'number', description: '任务ID', }, }, required: ['task_id'], }, }, { name: 'execute_task', description: '执行单个任务,将任务描述发送给Cursor进行代码开发', inputSchema: { type: 'object', properties: { task_id: { type: 'number', description: '要执行的任务ID', }, }, required: ['task_id'], }, }, { name: 'update_task_status', description: '更新任务状态', inputSchema: { type: 'object', properties: { task_id: { type: 'number', description: '任务ID', }, status: { type: 'string', enum: ['pending', 'processing', 'completed', 'failed'], description: '新状态', }, code_name: { type: 'string', description: '生成的代码文件名(可选)', }, code_path: { type: 'string', description: '代码文件路径(可选)', }, }, required: ['task_id', 'status'], }, }, { name: 'process_all_tasks', description: '批量处理所有待处理的任务', inputSchema: { type: 'object', properties: { auto_poll: { type: 'boolean', description: '是否启用自动轮询(每5分钟检查一次新任务)', default: false, }, }, }, }, { name: 'create_task', description: '创建新任务', inputSchema: { type: 'object', properties: { task_name: { type: 'string', description: '任务名称', }, task_description: { type: 'string', description: '任务描述(markdown格式)', }, create_by: { type: 'string', description: '创建者', }, }, required: ['task_name', 'task_description', 'create_by'], }, }, { name: 'get_all_tasks', description: '获取所有任务(用于调试)', inputSchema: { type: 'object', properties: { limit: { type: 'number', description: '返回的任务数量限制', default: 100, }, }, }, }, ], })); // 处理工具调用 this.server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; try { switch (name) { case 'get_pending_tasks': { const tasks = await getPendingTasks(); return { content: [ { type: 'text', text: JSON.stringify(tasks, null, 2), }, ], }; } case 'get_task_by_id': { const task = await getTaskById(args.task_id); if (!task) { return { content: [ { type: 'text', text: `Task with ID ${args.task_id} not found`, }, ], isError: true, }; } return { content: [ { type: 'text', text: JSON.stringify(task, null, 2), }, ], }; } case 'execute_task': { info(`[Task Manager] Executing task ${args.task_id}...`); const task = await getTaskById(args.task_id); if (!task) { error(`[Task Manager] Task ${args.task_id} not found`); return { content: [ { type: 'text', text: `Task with ID ${args.task_id} not found`, }, ], isError: true, }; } if (task.status !== 'pending') { error(`[Task Manager] Task ${args.task_id} is not in pending status. Current status: ${task.status}`); return { content: [ { type: 'text', text: `Task ${args.task_id} is not in pending status. Current status: ${task.status}`, }, ], isError: true, }; } // 更新状态为处理中 await updateTaskStatus(args.task_id, 'processing'); info(`[Task Manager] Task ${args.task_id} status updated to 'processing'`); // 处理任务 const result = await processTask(task); if (result.success) { info(`[Task Manager] Task ${args.task_id} executed successfully. Code: ${result.code_name || 'N/A'}, Path: ${result.code_path || 'N/A'}`); } else { error(`[Task Manager] Task ${args.task_id} execution failed: ${result.message || 'Unknown error'}`); } return { content: [ { type: 'text', text: JSON.stringify(result, null, 2), }, ], }; } case 'update_task_status': { const updated = await updateTaskStatus( args.task_id, args.status, args.code_name || null, args.code_path || null ); return { content: [ { type: 'text', text: JSON.stringify(updated, null, 2), }, ], }; } case 'process_all_tasks': { const result = await this.processAllTasks(args.auto_poll || false); return { content: [ { type: 'text', text: JSON.stringify(result, null, 2), }, ], }; } case 'create_task': { const task = await createTask( args.task_name, args.task_description, args.create_by ); return { content: [ { type: 'text', text: JSON.stringify(task, null, 2), }, ], }; } case 'get_all_tasks': { const tasks = await getAllTasks(args.limit || 100); return { content: [ { type: 'text', text: JSON.stringify(tasks, null, 2), }, ], }; } default: return { content: [ { type: 'text', text: `Unknown tool: ${name}`, }, ], isError: true, }; } } catch (error) { return { content: [ { type: 'text', text: `Error executing tool ${name}: ${error.message}\n${error.stack}`, }, ], isError: true, }; } }); } /** * 处理所有待处理的任务 */ async processAllTasks(autoPoll = false) { if (this.isProcessing) { info('[Task Manager] Task processing is already in progress, skipping...'); return { success: false, message: 'Task processing is already in progress', }; } info('[Task Manager] Starting to process all pending tasks...'); const processStartTime = Date.now(); this.isProcessing = true; const results = { processed: 0, succeeded: 0, failed: 0, tasks: [], }; try { const tasks = await getPendingTasks(); if (tasks.length === 0) { info('[Task Manager] No pending tasks found, skipping processing.'); return { success: true, message: 'No pending tasks to process', ...results, }; } info(`[Task Manager] Processing ${tasks.length} task(s)...`); for (const task of tasks) { try { info(`[Task Manager] Processing task ${task.task_id}: ${task.task_name}`); // 更新状态为处理中 await updateTaskStatus(task.task_id, 'processing'); // 处理任务 const result = await processTask(task); results.processed++; if (result.success) { results.succeeded++; await updateTaskStatus( task.task_id, 'completed', result.code_name, result.code_path ); info(`[Task Manager] Task ${task.task_id} completed successfully. Code: ${result.code_name || 'N/A'}, Path: ${result.code_path || 'N/A'}`); } else { results.failed++; await updateTaskStatus(task.task_id, 'failed'); error(`[Task Manager] Task ${task.task_id} failed: ${result.message || 'Unknown error'}`); } results.tasks.push({ task_id: task.task_id, task_name: task.task_name, success: result.success, message: result.message, }); } catch (err) { results.failed++; await updateTaskStatus(task.task_id, 'failed'); error(`[Task Manager] Task ${task.task_id} failed with error: ${err.message}`); results.tasks.push({ task_id: task.task_id, task_name: task.task_name, success: false, error: err.message, }); } } const processDuration = Date.now() - processStartTime; info(`[Task Manager] Task processing completed. Processed: ${results.processed}, Succeeded: ${results.succeeded}, Failed: ${results.failed}, Duration: ${processDuration}ms`); // 如果启用自动轮询,启动轮询机制 if (autoPoll && !this.pollingInterval) { this.startPolling(); } return { success: true, message: `Processed ${results.processed} tasks. Succeeded: ${results.succeeded}, Failed: ${results.failed}`, ...results, }; } finally { this.isProcessing = false; } } /** * 启动轮询机制 */ startPolling() { const pollInterval = parseInt(process.env.POLL_INTERVAL || '300000', 10); // 默认5分钟 if (this.pollingInterval) { clearInterval(this.pollingInterval); } this.pollingInterval = setInterval(async () => { if (!this.isProcessing) { try { const tasks = await getPendingTasks(); if (tasks.length > 0) { info(`Found ${tasks.length} new pending tasks, processing...`); await this.processAllTasks(false); } } catch (err) { error('Error during polling: ' + err.message); } } }, pollInterval); info(`Polling started with interval: ${pollInterval}ms (${pollInterval / 1000}s)`); } /** * 停止轮询机制 */ stopPolling() { if (this.pollingInterval) { clearInterval(this.pollingInterval); this.pollingInterval = null; info('Polling stopped'); } } /** * 启动服务器 */ async start() { const transport = new StdioServerTransport(); await this.server.connect(transport); info('Task Manager MCP Server started'); // 自动启动轮询机制(如果环境变量 AUTO_START_POLLING 为 true 或未设置) const autoStartPolling = process.env.AUTO_START_POLLING !== 'false'; if (autoStartPolling) { // 延迟启动,等待数据库连接就绪 setTimeout(() => { this.startPolling(); // 立即检查一次待处理任务 this.processAllTasks(false).catch((err) => { error('Error during initial task processing: ' + err.message); }); }, 2000); // 2秒后启动 } } /** * 关闭服务器 */ async close() { this.stopPolling(); await closePool(); // MCP SDK会自动处理连接关闭 } } // 启动服务器 const server = new TaskManagerServer(); // 处理进程退出 process.on('SIGINT', async () => { await server.close(); process.exit(0); }); process.on('SIGTERM', async () => { await server.close(); process.exit(0); }); server.start().catch((err) => { error('Failed to start server: ' + err.message); process.exit(1); });