#!/usr/bin/env python3 """ 自动任务执行核心调度脚本 (Agent 模式) 工作流程: 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务 2. 生成 tasks/task_execute_instructions.md 执行指令文件 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json 4. 更新 tasks/task_trigger.txt 触发器文件 5. 启动新的 Cursor Agent 并发送执行指令 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent 使用方式: # Agent 单次执行(执行一次任务后退出) python scripts/auto_execute_tasks.py --agent-run # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务) python scripts/auto_execute_tasks.py --agent-loop # Agent 循环模式 + 禁用自动部署 python scripts/auto_execute_tasks.py --agent-loop --no-deploy # 设置 Agent 超时时间(默认 3600 秒) python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200 # 任务完成后不自动关闭 Agent python scripts/auto_execute_tasks.py --agent-run --no-auto-close # 立即部署指定任务ID的脚本到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试到生产服务器的 SSH 连接 python scripts/auto_execute_tasks.py --test-connection """ from __future__ import annotations import argparse import json import logging import sys import time from datetime import datetime from pathlib import Path from typing import Any # ============================================================================ # 日志配置 # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ============================================================================ # Windows GUI 自动化依赖(可选) # ============================================================================ HAS_CURSOR_GUI = False HAS_PYPERCLIP = False try: import pyautogui import win32con import win32gui pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 HAS_CURSOR_GUI = True try: import pyperclip HAS_PYPERCLIP = True except ImportError: pass except ImportError: logger.info( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui)," "将禁用自动 Cursor Agent 功能。" ) # ============================================================================ # 全局配置 # ============================================================================ WORKSPACE_ROOT = Path(__file__).parent.parent TASKS_DIR = WORKSPACE_ROOT / "tasks" PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json" INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md" TRIGGER_FILE = TASKS_DIR / "task_trigger.txt" # 生产服务器配置 PRODUCTION_SERVER = { "host": "192.168.3.143", "port": 22, "username": "ubuntu", "password": "citumxl2357", "script_path": "/opt/dataops-platform/datafactory/scripts", "workflow_path": "/opt/dataops-platform/n8n/workflows", } # Agent 消息模板 AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。" # 命令行参数控制的全局变量 ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署 # ============================================================================ # 数据库操作 # ============================================================================ def get_db_connection(): """获取数据库连接(使用 production 环境配置)""" try: from urllib.parse import urlparse import psycopg2 sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的数据库配置 app_config = config["production"] db_uri = app_config.SQLALCHEMY_DATABASE_URI # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式 parsed = urlparse(db_uri) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path.lstrip("/"), user=parsed.username, password=parsed.password, ) logger.debug( f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}" ) return conn except ImportError as e: logger.error(f"导入依赖失败: {e}") return None except Exception as e: logger.error(f"连接数据库失败: {e}") import traceback logger.error(traceback.format_exc()) return None def get_pending_tasks() -> list[dict[str, Any]]: """ 从 PostgreSQL task_list 表获取所有 pending 状态的任务 重要:此函数直接查询数据库,确保获取最新的任务列表 """ try: from psycopg2.extras import RealDictCursor logger.info("📡 正在连接数据库...") conn = get_db_connection() if not conn: logger.error("❌ 无法获取数据库连接") return [] logger.info("✅ 数据库连接成功,正在查询 pending 任务...") cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute( """ SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """ ) tasks = cursor.fetchall() cursor.close() conn.close() task_list = [dict(task) for task in tasks] logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务") if task_list: for task in task_list: logger.info(f" - 任务 {task['task_id']}: {task['task_name']}") return task_list except Exception as e: logger.error(f"获取 pending 任务失败: {e}") import traceback logger.error(traceback.format_exc()) return [] def update_task_status( task_id: int, status: str, code_name: str | None = None, code_path: str | None = None, ) -> bool: """更新任务状态""" try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() if code_name and code_path: cursor.execute( """ UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id), ) else: cursor.execute( """ UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id), ) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ============================================================================ # 触发器文件管理 # ============================================================================ def update_trigger_file( task_count: int = 0, status: str = "有待执行任务", task_ids: list[int] | None = None, ) -> None: """更新触发器文件 tasks/task_trigger.txt""" TASKS_DIR.mkdir(parents=True, exist_ok=True) gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") task_ids_str = ", ".join(str(tid) for tid in (task_ids or [])) content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER 生成时间: {gen_time} 状态: {status} 待处理任务数: {task_count} 任务ID列表: [{task_ids_str}] 此文件用于触发Cursor自动执行任务。 【重要】Cursor AI 请立即执行以下操作: 1. 读取 tasks/task_execute_instructions.md 文件 2. 按照文件中的任务描述,依次完成每个任务 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed """ with TRIGGER_FILE.open("w", encoding="utf-8") as f: f.write(content) logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}") def get_processing_task_ids() -> list[int]: """从 pending_tasks.json 获取所有 processing 状态的任务 ID""" if not PENDING_TASKS_FILE.exists(): return [] try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) return [ t.get("task_id") for t in tasks if t.get("status") == "processing" and t.get("task_id") ] except Exception: return [] # ============================================================================ # 任务文件生成 # ============================================================================ def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None: """将任务列表写入 tasks/pending_tasks.json""" TASKS_DIR.mkdir(parents=True, exist_ok=True) # 读取现有任务 existing_tasks = [] if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: existing_tasks = json.load(f) except Exception: existing_tasks = [] existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t} # 添加新任务 for task in tasks: if task["task_id"] not in existing_ids: task_info = { "task_id": task["task_id"], "task_name": task["task_name"], "code_path": task.get("code_path", ""), "code_name": task.get("code_name", ""), "status": "processing", "notified_at": datetime.now().isoformat(), "code_file": task.get("code_file", ""), } existing_tasks.append(task_info) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(existing_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}") def create_execute_instructions(tasks: list[dict[str, Any]]) -> None: """生成任务执行指令文件 tasks/task_execute_instructions.md""" TASKS_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# Cursor 自动任务执行指令\n\n") f.write("**重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("## 任务约束要求\n\n") f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n") f.write("- 不要创建任何 summary、report、总结类的文档文件\n") f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n") f.write("- 只需创建任务要求的功能脚本文件\n") f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task["task_id"] task_name = task["task_name"] task_desc = task["task_description"] create_time = task.get("create_time", "") if hasattr(create_time, "strftime"): create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"## 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **创建时间**: {create_time}\n") f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n") f.write(f"### 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}") # ============================================================================ # Neo4j 独立连接(不依赖 Flask 应用上下文) # ============================================================================ def get_neo4j_driver(): """获取 Neo4j 驱动(独立于 Flask 应用上下文)""" try: from neo4j import GraphDatabase sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的配置 app_config = config["production"] uri = app_config.NEO4J_URI user = app_config.NEO4J_USER password = app_config.NEO4J_PASSWORD driver = GraphDatabase.driver(uri, auth=(user, password)) return driver except ImportError as e: logger.error(f"导入 Neo4j 驱动失败: {e}") return None except Exception as e: logger.error(f"连接 Neo4j 失败: {e}") return None # ============================================================================ # 状态同步 # ============================================================================ def extract_dataflow_name_from_task(task_id: int) -> str | None: """从任务描述中提取 DataFlow 名称""" import re try: conn = get_db_connection() if not conn: return None cursor = conn.cursor() cursor.execute( "SELECT task_description FROM task_list WHERE task_id = %s", (task_id,), ) result = cursor.fetchone() cursor.close() conn.close() if not result: return None task_desc = result[0] # 从任务描述中提取 DataFlow Name match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc) if match: dataflow_name = match.group(1).strip() logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}") return dataflow_name return None except Exception as e: logger.error(f"提取 DataFlow 名称失败: {e}") return None def update_dataflow_script_path( task_name: str, script_path: str, task_id: int | None = None ) -> bool: """更新 DataFlow 节点的 script_path 字段""" try: driver = get_neo4j_driver() if not driver: logger.error("无法获取 Neo4j 驱动") return False # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称 dataflow_name = task_name if task_id: extracted_name = extract_dataflow_name_from_task(task_id) if extracted_name: dataflow_name = extracted_name logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}") query = """ MATCH (n:DataFlow {name_zh: $name_zh}) SET n.script_path = $script_path, n.updated_at = $updated_at RETURN n """ updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with driver.session() as session: result = session.run( query, name_zh=dataflow_name, script_path=script_path, updated_at=updated_at, ).single() driver.close() if result: logger.info( f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}" ) return True else: logger.warning(f"未找到 DataFlow 节点: {dataflow_name}") return False except Exception as e: logger.error(f"更新 DataFlow script_path 失败: {e}") return False def sync_completed_tasks_to_db() -> int: """将 pending_tasks.json 中 completed 的任务同步到数据库""" if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as e: logger.error(f"读取 pending_tasks.json 失败: {e}") return 0 if not isinstance(tasks, list): return 0 updated = 0 remaining_tasks = [] for t in tasks: if t.get("status") == "completed": task_id = t.get("task_id") if not task_id: continue task_name = t.get("task_name") code_name = t.get("code_name") code_path = t.get("code_path") # 统一处理:code_path 始终为 "datafactory/scripts" code_path = "datafactory/scripts" # 只处理 Python 脚本文件 is_python_script = code_name and code_name.endswith(".py") if is_python_script: logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_name}") else: logger.info( f"任务 {task_id} 的 code_name ({code_name}) 不是 Python 脚本,跳过 DataFlow 更新" ) if update_task_status(task_id, "completed", code_name, code_path): updated += 1 logger.info(f"已同步任务 {task_id} 为 completed") # 只有 Python 脚本才更新 DataFlow 节点的 script_path if task_name and is_python_script: full_script_path = f"{code_path}/{code_name}" if update_dataflow_script_path( task_name, full_script_path, task_id=task_id ): logger.info( f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}" ) else: logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}") # 自动部署到生产服务器(如果启用) if ENABLE_AUTO_DEPLOY: logger.info(f"开始自动部署任务 {task_id} 到生产服务器...") if auto_deploy_completed_task(t): logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器") else: logger.warning(f"任务 {task_id} 部署到生产服务器失败") else: logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署") else: remaining_tasks.append(t) else: remaining_tasks.append(t) if updated > 0: with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(remaining_tasks, f, indent=2, ensure_ascii=False) logger.info(f"本次共同步 {updated} 个 completed 任务到数据库") return updated # ============================================================================ # 生产服务器部署功能 # ============================================================================ def get_ssh_connection(): """获取 SSH 连接到生产服务器""" try: import paramiko # type: ignore ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.info( f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@" f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..." ) ssh.connect( hostname=PRODUCTION_SERVER["host"], port=PRODUCTION_SERVER["port"], username=PRODUCTION_SERVER["username"], password=PRODUCTION_SERVER["password"], timeout=10, ) logger.info("✅ SSH 连接成功") return ssh except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return None except Exception as e: logger.error(f"SSH 连接失败: {e}") return None def test_ssh_connection() -> bool: """测试 SSH 连接到生产服务器""" logger.info("=" * 60) logger.info("测试生产服务器连接") logger.info("=" * 60) ssh = get_ssh_connection() if not ssh: logger.error("❌ SSH 连接测试失败") return False try: # 测试执行命令 _, stdout, _ = ssh.exec_command("echo 'Connection test successful'") output = stdout.read().decode().strip() logger.info(f"✅ 命令执行成功: {output}") # 检查目标目录是否存在 _, stdout, _ = ssh.exec_command( f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'" ) result = stdout.read().decode().strip() if result == "exists": logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}") else: logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}") logger.info("将在首次部署时自动创建") ssh.close() logger.info("=" * 60) logger.info("✅ 连接测试完成") logger.info("=" * 60) return True except Exception as e: logger.error(f"❌ 测试执行命令失败: {e}") ssh.close() return False def deploy_script_to_production( local_script_path: str, remote_filename: str | None = None ) -> bool: """部署脚本文件到生产服务器""" try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False # 转换为绝对路径 local_path = Path(local_script_path) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"本地文件不存在: {local_path}") return False # 确定远程文件名 if not remote_filename: remote_filename = local_path.name remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["script_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['script_path']}" ) stdout.channel.recv_exit_status() # 上传文件 logger.info(f"正在上传: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) # 设置文件权限为可执行 sftp.chmod(remote_path, 0o755) logger.info(f"✅ 脚本部署成功: {remote_path}") sftp.close() ssh.close() return True except Exception as e: logger.error(f"文件传输失败: {e}") ssh.close() return False except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False except Exception as e: logger.error(f"部署脚本失败: {e}") return False def deploy_n8n_workflow_to_production(workflow_file: str) -> bool: """ 部署 n8n 工作流到 n8n 服务器 此函数执行两个步骤: 1. 通过 n8n API 创建工作流(主要步骤) 2. 通过 SFTP 备份工作流文件到生产服务器(可选) """ try: import json import requests # 转换为绝对路径 local_path = Path(workflow_file) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"工作流文件不存在: {local_path}") return False # 加载工作流 JSON with open(local_path, encoding="utf-8") as f: workflow_data = json.load(f) workflow_name = workflow_data.get("name", local_path.stem) logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}") # 获取 n8n API 配置 try: sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import BaseConfig api_url = BaseConfig.N8N_API_URL api_key = BaseConfig.N8N_API_KEY timeout = BaseConfig.N8N_API_TIMEOUT except (ImportError, AttributeError): import os api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com") api_key = os.environ.get("N8N_API_KEY", "") timeout = int(os.environ.get("N8N_API_TIMEOUT", "30")) if not api_key: logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器") return False # 准备 API 请求 headers = { "X-N8N-API-KEY": api_key, "Content-Type": "application/json", "Accept": "application/json", } # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags) workflow_payload = { "name": workflow_name, "nodes": workflow_data.get("nodes", []), "connections": workflow_data.get("connections", {}), "settings": workflow_data.get("settings", {}), } # 调用 n8n API 创建工作流 create_url = f"{api_url.rstrip('/')}/api/v1/workflows" logger.info(f"调用 n8n API: {create_url}") try: response = requests.post( create_url, headers=headers, json=workflow_payload, timeout=timeout, ) if response.status_code == 401: logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置") return False elif response.status_code == 403: logger.error("n8n API 权限不足") return False response.raise_for_status() created_workflow = response.json() workflow_id = created_workflow.get("id") logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}") # 可选:将工作流文件备份到生产服务器 try: _backup_workflow_to_server(local_path) except Exception as backup_error: logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}") return True except requests.exceptions.Timeout: logger.error("n8n API 请求超时,请检查网络连接") return False except requests.exceptions.ConnectionError: logger.error(f"无法连接到 n8n 服务器: {api_url}") return False except requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json() except Exception: error_detail = e.response.text logger.error( f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}" ) return False except Exception as e: logger.error(f"部署工作流失败: {e}") import traceback logger.error(traceback.format_exc()) return False def _backup_workflow_to_server(local_path: Path) -> bool: """备份工作流文件到生产服务器(通过 SFTP)""" try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.debug("未安装 paramiko 库,跳过文件备份") return False remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["workflow_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['workflow_path']}" ) stdout.channel.recv_exit_status() # 上传工作流文件 logger.debug(f"备份工作流文件: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) sftp.close() ssh.close() return True except Exception as e: logger.warning(f"工作流文件备份失败: {e}") ssh.close() return False except Exception as e: logger.warning(f"备份工作流失败: {e}") return False def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool: """自动部署已完成任务的脚本和工作流到生产服务器""" code_name = task_info.get("code_name") code_path = task_info.get("code_path") task_name = task_info.get("task_name", "未知任务") if not code_name or not code_path: logger.warning(f"任务 {task_name} 缺少代码文件信息,跳过部署") return False logger.info("=" * 60) logger.info(f"开始自动部署任务: {task_name}") logger.info("=" * 60) deploy_success = True # 1. 部署 Python 脚本 if code_name.endswith(".py"): script_path = f"{code_path}/{code_name}" logger.info(f"部署 Python 脚本: {script_path}") if deploy_script_to_production(script_path): logger.info(f"✅ 脚本 {code_name} 部署成功") else: logger.error(f"❌ 脚本 {code_name} 部署失败") deploy_success = False # 2. 查找并部署相关的 n8n 工作流文件 workflow_files = [] # 查找模式1: 与脚本同目录的工作流文件 script_dir = WORKSPACE_ROOT / code_path if script_dir.exists() and script_dir.is_dir(): for wf_file in script_dir.glob("n8n_workflow_*.json"): if wf_file.is_file(): workflow_files.append(wf_file) # 查找模式2: datafactory/n8n_workflows 目录 n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows" if n8n_workflows_dir.exists(): for wf_file in n8n_workflows_dir.glob("*.json"): if wf_file.is_file() and wf_file not in workflow_files: workflow_files.append(wf_file) # 查找模式3: 根据任务名称匹配工作流文件 if task_name and task_name != "未知任务": task_name_pattern = task_name.replace(" ", "_").lower() for wf_file in (WORKSPACE_ROOT / "datafactory").rglob( f"*{task_name_pattern}*.json" ): if ( wf_file.is_file() and "n8n" in wf_file.name.lower() and wf_file not in workflow_files ): workflow_files.append(wf_file) if workflow_files: logger.info(f"发现 {len(workflow_files)} 个工作流文件") for wf_file in workflow_files: logger.info(f"部署工作流: {wf_file.name}") if deploy_n8n_workflow_to_production(str(wf_file)): logger.info(f"✅ 工作流 {wf_file.name} 部署成功") else: logger.error(f"❌ 工作流 {wf_file.name} 部署失败") deploy_success = False else: logger.info("未发现相关工作流文件") logger.info("=" * 60) if deploy_success: logger.info(f"✅ 任务 {task_name} 部署完成") else: logger.warning(f"任务 {task_name} 部署过程中出现错误") logger.info("=" * 60) return deploy_success # ============================================================================ # Cursor Agent 自动化 # ============================================================================ # Agent 会话状态 AGENT_SESSION_ACTIVE: bool = False AGENT_START_TIME: float = 0 def get_all_cursor_windows() -> list[dict[str, Any]]: """获取所有 Cursor 窗口信息""" if not HAS_CURSOR_GUI: return [] cursor_windows: list[dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = "cursor" in title.lower() if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) area = (right - left) * (bottom - top) cursor_windows.append( { "hwnd": hwnd, "title": title, "class_name": class_name, "area": area, } ) return True win32gui.EnumWindows(enum_windows_callback, None) return cursor_windows def find_cursor_window() -> int | None: """查找 Cursor 主窗口句柄""" if not HAS_CURSOR_GUI: return None cursor_windows = get_all_cursor_windows() if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None # 按面积排序,返回最大的窗口(主窗口) cursor_windows.sort(key=lambda x: x["area"], reverse=True) return cursor_windows[0]["hwnd"] def activate_window(hwnd: int) -> bool: """激活指定窗口""" if not HAS_CURSOR_GUI: return False try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.3) win32gui.SetForegroundWindow(hwnd) time.sleep(0.5) return True except Exception as e: logger.error(f"激活窗口失败: {e}") return False def open_new_agent() -> bool: """在 Cursor 中打开新的 Agent 窗口""" global AGENT_SESSION_ACTIVE, AGENT_START_TIME if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False try: # 使用 Ctrl+Shift+I 打开新的 Agent/Composer logger.info("正在打开新的 Agent...") pyautogui.hotkey("ctrl", "shift", "i") time.sleep(2.0) # 等待 Agent 窗口打开 AGENT_SESSION_ACTIVE = True AGENT_START_TIME = time.time() logger.info("✅ 新的 Agent 已打开") return True except Exception as e: logger.error(f"打开 Agent 失败: {e}") return False def close_current_agent() -> bool: """关闭当前的 Agent 会话""" global AGENT_SESSION_ACTIVE if not HAS_CURSOR_GUI: return False if not AGENT_SESSION_ACTIVE: logger.info("没有活动的 Agent 会话") return True hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False try: logger.info("正在关闭 Agent...") # 按 Escape 键关闭 Agent pyautogui.press("escape") time.sleep(0.5) # 再按一次确保关闭 pyautogui.press("escape") time.sleep(0.3) AGENT_SESSION_ACTIVE = False logger.info("✅ Agent 已关闭") return True except Exception as e: logger.error(f"关闭 Agent 失败: {e}") return False def type_message_to_agent(message: str) -> bool: """向 Agent 输入消息""" if not HAS_CURSOR_GUI: return False try: # 等待 Agent 输入框获得焦点 time.sleep(0.5) # 使用剪贴板粘贴(更可靠地处理中文和特殊字符) if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: # 回退到逐字符输入 pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) # 按 Enter 发送消息 pyautogui.press("enter") logger.info("✅ 消息已发送到 Agent") return True except Exception as e: logger.error(f"发送消息到 Agent 失败: {e}") return False def wait_for_agent_completion( timeout: int = 3600, check_interval: int = 30, ) -> bool: """ 等待 Agent 完成任务 通过检查 pending_tasks.json 中的任务状态来判断是否完成 """ start_time = time.time() logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...") while time.time() - start_time < timeout: processing_ids = get_processing_task_ids() if not processing_ids: elapsed = int(time.time() - start_time) logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s") return True remaining = len(processing_ids) elapsed = int(time.time() - start_time) logger.info( f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)" ) time.sleep(check_interval) logger.warning("等待超时,仍有未完成的任务") return False def run_agent_once( timeout: int = 3600, auto_close: bool = True, ) -> bool: """ 执行一次 Agent 任务 流程: 1. 同步已完成任务到数据库 2. 从数据库读取 pending 任务 3. 更新任务状态为 processing 4. 生成执行指令文件 5. 打开 Agent 并发送消息 6. 等待任务完成 7. 同步完成任务 + 自动部署 8. 关闭 Agent """ logger.info("=" * 60) logger.info("Agent 单次执行模式") logger.info("=" * 60) # 1. 先同步已完成任务 sync_completed_tasks_to_db() # 2. 从数据库获取 pending 任务 logger.info("正在从数据库查询 pending 任务...") pending_tasks = get_pending_tasks() # 3. 检查是否有任务需要执行 if not pending_tasks: processing_ids = get_processing_task_ids() if not processing_ids: logger.info("✅ 没有待执行的任务") return True logger.info(f"发现 {len(processing_ids)} 个 processing 任务,继续执行") else: logger.info(f"发现 {len(pending_tasks)} 个 pending 任务") # 4. 更新任务状态为 processing for task in pending_tasks: update_task_status(task["task_id"], "processing") # 5. 写入 pending_tasks.json write_pending_tasks_json(pending_tasks) # 6. 生成执行指令文件 create_execute_instructions(pending_tasks) # 7. 更新触发器文件 all_processing_ids = get_processing_task_ids() if all_processing_ids: update_trigger_file( task_count=len(all_processing_ids), status="有待执行任务", task_ids=all_processing_ids, ) # 8. 打开 Agent 并发送消息 if not open_new_agent(): logger.error("❌ 无法打开 Agent") return False if not type_message_to_agent(AGENT_MESSAGE): logger.error("❌ 无法发送消息到 Agent") close_current_agent() return False logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...") # 9. 等待任务完成 completed = wait_for_agent_completion(timeout=timeout) # 10. 同步已完成的任务到数据库 sync_completed_tasks_to_db() if completed: logger.info("✅ Agent 已完成所有任务") else: logger.warning("Agent 未能在超时时间内完成所有任务") # 11. 关闭 Agent if auto_close: close_current_agent() logger.info("=" * 60) logger.info("Agent 会话结束") logger.info("=" * 60) return completed def run_agent_loop( interval: int = 300, timeout: int = 3600, auto_close: bool = True, ) -> None: """ Agent 循环模式 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止 """ global AGENT_SESSION_ACTIVE logger.info("=" * 60) logger.info("Agent 循环模式已启动") logger.info(f"检查间隔: {interval} 秒") logger.info(f"任务超时: {timeout} 秒") logger.info(f"自动部署: {'已启用' if ENABLE_AUTO_DEPLOY else '已禁用'}") logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) try: while True: try: logger.info("开始新一轮任务检查...") # 1. 同步已完成任务 sync_completed_tasks_to_db() # 2. 从数据库获取 pending 任务 pending_tasks = get_pending_tasks() if pending_tasks: logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务") # 更新任务状态为 processing for task in pending_tasks: update_task_status(task["task_id"], "processing") # 写入 pending_tasks.json write_pending_tasks_json(pending_tasks) # 生成执行指令文件 create_execute_instructions(pending_tasks) # 3. 检查是否有 processing 任务 processing_ids = get_processing_task_ids() if processing_ids: # 如果有活动的 Agent 会话,不需要重新启动 if AGENT_SESSION_ACTIVE: logger.info( f"Agent 正在执行中,剩余 {len(processing_ids)} 个任务" ) else: logger.info(f"发现 {len(processing_ids)} 个待处理任务") # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) # 启动 Agent if open_new_agent(): if type_message_to_agent(AGENT_MESSAGE): logger.info("✅ 已启动 Agent 并发送执行提醒") # 等待任务完成 wait_for_agent_completion(timeout=timeout) # 同步完成的任务 sync_completed_tasks_to_db() # 关闭 Agent if auto_close: close_current_agent() else: logger.warning("发送消息失败") close_current_agent() else: logger.warning("启动 Agent 失败") else: logger.info("✅ 没有待处理任务") logger.info(f"{interval} 秒后将重新检查任务列表...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") import traceback logger.error(traceback.format_exc()) time.sleep(interval) except KeyboardInterrupt: # 退出时关闭 Agent if AGENT_SESSION_ACTIVE: logger.info("正在关闭 Agent...") close_current_agent() logger.info("\n⛔ 服务已停止") # ============================================================================ # 交互式菜单 # ============================================================================ def show_interactive_menu() -> None: """显示交互式菜单并执行用户选择的操作""" global ENABLE_AUTO_DEPLOY while True: print("\n" + "=" * 60) print("自动任务执行调度脚本 - Agent 模式") print("=" * 60) print("\n请选择操作模式:\n") print(" 1. Agent 单次执行") print(" 2. Agent 循环模式") print(" 3. Agent 循环模式(禁用部署)") print(" 4. 测试生产服务器连接") print(" 5. 查看当前任务状态") print(" 0. 退出") print("\n" + "-" * 60) try: choice = input("请输入选项 [0-5]: ").strip() except (KeyboardInterrupt, EOFError): print("\n再见!") break if choice == "0": print("再见!") break elif choice == "1": print("\n启动 Agent 单次执行模式...") run_agent_once(timeout=3600, auto_close=True) input("\n按 Enter 键返回菜单...") elif choice == "2": try: interval_str = input("请输入检查间隔(秒,默认300): ").strip() interval = int(interval_str) if interval_str else 300 except ValueError: interval = 300 print(f"\n启动 Agent 循环模式,检查间隔: {interval} 秒") print("按 Ctrl+C 停止服务并返回菜单\n") ENABLE_AUTO_DEPLOY = True try: run_agent_loop(interval=interval) except KeyboardInterrupt: print("\n循环已停止") elif choice == "3": try: interval_str = input("请输入检查间隔(秒,默认300): ").strip() interval = int(interval_str) if interval_str else 300 except ValueError: interval = 300 print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒") print("按 Ctrl+C 停止服务并返回菜单\n") ENABLE_AUTO_DEPLOY = False try: run_agent_loop(interval=interval) except KeyboardInterrupt: print("\n循环已停止") elif choice == "4": print("\n测试生产服务器连接...") if test_ssh_connection(): print("✅ 连接测试成功") else: print("❌ 连接测试失败") input("\n按 Enter 键返回菜单...") elif choice == "5": print("\n当前任务状态:") print("-" * 40) # 从数据库获取 pending 任务 pending_tasks = get_pending_tasks() print(f" 数据库中 pending 任务: {len(pending_tasks)} 个") for task in pending_tasks: print(f" - [{task['task_id']}] {task['task_name']}") # 从本地文件获取 processing 任务 processing_ids = get_processing_task_ids() print(f" 本地 processing 任务: {len(processing_ids)} 个") if processing_ids: print(f" 任务 ID: {processing_ids}") input("\n按 Enter 键返回菜单...") else: print("❌ 无效的选项,请重新选择") # ============================================================================ # 主函数 # ============================================================================ def main() -> None: """主函数""" parser = argparse.ArgumentParser( description="自动任务执行调度脚本 (Agent 模式)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # Agent 单次执行 python scripts/auto_execute_tasks.py --agent-run # Agent 循环模式 python scripts/auto_execute_tasks.py --agent-loop # Agent 循环模式 + 禁用自动部署 python scripts/auto_execute_tasks.py --agent-loop --no-deploy # 设置 Agent 超时时间 python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200 # 立即部署指定任务到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试生产服务器连接 python scripts/auto_execute_tasks.py --test-connection """, ) # Agent 模式参数 parser.add_argument( "--agent-run", action="store_true", help="Agent 单次执行模式", ) parser.add_argument( "--agent-loop", action="store_true", help="Agent 循环模式", ) parser.add_argument( "--agent-timeout", type=int, default=3600, help="Agent 等待任务完成的超时时间(秒),默认 3600", ) parser.add_argument( "--interval", type=int, default=300, help="循环模式检查间隔(秒),默认 300", ) parser.add_argument( "--no-auto-close", action="store_true", help="任务完成后不自动关闭 Agent", ) # 部署相关参数 parser.add_argument( "--no-deploy", action="store_true", help="禁用自动部署功能", ) parser.add_argument( "--deploy-now", type=str, metavar="TASK_ID", help="立即部署指定任务ID的脚本到生产服务器", ) parser.add_argument( "--test-connection", action="store_true", help="测试到生产服务器的 SSH 连接", ) args = parser.parse_args() global ENABLE_AUTO_DEPLOY ENABLE_AUTO_DEPLOY = not args.no_deploy auto_close = not args.no_auto_close # 测试 SSH 连接 if args.test_connection: if test_ssh_connection(): logger.info("✅ 连接测试成功") else: logger.error("❌ 连接测试失败") return # 立即部署指定任务 if args.deploy_now: try: task_id = int(args.deploy_now) logger.info(f"开始部署任务 {task_id}...") # 从 pending_tasks.json 查找任务信息 if PENDING_TASKS_FILE.exists(): with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) task_found = None for t in tasks: if t.get("task_id") == task_id: task_found = t break if task_found: if auto_deploy_completed_task(task_found): logger.info(f"✅ 任务 {task_id} 部署成功") else: logger.error(f"❌ 任务 {task_id} 部署失败") else: logger.error(f"未找到任务 {task_id}") else: logger.error("pending_tasks.json 文件不存在") except ValueError: logger.error(f"无效的任务ID: {args.deploy_now}") return # Agent 单次执行 if args.agent_run: success = run_agent_once( timeout=args.agent_timeout, auto_close=auto_close, ) if success: logger.info("✅ Agent 单次执行完成") else: logger.error("❌ Agent 单次执行失败") return # Agent 循环模式 if args.agent_loop: run_agent_loop( interval=args.interval, timeout=args.agent_timeout, auto_close=auto_close, ) return # 没有指定任何模式参数时,显示交互式菜单 if len(sys.argv) == 1: show_interactive_menu() else: # 显示帮助信息 parser.print_help() if __name__ == "__main__": main()