#!/usr/bin/env python3 """ 自动任务执行 + Cursor Chat 工具(合并版) 本脚本整合了原来的: - auto_execute_tasks.py:检查数据库中的 pending 任务,生成本地任务文件和 pending_tasks.json - cursor_auto_chat.py:在 Windows 下自动操作 Cursor Chat,发送指定消息 合并后,一个脚本即可完成: 1. 从 task_list 中读取 pending 任务 2. 为任务生成本地 Python 占位文件 3. 维护 .cursor/pending_tasks.json 4. (可选)在 Cursor Chat 中自动发起「请检查并执行所有待处理任务。」 5. 将 .cursor/pending_tasks.json 中 status=completed 的任务状态同步到 task_list 使用方式: 1. 仅任务调度(不发 Chat): python scripts/auto_execute_tasks.py --once python scripts/auto_execute_tasks.py --interval 300 2. 任务调度 + 自动 Chat: python scripts/auto_execute_tasks.py --once --enable-chat --chat-input-pos "1180,965" python scripts/auto_execute_tasks.py --interval 300 --enable-chat --chat-input-pos "1180,965" """ from __future__ import annotations import json import time import argparse import logging import sys from pathlib import Path from datetime import datetime from typing import Any, Dict, List, Optional, Tuple # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ==== Cursor Chat 相关依赖(Windows GUI 自动化) ==== try: import win32gui import win32con import win32process import pyautogui try: import pyperclip HAS_PYPERCLIP = True except ImportError: HAS_PYPERCLIP = False HAS_CURSOR_GUI = True # pyautogui 安全 & 节奏 pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 except ImportError: HAS_CURSOR_GUI = False HAS_PYPERCLIP = False logger.warning( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui/pyperclip)," "将禁用自动 Cursor Chat 功能。" ) # 全局配置(由命令行参数控制) ENABLE_CHAT: bool = False CHAT_MESSAGE: str = "请检查并执行所有待处理任务。" CHAT_INPUT_POS: Optional[Tuple[int, int]] = None WORKSPACE_ROOT = Path(__file__).parent.parent CURSOR_DIR = WORKSPACE_ROOT / ".cursor" PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json" def get_db_connection(): """ 获取数据库连接 Returns: 数据库连接对象,如果失败返回None """ try: import psycopg2 from psycopg2.extras import RealDictCursor # 读取数据库配置 config_file = Path(__file__).parent.parent / 'mcp-servers' / 'task-manager' / 'config.json' with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) db_uri = config['database']['uri'] conn = psycopg2.connect(db_uri) return conn except Exception as e: logger.error(f"连接数据库失败: {e}") return None def get_pending_tasks(): """ 从数据库获取所有pending任务 """ try: from psycopg2.extras import RealDictCursor conn = get_db_connection() if not conn: return [] cursor = conn.cursor(cursor_factory=RealDictCursor) # 查询pending任务 cursor.execute(""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """) tasks = cursor.fetchall() cursor.close() conn.close() return [dict(task) for task in tasks] except Exception as e: logger.error(f"获取pending任务失败: {e}") return [] def update_task_status(task_id, status, code_name=None, code_path=None): """ 更新任务状态 Args: task_id: 任务ID status: 新状态('pending', 'processing', 'completed', 'failed') code_name: 代码文件名(可选) code_path: 代码文件路径(可选) Returns: 是否更新成功 """ try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() # 构建更新SQL if code_name and code_path: cursor.execute(""" UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id)) else: cursor.execute(""" UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id)) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") else: logger.warning(f"⚠️ 任务 {task_id} 状态更新失败(任务不存在)") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ==== Cursor Chat 辅助函数(简化版,依赖固定输入框坐标) ==== def _find_cursor_window() -> Optional[int]: """ 查找 Cursor 主窗口句柄(简化版) 通过窗口标题 / 类名中包含 'Cursor' / 'Chrome_WidgetWin_1' 来判断。 """ if not HAS_CURSOR_GUI: return None cursor_windows: List[Dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = False if "cursor" in title.lower(): is_cursor = True if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) width = right - left height = bottom - top area = width * height cursor_windows.append( { "hwnd": hwnd, "title": title, "class": class_name, "width": width, "height": height, "area": area, } ) return True win32gui.EnumWindows(enum_windows_callback, None) if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None # 选取面积最大的窗口作为主窗口 cursor_windows.sort(key=lambda x: x["area"], reverse=True) main = cursor_windows[0] logger.info( "找到 Cursor 主窗口: %s (%s), size=%dx%d, hwnd=%s", main["title"], main["class"], main["width"], main["height"], main["hwnd"], ) return main["hwnd"] def _activate_cursor_window(hwnd: int) -> bool: """激活 Cursor 主窗口""" if not HAS_CURSOR_GUI: return False try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.3) win32gui.SetForegroundWindow(hwnd) time.sleep(0.5) logger.info("Cursor 窗口已激活") return True except Exception as exc: # noqa: BLE001 logger.error("激活 Cursor 窗口失败: %s", exc) return False def _send_chat_message_once(message: str, input_pos: Optional[Tuple[int, int]]) -> bool: """ 在 Cursor Chat 中发送一条消息(单次): 1. 激活窗口 2. 如果提供了输入框坐标,则移动并点击 3. 使用剪贴板粘贴消息 4. 按 Enter 提交 """ if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化,跳过自动发 Chat") return False hwnd = _find_cursor_window() if not hwnd: return False if not _activate_cursor_window(hwnd): return False # 点击/激活输入框 if input_pos: x, y = input_pos logger.info("移动鼠标到输入框位置: (%d, %d)", x, y) pyautogui.moveTo(x, y, duration=0.3) time.sleep(0.2) pyautogui.click(x, y) time.sleep(0.4) else: # 未指定位置时,尝试快捷键打开 Chat(Ctrl+K),然后点击窗口底部中间 logger.info("未指定输入框位置,尝试使用 Ctrl+K 打开 Chat") pyautogui.hotkey("ctrl", "k") time.sleep(1.5) screen_width, screen_height = pyautogui.size() x, y = int(screen_width * 0.6), int(screen_height * 0.9) pyautogui.moveTo(x, y, duration=0.3) pyautogui.click(x, y) time.sleep(0.4) # 清空旧内容 pyautogui.hotkey("ctrl", "a") time.sleep(0.3) # 再次点击保证焦点 pyautogui.click() time.sleep(0.2) logger.info("正在向 Cursor Chat 输入消息: %s", message) # 优先使用剪贴板(兼容中文) if HAS_PYPERCLIP: try: old_clipboard = pyperclip.paste() except Exception: # noqa: BLE001 old_clipboard = None try: pyperclip.copy(message) time.sleep(0.3) pyautogui.hotkey("ctrl", "v") time.sleep(1.0) logger.info("使用剪贴板粘贴消息成功") except Exception as exc: # noqa: BLE001 logger.error("剪贴板粘贴失败: %s,退回到直接输入", exc) pyautogui.write(message, interval=0.05) time.sleep(0.8) finally: if old_clipboard is not None: try: pyperclip.copy(old_clipboard) except Exception: pass else: pyautogui.write(message, interval=0.05) time.sleep(0.8) # 提交(Enter) logger.info("按 Enter 提交消息") pyautogui.press("enter") time.sleep(1.0) logger.info("消息已提交") return True def send_chat_for_pending_tasks() -> None: """ 如果启用了 Chat 功能且存在 pending 任务,则向 Cursor Chat 发送一次统一消息。 """ if not ENABLE_CHAT: return # 仅检查 .cursor/pending_tasks.json 中是否存在 status 为 processing 的任务 if not PENDING_TASKS_FILE.exists(): logger.info("未找到 .cursor/pending_tasks.json,跳过自动 Chat") return try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, list) or not any( t.get("status") == "processing" for t in data ): logger.info("pending_tasks.json 中没有 processing 任务,跳过自动 Chat") return except Exception as exc: # noqa: BLE001 logger.error("读取 pending_tasks.json 失败: %s", exc) return logger.info("检测到 processing 任务,准备自动向 Cursor Chat 发送提醒消息") _send_chat_message_once(CHAT_MESSAGE, CHAT_INPUT_POS) def sync_completed_tasks_from_pending_file() -> int: """ 将 .cursor/pending_tasks.json 中 status == 'completed' 的任务,同步到数据库。 Returns: 成功更新的任务数量 """ if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as exc: # noqa: BLE001 logger.error("读取 pending_tasks.json 失败: %s", exc) return 0 if not isinstance(tasks, list): return 0 updated = 0 for t in tasks: if t.get("status") != "completed": continue task_id = t.get("task_id") if not task_id: continue code_name = t.get("code_name") code_path = t.get("code_path") if update_task_status(task_id, "completed", code_name, code_path): updated += 1 logger.info( "已根据 pending_tasks.json 将任务 %s 同步为 completed (code_name=%s, code_path=%s)", task_id, code_name, code_path, ) if updated: logger.info("本次共同步 %d 个 completed 任务到数据库", updated) return updated def print_task_for_cursor_execution(task, task_file_path=None): """ 打印任务信息,供Cursor识别并执行 这个函数会以特定格式输出任务,Cursor可以识别并自动执行 """ print("\n" + "=" * 80) print(f"🤖 [AUTO-EXECUTE-TASK] Task ID: {task['task_id']}") print("=" * 80) print(f"\n**任务名称**: {task['task_name']}") print(f"**任务ID**: {task['task_id']}") print(f"**状态**: processing(已更新)") print(f"**创建时间**: {task['create_time']}") print(f"**创建者**: {task['create_by']}") if task_file_path: print(f"**任务文件**: {task_file_path}") print(f"\n## 任务描述\n") print(task['task_description']) print(f"\n## 执行指令") print(f"\n请Cursor AI根据上述任务描述,自动完成以下步骤:") print(f"1. 打开并查看任务文件: {task_file_path or '未创建'}") print(f"2. 根据任务描述实现具体功能") print(f"3. 确保代码符合项目规范") print(f"4. 完成后调用MCP工具更新任务状态:") print(f" 工具: update_task_status") print(f" 参数: {{") print(f" \"task_id\": {task['task_id']},") if task_file_path: import os file_name = os.path.basename(task_file_path) file_dir = os.path.dirname(task_file_path).replace(str(Path(__file__).parent.parent), '').strip('\\').strip('/') print(f" \"code_name\": \"{file_name}\",") print(f" \"code_path\": \"{file_dir}\",") print(f" \"status\": \"completed\"") print(f" }}") print(f"\n任务文件保存路径:{task.get('code_path', 'app/core/data_flow')}") print("\n" + "=" * 80) print(f"🔚 [END-AUTO-EXECUTE-TASK]") print("=" * 80 + "\n") def create_task_file(task): """ 在指定目录创建任务文件 Args: task: 任务字典 Returns: 生成的文件路径,如果失败返回None """ try: workspace = Path(__file__).parent.parent code_path = task.get('code_path', 'app/core/data_flow') target_dir = workspace / code_path # 确保目录存在 target_dir.mkdir(parents=True, exist_ok=True) # 生成文件名(从任务名称或代码名称) code_name = task.get('code_name') if not code_name: # 从任务名称生成文件名 import re task_name = task['task_name'] # 清理文件名:去除特殊字符,替换为下划线 safe_name = re.sub(r'[^\w\u4e00-\u9fff]+', '_', task_name) safe_name = re.sub(r'_+', '_', safe_name).strip('_') code_name = f"{safe_name}.py" # 确保是.py文件 if not code_name.endswith('.py'): code_name = f"{code_name}.py" file_path = target_dir / code_name # 如果文件已存在,添加时间戳 if file_path.exists(): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') base_name = file_path.stem file_path = target_dir / f"{base_name}_{timestamp}.py" code_name = file_path.name # 生成任务文件内容 file_content = f'''#!/usr/bin/env python3 """ {task['task_name']} 任务ID: {task['task_id']} 创建时间: {task['create_time']} 创建者: {task['create_by']} 任务描述: {task['task_description']} 注意:此文件为任务占位符,需要根据任务描述实现具体功能。 """ # TODO: 根据任务描述实现功能 # {task['task_description'][:100]}... if __name__ == '__main__': print("任务文件已创建,请根据任务描述实现具体功能") pass ''' # 写入文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(file_content) logger.info(f"✅ 任务文件已创建: {file_path}") # 更新数据库中的code_name和code_path update_task_status( task['task_id'], 'processing', # 状态改为processing code_name=code_name, code_path=code_path ) return str(file_path) except Exception as e: logger.error(f"创建任务文件失败: {e}") # 即使文件创建失败,也要更新状态为processing update_task_status(task['task_id'], 'processing') return None def notify_cursor_to_execute_task(task, task_file_path=None): """ 通知Cursor执行任务 通过创建一个标记文件,让Cursor知道有新任务需要执行 """ workspace = Path(__file__).parent.parent task_trigger_file = workspace / '.cursor' / 'pending_tasks.json' task_trigger_file.parent.mkdir(parents=True, exist_ok=True) # 读取现有的pending tasks pending_tasks = [] if task_trigger_file.exists(): try: with open(task_trigger_file, 'r', encoding='utf-8') as f: pending_tasks = json.load(f) except: pending_tasks = [] # 检查任务是否已存在 task_exists = any(t['task_id'] == task['task_id'] for t in pending_tasks) if not task_exists: task_info = { 'task_id': task['task_id'], 'task_name': task['task_name'], 'task_description': task['task_description'], 'code_path': task.get('code_path', 'app/core/data_flow'), 'code_name': task.get('code_name'), 'status': 'processing', # 标记为processing 'notified_at': datetime.now().isoformat() } if task_file_path: task_info['task_file'] = task_file_path pending_tasks.append(task_info) # 写入文件 with open(task_trigger_file, 'w', encoding='utf-8') as f: json.dump(pending_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ Task {task['task_id']} added to pending_tasks.json") def auto_execute_tasks_once(): """ 执行一次任务检查和通知 """ # 先尝试把本地 completed 状态同步到数据库 sync_completed_tasks_from_pending_file() logger.info("🔍 检查pending任务...") tasks = get_pending_tasks() if not tasks: logger.info("✅ 没有pending任务") return 0 logger.info(f"📋 找到 {len(tasks)} 个pending任务") processed_count = 0 for task in tasks: logger.info(f"\n{'='*80}") logger.info(f"处理任务: [{task['task_id']}] {task['task_name']}") logger.info(f"{'='*80}") try: # 1. 创建任务文件(同时更新状态为processing) task_file_path = create_task_file(task) if not task_file_path: logger.warning(f"⚠️ 任务 {task['task_id']} 文件创建失败,但状态已更新为processing") # 即使文件创建失败,也继续通知Cursor # 2. 打印任务详情,供Cursor识别 print_task_for_cursor_execution(task, task_file_path) # 3. 创建通知文件 notify_cursor_to_execute_task(task, task_file_path) processed_count += 1 logger.info(f"✅ 任务 {task['task_id']} 处理完成") except Exception as e: logger.error(f"❌ 处理任务 {task['task_id']} 时出错: {e}") # 标记任务为failed update_task_status(task['task_id'], 'failed') return processed_count def auto_execute_tasks_loop(interval=300): """ 循环执行任务检查 Args: interval: 检查间隔(秒),默认300秒(5分钟) """ logger.info("=" * 80) logger.info("🚀 自动任务执行服务已启动") logger.info(f"⏰ 检查间隔: {interval}秒 ({interval//60}分钟)") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 80 + "\n") try: while True: try: count = auto_execute_tasks_once() # 如果有新任务且启用了自动 Chat,则向 Cursor 发送提醒 if count > 0: send_chat_for_pending_tasks() if count > 0: logger.info(f"\n✅ 已通知 {count} 个任务") logger.info(f"\n⏳ 下次检查时间: {interval}秒后...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") logger.info(f"⏳ {interval}秒后重试...") time.sleep(interval) except KeyboardInterrupt: logger.info("\n" + "=" * 80) logger.info("⛔ 用户停止了自动任务执行服务") logger.info("=" * 80) def main(): """ 主函数 """ parser = argparse.ArgumentParser( description='自动任务执行脚本(含可选Cursor Chat)- 定期检查并执行pending任务' ) parser.add_argument( '--once', action='store_true', help='只执行一次,不循环' ) parser.add_argument( '--interval', type=int, default=300, help='检查间隔(秒),默认300秒(5分钟)' ) parser.add_argument( '--enable-chat', action='store_true', help='启用自动 Cursor Chat,在有pending任务时自动向 Cursor 发送提醒消息' ) parser.add_argument( '--chat-input-pos', type=str, default=None, help='Cursor Chat 输入框位置,格式 "x,y"(例如 "1180,965"),不指定则自动尝试定位' ) args = parser.parse_args() # 设置全局 Chat 配置 global ENABLE_CHAT, CHAT_INPUT_POS # noqa: PLW0603 ENABLE_CHAT = bool(args.enable_chat) CHAT_INPUT_POS = None if args.chat_input_pos: try: x_str, y_str = args.chat_input_pos.split(',') CHAT_INPUT_POS = (int(x_str.strip()), int(y_str.strip())) logger.info("使用指定的 Chat 输入框位置: %s", CHAT_INPUT_POS) except Exception as exc: # noqa: BLE001 logger.warning("解析 --chat-input-pos 失败 %r: %s", args.chat_input_pos, exc) if args.once: # 执行一次 count = auto_execute_tasks_once() if count > 0: send_chat_for_pending_tasks() logger.info(f"\n✅ 完成!处理了 {count} 个任务") else: # 循环执行 auto_execute_tasks_loop(interval=args.interval) if __name__ == '__main__': main()