#!/usr/bin/env python3 """ 自动任务执行核心调度脚本(简化版) 工作流程: 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务 2. 生成 .cursor/task_execute_instructions.md 执行指令文件 3. 更新任务状态为 processing,并维护 .cursor/pending_tasks.json 4. (可选)向 Cursor Chat 发送执行提醒 5. Cursor 完成任务后,将 pending_tasks.json 中的状态改为 completed 6. 调度脚本将 completed 状态的任务同步回数据库 使用方式: python scripts/auto_execute_tasks.py --once python scripts/auto_execute_tasks.py --interval 300 python scripts/auto_execute_tasks.py --once --enable-chat """ from __future__ import annotations import json import time import argparse import logging from pathlib import Path from datetime import datetime from typing import Any, Dict, List, Optional, Tuple # ============================================================================ # 日志配置 # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ============================================================================ # Windows GUI 自动化依赖(可选) # ============================================================================ HAS_CURSOR_GUI = False HAS_PYPERCLIP = False try: import win32gui import win32con import pyautogui pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 HAS_CURSOR_GUI = True try: import pyperclip HAS_PYPERCLIP = True except ImportError: pass except ImportError: logger.info( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui)," "将禁用自动 Cursor Chat 功能。" ) # ============================================================================ # 全局配置 # ============================================================================ WORKSPACE_ROOT = Path(__file__).parent.parent CURSOR_DIR = WORKSPACE_ROOT / ".cursor" PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json" INSTRUCTIONS_FILE = CURSOR_DIR / "task_execute_instructions.md" # 命令行参数控制的全局变量 ENABLE_CHAT: bool = False CHAT_MESSAGE: str = "请阅读 .cursor/task_execute_instructions.md 并执行任务。" CHAT_INPUT_POS: Optional[Tuple[int, int]] = None # ============================================================================ # 数据库操作 # ============================================================================ def get_db_connection(): """获取数据库连接""" try: import psycopg2 import sys sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config, current_env app_config = config.get(current_env, config['default']) db_uri = app_config.SQLALCHEMY_DATABASE_URI return psycopg2.connect(db_uri) except ImportError as e: logger.error(f"导入依赖失败: {e}") return None except Exception as e: logger.error(f"连接数据库失败: {e}") return None def get_pending_tasks() -> List[Dict[str, Any]]: """从数据库获取所有 pending 任务""" try: from psycopg2.extras import RealDictCursor conn = get_db_connection() if not conn: return [] cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """) tasks = cursor.fetchall() cursor.close() conn.close() return [dict(task) for task in tasks] except Exception as e: logger.error(f"获取 pending 任务失败: {e}") return [] def update_task_status( task_id: int, status: str, code_name: Optional[str] = None, code_path: Optional[str] = None, ) -> bool: """更新任务状态""" try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() if code_name and code_path: cursor.execute(""" UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id)) else: cursor.execute(""" UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id)) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ============================================================================ # 任务文件生成 # ============================================================================ def write_pending_tasks_json(tasks: List[Dict[str, Any]]) -> None: """将任务列表写入 .cursor/pending_tasks.json""" CURSOR_DIR.mkdir(parents=True, exist_ok=True) # 读取现有任务 existing_tasks = [] if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: existing_tasks = json.load(f) except Exception: existing_tasks = [] existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t} # 添加新任务 for task in tasks: if task["task_id"] not in existing_ids: create_time = task.get("create_time", "") if hasattr(create_time, "isoformat"): create_time = create_time.isoformat() task_info = { "task_id": task["task_id"], "task_name": task["task_name"], "task_description": task["task_description"], "code_path": task.get("code_path", ""), "code_name": task.get("code_name", ""), "status": "processing", "notified_at": datetime.now().isoformat(), } existing_tasks.append(task_info) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(existing_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}") def create_execute_instructions(tasks: List[Dict[str, Any]]) -> None: """生成任务执行指令文件 .cursor/task_execute_instructions.md""" CURSOR_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# 🤖 Cursor 自动任务执行指令\n\n") f.write("**⚠️ 重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 📋 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task['task_id'] task_name = task['task_name'] task_desc = task['task_description'] create_time = task.get("create_time", "") if hasattr(create_time, "strftime"): create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"## 🔴 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **创建时间**: {create_time}\n") f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n") f.write(f"### 📝 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}") # ============================================================================ # 状态同步 # ============================================================================ def sync_completed_tasks_to_db() -> int: """将 pending_tasks.json 中 completed 的任务同步到数据库""" if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as e: logger.error(f"读取 pending_tasks.json 失败: {e}") return 0 if not isinstance(tasks, list): return 0 updated = 0 remaining_tasks = [] for t in tasks: if t.get("status") == "completed": task_id = t.get("task_id") if not task_id: continue code_name = t.get("code_name") code_path = t.get("code_path") if update_task_status(task_id, "completed", code_name, code_path): updated += 1 logger.info(f"已同步任务 {task_id} 为 completed") else: remaining_tasks.append(t) else: remaining_tasks.append(t) if updated > 0: with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(remaining_tasks, f, indent=2, ensure_ascii=False) logger.info(f"本次共同步 {updated} 个 completed 任务到数据库") return updated # ============================================================================ # Cursor Chat 自动化 # ============================================================================ def find_cursor_window() -> Optional[int]: """查找 Cursor 主窗口句柄""" if not HAS_CURSOR_GUI: return None cursor_windows: List[Dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = "cursor" in title.lower() if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) area = (right - left) * (bottom - top) cursor_windows.append({"hwnd": hwnd, "area": area}) return True win32gui.EnumWindows(enum_windows_callback, None) if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None cursor_windows.sort(key=lambda x: x["area"], reverse=True) return cursor_windows[0]["hwnd"] def send_chat_message( message: str, input_pos: Optional[Tuple[int, int]] ) -> bool: """在 Cursor Chat 中发送消息""" if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.3) win32gui.SetForegroundWindow(hwnd) time.sleep(0.5) except Exception as e: logger.error(f"激活 Cursor 窗口失败: {e}") return False # 点击输入框或使用快捷键 if input_pos: x, y = input_pos pyautogui.click(x, y) time.sleep(0.4) else: pyautogui.hotkey("ctrl", "l") time.sleep(1.0) pyautogui.hotkey("ctrl", "a") time.sleep(0.2) # 输入消息 if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) pyautogui.press("enter") logger.info("✅ 消息已发送到 Cursor Chat") return True def send_chat_for_tasks() -> None: """向 Cursor Chat 发送任务提醒""" if not ENABLE_CHAT: return if not PENDING_TASKS_FILE.exists(): return try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: data = json.load(f) if not any(t.get("status") == "processing" for t in data): return except Exception: return logger.info("发送任务提醒到 Cursor Chat...") send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS) # ============================================================================ # 主执行流程 # ============================================================================ def auto_execute_tasks_once() -> int: """执行一次任务检查和处理""" # 1. 先同步已完成任务到数据库 sync_completed_tasks_to_db() # 2. 获取 pending 任务 logger.info("🔍 检查 pending 任务...") tasks = get_pending_tasks() if not tasks: logger.info("✅ 没有 pending 任务") return 0 logger.info(f"📋 找到 {len(tasks)} 个 pending 任务") # 3. 更新任务状态为 processing for task in tasks: update_task_status(task["task_id"], "processing") # 4. 写入 pending_tasks.json write_pending_tasks_json(tasks) # 5. 生成执行指令文件 create_execute_instructions(tasks) return len(tasks) def auto_execute_tasks_loop(interval: int = 300) -> None: """循环执行任务检查""" logger.info("=" * 60) logger.info("🚀 自动任务执行服务已启动") logger.info(f"⏰ 检查间隔: {interval} 秒") logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) try: while True: try: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks() logger.info(f"✅ 已处理 {count} 个任务") logger.info(f"⏳ {interval} 秒后再次检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") time.sleep(interval) except KeyboardInterrupt: logger.info("\n⛔ 服务已停止") def main() -> None: """主函数""" parser = argparse.ArgumentParser( description="自动任务执行调度脚本", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--once", action="store_true", help="只执行一次" ) parser.add_argument( "--interval", type=int, default=300, help="检查间隔(秒)" ) parser.add_argument( "--enable-chat", action="store_true", help="启用自动 Cursor Chat" ) parser.add_argument( "--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"' ) parser.add_argument( "--chat-message", type=str, default="请阅读 .cursor/task_execute_instructions.md 并执行任务。", help="发送到 Chat 的消息" ) args = parser.parse_args() global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE ENABLE_CHAT = bool(args.enable_chat) CHAT_MESSAGE = args.chat_message if args.chat_input_pos: try: x, y = args.chat_input_pos.split(",") CHAT_INPUT_POS = (int(x.strip()), int(y.strip())) except Exception: pass if args.once: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks() logger.info(f"✅ 完成!处理了 {count} 个任务") else: auto_execute_tasks_loop(interval=args.interval) if __name__ == "__main__": main()