#!/usr/bin/env python3 """ Cursor任务执行Agent 这个脚本会定期检查task-manager MCP中的pending任务, 并自动通过Cursor CLI或API触发任务执行。 使用方法: 1. 直接运行:python scripts/cursor_task_agent.py 2. 作为后台服务运行:python scripts/cursor_task_agent.py --daemon 3. 单次检查:python scripts/cursor_task_agent.py --once """ import json import time import argparse import logging import sys from pathlib import Path from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/cursor_task_agent.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger('CursorTaskAgent') class CursorTaskAgent: """ Cursor任务执行Agent 负责: 1. 定期从MCP获取pending任务 2. 为每个任务创建Cursor执行指令 3. 触发Cursor执行任务(通过创建临时提示文件) """ def __init__(self, check_interval=300, workspace_path=None): """ 初始化Agent Args: check_interval: 检查间隔(秒),默认300秒(5分钟) workspace_path: 工作区路径 """ self.check_interval = check_interval self.workspace_path = workspace_path or Path(__file__).parent.parent self.prompt_dir = self.workspace_path / '.cursor' / 'task_prompts' self.prompt_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Cursor Task Agent initialized") logger.info(f"Workspace: {self.workspace_path}") logger.info(f"Prompt directory: {self.prompt_dir}") logger.info(f"Check interval: {self.check_interval}s") def get_pending_tasks_from_db(self): """ 从数据库直接获取pending任务 注意:这里我们绕过MCP,直接连接数据库 因为MCP的轮询机制有限制 """ try: import psycopg2 from psycopg2.extras import RealDictCursor # 从配置文件读取数据库连接信息 config_file = self.workspace_path / 'mcp-servers' / 'task-manager' / 'config.json' with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) db_uri = config['database']['uri'] # 连接数据库 conn = psycopg2.connect(db_uri) cursor = conn.cursor(cursor_factory=RealDictCursor) # 查询pending任务 cursor.execute(""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """) tasks = cursor.fetchall() cursor.close() conn.close() logger.info(f"Found {len(tasks)} pending tasks") return [dict(task) for task in tasks] except Exception as e: logger.error(f"Failed to get pending tasks from database: {e}") return [] def create_task_prompt(self, task): """ 为任务创建Cursor提示文件 这个文件会告诉用户有新任务需要执行 """ prompt_file = self.prompt_dir / f"task_{task['task_id']}.md" prompt_content = f"""# 🔔 新任务通知 **任务ID**: {task['task_id']} **任务名称**: {task['task_name']} **创建时间**: {task['create_time']} **创建者**: {task['create_by']} --- ## 📋 任务描述 {task['task_description']} --- ## 🚀 执行指令 请在Cursor中执行以下操作来完成此任务: ### 方式1:使用MCP工具(推荐) 在Cursor Chat中输入: ``` @task-manager 请执行task_id={task['task_id']}的任务 ``` 或者直接使用工具: ``` 调用工具: execute_task 参数: {{ "task_id": {task['task_id']}, "auto_complete": true }} ``` ### 方式2:手动执行 1. 阅读上述任务描述 2. 根据描述开发相应的Python代码 3. 完成后调用update_task_status工具更新状态 --- **⚠️ 注意**:任务完成后,此提示文件将自动删除。 """ # 写入提示文件 with open(prompt_file, 'w', encoding='utf-8') as f: f.write(prompt_content) logger.info(f"Created task prompt: {prompt_file}") return prompt_file def check_and_notify_tasks(self): """ 检查pending任务并创建通知 """ logger.info("Checking for pending tasks...") tasks = self.get_pending_tasks_from_db() if not tasks: logger.info("No pending tasks found") return 0 # 为每个任务创建提示文件 for task in tasks: try: prompt_file = self.create_task_prompt(task) logger.info(f"Task {task['task_id']} ({task['task_name']}) - prompt created at {prompt_file}") except Exception as e: logger.error(f"Failed to create prompt for task {task['task_id']}: {e}") return len(tasks) def run_once(self): """ 执行一次检查 """ logger.info("=" * 60) logger.info("Running single check...") count = self.check_and_notify_tasks() logger.info(f"Check completed. Found {count} pending tasks.") logger.info("=" * 60) return count def run_daemon(self): """ 作为守护进程运行,定期检查任务 """ logger.info("=" * 60) logger.info("Starting Cursor Task Agent in daemon mode...") logger.info(f"Will check for new tasks every {self.check_interval} seconds") logger.info("Press Ctrl+C to stop") logger.info("=" * 60) try: while True: try: count = self.check_and_notify_tasks() logger.info(f"Next check in {self.check_interval} seconds...") time.sleep(self.check_interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"Error during check: {e}") logger.info(f"Retrying in {self.check_interval} seconds...") time.sleep(self.check_interval) except KeyboardInterrupt: logger.info("\n" + "=" * 60) logger.info("Cursor Task Agent stopped by user") logger.info("=" * 60) def main(): """ 主函数 """ parser = argparse.ArgumentParser( description='Cursor任务执行Agent - 自动检查并通知pending任务' ) parser.add_argument( '--once', action='store_true', help='只执行一次检查,不持续运行' ) parser.add_argument( '--daemon', action='store_true', help='作为守护进程运行(与--once互斥)' ) parser.add_argument( '--interval', type=int, default=300, help='检查间隔(秒),默认300秒(5分钟)' ) args = parser.parse_args() # 创建logs目录 logs_dir = Path(__file__).parent.parent / 'logs' logs_dir.mkdir(exist_ok=True) # 创建Agent实例 agent = CursorTaskAgent(check_interval=args.interval) # 根据参数运行 if args.once: agent.run_once() else: # 默认或明确指定daemon模式 agent.run_daemon() if __name__ == '__main__': main()