import pg from 'pg'; import dotenv from 'dotenv'; import { readFileSync } from 'fs'; import { fileURLToPath } from 'url'; import { dirname, join } from 'path'; import { info, error } from './logger.js'; dotenv.config(); const { Pool } = pg; let pool = null; /** * 从task-manager独立配置文件中读取数据库URI */ function getDatabaseUrlFromConfig() { try { // 获取当前文件的目录 const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(__filename); // 配置文件路径 const configPath = join(__dirname, 'config.json'); // 读取配置文件 const configFile = readFileSync(configPath, 'utf-8'); const config = JSON.parse(configFile); // 获取数据库URI const databaseUrl = config?.database?.uri; if (databaseUrl) { info('Database URL loaded from task-manager config.json: ' + databaseUrl.replace(/:[^:@]+@/, ':****@')); return databaseUrl; } else { throw new Error('database.uri not found in config.json'); } } catch (err) { error('Error reading config.json: ' + err.message); // 如果读取失败,回退到环境变量 return null; } } /** * 初始化数据库连接池 */ function initPool() { if (pool) { return pool; } // 优先从task-manager独立配置文件读取 let databaseUrl = getDatabaseUrlFromConfig(); // 如果配置文件读取失败,尝试从环境变量读取 if (!databaseUrl) { databaseUrl = process.env.DATABASE_URL || process.env.SQLALCHEMY_DATABASE_URI; if (databaseUrl) { info('Database URL loaded from environment variable'); } } if (!databaseUrl) { throw new Error('Database URL not found. Please check mcp-servers/task-manager/config.json or set DATABASE_URL environment variable'); } // 记录实际使用的数据库URI(隐藏密码) const maskedUrl = databaseUrl.replace(/:[^:@]+@/, ':****@'); info('Initializing database pool with URI: ' + maskedUrl); pool = new Pool({ connectionString: databaseUrl, max: 10, idleTimeoutMillis: 30000, connectionTimeoutMillis: 2000, }); pool.on('error', (err) => { error('Unexpected error on idle client: ' + err.message); }); return pool; } /** * 获取数据库连接池 */ export function getPool() { if (!pool) { return initPool(); } return pool; } /** * 获取所有待处理的任务 */ export async function getPendingTasks() { info('[Task Manager] Starting to read task_list table...'); const startTime = Date.now(); const client = await getPool().connect(); try { const result = await client.query( `SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, update_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC` ); const taskCount = result.rows.length; const duration = Date.now() - startTime; info(`[Task Manager] Finished reading task_list table. Found ${taskCount} pending task(s) in ${duration}ms`); return result.rows; } finally { client.release(); } } /** * 根据ID获取任务 */ export async function getTaskById(taskId) { const client = await getPool().connect(); try { const result = await client.query( `SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, update_time, create_by FROM task_list WHERE task_id = $1`, [taskId] ); return result.rows[0] || null; } finally { client.release(); } } /** * 更新任务状态 */ export async function updateTaskStatus(taskId, status, codeName = null, codePath = null) { const client = await getPool().connect(); try { const updates = ['status = $1', 'update_time = CURRENT_TIMESTAMP']; const values = [status]; let paramIndex = 1; if (codeName !== null) { paramIndex++; updates.push(`code_name = $${paramIndex}`); values.push(codeName); } if (codePath !== null) { paramIndex++; updates.push(`code_path = $${paramIndex}`); values.push(codePath); } // taskId is always the last parameter paramIndex++; values.push(taskId); const query = ` UPDATE task_list SET ${updates.join(', ')} WHERE task_id = $${paramIndex} RETURNING task_id, task_name, status, update_time `; const result = await client.query(query, values); return result.rows[0]; } finally { client.release(); } } /** * 创建新任务 */ export async function createTask(taskName, taskDescription, createBy) { const client = await getPool().connect(); try { const result = await client.query( `INSERT INTO task_list (task_name, task_description, status, create_by) VALUES ($1, $2, 'pending', $3) RETURNING task_id, task_name, task_description, status, create_time, create_by`, [taskName, taskDescription, createBy] ); return result.rows[0]; } finally { client.release(); } } /** * 获取所有任务(用于调试) */ export async function getAllTasks(limit = 100) { const client = await getPool().connect(); try { const result = await client.query( `SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, update_time, create_by FROM task_list ORDER BY create_time DESC LIMIT $1`, [limit] ); return result.rows; } finally { client.release(); } } /** * 关闭数据库连接池 */ export async function closePool() { if (pool) { await pool.end(); pool = null; } }