#!/usr/bin/env python3 """ 自动任务执行核心调度脚本 工作流程: 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务 2. 生成 tasks/task_execute_instructions.md 执行指令文件 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json 4. 更新 tasks/task_trigger.txt 触发器文件 5. 启动新的 Cursor Agent 并发送执行指令 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent 使用方式: # 执行一次任务检查 python scripts/auto_execute_tasks.py --once # 循环模式(每 5 分钟检查) python scripts/auto_execute_tasks.py --interval 300 # 刷新触发器文件(用于现有 processing 任务) python scripts/auto_execute_tasks.py --refresh-trigger # 【推荐】使用 Agent 模式执行任务(自动打开/关闭 Agent) python scripts/auto_execute_tasks.py --agent-run # 【推荐】启动 Agent 循环模式(有任务时自动启动 Agent,完成后自动关闭) python scripts/auto_execute_tasks.py --chat-loop --use-agent # 启动传统 Chat 模式(不使用 Agent) python scripts/auto_execute_tasks.py --chat-loop --no-agent # 立即发送 Chat 消息触发 Cursor 执行 python scripts/auto_execute_tasks.py --send-chat-now # 设置 Agent 超时时间(默认 3600 秒) python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200 # 任务完成后不自动关闭 Agent python scripts/auto_execute_tasks.py --agent-run --no-auto-close # 禁用自动部署功能 python scripts/auto_execute_tasks.py --chat-loop --use-agent --no-deploy # 立即部署指定任务ID的脚本到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试到生产服务器的 SSH 连接 python scripts/auto_execute_tasks.py --test-connection """ from __future__ import annotations import argparse import json import logging import time from datetime import datetime from pathlib import Path from typing import Any # ============================================================================ # 日志配置 # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ============================================================================ # Windows GUI 自动化依赖(可选) # ============================================================================ HAS_CURSOR_GUI = False HAS_PYPERCLIP = False try: import pyautogui import win32con import win32gui pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 HAS_CURSOR_GUI = True try: import pyperclip HAS_PYPERCLIP = True except ImportError: pass except ImportError: logger.info( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui)," "将禁用自动 Cursor Chat 功能。" ) # ============================================================================ # 全局配置 # ============================================================================ WORKSPACE_ROOT = Path(__file__).parent.parent TASKS_DIR = WORKSPACE_ROOT / "tasks" PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json" INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md" TRIGGER_FILE = TASKS_DIR / "task_trigger.txt" # 生产服务器配置 PRODUCTION_SERVER = { "host": "192.168.3.143", "port": 22, "username": "ubuntu", "password": "citumxl2357", "script_path": "/opt/dataops-platform/datafactory/scripts", "workflow_path": "/opt/dataops-platform/n8n/workflows", } # 命令行参数控制的全局变量 ENABLE_CHAT: bool = False CHAT_MESSAGE: str = "请阅读 tasks/task_execute_instructions.md 并执行任务。" CHAT_INPUT_POS: tuple[int, int] | None = None ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署 # ============================================================================ # 数据库操作 # ============================================================================ def get_db_connection(): """获取数据库连接(使用 production 环境配置)""" try: import sys import psycopg2 sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的数据库配置 app_config = config["production"] db_uri = app_config.SQLALCHEMY_DATABASE_URI return psycopg2.connect(db_uri) except ImportError as e: logger.error(f"导入依赖失败: {e}") return None except Exception as e: logger.error(f"连接数据库失败: {e}") return None def get_pending_tasks() -> list[dict[str, Any]]: """从数据库获取所有 pending 任务""" try: from psycopg2.extras import RealDictCursor conn = get_db_connection() if not conn: return [] cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute(""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """) tasks = cursor.fetchall() cursor.close() conn.close() return [dict(task) for task in tasks] except Exception as e: logger.error(f"获取 pending 任务失败: {e}") return [] def update_task_status( task_id: int, status: str, code_name: str | None = None, code_path: str | None = None, ) -> bool: """更新任务状态""" try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() if code_name and code_path: cursor.execute( """ UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id), ) else: cursor.execute( """ UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id), ) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ============================================================================ # 触发器文件管理 # ============================================================================ def update_trigger_file( task_count: int = 0, status: str = "有待执行任务", task_ids: list[int] | None = None, ) -> None: """ 更新触发器文件 tasks/task_trigger.txt Cursor Rules 会检测此文件的更新时间,如果在 5 分钟内更新, 则触发自动任务执行。 """ TASKS_DIR.mkdir(parents=True, exist_ok=True) gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") task_ids_str = ", ".join(str(tid) for tid in (task_ids or [])) content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER 生成时间: {gen_time} 状态: {status} 待处理任务数: {task_count} 任务ID列表: [{task_ids_str}] 此文件用于触发Cursor自动执行任务。 当此文件更新时间在5分钟内时,Cursor应该检查 tasks/task_execute_instructions.md 并执行其中的任务。 【重要】Cursor AI 请立即执行以下操作: 1. 读取 tasks/task_execute_instructions.md 文件 2. 按照文件中的任务描述,依次完成每个任务 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed """ with TRIGGER_FILE.open("w", encoding="utf-8") as f: f.write(content) logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}") def get_processing_task_ids() -> list[int]: """从 pending_tasks.json 获取所有 processing 状态的任务 ID""" if not PENDING_TASKS_FILE.exists(): return [] try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) return [ t.get("task_id") for t in tasks if t.get("status") == "processing" and t.get("task_id") ] except Exception: return [] # ============================================================================ # 任务文件生成 # ============================================================================ def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None: """将任务列表写入 tasks/pending_tasks.json""" TASKS_DIR.mkdir(parents=True, exist_ok=True) # 读取现有任务 existing_tasks = [] if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: existing_tasks = json.load(f) except Exception: existing_tasks = [] existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t} # 添加新任务 for task in tasks: if task["task_id"] not in existing_ids: task_info = { "task_id": task["task_id"], "task_name": task["task_name"], "code_path": task.get("code_path", ""), "code_name": task.get("code_name", ""), "status": "processing", "notified_at": datetime.now().isoformat(), "code_file": task.get("code_file", ""), } existing_tasks.append(task_info) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(existing_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}") def create_execute_instructions(tasks: list[dict[str, Any]]) -> None: """生成任务执行指令文件 tasks/task_execute_instructions.md""" TASKS_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# 🤖 Cursor 自动任务执行指令\n\n") f.write("**⚠️ 重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 📋 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("## ⚠️ 任务约束要求\n\n") f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n") f.write("- ❌ 不要创建任何 summary、report、总结类的文档文件\n") f.write("- ❌ 不要生成 task_summary.md、execution_report.md 等总结文件\n") f.write("- ✅ 只需创建任务要求的功能脚本文件\n") f.write("- ✅ 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task["task_id"] task_name = task["task_name"] task_desc = task["task_description"] create_time = task.get("create_time", "") if hasattr(create_time, "strftime"): create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"## 🔴 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **创建时间**: {create_time}\n") f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n") f.write(f"### 📝 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}") # ============================================================================ # Neo4j 独立连接(不依赖 Flask 应用上下文) # ============================================================================ def get_neo4j_driver(): """获取 Neo4j 驱动(独立于 Flask 应用上下文)""" try: import sys from neo4j import GraphDatabase sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的配置 app_config = config["production"] uri = app_config.NEO4J_URI user = app_config.NEO4J_USER password = app_config.NEO4J_PASSWORD driver = GraphDatabase.driver(uri, auth=(user, password)) return driver except ImportError as e: logger.error(f"导入 Neo4j 驱动失败: {e}") return None except Exception as e: logger.error(f"连接 Neo4j 失败: {e}") return None # ============================================================================ # 状态同步 # ============================================================================ def update_dataflow_script_path(task_name: str, script_path: str) -> bool: """ 更新 DataFlow 节点的 script_path 字段(独立于 Flask 应用上下文) Args: task_name: 任务名称(对应 DataFlow 的 name_zh) script_path: Python 脚本的完整路径 Returns: 是否更新成功 """ try: from datetime import datetime driver = get_neo4j_driver() if not driver: logger.error("无法获取 Neo4j 驱动") return False query = """ MATCH (n:DataFlow {name_zh: $name_zh}) SET n.script_path = $script_path, n.updated_at = $updated_at RETURN n """ updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with driver.session() as session: result = session.run( query, name_zh=task_name, script_path=script_path, updated_at=updated_at, ).single() driver.close() if result: logger.info(f"成功更新 DataFlow 脚本路径: {task_name} -> {script_path}") return True else: logger.warning(f"未找到 DataFlow 节点: {task_name}") return False except Exception as e: logger.error(f"更新 DataFlow script_path 失败: {e}") return False def sync_completed_tasks_to_db() -> int: """将 pending_tasks.json 中 completed 的任务同步到数据库""" if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as e: logger.error(f"读取 pending_tasks.json 失败: {e}") return 0 if not isinstance(tasks, list): return 0 updated = 0 remaining_tasks = [] for t in tasks: if t.get("status") == "completed": task_id = t.get("task_id") if not task_id: continue task_name = t.get("task_name") code_name = t.get("code_name") code_path = t.get("code_path") # 统一处理:code_path 始终为 "datafactory/scripts" # code_name 保持原样(可能是新建的脚本名或 import_resource_data.py) code_path = "datafactory/scripts" # 只处理 Python 脚本文件,跳过 JSON 等其他文件 is_python_script = code_name and code_name.endswith(".py") if is_python_script: logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_name}") else: logger.info( f"任务 {task_id} 的 code_name ({code_name}) 不是 Python 脚本,跳过 DataFlow 更新" ) if update_task_status(task_id, "completed", code_name, code_path): updated += 1 logger.info(f"已同步任务 {task_id} 为 completed") # 只有 Python 脚本才更新 DataFlow 节点的 script_path if task_name and is_python_script: full_script_path = f"{code_path}/{code_name}" if update_dataflow_script_path(task_name, full_script_path): logger.info( f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}" ) else: logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}") # 自动部署到生产服务器(如果启用) if ENABLE_AUTO_DEPLOY: logger.info(f"🚀 开始自动部署任务 {task_id} 到生产服务器...") if auto_deploy_completed_task(t): logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器") else: logger.warning(f"⚠️ 任务 {task_id} 部署到生产服务器失败") else: logger.info(f"ℹ️ 自动部署已禁用,跳过任务 {task_id} 的部署") else: remaining_tasks.append(t) else: remaining_tasks.append(t) if updated > 0: with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(remaining_tasks, f, indent=2, ensure_ascii=False) logger.info(f"本次共同步 {updated} 个 completed 任务到数据库") return updated # ============================================================================ # 生产服务器部署功能 # ============================================================================ def get_ssh_connection(): """获取 SSH 连接到生产服务器""" try: import paramiko # type: ignore ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.info( f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@" f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..." ) ssh.connect( hostname=PRODUCTION_SERVER["host"], port=PRODUCTION_SERVER["port"], username=PRODUCTION_SERVER["username"], password=PRODUCTION_SERVER["password"], timeout=10, ) logger.info("✅ SSH 连接成功") return ssh except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return None except Exception as e: logger.error(f"SSH 连接失败: {e}") return None def test_ssh_connection() -> bool: """ 测试 SSH 连接到生产服务器 Returns: 连接是否成功 """ logger.info("=" * 60) logger.info("🔍 测试生产服务器连接") logger.info("=" * 60) ssh = get_ssh_connection() if not ssh: logger.error("❌ SSH 连接测试失败") return False try: # 测试执行命令 _, stdout, _ = ssh.exec_command("echo 'Connection test successful'") output = stdout.read().decode().strip() logger.info(f"✅ 命令执行成功: {output}") # 检查目标目录是否存在 _, stdout, _ = ssh.exec_command( f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'" ) result = stdout.read().decode().strip() if result == "exists": logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}") else: logger.warning(f"⚠️ 脚本目录不存在: {PRODUCTION_SERVER['script_path']}") logger.info("将在首次部署时自动创建") ssh.close() logger.info("=" * 60) logger.info("✅ 连接测试完成") logger.info("=" * 60) return True except Exception as e: logger.error(f"❌ 测试执行命令失败: {e}") ssh.close() return False def deploy_script_to_production( local_script_path: str, remote_filename: str | None = None ) -> bool: """ 部署脚本文件到生产服务器 Args: local_script_path: 本地脚本文件路径(相对或绝对) remote_filename: 远程文件名(可选,默认使用本地文件名) Returns: 是否部署成功 """ try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False # 转换为绝对路径 local_path = Path(local_script_path) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"本地文件不存在: {local_path}") return False # 确定远程文件名 if not remote_filename: remote_filename = local_path.name remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["script_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['script_path']}" ) stdout.channel.recv_exit_status() # 上传文件 logger.info(f"正在上传: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) # 设置文件权限为可执行 sftp.chmod(remote_path, 0o755) logger.info(f"✅ 脚本部署成功: {remote_path}") sftp.close() ssh.close() return True except Exception as e: logger.error(f"文件传输失败: {e}") ssh.close() return False except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False except Exception as e: logger.error(f"部署脚本失败: {e}") return False def deploy_n8n_workflow_to_production(workflow_file: str) -> bool: """ 部署 n8n 工作流文件到生产服务器 Args: workflow_file: 本地工作流 JSON 文件路径 Returns: 是否部署成功 """ try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False # 转换为绝对路径 local_path = Path(workflow_file) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"工作流文件不存在: {local_path}") return False remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["workflow_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['workflow_path']}" ) stdout.channel.recv_exit_status() # 上传工作流文件 logger.info(f"正在上传工作流: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) logger.info(f"✅ 工作流部署成功: {remote_path}") sftp.close() ssh.close() return True except Exception as e: logger.error(f"工作流传输失败: {e}") ssh.close() return False except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False except Exception as e: logger.error(f"部署工作流失败: {e}") return False def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool: """ 自动部署已完成任务的脚本和工作流到生产服务器 Args: task_info: 任务信息字典,包含 code_name, code_path 等 Returns: 是否部署成功 """ code_name = task_info.get("code_name") code_path = task_info.get("code_path") task_name = task_info.get("task_name", "未知任务") if not code_name or not code_path: logger.warning(f"任务 {task_name} 缺少代码文件信息,跳过部署") return False logger.info("=" * 60) logger.info(f"🚀 开始自动部署任务: {task_name}") logger.info("=" * 60) deploy_success = True # 1. 部署 Python 脚本 if code_name.endswith(".py"): script_path = f"{code_path}/{code_name}" logger.info(f"📦 部署 Python 脚本: {script_path}") if deploy_script_to_production(script_path): logger.info(f"✅ 脚本 {code_name} 部署成功") else: logger.error(f"❌ 脚本 {code_name} 部署失败") deploy_success = False # 2. 查找并部署相关的 n8n 工作流文件 # 工作流文件通常与脚本在同一目录或 n8n_workflows 目录 workflow_files = [] # 查找模式1: 与脚本同目录的工作流文件 script_dir = WORKSPACE_ROOT / code_path if script_dir.exists() and script_dir.is_dir(): for wf_file in script_dir.glob("n8n_workflow_*.json"): if wf_file.is_file(): workflow_files.append(wf_file) # 查找模式2: datafactory/n8n_workflows 目录 n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows" if n8n_workflows_dir.exists(): for wf_file in n8n_workflows_dir.glob("*.json"): if wf_file.is_file() and wf_file not in workflow_files: workflow_files.append(wf_file) # 查找模式3: 根据任务名称匹配工作流文件 if task_name and task_name != "未知任务": # 将任务名称转换为可能的文件名模式 task_name_pattern = task_name.replace(" ", "_").lower() for wf_file in (WORKSPACE_ROOT / "datafactory").rglob( f"*{task_name_pattern}*.json" ): if ( wf_file.is_file() and "n8n" in wf_file.name.lower() and wf_file not in workflow_files ): workflow_files.append(wf_file) if workflow_files: logger.info(f"📦 发现 {len(workflow_files)} 个工作流文件") for wf_file in workflow_files: logger.info(f"📦 部署工作流: {wf_file.name}") if deploy_n8n_workflow_to_production(str(wf_file)): logger.info(f"✅ 工作流 {wf_file.name} 部署成功") else: logger.error(f"❌ 工作流 {wf_file.name} 部署失败") deploy_success = False else: logger.info("ℹ️ 未发现相关工作流文件") logger.info("=" * 60) if deploy_success: logger.info(f"✅ 任务 {task_name} 部署完成") else: logger.warning(f"⚠️ 任务 {task_name} 部署过程中出现错误") logger.info("=" * 60) return deploy_success # ============================================================================ # Cursor Chat 自动化(Agent 模式) # ============================================================================ # Agent 会话状态 AGENT_SESSION_ACTIVE: bool = False AGENT_START_TIME: float = 0 def get_all_cursor_windows() -> list[dict[str, Any]]: """获取所有 Cursor 窗口信息""" if not HAS_CURSOR_GUI: return [] cursor_windows: list[dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = "cursor" in title.lower() if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) area = (right - left) * (bottom - top) cursor_windows.append( { "hwnd": hwnd, "title": title, "class_name": class_name, "area": area, } ) return True win32gui.EnumWindows(enum_windows_callback, None) return cursor_windows def find_cursor_window() -> int | None: """查找 Cursor 主窗口句柄""" if not HAS_CURSOR_GUI: return None cursor_windows = get_all_cursor_windows() if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None # 按面积排序,返回最大的窗口(主窗口) cursor_windows.sort(key=lambda x: x["area"], reverse=True) return cursor_windows[0]["hwnd"] def activate_window(hwnd: int) -> bool: """激活指定窗口""" if not HAS_CURSOR_GUI: return False try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.3) win32gui.SetForegroundWindow(hwnd) time.sleep(0.5) return True except Exception as e: logger.error(f"激活窗口失败: {e}") return False def open_new_agent() -> bool: """ 在 Cursor 中打开新的 Agent 窗口 使用快捷键 Ctrl+Shift+I 打开新的 Composer/Agent """ global AGENT_SESSION_ACTIVE, AGENT_START_TIME if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False try: # 使用 Ctrl+Shift+I 打开新的 Agent/Composer # 这是 Cursor 默认的快捷键 logger.info("🚀 正在打开新的 Agent...") pyautogui.hotkey("ctrl", "shift", "i") time.sleep(2.0) # 等待 Agent 窗口打开 AGENT_SESSION_ACTIVE = True AGENT_START_TIME = time.time() logger.info("✅ 新的 Agent 已打开") return True except Exception as e: logger.error(f"打开 Agent 失败: {e}") return False def close_current_agent() -> bool: """ 关闭当前的 Agent 会话 使用 Escape 键关闭 Agent 或使用快捷键 """ global AGENT_SESSION_ACTIVE if not HAS_CURSOR_GUI: return False if not AGENT_SESSION_ACTIVE: logger.info("没有活动的 Agent 会话") return True hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False try: logger.info("🔄 正在关闭 Agent...") # 方法1: 按 Escape 键关闭 Agent pyautogui.press("escape") time.sleep(0.5) # 再按一次确保关闭 pyautogui.press("escape") time.sleep(0.3) AGENT_SESSION_ACTIVE = False logger.info("✅ Agent 已关闭") return True except Exception as e: logger.error(f"关闭 Agent 失败: {e}") return False def type_message_to_agent(message: str) -> bool: """ 向 Agent 输入消息 Args: message: 要发送的消息内容 Returns: 是否成功发送 """ if not HAS_CURSOR_GUI: return False try: # 等待 Agent 输入框获得焦点 time.sleep(0.5) # 使用剪贴板粘贴(更可靠地处理中文和特殊字符) if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: # 回退到逐字符输入 pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) # 按 Enter 发送消息 pyautogui.press("enter") logger.info("✅ 消息已发送到 Agent") return True except Exception as e: logger.error(f"发送消息到 Agent 失败: {e}") return False def wait_for_agent_completion( timeout: int = 3600, check_interval: int = 30, ) -> bool: """ 等待 Agent 完成任务 通过检查 pending_tasks.json 中的任务状态来判断是否完成 Args: timeout: 超时时间(秒),默认 1 小时 check_interval: 检查间隔(秒),默认 30 秒 Returns: 是否所有任务都已完成 """ start_time = time.time() logger.info(f"⏳ 等待 Agent 完成任务(超时: {timeout}s)...") while time.time() - start_time < timeout: processing_ids = get_processing_task_ids() if not processing_ids: elapsed = int(time.time() - start_time) logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s") return True remaining = len(processing_ids) elapsed = int(time.time() - start_time) logger.info( f"⏳ 仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)" ) time.sleep(check_interval) logger.warning("⚠️ 等待超时,仍有未完成的任务") return False def run_agent_session( message: str, wait_completion: bool = True, timeout: int = 3600, auto_close: bool = True, ) -> bool: """ 运行完整的 Agent 会话 流程: 1. 打开新的 Agent 2. 发送任务消息 3. (可选)等待任务完成 4. (可选)关闭 Agent Args: message: 要发送给 Agent 的消息 wait_completion: 是否等待任务完成 timeout: 等待超时时间(秒) auto_close: 任务完成后是否自动关闭 Agent Returns: 是否成功完成会话 """ logger.info("=" * 50) logger.info("🤖 开始 Agent 会话") logger.info("=" * 50) # 1. 打开新的 Agent if not open_new_agent(): logger.error("❌ 无法打开 Agent") return False # 2. 发送消息 if not type_message_to_agent(message): logger.error("❌ 无法发送消息到 Agent") close_current_agent() return False logger.info(f"📤 已发送消息: {message[:50]}...") # 3. 等待任务完成 if wait_completion: completed = wait_for_agent_completion(timeout=timeout) # 同步已完成的任务到数据库 sync_completed_tasks_to_db() if completed: logger.info("✅ Agent 已完成所有任务") else: logger.warning("⚠️ Agent 未能在超时时间内完成所有任务") # 4. 关闭 Agent if auto_close: close_current_agent() logger.info("=" * 50) logger.info("🏁 Agent 会话结束") logger.info("=" * 50) return True def send_chat_message(message: str, input_pos: tuple[int, int] | None) -> bool: """ 在 Cursor Chat 中发送消息(传统方式,保留向后兼容) 推荐使用 run_agent_session() 替代此函数 """ if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False # 点击输入框或使用快捷键 if input_pos: x, y = input_pos pyautogui.click(x, y) time.sleep(0.4) else: pyautogui.hotkey("ctrl", "l") time.sleep(1.0) pyautogui.hotkey("ctrl", "a") time.sleep(0.2) # 输入消息 if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) pyautogui.press("enter") logger.info("✅ 消息已发送到 Cursor Chat") return True def send_chat_for_tasks(force: bool = False, use_agent: bool = True) -> bool: """ 向 Cursor Chat 发送任务提醒 Args: force: 强制发送,忽略 ENABLE_CHAT 设置 use_agent: 是否使用新 Agent 模式(推荐) Returns: 是否成功发送 """ if not force and not ENABLE_CHAT: return False if not HAS_CURSOR_GUI: logger.warning("未安装 GUI 自动化依赖,无法发送 Chat 消息") return False processing_ids = get_processing_task_ids() if not processing_ids: logger.info("没有 processing 任务,跳过发送") return False logger.info(f"📤 发送任务提醒({len(processing_ids)} 个任务)...") if use_agent: # 使用新的 Agent 模式 return run_agent_session( message=CHAT_MESSAGE, wait_completion=False, # 在循环模式中不阻塞等待 auto_close=False, # 保持 Agent 打开,让其完成任务 ) else: # 传统模式 return send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS) def chat_trigger_loop( interval: int = 60, use_agent: bool = True, auto_close_on_complete: bool = True, ) -> None: """ 循环模式:定期检查 processing 任务并发送 Chat 消息 Args: interval: 检查间隔(秒),默认 60 秒 use_agent: 是否使用新 Agent 模式 auto_close_on_complete: 任务完成后是否自动关闭 Agent """ global AGENT_SESSION_ACTIVE logger.info("=" * 60) logger.info("🚀 Cursor Chat 自动触发服务已启动") logger.info(f"⏰ 检查间隔: {interval} 秒") logger.info(f"🤖 Agent 模式: {'已启用' if use_agent else '未启用'}") logger.info(f"📝 发送消息: {CHAT_MESSAGE}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) last_sent_time = 0 min_send_interval = 120 # 最小发送间隔(秒),避免频繁打扰 try: while True: try: # 1. 先同步已完成任务 sync_completed_tasks_to_db() # 2. 从数据库拉取 pending 任务并转为 processing pending_tasks = get_pending_tasks() if pending_tasks: logger.info(f"📥 从数据库发现 {len(pending_tasks)} 个 pending 任务") # 更新任务状态为 processing for task in pending_tasks: update_task_status(task["task_id"], "processing") # 写入 pending_tasks.json write_pending_tasks_json(pending_tasks) # 生成执行指令文件 create_execute_instructions(pending_tasks) logger.info("✅ 已将 pending 任务转为 processing") # 3. 检查是否有 processing 任务 processing_ids = get_processing_task_ids() if processing_ids: current_time = time.time() time_since_last = current_time - last_sent_time # 如果有活动的 Agent 会话,不需要重新发送 if AGENT_SESSION_ACTIVE: logger.info( f"🤖 Agent 正在执行中,剩余 {len(processing_ids)} 个任务" ) elif time_since_last >= min_send_interval: logger.info(f"📋 发现 {len(processing_ids)} 个待处理任务") # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) # 发送 Chat 消息(启动 Agent) if use_agent: if open_new_agent(): if type_message_to_agent(CHAT_MESSAGE): last_sent_time = current_time logger.info("✅ 已启动 Agent 并发送执行提醒") else: logger.warning("⚠️ 发送消息失败") close_current_agent() else: logger.warning("⚠️ 启动 Agent 失败") else: if send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS): last_sent_time = current_time logger.info("✅ 已发送执行提醒") else: logger.warning("⚠️ 发送失败,将在下次重试") else: remaining = int(min_send_interval - time_since_last) logger.info( f"⏳ 距离上次发送不足 {min_send_interval}s," f"还需等待 {remaining}s" ) else: # 没有待处理任务 if AGENT_SESSION_ACTIVE and auto_close_on_complete: logger.info("✅ 所有任务已完成,正在关闭 Agent...") close_current_agent() logger.info("✅ Agent 已关闭") else: logger.info("✅ 没有待处理任务") logger.info(f"⏳ {interval} 秒后再次检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") time.sleep(interval) except KeyboardInterrupt: # 退出时关闭 Agent if AGENT_SESSION_ACTIVE: logger.info("正在关闭 Agent...") close_current_agent() logger.info("\n⛔ 服务已停止") # ============================================================================ # 主执行流程 # ============================================================================ def auto_execute_tasks_once() -> int: """执行一次任务检查和处理""" # 1. 先同步已完成任务到数据库 sync_completed_tasks_to_db() # 2. 获取 pending 任务 logger.info("🔍 检查 pending 任务...") tasks = get_pending_tasks() # 3. 检查是否有现有的 processing 任务 existing_processing_ids = get_processing_task_ids() if not tasks and not existing_processing_ids: logger.info("✅ 没有 pending 或 processing 任务") # 更新触发器文件为"已完成"状态 update_trigger_file(0, "所有任务已完成", []) return 0 if tasks: logger.info(f"📋 找到 {len(tasks)} 个 pending 任务") # 4. 更新任务状态为 processing for task in tasks: update_task_status(task["task_id"], "processing") # 5. 写入 pending_tasks.json write_pending_tasks_json(tasks) # 6. 生成执行指令文件 create_execute_instructions(tasks) # 7. 更新触发器文件(关键:触发 Cursor 自动执行) all_processing_ids = get_processing_task_ids() if all_processing_ids: update_trigger_file( task_count=len(all_processing_ids), status="有待执行任务", task_ids=all_processing_ids, ) logger.info( f"🔔 已更新触发器,等待 Cursor 执行 {len(all_processing_ids)} 个任务" ) return len(tasks) def auto_execute_tasks_loop(interval: int = 300) -> None: """循环执行任务检查""" logger.info("=" * 60) logger.info("🚀 自动任务执行服务已启动") logger.info(f"⏰ 检查间隔: {interval} 秒") logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) try: while True: try: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks() logger.info(f"✅ 已处理 {count} 个任务") logger.info(f"⏳ {interval} 秒后再次检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") time.sleep(interval) except KeyboardInterrupt: logger.info("\n⛔ 服务已停止") def refresh_trigger_only() -> None: """仅刷新触发器文件(用于现有 processing 任务)""" processing_ids = get_processing_task_ids() if not processing_ids: logger.info("没有 processing 状态的任务") update_trigger_file(0, "所有任务已完成", []) return logger.info(f"📋 发现 {len(processing_ids)} 个 processing 任务") # 重新生成指令文件(从 pending_tasks.json 读取) if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: all_tasks = json.load(f) processing_tasks = [t for t in all_tasks if t.get("status") == "processing"] if processing_tasks: # 重新生成执行指令文件 create_execute_instructions_from_json(processing_tasks) except Exception as e: logger.error(f"读取任务文件失败: {e}") # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) logger.info("✅ 触发器已刷新,等待 Cursor 执行任务") def create_execute_instructions_from_json(tasks: list[dict[str, Any]]) -> None: """从 pending_tasks.json 格式的任务列表生成执行指令文件""" TASKS_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# 🤖 Cursor 自动任务执行指令\n\n") f.write("**⚠️ 重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 📋 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("## ⚠️ 任务约束要求\n\n") f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n") f.write("- ❌ 不要创建任何 summary、report、总结类的文档文件\n") f.write("- ❌ 不要生成 task_summary.md、execution_report.md 等总结文件\n") f.write("- ✅ 只需创建任务要求的功能脚本文件\n") f.write("- ✅ 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task.get("task_id", "N/A") task_name = task.get("task_name", "未命名任务") task_desc = task.get("task_description", "无描述") notified_at = task.get("notified_at", "") f.write(f"## 🔴 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **通知时间**: {notified_at}\n") f.write(f"- **代码路径**: {task.get('code_path', 'N/A')}\n\n") f.write(f"### 📝 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已更新: {INSTRUCTIONS_FILE}") def main() -> None: """主函数""" parser = argparse.ArgumentParser( description="自动任务执行调度脚本", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 执行一次任务检查 python scripts/auto_execute_tasks.py --once # 使用 Agent 模式执行任务(自动部署) python scripts/auto_execute_tasks.py --agent-run # 启动 Agent 循环模式(推荐,自动部署) python scripts/auto_execute_tasks.py --chat-loop --use-agent # 禁用自动部署功能 python scripts/auto_execute_tasks.py --chat-loop --use-agent --no-deploy # 立即部署指定任务到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试生产服务器连接 python scripts/auto_execute_tasks.py --test-connection """, ) parser.add_argument("--once", action="store_true", help="只执行一次") parser.add_argument("--interval", type=int, default=300, help="检查间隔(秒)") parser.add_argument( "--enable-chat", action="store_true", help="启用自动 Cursor Chat" ) parser.add_argument("--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"') parser.add_argument( "--chat-message", type=str, default="请阅读 tasks/task_execute_instructions.md 并执行任务。", help="发送到 Chat 的消息", ) parser.add_argument( "--refresh-trigger", action="store_true", help="仅刷新触发器文件(用于现有 processing 任务)", ) parser.add_argument( "--chat-loop", action="store_true", help="启动 Chat 自动触发循环(定期发送执行消息)", ) parser.add_argument( "--chat-interval", type=int, default=60, help="Chat 触发循环的检查间隔(秒),默认 60", ) parser.add_argument( "--send-chat-now", action="store_true", help="立即发送一次 Chat 消息(用于手动触发)", ) # Agent 模式相关参数 parser.add_argument( "--use-agent", action="store_true", default=True, help="使用新 Agent 模式(默认启用)", ) parser.add_argument( "--no-agent", action="store_true", help="禁用 Agent 模式,使用传统 Chat 方式", ) parser.add_argument( "--agent-run", action="store_true", help="立即启动 Agent 会话执行任务", ) parser.add_argument( "--agent-timeout", type=int, default=3600, help="Agent 等待任务完成的超时时间(秒),默认 3600", ) parser.add_argument( "--no-auto-close", action="store_true", help="任务完成后不自动关闭 Agent", ) # 自动部署相关参数 parser.add_argument( "--enable-deploy", action="store_true", default=True, help="启用自动部署到生产服务器(默认启用)", ) parser.add_argument( "--no-deploy", action="store_true", help="禁用自动部署功能", ) parser.add_argument( "--deploy-now", type=str, metavar="TASK_ID", help="立即部署指定任务ID的脚本到生产服务器", ) parser.add_argument( "--test-connection", action="store_true", help="测试到生产服务器的 SSH 连接", ) args = parser.parse_args() global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE, ENABLE_AUTO_DEPLOY ENABLE_CHAT = bool(args.enable_chat) CHAT_MESSAGE = args.chat_message ENABLE_AUTO_DEPLOY = args.enable_deploy and not args.no_deploy # 确定是否使用 Agent 模式 use_agent = args.use_agent and not args.no_agent auto_close = not args.no_auto_close if args.chat_input_pos: try: x, y = args.chat_input_pos.split(",") CHAT_INPUT_POS = (int(x.strip()), int(y.strip())) except Exception: pass # 测试 SSH 连接 if args.test_connection: if test_ssh_connection(): logger.info("✅ 连接测试成功") else: logger.error("❌ 连接测试失败") return # 立即部署指定任务 if args.deploy_now: try: task_id = int(args.deploy_now) logger.info(f"🚀 开始部署任务 {task_id}...") # 从 pending_tasks.json 查找任务信息 if PENDING_TASKS_FILE.exists(): with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) task_found = None for t in tasks: if t.get("task_id") == task_id: task_found = t break if task_found: if auto_deploy_completed_task(task_found): logger.info(f"✅ 任务 {task_id} 部署成功") else: logger.error(f"❌ 任务 {task_id} 部署失败") else: logger.error(f"未找到任务 {task_id}") else: logger.error("pending_tasks.json 文件不存在") except ValueError: logger.error(f"无效的任务ID: {args.deploy_now}") return # 仅刷新触发器 if args.refresh_trigger: refresh_trigger_only() return # 立即启动 Agent 会话 if args.agent_run: processing_ids = get_processing_task_ids() if not processing_ids: # 先检查是否有 pending 任务 auto_execute_tasks_once() processing_ids = get_processing_task_ids() if not processing_ids: logger.info("没有待执行的任务") return logger.info(f"🚀 启动 Agent 执行 {len(processing_ids)} 个任务...") success = run_agent_session( message=CHAT_MESSAGE, wait_completion=True, timeout=args.agent_timeout, auto_close=auto_close, ) if success: logger.info("✅ Agent 会话完成") else: logger.error("❌ Agent 会话失败") return # 立即发送一次 Chat 消息 if args.send_chat_now: if use_agent: # 使用 Agent 模式 processing_ids = get_processing_task_ids() if not processing_ids: auto_execute_tasks_once() processing_ids = get_processing_task_ids() if processing_ids: if open_new_agent(): if type_message_to_agent(CHAT_MESSAGE): logger.info("✅ Agent 已启动并发送消息") else: logger.error("❌ 发送消息失败") else: logger.error("❌ 启动 Agent 失败") else: logger.info("没有待执行的任务") else: if send_chat_for_tasks(force=True, use_agent=False): logger.info("✅ 消息已发送") else: logger.error("❌ 发送失败") return # Chat 自动触发循环模式 if args.chat_loop: chat_trigger_loop( interval=args.chat_interval, use_agent=use_agent, auto_close_on_complete=auto_close, ) return if args.once: count = auto_execute_tasks_once() if count > 0: send_chat_for_tasks(use_agent=use_agent) logger.info(f"✅ 完成!处理了 {count} 个任务") else: auto_execute_tasks_loop(interval=args.interval) if __name__ == "__main__": main()