#!/usr/bin/env python3 """ 自动任务执行核心调度脚本 工作流程: 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务 2. 生成 .cursor/task_execute_instructions.md 执行指令文件 3. 更新任务状态为 processing,并维护 .cursor/pending_tasks.json 4. 更新 .cursor/task_trigger.txt 触发器文件 5. (可选)向 Cursor Chat 发送执行提醒 6. Cursor 完成任务后,将 pending_tasks.json 中的状态改为 completed 7. 调度脚本将 completed 状态的任务同步回数据库 使用方式: # 执行一次任务检查 python scripts/auto_execute_tasks.py --once # 循环模式(每 5 分钟检查) python scripts/auto_execute_tasks.py --interval 300 # 刷新触发器文件(用于现有 processing 任务) python scripts/auto_execute_tasks.py --refresh-trigger # 立即发送 Chat 消息触发 Cursor 执行 python scripts/auto_execute_tasks.py --send-chat-now # 启动 Chat 自动触发循环(每 60 秒检查,有任务时自动发送消息) python scripts/auto_execute_tasks.py --chat-loop --chat-interval 60 """ from __future__ import annotations import json import time import argparse import logging from pathlib import Path from datetime import datetime from typing import Any, Dict, List, Optional, Tuple # ============================================================================ # 日志配置 # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ============================================================================ # Windows GUI 自动化依赖(可选) # ============================================================================ HAS_CURSOR_GUI = False HAS_PYPERCLIP = False try: import win32gui import win32con import pyautogui pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 HAS_CURSOR_GUI = True try: import pyperclip HAS_PYPERCLIP = True except ImportError: pass except ImportError: logger.info( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui)," "将禁用自动 Cursor Chat 功能。" ) # ============================================================================ # 全局配置 # ============================================================================ WORKSPACE_ROOT = Path(__file__).parent.parent CURSOR_DIR = WORKSPACE_ROOT / ".cursor" PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json" INSTRUCTIONS_FILE = CURSOR_DIR / "task_execute_instructions.md" TRIGGER_FILE = CURSOR_DIR / "task_trigger.txt" # 命令行参数控制的全局变量 ENABLE_CHAT: bool = False CHAT_MESSAGE: str = "请阅读 .cursor/task_execute_instructions.md 并执行任务。" CHAT_INPUT_POS: Optional[Tuple[int, int]] = None # ============================================================================ # 数据库操作 # ============================================================================ def get_db_connection(): """获取数据库连接(使用 production 环境配置)""" try: import psycopg2 import sys sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的数据库配置 app_config = config['production'] db_uri = app_config.SQLALCHEMY_DATABASE_URI return psycopg2.connect(db_uri) except ImportError as e: logger.error(f"导入依赖失败: {e}") return None except Exception as e: logger.error(f"连接数据库失败: {e}") return None def get_pending_tasks() -> List[Dict[str, Any]]: """从数据库获取所有 pending 任务""" try: from psycopg2.extras import RealDictCursor conn = get_db_connection() if not conn: return [] cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """) tasks = cursor.fetchall() cursor.close() conn.close() return [dict(task) for task in tasks] except Exception as e: logger.error(f"获取 pending 任务失败: {e}") return [] def update_task_status( task_id: int, status: str, code_name: Optional[str] = None, code_path: Optional[str] = None, ) -> bool: """更新任务状态""" try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() if code_name and code_path: cursor.execute(""" UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id)) else: cursor.execute(""" UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id)) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ============================================================================ # 触发器文件管理 # ============================================================================ def update_trigger_file( task_count: int = 0, status: str = "有待执行任务", task_ids: Optional[List[int]] = None, ) -> None: """ 更新触发器文件 .cursor/task_trigger.txt Cursor Rules 会检测此文件的更新时间,如果在 5 分钟内更新, 则触发自动任务执行。 """ CURSOR_DIR.mkdir(parents=True, exist_ok=True) gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') task_ids_str = ", ".join(str(tid) for tid in (task_ids or [])) content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER 生成时间: {gen_time} 状态: {status} 待处理任务数: {task_count} 任务ID列表: [{task_ids_str}] 此文件用于触发Cursor自动执行任务。 当此文件更新时间在5分钟内时,Cursor应该检查 .cursor/task_execute_instructions.md 并执行其中的任务。 【重要】Cursor AI 请立即执行以下操作: 1. 读取 .cursor/task_execute_instructions.md 文件 2. 按照文件中的任务描述,依次完成每个任务 3. 完成后更新 .cursor/pending_tasks.json 中对应任务的状态为 completed """ with TRIGGER_FILE.open("w", encoding="utf-8") as f: f.write(content) logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}") def get_processing_task_ids() -> List[int]: """从 pending_tasks.json 获取所有 processing 状态的任务 ID""" if not PENDING_TASKS_FILE.exists(): return [] try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) return [ t.get("task_id") for t in tasks if t.get("status") == "processing" and t.get("task_id") ] except Exception: return [] # ============================================================================ # 任务文件生成 # ============================================================================ def write_pending_tasks_json(tasks: List[Dict[str, Any]]) -> None: """将任务列表写入 .cursor/pending_tasks.json""" CURSOR_DIR.mkdir(parents=True, exist_ok=True) # 读取现有任务 existing_tasks = [] if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: existing_tasks = json.load(f) except Exception: existing_tasks = [] existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t} # 添加新任务 for task in tasks: if task["task_id"] not in existing_ids: task_info = { "task_id": task["task_id"], "task_name": task["task_name"], "code_path": task.get("code_path", ""), "code_name": task.get("code_name", ""), "status": "processing", "notified_at": datetime.now().isoformat(), "code_file": task.get("code_file", ""), } existing_tasks.append(task_info) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(existing_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}") def create_execute_instructions(tasks: List[Dict[str, Any]]) -> None: """生成任务执行指令文件 .cursor/task_execute_instructions.md""" CURSOR_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# 🤖 Cursor 自动任务执行指令\n\n") f.write("**⚠️ 重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 📋 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task['task_id'] task_name = task['task_name'] task_desc = task['task_description'] create_time = task.get("create_time", "") if hasattr(create_time, "strftime"): create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"## 🔴 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **创建时间**: {create_time}\n") f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n") f.write(f"### 📝 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}") # ============================================================================ # 状态同步 # ============================================================================ def sync_completed_tasks_to_db() -> int: """将 pending_tasks.json 中 completed 的任务同步到数据库""" if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as e: logger.error(f"读取 pending_tasks.json 失败: {e}") return 0 if not isinstance(tasks, list): return 0 updated = 0 remaining_tasks = [] for t in tasks: if t.get("status") == "completed": task_id = t.get("task_id") if not task_id: continue code_name = t.get("code_name") code_path = t.get("code_path") if update_task_status(task_id, "completed", code_name, code_path): updated += 1 logger.info(f"已同步任务 {task_id} 为 completed") else: remaining_tasks.append(t) else: remaining_tasks.append(t) if updated > 0: with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(remaining_tasks, f, indent=2, ensure_ascii=False) logger.info(f"本次共同步 {updated} 个 completed 任务到数据库") return updated # ============================================================================ # Cursor Chat 自动化 # ============================================================================ def find_cursor_window() -> Optional[int]: """查找 Cursor 主窗口句柄""" if not HAS_CURSOR_GUI: return None cursor_windows: List[Dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = "cursor" in title.lower() if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) area = (right - left) * (bottom - top) cursor_windows.append({"hwnd": hwnd, "area": area}) return True win32gui.EnumWindows(enum_windows_callback, None) if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None cursor_windows.sort(key=lambda x: x["area"], reverse=True) return cursor_windows[0]["hwnd"] def send_chat_message( message: str, input_pos: Optional[Tuple[int, int]] ) -> bool: """在 Cursor Chat 中发送消息""" if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.3) win32gui.SetForegroundWindow(hwnd) time.sleep(0.5) except Exception as e: logger.error(f"激活 Cursor 窗口失败: {e}") return False # 点击输入框或使用快捷键 if input_pos: x, y = input_pos pyautogui.click(x, y) time.sleep(0.4) else: pyautogui.hotkey("ctrl", "l") time.sleep(1.0) pyautogui.hotkey("ctrl", "a") time.sleep(0.2) # 输入消息 if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) pyautogui.press("enter") logger.info("✅ 消息已发送到 Cursor Chat") return True def send_chat_for_tasks(force: bool = False) -> bool: """ 向 Cursor Chat 发送任务提醒 Args: force: 强制发送,忽略 ENABLE_CHAT 设置 Returns: 是否成功发送 """ if not force and not ENABLE_CHAT: return False if not HAS_CURSOR_GUI: logger.warning("未安装 GUI 自动化依赖,无法发送 Chat 消息") return False processing_ids = get_processing_task_ids() if not processing_ids: logger.info("没有 processing 任务,跳过发送") return False logger.info(f"📤 发送任务提醒到 Cursor Chat({len(processing_ids)} 个任务)...") return send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS) def chat_trigger_loop(interval: int = 60) -> None: """ 循环模式:定期检查 processing 任务并发送 Chat 消息 Args: interval: 检查间隔(秒),默认 60 秒 """ logger.info("=" * 60) logger.info("🚀 Cursor Chat 自动触发服务已启动") logger.info(f"⏰ 检查间隔: {interval} 秒") logger.info(f"📝 发送消息: {CHAT_MESSAGE}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) last_sent_time = 0 min_send_interval = 120 # 最小发送间隔(秒),避免频繁打扰 try: while True: try: # 1. 先同步已完成任务 sync_completed_tasks_to_db() # 2. 检查是否有 processing 任务 processing_ids = get_processing_task_ids() if processing_ids: current_time = time.time() time_since_last = current_time - last_sent_time if time_since_last >= min_send_interval: logger.info(f"📋 发现 {len(processing_ids)} 个待处理任务") # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) # 发送 Chat 消息 if send_chat_for_tasks(force=True): last_sent_time = current_time logger.info("✅ 已发送执行提醒") else: logger.warning("⚠️ 发送失败,将在下次重试") else: remaining = int(min_send_interval - time_since_last) logger.info( f"⏳ 距离上次发送不足 {min_send_interval}s," f"还需等待 {remaining}s" ) else: logger.info("✅ 没有待处理任务") logger.info(f"⏳ {interval} 秒后再次检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") time.sleep(interval) except KeyboardInterrupt: logger.info("\n⛔ 服务已停止") # ============================================================================ # 主执行流程 # ============================================================================ def auto_execute_tasks_once() -> int: """执行一次任务检查和处理""" # 1. 先同步已完成任务到数据库 sync_completed_tasks_to_db() # 2. 获取 pending 任务 logger.info("🔍 检查 pending 任务...") tasks = get_pending_tasks() # 3. 检查是否有现有的 processing 任务 existing_processing_ids = get_processing_task_ids() if not tasks and not existing_processing_ids: logger.info("✅ 没有 pending 或 processing 任务") # 更新触发器文件为"已完成"状态 update_trigger_file(0, "所有任务已完成", []) return 0 if tasks: logger.info(f"📋 找到 {len(tasks)} 个 pending 任务") # 4. 更新任务状态为 processing for task in tasks: update_task_status(task["task_id"], "processing") # 5. 写入 pending_tasks.json write_pending_tasks_json(tasks) # 6. 生成执行指令文件 create_execute_instructions(tasks) # 7. 更新触发器文件(关键:触发 Cursor 自动执行) all_processing_ids = get_processing_task_ids() if all_processing_ids: update_trigger_file( task_count=len(all_processing_ids), status="有待执行任务", task_ids=all_processing_ids, ) logger.info(f"🔔 已更新触发器,等待 Cursor 执行 {len(all_processing_ids)} 个任务") return len(tasks) def auto_execute_tasks_loop(interval: int = 300) -> None: """循环执行任务检查""" logger.info("=" * 60) logger.info("🚀 自动任务执行服务已启动") logger.info(f"⏰ 检查间隔: {interval} 秒") logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) try: while True: try: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks() logger.info(f"✅ 已处理 {count} 个任务") logger.info(f"⏳ {interval} 秒后再次检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") time.sleep(interval) except KeyboardInterrupt: logger.info("\n⛔ 服务已停止") def refresh_trigger_only() -> None: """仅刷新触发器文件(用于现有 processing 任务)""" processing_ids = get_processing_task_ids() if not processing_ids: logger.info("没有 processing 状态的任务") update_trigger_file(0, "所有任务已完成", []) return logger.info(f"📋 发现 {len(processing_ids)} 个 processing 任务") # 重新生成指令文件(从 pending_tasks.json 读取) if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: all_tasks = json.load(f) processing_tasks = [ t for t in all_tasks if t.get("status") == "processing" ] if processing_tasks: # 重新生成执行指令文件 create_execute_instructions_from_json(processing_tasks) except Exception as e: logger.error(f"读取任务文件失败: {e}") # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) logger.info("✅ 触发器已刷新,等待 Cursor 执行任务") def create_execute_instructions_from_json(tasks: List[Dict[str, Any]]) -> None: """从 pending_tasks.json 格式的任务列表生成执行指令文件""" CURSOR_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# 🤖 Cursor 自动任务执行指令\n\n") f.write("**⚠️ 重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 📋 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task.get('task_id', 'N/A') task_name = task.get('task_name', '未命名任务') task_desc = task.get('task_description', '无描述') notified_at = task.get('notified_at', '') f.write(f"## 🔴 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **通知时间**: {notified_at}\n") f.write(f"- **代码路径**: {task.get('code_path', 'N/A')}\n\n") f.write(f"### 📝 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已更新: {INSTRUCTIONS_FILE}") def main() -> None: """主函数""" parser = argparse.ArgumentParser( description="自动任务执行调度脚本", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--once", action="store_true", help="只执行一次" ) parser.add_argument( "--interval", type=int, default=300, help="检查间隔(秒)" ) parser.add_argument( "--enable-chat", action="store_true", help="启用自动 Cursor Chat" ) parser.add_argument( "--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"' ) parser.add_argument( "--chat-message", type=str, default="请阅读 .cursor/task_execute_instructions.md 并执行任务。", help="发送到 Chat 的消息" ) parser.add_argument( "--refresh-trigger", action="store_true", help="仅刷新触发器文件(用于现有 processing 任务)" ) parser.add_argument( "--chat-loop", action="store_true", help="启动 Chat 自动触发循环(定期发送执行消息)" ) parser.add_argument( "--chat-interval", type=int, default=60, help="Chat 触发循环的检查间隔(秒),默认 60" ) parser.add_argument( "--send-chat-now", action="store_true", help="立即发送一次 Chat 消息(用于手动触发)" ) args = parser.parse_args() global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE ENABLE_CHAT = bool(args.enable_chat) CHAT_MESSAGE = args.chat_message if args.chat_input_pos: try: x, y = args.chat_input_pos.split(",") CHAT_INPUT_POS = (int(x.strip()), int(y.strip())) except Exception: pass # 仅刷新触发器 if args.refresh_trigger: refresh_trigger_only() return # 立即发送一次 Chat 消息 if args.send_chat_now: if send_chat_for_tasks(force=True): logger.info("✅ 消息已发送") else: logger.error("❌ 发送失败") return # Chat 自动触发循环模式 if args.chat_loop: chat_trigger_loop(interval=args.chat_interval) return if args.once: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks() logger.info(f"✅ 完成!处理了 {count} 个任务") else: auto_execute_tasks_loop(interval=args.interval) if __name__ == "__main__": main()