| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- import pg from 'pg';
- import dotenv from 'dotenv';
- import { readFileSync } from 'fs';
- import { fileURLToPath } from 'url';
- import { dirname, join } from 'path';
- import { info, error } from './logger.js';
- dotenv.config();
- const { Pool } = pg;
- let pool = null;
- /**
- * 从task-manager独立配置文件中读取数据库URI
- */
- function getDatabaseUrlFromConfig() {
- try {
- // 获取当前文件的目录
- const __filename = fileURLToPath(import.meta.url);
- const __dirname = dirname(__filename);
-
- // 配置文件路径
- const configPath = join(__dirname, 'config.json');
-
- // 读取配置文件
- const configFile = readFileSync(configPath, 'utf-8');
- const config = JSON.parse(configFile);
-
- // 获取数据库URI
- const databaseUrl = config?.database?.uri;
-
- if (databaseUrl) {
- info('Database URL loaded from task-manager config.json: ' + databaseUrl.replace(/:[^:@]+@/, ':****@'));
- return databaseUrl;
- } else {
- throw new Error('database.uri not found in config.json');
- }
- } catch (err) {
- error('Error reading config.json: ' + err.message);
- // 如果读取失败,回退到环境变量
- return null;
- }
- }
- /**
- * 初始化数据库连接池
- */
- function initPool() {
- if (pool) {
- return pool;
- }
- // 优先从task-manager独立配置文件读取
- let databaseUrl = getDatabaseUrlFromConfig();
-
- // 如果配置文件读取失败,尝试从环境变量读取
- if (!databaseUrl) {
- databaseUrl = process.env.DATABASE_URL || process.env.SQLALCHEMY_DATABASE_URI;
- if (databaseUrl) {
- info('Database URL loaded from environment variable');
- }
- }
-
- if (!databaseUrl) {
- throw new Error('Database URL not found. Please check mcp-servers/task-manager/config.json or set DATABASE_URL environment variable');
- }
- // 记录实际使用的数据库URI(隐藏密码)
- const maskedUrl = databaseUrl.replace(/:[^:@]+@/, ':****@');
- info('Initializing database pool with URI: ' + maskedUrl);
- pool = new Pool({
- connectionString: databaseUrl,
- max: 10,
- idleTimeoutMillis: 30000,
- connectionTimeoutMillis: 2000,
- });
- pool.on('error', (err) => {
- error('Unexpected error on idle client: ' + err.message);
- });
- return pool;
- }
- /**
- * 获取数据库连接池
- */
- export function getPool() {
- if (!pool) {
- return initPool();
- }
- return pool;
- }
- /**
- * 获取所有待处理的任务
- */
- export async function getPendingTasks() {
- info('[Task Manager] Starting to read task_list table...');
- const startTime = Date.now();
-
- const client = await getPool().connect();
- try {
- const result = await client.query(
- `SELECT task_id, task_name, task_description, status, code_name, code_path,
- create_time, update_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC`
- );
-
- const taskCount = result.rows.length;
- const duration = Date.now() - startTime;
- info(`[Task Manager] Finished reading task_list table. Found ${taskCount} pending task(s) in ${duration}ms`);
-
- return result.rows;
- } finally {
- client.release();
- }
- }
- /**
- * 根据ID获取任务
- */
- export async function getTaskById(taskId) {
- const client = await getPool().connect();
- try {
- const result = await client.query(
- `SELECT task_id, task_name, task_description, status, code_name, code_path,
- create_time, update_time, create_by
- FROM task_list
- WHERE task_id = $1`,
- [taskId]
- );
- return result.rows[0] || null;
- } finally {
- client.release();
- }
- }
- /**
- * 更新任务状态
- */
- export async function updateTaskStatus(taskId, status, codeName = null, codePath = null) {
- const client = await getPool().connect();
- try {
- const updates = ['status = $1', 'update_time = CURRENT_TIMESTAMP'];
- const values = [status];
- let paramIndex = 1;
- if (codeName !== null) {
- paramIndex++;
- updates.push(`code_name = $${paramIndex}`);
- values.push(codeName);
- }
- if (codePath !== null) {
- paramIndex++;
- updates.push(`code_path = $${paramIndex}`);
- values.push(codePath);
- }
- // taskId is always the last parameter
- paramIndex++;
- values.push(taskId);
- const query = `
- UPDATE task_list
- SET ${updates.join(', ')}
- WHERE task_id = $${paramIndex}
- RETURNING task_id, task_name, status, update_time
- `;
- const result = await client.query(query, values);
- return result.rows[0];
- } finally {
- client.release();
- }
- }
- /**
- * 创建新任务
- */
- export async function createTask(taskName, taskDescription, createBy) {
- const client = await getPool().connect();
- try {
- const result = await client.query(
- `INSERT INTO task_list (task_name, task_description, status, create_by)
- VALUES ($1, $2, 'pending', $3)
- RETURNING task_id, task_name, task_description, status, create_time, create_by`,
- [taskName, taskDescription, createBy]
- );
- return result.rows[0];
- } finally {
- client.release();
- }
- }
- /**
- * 获取所有任务(用于调试)
- */
- export async function getAllTasks(limit = 100) {
- const client = await getPool().connect();
- try {
- const result = await client.query(
- `SELECT task_id, task_name, task_description, status, code_name, code_path,
- create_time, update_time, create_by
- FROM task_list
- ORDER BY create_time DESC
- LIMIT $1`,
- [limit]
- );
- return result.rows;
- } finally {
- client.release();
- }
- }
- /**
- * 关闭数据库连接池
- */
- export async function closePool() {
- if (pool) {
- await pool.end();
- pool = null;
- }
- }
|