| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- #!/usr/bin/env python3
- """
- 自动任务执行核心调度脚本(简化版)
- 工作流程:
- 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
- 2. 生成 .cursor/task_execute_instructions.md 执行指令文件
- 3. 更新任务状态为 processing,并维护 .cursor/pending_tasks.json
- 4. (可选)向 Cursor Chat 发送执行提醒
- 5. Cursor 完成任务后,将 pending_tasks.json 中的状态改为 completed
- 6. 调度脚本将 completed 状态的任务同步回数据库
- 使用方式:
- python scripts/auto_execute_tasks.py --once
- python scripts/auto_execute_tasks.py --interval 300
- python scripts/auto_execute_tasks.py --once --enable-chat
- """
- from __future__ import annotations
- import json
- import time
- import argparse
- import logging
- from pathlib import Path
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple
- # ============================================================================
- # 日志配置
- # ============================================================================
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ============================================================================
- # Windows GUI 自动化依赖(可选)
- # ============================================================================
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- try:
- import win32gui
- import win32con
- import pyautogui
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- HAS_CURSOR_GUI = True
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- pass
- except ImportError:
- logger.info(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
- "将禁用自动 Cursor Chat 功能。"
- )
- # ============================================================================
- # 全局配置
- # ============================================================================
- WORKSPACE_ROOT = Path(__file__).parent.parent
- CURSOR_DIR = WORKSPACE_ROOT / ".cursor"
- PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json"
- INSTRUCTIONS_FILE = CURSOR_DIR / "task_execute_instructions.md"
- # 命令行参数控制的全局变量
- ENABLE_CHAT: bool = False
- CHAT_MESSAGE: str = "请阅读 .cursor/task_execute_instructions.md 并执行任务。"
- CHAT_INPUT_POS: Optional[Tuple[int, int]] = None
- # ============================================================================
- # 数据库操作
- # ============================================================================
- def get_db_connection():
- """获取数据库连接"""
- try:
- import psycopg2
- import sys
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config, current_env
- app_config = config.get(current_env, config['default'])
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
- return psycopg2.connect(db_uri)
- except ImportError as e:
- logger.error(f"导入依赖失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- return None
- def get_pending_tasks() -> List[Dict[str, Any]]:
- """从数据库获取所有 pending 任务"""
- try:
- from psycopg2.extras import RealDictCursor
- conn = get_db_connection()
- if not conn:
- return []
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute("""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """)
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- return [dict(task) for task in tasks]
- except Exception as e:
- logger.error(f"获取 pending 任务失败: {e}")
- return []
- def update_task_status(
- task_id: int,
- status: str,
- code_name: Optional[str] = None,
- code_path: Optional[str] = None,
- ) -> bool:
- """更新任务状态"""
- try:
- conn = get_db_connection()
- if not conn:
- return False
- cursor = conn.cursor()
- if code_name and code_path:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, code_name, code_path, task_id))
- else:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, task_id))
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- return updated
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ============================================================================
- # 任务文件生成
- # ============================================================================
- def write_pending_tasks_json(tasks: List[Dict[str, Any]]) -> None:
- """将任务列表写入 .cursor/pending_tasks.json"""
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- # 读取现有任务
- existing_tasks = []
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- existing_tasks = json.load(f)
- except Exception:
- existing_tasks = []
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
- # 添加新任务
- for task in tasks:
- if task["task_id"] not in existing_ids:
- create_time = task.get("create_time", "")
- if hasattr(create_time, "isoformat"):
- create_time = create_time.isoformat()
- task_info = {
- "task_id": task["task_id"],
- "task_name": task["task_name"],
- "task_description": task["task_description"],
- "code_path": task.get("code_path", ""),
- "code_name": task.get("code_name", ""),
- "status": "processing",
- "notified_at": datetime.now().isoformat(),
- }
- existing_tasks.append(task_info)
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
- def create_execute_instructions(tasks: List[Dict[str, Any]]) -> None:
- """生成任务执行指令文件 .cursor/task_execute_instructions.md"""
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# 🤖 Cursor 自动任务执行指令\n\n")
- f.write("**⚠️ 重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 📋 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task['task_id']
- task_name = task['task_name']
- task_desc = task['task_description']
- create_time = task.get("create_time", "")
- if hasattr(create_time, "strftime"):
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"## 🔴 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **创建时间**: {create_time}\n")
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
- f.write(f"### 📝 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
- # ============================================================================
- # 状态同步
- # ============================================================================
- def sync_completed_tasks_to_db() -> int:
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as e:
- logger.error(f"读取 pending_tasks.json 失败: {e}")
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- remaining_tasks = []
- for t in tasks:
- if t.get("status") == "completed":
- task_id = t.get("task_id")
- if not task_id:
- continue
- code_name = t.get("code_name")
- code_path = t.get("code_path")
- if update_task_status(task_id, "completed", code_name, code_path):
- updated += 1
- logger.info(f"已同步任务 {task_id} 为 completed")
- else:
- remaining_tasks.append(t)
- else:
- remaining_tasks.append(t)
- if updated > 0:
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
- return updated
- # ============================================================================
- # Cursor Chat 自动化
- # ============================================================================
- def find_cursor_window() -> Optional[int]:
- """查找 Cursor 主窗口句柄"""
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows: List[Dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = "cursor" in title.lower()
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- area = (right - left) * (bottom - top)
- cursor_windows.append({"hwnd": hwnd, "area": area})
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- return cursor_windows[0]["hwnd"]
- def send_chat_message(
- message: str, input_pos: Optional[Tuple[int, int]]
- ) -> bool:
- """在 Cursor Chat 中发送消息"""
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.3)
- win32gui.SetForegroundWindow(hwnd)
- time.sleep(0.5)
- except Exception as e:
- logger.error(f"激活 Cursor 窗口失败: {e}")
- return False
- # 点击输入框或使用快捷键
- if input_pos:
- x, y = input_pos
- pyautogui.click(x, y)
- time.sleep(0.4)
- else:
- pyautogui.hotkey("ctrl", "l")
- time.sleep(1.0)
- pyautogui.hotkey("ctrl", "a")
- time.sleep(0.2)
- # 输入消息
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Cursor Chat")
- return True
- def send_chat_for_tasks() -> None:
- """向 Cursor Chat 发送任务提醒"""
- if not ENABLE_CHAT:
- return
- if not PENDING_TASKS_FILE.exists():
- return
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- data = json.load(f)
- if not any(t.get("status") == "processing" for t in data):
- return
- except Exception:
- return
- logger.info("发送任务提醒到 Cursor Chat...")
- send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS)
- # ============================================================================
- # 主执行流程
- # ============================================================================
- def auto_execute_tasks_once() -> int:
- """执行一次任务检查和处理"""
- # 1. 先同步已完成任务到数据库
- sync_completed_tasks_to_db()
- # 2. 获取 pending 任务
- logger.info("🔍 检查 pending 任务...")
- tasks = get_pending_tasks()
- if not tasks:
- logger.info("✅ 没有 pending 任务")
- return 0
- logger.info(f"📋 找到 {len(tasks)} 个 pending 任务")
- # 3. 更新任务状态为 processing
- for task in tasks:
- update_task_status(task["task_id"], "processing")
- # 4. 写入 pending_tasks.json
- write_pending_tasks_json(tasks)
- # 5. 生成执行指令文件
- create_execute_instructions(tasks)
- return len(tasks)
- def auto_execute_tasks_loop(interval: int = 300) -> None:
- """循环执行任务检查"""
- logger.info("=" * 60)
- logger.info("🚀 自动任务执行服务已启动")
- logger.info(f"⏰ 检查间隔: {interval} 秒")
- logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- try:
- while True:
- try:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks()
- logger.info(f"✅ 已处理 {count} 个任务")
- logger.info(f"⏳ {interval} 秒后再次检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- time.sleep(interval)
- except KeyboardInterrupt:
- logger.info("\n⛔ 服务已停止")
- def main() -> None:
- """主函数"""
- parser = argparse.ArgumentParser(
- description="自动任务执行调度脚本",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- parser.add_argument(
- "--once", action="store_true", help="只执行一次"
- )
- parser.add_argument(
- "--interval", type=int, default=300, help="检查间隔(秒)"
- )
- parser.add_argument(
- "--enable-chat", action="store_true", help="启用自动 Cursor Chat"
- )
- parser.add_argument(
- "--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"'
- )
- parser.add_argument(
- "--chat-message", type=str,
- default="请阅读 .cursor/task_execute_instructions.md 并执行任务。",
- help="发送到 Chat 的消息"
- )
- args = parser.parse_args()
- global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE
- ENABLE_CHAT = bool(args.enable_chat)
- CHAT_MESSAGE = args.chat_message
- if args.chat_input_pos:
- try:
- x, y = args.chat_input_pos.split(",")
- CHAT_INPUT_POS = (int(x.strip()), int(y.strip()))
- except Exception:
- pass
- if args.once:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks()
- logger.info(f"✅ 完成!处理了 {count} 个任务")
- else:
- auto_execute_tasks_loop(interval=args.interval)
- if __name__ == "__main__":
- main()
|