| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- #!/usr/bin/env python3
- """
- Cursor任务执行Agent
- 这个脚本会定期检查task-manager MCP中的pending任务,
- 并自动通过Cursor CLI或API触发任务执行。
- 使用方法:
- 1. 直接运行:python scripts/cursor_task_agent.py
- 2. 作为后台服务运行:python scripts/cursor_task_agent.py --daemon
- 3. 单次检查:python scripts/cursor_task_agent.py --once
- """
- import json
- import time
- import argparse
- import logging
- import sys
- from pathlib import Path
- from datetime import datetime
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('logs/cursor_task_agent.log'),
- logging.StreamHandler(sys.stdout)
- ]
- )
- logger = logging.getLogger('CursorTaskAgent')
- class CursorTaskAgent:
- """
- Cursor任务执行Agent
-
- 负责:
- 1. 定期从MCP获取pending任务
- 2. 为每个任务创建Cursor执行指令
- 3. 触发Cursor执行任务(通过创建临时提示文件)
- """
-
- def __init__(self, check_interval=300, workspace_path=None):
- """
- 初始化Agent
-
- Args:
- check_interval: 检查间隔(秒),默认300秒(5分钟)
- workspace_path: 工作区路径
- """
- self.check_interval = check_interval
- self.workspace_path = workspace_path or Path(__file__).parent.parent
- self.prompt_dir = self.workspace_path / '.cursor' / 'task_prompts'
- self.prompt_dir.mkdir(parents=True, exist_ok=True)
-
- logger.info(f"Cursor Task Agent initialized")
- logger.info(f"Workspace: {self.workspace_path}")
- logger.info(f"Prompt directory: {self.prompt_dir}")
- logger.info(f"Check interval: {self.check_interval}s")
-
- def get_pending_tasks_from_db(self):
- """
- 从数据库直接获取pending任务
-
- 注意:这里我们绕过MCP,直接连接数据库
- 因为MCP的轮询机制有限制
- """
- try:
- import psycopg2
- from psycopg2.extras import RealDictCursor
-
- # 从配置文件读取数据库连接信息
- config_file = self.workspace_path / 'mcp-servers' / 'task-manager' / 'config.json'
- with open(config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
-
- db_uri = config['database']['uri']
-
- # 连接数据库
- conn = psycopg2.connect(db_uri)
- cursor = conn.cursor(cursor_factory=RealDictCursor)
-
- # 查询pending任务
- cursor.execute("""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """)
-
- tasks = cursor.fetchall()
-
- cursor.close()
- conn.close()
-
- logger.info(f"Found {len(tasks)} pending tasks")
- return [dict(task) for task in tasks]
-
- except Exception as e:
- logger.error(f"Failed to get pending tasks from database: {e}")
- return []
-
- def create_task_prompt(self, task):
- """
- 为任务创建Cursor提示文件
-
- 这个文件会告诉用户有新任务需要执行
- """
- prompt_file = self.prompt_dir / f"task_{task['task_id']}.md"
-
- prompt_content = f"""# 🔔 新任务通知
- **任务ID**: {task['task_id']}
- **任务名称**: {task['task_name']}
- **创建时间**: {task['create_time']}
- **创建者**: {task['create_by']}
- ---
- ## 📋 任务描述
- {task['task_description']}
- ---
- ## 🚀 执行指令
- 请在Cursor中执行以下操作来完成此任务:
- ### 方式1:使用MCP工具(推荐)
- 在Cursor Chat中输入:
- ```
- @task-manager 请执行task_id={task['task_id']}的任务
- ```
- 或者直接使用工具:
- ```
- 调用工具: execute_task
- 参数: {{
- "task_id": {task['task_id']},
- "auto_complete": true
- }}
- ```
- ### 方式2:手动执行
- 1. 阅读上述任务描述
- 2. 根据描述开发相应的Python代码
- 3. 完成后调用update_task_status工具更新状态
- ---
- **⚠️ 注意**:任务完成后,此提示文件将自动删除。
- """
-
- # 写入提示文件
- with open(prompt_file, 'w', encoding='utf-8') as f:
- f.write(prompt_content)
-
- logger.info(f"Created task prompt: {prompt_file}")
- return prompt_file
-
- def check_and_notify_tasks(self):
- """
- 检查pending任务并创建通知
- """
- logger.info("Checking for pending tasks...")
-
- tasks = self.get_pending_tasks_from_db()
-
- if not tasks:
- logger.info("No pending tasks found")
- return 0
-
- # 为每个任务创建提示文件
- for task in tasks:
- try:
- prompt_file = self.create_task_prompt(task)
- logger.info(f"Task {task['task_id']} ({task['task_name']}) - prompt created at {prompt_file}")
- except Exception as e:
- logger.error(f"Failed to create prompt for task {task['task_id']}: {e}")
-
- return len(tasks)
-
- def run_once(self):
- """
- 执行一次检查
- """
- logger.info("=" * 60)
- logger.info("Running single check...")
- count = self.check_and_notify_tasks()
- logger.info(f"Check completed. Found {count} pending tasks.")
- logger.info("=" * 60)
- return count
-
- def run_daemon(self):
- """
- 作为守护进程运行,定期检查任务
- """
- logger.info("=" * 60)
- logger.info("Starting Cursor Task Agent in daemon mode...")
- logger.info(f"Will check for new tasks every {self.check_interval} seconds")
- logger.info("Press Ctrl+C to stop")
- logger.info("=" * 60)
-
- try:
- while True:
- try:
- count = self.check_and_notify_tasks()
- logger.info(f"Next check in {self.check_interval} seconds...")
- time.sleep(self.check_interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"Error during check: {e}")
- logger.info(f"Retrying in {self.check_interval} seconds...")
- time.sleep(self.check_interval)
- except KeyboardInterrupt:
- logger.info("\n" + "=" * 60)
- logger.info("Cursor Task Agent stopped by user")
- logger.info("=" * 60)
- def main():
- """
- 主函数
- """
- parser = argparse.ArgumentParser(
- description='Cursor任务执行Agent - 自动检查并通知pending任务'
- )
- parser.add_argument(
- '--once',
- action='store_true',
- help='只执行一次检查,不持续运行'
- )
- parser.add_argument(
- '--daemon',
- action='store_true',
- help='作为守护进程运行(与--once互斥)'
- )
- parser.add_argument(
- '--interval',
- type=int,
- default=300,
- help='检查间隔(秒),默认300秒(5分钟)'
- )
-
- args = parser.parse_args()
-
- # 创建logs目录
- logs_dir = Path(__file__).parent.parent / 'logs'
- logs_dir.mkdir(exist_ok=True)
-
- # 创建Agent实例
- agent = CursorTaskAgent(check_interval=args.interval)
-
- # 根据参数运行
- if args.once:
- agent.run_once()
- else:
- # 默认或明确指定daemon模式
- agent.run_daemon()
- if __name__ == '__main__':
- main()
|