#!/usr/bin/env python3 """ 自动任务执行核心调度脚本 (Agent 模式) 工作流程: 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务 2. 生成 tasks/task_execute_instructions.md 执行指令文件 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json 4. 更新 tasks/task_trigger.txt 触发器文件 5. 启动新的 Cursor Agent 并发送执行指令 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent 使用方式: # Agent 单次执行(执行一次任务后退出) python scripts/auto_execute_tasks.py --agent-run # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务) python scripts/auto_execute_tasks.py --agent-loop # Agent 循环模式 + 禁用自动部署 python scripts/auto_execute_tasks.py --agent-loop --no-deploy # 设置 Agent 超时时间(默认 3600 秒) python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200 # 任务完成后不自动关闭 Agent python scripts/auto_execute_tasks.py --agent-run --no-auto-close # 立即部署指定任务ID的脚本到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试到生产服务器的 SSH 连接 python scripts/auto_execute_tasks.py --test-connection """ from __future__ import annotations import argparse import contextlib import json import logging import sys import time from datetime import datetime from pathlib import Path from typing import Any # ============================================================================ # 日志配置 # ============================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("AutoExecuteTasks") # ============================================================================ # Windows GUI 自动化依赖(可选) # ============================================================================ HAS_CURSOR_GUI = False HAS_PYPERCLIP = False try: import pyautogui import win32con import win32gui pyautogui.FAILSAFE = True pyautogui.PAUSE = 0.5 HAS_CURSOR_GUI = True try: import pyperclip HAS_PYPERCLIP = True except ImportError: pass except ImportError: logger.info( "未安装 Windows GUI 自动化依赖(pywin32/pyautogui)," "将禁用自动 Cursor Agent 功能。" ) # ============================================================================ # 全局配置 # ============================================================================ WORKSPACE_ROOT = Path(__file__).parent.parent TASKS_DIR = WORKSPACE_ROOT / "tasks" PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json" INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md" TRIGGER_FILE = TASKS_DIR / "task_trigger.txt" # 生产服务器配置 PRODUCTION_SERVER = { "host": "192.168.3.143", "port": 22, "username": "ubuntu", "password": "citumxl2357", "script_path": "/opt/dataops-platform/datafactory/scripts", "workflow_path": "/opt/dataops-platform/datafactory/workflows", # 工作流 JSON 文件目录 } # Agent 消息模板 AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。" # 命令行参数控制的全局变量 ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署 # ============================================================================ # 数据库操作 # ============================================================================ def get_db_connection(): """获取数据库连接(使用 production 环境配置)""" try: from urllib.parse import urlparse import psycopg2 sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的数据库配置 app_config = config["production"] db_uri = app_config.SQLALCHEMY_DATABASE_URI # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式 parsed = urlparse(db_uri) conn = psycopg2.connect( host=parsed.hostname, port=parsed.port or 5432, database=parsed.path.lstrip("/"), user=parsed.username, password=parsed.password, ) logger.debug( f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}" ) return conn except ImportError as e: logger.error(f"导入依赖失败: {e}") return None except Exception as e: logger.error(f"连接数据库失败: {e}") import traceback logger.error(traceback.format_exc()) return None def get_pending_tasks() -> list[dict[str, Any]]: """ 从 PostgreSQL task_list 表获取所有 pending 状态的任务 重要:此函数直接查询数据库,确保获取最新的任务列表 """ try: from psycopg2.extras import RealDictCursor logger.info("📡 正在连接数据库...") conn = get_db_connection() if not conn: logger.error("❌ 无法获取数据库连接") return [] logger.info("✅ 数据库连接成功,正在查询 pending 任务...") cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute( """ SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE status = 'pending' ORDER BY create_time ASC """ ) tasks = cursor.fetchall() cursor.close() conn.close() task_list = [dict(task) for task in tasks] logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务") if task_list: for task in task_list: logger.info(f" - 任务 {task['task_id']}: {task['task_name']}") return task_list except Exception as e: logger.error(f"获取 pending 任务失败: {e}") import traceback logger.error(traceback.format_exc()) return [] def update_task_status( task_id: int, status: str, code_name: str | None = None, code_path: str | None = None, ) -> bool: """更新任务状态""" try: conn = get_db_connection() if not conn: return False cursor = conn.cursor() if code_name and code_path: cursor.execute( """ UPDATE task_list SET status = %s, code_name = %s, code_path = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, code_name, code_path, task_id), ) else: cursor.execute( """ UPDATE task_list SET status = %s, update_time = CURRENT_TIMESTAMP WHERE task_id = %s """, (status, task_id), ) conn.commit() updated = cursor.rowcount > 0 cursor.close() conn.close() if updated: logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}") return updated except Exception as e: logger.error(f"更新任务状态失败: {e}") return False # ============================================================================ # 触发器文件管理 # ============================================================================ def update_trigger_file( task_count: int = 0, status: str = "有待执行任务", task_ids: list[int] | None = None, ) -> None: """更新触发器文件 tasks/task_trigger.txt""" TASKS_DIR.mkdir(parents=True, exist_ok=True) gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") task_ids_str = ", ".join(str(tid) for tid in (task_ids or [])) content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER 生成时间: {gen_time} 状态: {status} 待处理任务数: {task_count} 任务ID列表: [{task_ids_str}] 此文件用于触发Cursor自动执行任务。 【重要】Cursor AI 请立即执行以下操作: 1. 读取 tasks/task_execute_instructions.md 文件 2. 按照文件中的任务描述,依次完成每个任务 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed """ with TRIGGER_FILE.open("w", encoding="utf-8") as f: f.write(content) logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}") def get_processing_task_ids() -> list[int]: """从 pending_tasks.json 获取所有 processing 状态的任务 ID""" if not PENDING_TASKS_FILE.exists(): return [] try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) return [ t.get("task_id") for t in tasks if t.get("status") == "processing" and t.get("task_id") ] except Exception: return [] def get_tasks_by_ids(task_ids: list[int]) -> list[dict[str, Any]]: """ 根据任务 ID 列表从数据库获取任务详细信息 Args: task_ids: 任务 ID 列表 Returns: 包含任务详细信息的列表(包括 task_description) """ if not task_ids: return [] try: from psycopg2.extras import RealDictCursor conn = get_db_connection() if not conn: logger.error("无法获取数据库连接") return [] cursor = conn.cursor(cursor_factory=RealDictCursor) # 构建 IN 查询 placeholders = ", ".join(["%s"] * len(task_ids)) query = f""" SELECT task_id, task_name, task_description, status, code_name, code_path, create_time, create_by FROM task_list WHERE task_id IN ({placeholders}) ORDER BY create_time ASC """ cursor.execute(query, tuple(task_ids)) tasks = cursor.fetchall() cursor.close() conn.close() task_list = [dict(task) for task in tasks] logger.info(f"从数据库获取了 {len(task_list)} 个任务的详细信息") return task_list except Exception as e: logger.error(f"根据 ID 获取任务失败: {e}") import traceback logger.error(traceback.format_exc()) return [] def get_all_tasks_to_execute() -> list[dict[str, Any]]: """ 获取所有需要执行的任务(包括新的 pending 任务和已有的 processing 任务) 此函数确保返回的任务列表包含完整信息(特别是 task_description), 用于生成执行指令文件。 Returns: 包含所有需要执行任务的完整信息列表 """ # 1. 获取本地 pending_tasks.json 中 processing 状态的任务 ID processing_ids = get_processing_task_ids() # 2. 从数据库获取所有 pending 任务 pending_tasks = get_pending_tasks() pending_ids = [t["task_id"] for t in pending_tasks] # 3. 合并所有需要查询的任务 ID(去重) all_task_ids = list(set(processing_ids + pending_ids)) if not all_task_ids: return [] # 4. 从数据库获取这些任务的完整信息 all_tasks = get_tasks_by_ids(all_task_ids) logger.info( f"需要执行的任务: {len(all_tasks)} 个 " f"(processing: {len(processing_ids)}, pending: {len(pending_ids)})" ) return all_tasks # ============================================================================ # 任务文件生成 # ============================================================================ def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None: """将任务列表写入 tasks/pending_tasks.json""" TASKS_DIR.mkdir(parents=True, exist_ok=True) # 读取现有任务 existing_tasks = [] if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: existing_tasks = json.load(f) except Exception: existing_tasks = [] existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t} # 添加新任务 for task in tasks: if task["task_id"] not in existing_ids: task_info = { "task_id": task["task_id"], "task_name": task["task_name"], "code_path": task.get("code_path", ""), "code_name": task.get("code_name", ""), "status": "processing", "notified_at": datetime.now().isoformat(), "code_file": task.get("code_file", ""), } existing_tasks.append(task_info) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(existing_tasks, f, indent=2, ensure_ascii=False) logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}") def create_execute_instructions(tasks: list[dict[str, Any]]) -> None: """生成任务执行指令文件 tasks/task_execute_instructions.md""" TASKS_DIR.mkdir(parents=True, exist_ok=True) with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f: f.write("# Cursor 自动任务执行指令\n\n") f.write("**重要:请立即执行以下任务!**\n\n") gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"**生成时间**: {gen_time}\n\n") f.write(f"**待执行任务数量**: {len(tasks)}\n\n") f.write("## 任务完成后的操作\n\n") f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中") f.write("对应任务的 `status` 为 `completed`,\n") f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n") f.write("调度脚本会自动将完成的任务同步到数据库。\n\n") f.write("## 任务约束要求\n\n") f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n") f.write("- 不要创建任何 summary、report、总结类的文档文件\n") f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n") f.write("- 只需创建任务要求的功能脚本文件\n") f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n") f.write("---\n\n") for idx, task in enumerate(tasks, 1): task_id = task["task_id"] task_name = task["task_name"] task_desc = task["task_description"] create_time = task.get("create_time", "") if hasattr(create_time, "strftime"): create_time = create_time.strftime("%Y-%m-%d %H:%M:%S") f.write(f"## 任务 {idx}: {task_name}\n\n") f.write(f"- **任务ID**: `{task_id}`\n") f.write(f"- **创建时间**: {create_time}\n") f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n") f.write(f"### 任务描述\n\n{task_desc}\n\n") f.write("---\n\n") logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}") # ============================================================================ # Neo4j 独立连接(不依赖 Flask 应用上下文) # ============================================================================ def get_neo4j_driver(): """获取 Neo4j 驱动(独立于 Flask 应用上下文)""" try: from neo4j import GraphDatabase sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import config # 强制使用 production 环境的配置 app_config = config["production"] uri = app_config.NEO4J_URI user = app_config.NEO4J_USER password = app_config.NEO4J_PASSWORD driver = GraphDatabase.driver(uri, auth=(user, password)) return driver except ImportError as e: logger.error(f"导入 Neo4j 驱动失败: {e}") return None except Exception as e: logger.error(f"连接 Neo4j 失败: {e}") return None # ============================================================================ # 状态同步 # ============================================================================ def extract_dataflow_name_from_task(task_id: int) -> str | None: """从任务描述中提取 DataFlow 名称""" import re try: conn = get_db_connection() if not conn: return None cursor = conn.cursor() cursor.execute( "SELECT task_description FROM task_list WHERE task_id = %s", (task_id,), ) result = cursor.fetchone() cursor.close() conn.close() if not result: return None task_desc = result[0] # 从任务描述中提取 DataFlow Name match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc) if match: dataflow_name = match.group(1).strip() logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}") return dataflow_name return None except Exception as e: logger.error(f"提取 DataFlow 名称失败: {e}") return None def update_dataflow_script_path( task_name: str, script_path: str, task_id: int | None = None ) -> bool: """更新 DataFlow 节点的 script_path 字段""" try: driver = get_neo4j_driver() if not driver: logger.error("无法获取 Neo4j 驱动") return False # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称 dataflow_name = task_name if task_id: extracted_name = extract_dataflow_name_from_task(task_id) if extracted_name: dataflow_name = extracted_name logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}") query = """ MATCH (n:DataFlow {name_zh: $name_zh}) SET n.script_path = $script_path, n.updated_at = $updated_at RETURN n """ updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with driver.session() as session: result = session.run( query, name_zh=dataflow_name, script_path=script_path, updated_at=updated_at, ).single() driver.close() if result: logger.info( f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}" ) return True else: logger.warning(f"未找到 DataFlow 节点: {dataflow_name}") return False except Exception as e: logger.error(f"更新 DataFlow script_path 失败: {e}") return False def sync_completed_tasks_to_db() -> int: """将 pending_tasks.json 中 completed 的任务同步到数据库""" if not PENDING_TASKS_FILE.exists(): return 0 try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) except Exception as e: logger.error(f"读取 pending_tasks.json 失败: {e}") return 0 if not isinstance(tasks, list): return 0 updated = 0 remaining_tasks = [] for t in tasks: if t.get("status") == "completed": task_id = t.get("task_id") if not task_id: continue task_name = t.get("task_name") code_path = t.get("code_path") # 使用 code_file 字段获取实际的脚本文件名 code_file = t.get("code_file", "") # 统一处理:code_path 始终为 "datafactory/scripts" code_path = "datafactory/scripts" # 使用 code_file 判断是否为 Python 脚本 is_python_script = code_file and code_file.endswith(".py") # 修复路径重复问题:统一处理脚本路径 if is_python_script: if code_file.startswith(code_path): # code_file 已经是完整路径 full_script_path = code_file # 提取纯文件名用于数据库存储 code_file_name = Path(code_file).name elif "/" in code_file or "\\" in code_file: # code_file 包含其他路径,提取文件名 code_file_name = Path(code_file).name full_script_path = f"{code_path}/{code_file_name}" else: # code_file 只是文件名 code_file_name = code_file full_script_path = f"{code_path}/{code_file}" logger.info(f"任务 {task_id} 使用 Python 脚本: {full_script_path}") else: logger.info( f"任务 {task_id} 的 code_file ({code_file}) 不是 Python 脚本,跳过 DataFlow 更新" ) code_file_name = code_file full_script_path = "" if update_task_status(task_id, "completed", code_file_name, code_path): updated += 1 logger.info(f"已同步任务 {task_id} 为 completed") # 只有 Python 脚本才更新 DataFlow 节点的 script_path if task_name and is_python_script: if update_dataflow_script_path( task_name, full_script_path, task_id=task_id ): logger.info( f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}" ) else: logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}") # 自动部署到生产服务器(如果启用) if ENABLE_AUTO_DEPLOY: logger.info(f"开始自动部署任务 {task_id} 到生产服务器...") if auto_deploy_completed_task(t): logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器") else: logger.warning(f"任务 {task_id} 部署到生产服务器失败") else: logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署") else: remaining_tasks.append(t) else: remaining_tasks.append(t) if updated > 0: with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(remaining_tasks, f, indent=2, ensure_ascii=False) logger.info(f"本次共同步 {updated} 个 completed 任务到数据库") return updated # ============================================================================ # 生产服务器部署功能 # ============================================================================ def get_ssh_connection(): """获取 SSH 连接到生产服务器""" try: import paramiko # type: ignore ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) logger.info( f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@" f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..." ) ssh.connect( hostname=PRODUCTION_SERVER["host"], port=PRODUCTION_SERVER["port"], username=PRODUCTION_SERVER["username"], password=PRODUCTION_SERVER["password"], timeout=10, ) logger.info("✅ SSH 连接成功") return ssh except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return None except Exception as e: logger.error(f"SSH 连接失败: {e}") return None def test_ssh_connection() -> bool: """测试 SSH 连接到生产服务器""" logger.info("=" * 60) logger.info("测试生产服务器连接") logger.info("=" * 60) ssh = get_ssh_connection() if not ssh: logger.error("❌ SSH 连接测试失败") return False try: # 测试执行命令 _, stdout, _ = ssh.exec_command("echo 'Connection test successful'") output = stdout.read().decode().strip() logger.info(f"✅ 命令执行成功: {output}") # 检查目标目录是否存在 _, stdout, _ = ssh.exec_command( f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'" ) result = stdout.read().decode().strip() if result == "exists": logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}") else: logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}") logger.info("将在首次部署时自动创建") ssh.close() logger.info("=" * 60) logger.info("✅ 连接测试完成") logger.info("=" * 60) return True except Exception as e: logger.error(f"❌ 测试执行命令失败: {e}") ssh.close() return False def deploy_script_to_production( local_script_path: str, remote_filename: str | None = None ) -> bool: """部署脚本文件到生产服务器""" try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False # 转换为绝对路径 local_path = Path(local_script_path) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"本地文件不存在: {local_path}") return False # 确定远程文件名 if not remote_filename: remote_filename = local_path.name remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["script_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['script_path']}" ) stdout.channel.recv_exit_status() # 上传文件 logger.info(f"正在上传: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) # 设置文件权限为可执行 sftp.chmod(remote_path, 0o755) logger.info(f"✅ 脚本部署成功: {remote_path}") sftp.close() ssh.close() return True except Exception as e: logger.error(f"文件传输失败: {e}") ssh.close() return False except ImportError: logger.error("未安装 paramiko 库,请运行: pip install paramiko") return False except Exception as e: logger.error(f"部署脚本失败: {e}") return False def deploy_n8n_workflow_to_production(workflow_file: str) -> bool: """ 部署 n8n 工作流到 n8n 服务器 此函数执行两个步骤: 1. 通过 n8n API 创建工作流(主要步骤) 2. 通过 SFTP 备份工作流文件到生产服务器(可选) """ try: import json import requests # 转换为绝对路径 local_path = Path(workflow_file) if not local_path.is_absolute(): local_path = WORKSPACE_ROOT / local_path if not local_path.exists(): logger.error(f"工作流文件不存在: {local_path}") return False # 加载工作流 JSON with open(local_path, encoding="utf-8") as f: workflow_data = json.load(f) workflow_name = workflow_data.get("name", local_path.stem) logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}") # 获取 n8n API 配置 try: sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import BaseConfig api_url = BaseConfig.N8N_API_URL api_key = BaseConfig.N8N_API_KEY timeout = BaseConfig.N8N_API_TIMEOUT except (ImportError, AttributeError): import os api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com") api_key = os.environ.get("N8N_API_KEY", "") timeout = int(os.environ.get("N8N_API_TIMEOUT", "30")) if not api_key: logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器") return False # 准备 API 请求 headers = { "X-N8N-API-KEY": api_key, "Content-Type": "application/json", "Accept": "application/json", } # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags) workflow_payload = { "name": workflow_name, "nodes": workflow_data.get("nodes", []), "connections": workflow_data.get("connections", {}), "settings": workflow_data.get("settings", {}), } # 调用 n8n API 创建工作流 create_url = f"{api_url.rstrip('/')}/api/v1/workflows" logger.info(f"调用 n8n API: {create_url}") try: response = requests.post( create_url, headers=headers, json=workflow_payload, timeout=timeout, ) if response.status_code == 401: logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置") return False elif response.status_code == 403: logger.error("n8n API 权限不足") return False response.raise_for_status() created_workflow = response.json() workflow_id = created_workflow.get("id") logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}") # 可选:将工作流文件备份到生产服务器 try: _backup_workflow_to_server(local_path) except Exception as backup_error: logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}") return True except requests.exceptions.Timeout: logger.error("n8n API 请求超时,请检查网络连接") return False except requests.exceptions.ConnectionError: logger.error(f"无法连接到 n8n 服务器: {api_url}") return False except requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json() except Exception: error_detail = e.response.text logger.error( f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}" ) return False except Exception as e: logger.error(f"部署工作流失败: {e}") import traceback logger.error(traceback.format_exc()) return False def _backup_workflow_to_server(local_path: Path) -> bool: """备份工作流文件到生产服务器(通过 SFTP)""" try: import importlib.util if importlib.util.find_spec("paramiko") is None: logger.debug("未安装 paramiko 库,跳过文件备份") return False remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}" # 建立 SSH 连接 ssh = get_ssh_connection() if not ssh: return False try: # 创建 SFTP 客户端 sftp = ssh.open_sftp() # 确保远程目录存在 try: sftp.stat(PRODUCTION_SERVER["workflow_path"]) except FileNotFoundError: logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}") _, stdout, _ = ssh.exec_command( f"mkdir -p {PRODUCTION_SERVER['workflow_path']}" ) stdout.channel.recv_exit_status() # 上传工作流文件 logger.debug(f"备份工作流文件: {local_path} -> {remote_path}") sftp.put(str(local_path), remote_path) sftp.close() ssh.close() return True except Exception as e: logger.warning(f"工作流文件备份失败: {e}") ssh.close() return False except Exception as e: logger.warning(f"备份工作流失败: {e}") return False def find_remote_workflow_files(task_info: dict[str, Any]) -> list[str]: """ 从生产服务器查找与任务相关的 n8n 工作流文件 查找策略: 1. 列出远程 workflow_path 目录下的所有 .json 文件 2. 根据任务名称或脚本名称匹配相关工作流 Args: task_info: 任务信息字典 Returns: 远程工作流文件路径列表 """ remote_files: list[str] = [] code_file = task_info.get("code_file", "") task_name = task_info.get("task_name", "") ssh = get_ssh_connection() if not ssh: logger.warning("无法连接到生产服务器,跳过远程工作流文件查找") return remote_files try: workflow_path = PRODUCTION_SERVER["workflow_path"] # 检查目录是否存在 _, stdout, _ = ssh.exec_command(f"test -d {workflow_path} && echo 'exists'") if stdout.read().decode().strip() != "exists": logger.info(f"远程工作流目录不存在: {workflow_path}") ssh.close() return remote_files # 列出目录下所有 .json 文件 _, stdout, _ = ssh.exec_command(f"ls -1 {workflow_path}/*.json 2>/dev/null") file_list = stdout.read().decode().strip().split("\n") # 过滤有效文件路径 all_json_files = [ f.strip() for f in file_list if f.strip() and f.endswith(".json") ] if not all_json_files: logger.info(f"远程工作流目录 {workflow_path} 中没有 JSON 文件") ssh.close() return remote_files logger.info(f"远程服务器发现 {len(all_json_files)} 个工作流文件") # 根据任务信息匹配相关工作流 # 构建匹配模式 match_patterns: list[str] = [] # 基于脚本文件名匹配 if code_file and code_file.endswith(".py"): script_base = code_file[:-3] # 去掉 .py match_patterns.append(script_base.lower()) # 基于任务名称匹配(针对 DF_DO 格式的任务名) if task_name: if task_name.startswith("DF_DO"): match_patterns.append(task_name.lower()) # 对于中文任务名,尝试提取英文/数字部分 import re alphanumeric = re.sub(r"[^a-zA-Z0-9_-]", "", task_name) if alphanumeric and len(alphanumeric) >= 3: match_patterns.append(alphanumeric.lower()) # 匹配文件 for remote_file in all_json_files: file_name_lower = Path(remote_file).stem.lower() # 检查是否与任何模式匹配 matched = False for pattern in match_patterns: if pattern in file_name_lower or file_name_lower in pattern: matched = True break if matched and remote_file not in remote_files: remote_files.append(remote_file) logger.info(f" 匹配到工作流: {Path(remote_file).name}") # 如果没有匹配到任何文件,不再自动部署所有文件 # 这样可以避免误部署其他任务的工作流 if not remote_files and all_json_files: logger.info("没有精确匹配的工作流文件,跳过远程工作流部署") # 不再自动部署所有文件,避免重复部署问题 ssh.close() return remote_files except Exception as e: logger.error(f"查找远程工作流文件失败: {e}") if ssh: ssh.close() return remote_files def deploy_remote_workflow_to_n8n(remote_file_path: str) -> bool: """ 从生产服务器读取工作流 JSON 文件并部署到 n8n 系统 Args: remote_file_path: 远程服务器上的工作流文件完整路径 Returns: 是否部署成功 """ try: import requests ssh = get_ssh_connection() if not ssh: logger.error("无法连接到生产服务器") return False # 读取远程工作流文件内容 logger.info(f"从远程服务器读取工作流: {remote_file_path}") _, stdout, stderr = ssh.exec_command(f"cat {remote_file_path}") file_content = stdout.read().decode("utf-8") error_output = stderr.read().decode() if error_output: logger.error(f"读取远程文件失败: {error_output}") ssh.close() return False ssh.close() # 解析工作流 JSON try: workflow_data = json.loads(file_content) except json.JSONDecodeError as e: logger.error(f"解析工作流 JSON 失败: {e}") return False workflow_name = workflow_data.get("name", Path(remote_file_path).stem) logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}") # 获取 n8n API 配置 try: sys.path.insert(0, str(WORKSPACE_ROOT)) from app.config.config import BaseConfig api_url = BaseConfig.N8N_API_URL api_key = BaseConfig.N8N_API_KEY timeout = BaseConfig.N8N_API_TIMEOUT except (ImportError, AttributeError): import os api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com") api_key = os.environ.get("N8N_API_KEY", "") timeout = int(os.environ.get("N8N_API_TIMEOUT", "30")) if not api_key: logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器") return False # 准备 API 请求 headers = { "X-N8N-API-KEY": api_key, "Content-Type": "application/json", "Accept": "application/json", } # 准备工作流数据 workflow_payload = { "name": workflow_name, "nodes": workflow_data.get("nodes", []), "connections": workflow_data.get("connections", {}), "settings": workflow_data.get("settings", {}), } # 先检查是否已存在同名工作流 list_url = f"{api_url.rstrip('/')}/api/v1/workflows" try: list_response = requests.get( list_url, headers=headers, timeout=timeout, ) if list_response.status_code == 200: existing_workflows = list_response.json().get("data", []) existing_wf = None for wf in existing_workflows: if wf.get("name") == workflow_name: existing_wf = wf break if existing_wf: # 已存在同名工作流,跳过创建避免重复 workflow_id = existing_wf.get("id") logger.info( f"发现已存在的工作流 (ID: {workflow_id}),跳过部署避免重复" ) logger.info( "如需更新工作流,请手动在 n8n 控制台操作或删除后重新部署" ) return True # 返回成功,因为工作流已存在 except requests.exceptions.RequestException as e: logger.warning(f"检查已存在工作流时出错: {e}") # 调用 n8n API 创建工作流 create_url = f"{api_url.rstrip('/')}/api/v1/workflows" logger.info(f"调用 n8n API 创建工作流: {create_url}") try: response = requests.post( create_url, headers=headers, json=workflow_payload, timeout=timeout, ) if response.status_code == 401: logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置") return False elif response.status_code == 403: logger.error("n8n API 权限不足") return False response.raise_for_status() created_workflow = response.json() workflow_id = created_workflow.get("id") logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}") return True except requests.exceptions.Timeout: logger.error("n8n API 请求超时,请检查网络连接") return False except requests.exceptions.ConnectionError: logger.error(f"无法连接到 n8n 服务器: {api_url}") return False except requests.exceptions.HTTPError as e: error_detail = "" try: error_detail = e.response.json() except Exception: error_detail = e.response.text logger.error( f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}" ) return False except Exception as e: logger.error(f"从远程服务器部署工作流失败: {e}") import traceback logger.error(traceback.format_exc()) return False def find_related_workflow_files( task_info: dict[str, Any], ) -> list[Path]: """ 查找与任务相关的所有 n8n 工作流文件 查找策略: 1. 与脚本同目录的工作流文件 (n8n_workflow_*.json) 2. datafactory/n8n_workflows 目录下的工作流文件 3. 根据任务名称模式匹配 4. 根据脚本名称匹配 (去掉 .py 后缀) 5. 根据任务 ID 匹配 6. 最近修改的工作流文件 (在任务创建后修改的) """ workflow_files: list[Path] = [] code_name = task_info.get("code_name", "") code_path = task_info.get("code_path", "datafactory/scripts") task_name = task_info.get("task_name", "") task_id = task_info.get("task_id") # 获取任务通知时间用于判断文件是否是新创建的 notified_at_str = task_info.get("notified_at", "") notified_at = None if notified_at_str: with contextlib.suppress(ValueError, TypeError): notified_at = datetime.fromisoformat(notified_at_str.replace("Z", "+00:00")) # 查找模式1: 与脚本同目录的工作流文件 script_dir = WORKSPACE_ROOT / code_path if script_dir.exists() and script_dir.is_dir(): for wf_file in script_dir.glob("n8n_workflow_*.json"): if wf_file.is_file() and wf_file not in workflow_files: workflow_files.append(wf_file) # 也查找以 workflow_ 开头的文件 for wf_file in script_dir.glob("workflow_*.json"): if wf_file.is_file() and wf_file not in workflow_files: workflow_files.append(wf_file) # 查找模式2: datafactory/n8n_workflows 目录 n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows" if n8n_workflows_dir.exists(): for wf_file in n8n_workflows_dir.glob("*.json"): if wf_file.is_file() and wf_file not in workflow_files: workflow_files.append(wf_file) # 查找模式3: 根据任务名称匹配 if task_name and task_name != "未知任务": # 尝试多种名称变体 name_patterns = [ task_name.replace(" ", "_").lower(), task_name.replace(" ", "-").lower(), task_name.lower(), ] for pattern in name_patterns: if len(pattern) < 3: # 跳过过短的模式 continue for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*{pattern}*.json"): # 验证是文件、未添加过、且是有效的 n8n 工作流文件 if ( wf_file.is_file() and wf_file not in workflow_files and _is_n8n_workflow_file(wf_file) ): workflow_files.append(wf_file) # 查找模式4: 根据脚本名称匹配 if code_name and code_name.endswith(".py"): script_base_name = code_name[:-3] # 去掉 .py # 在 datafactory 目录下查找 for wf_file in (WORKSPACE_ROOT / "datafactory").rglob( f"*{script_base_name}*.json" ): if ( wf_file.is_file() and wf_file not in workflow_files and _is_n8n_workflow_file(wf_file) ): workflow_files.append(wf_file) # 查找模式5: 根据任务 ID 匹配 if task_id: for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*task_{task_id}*.json"): if ( wf_file.is_file() and wf_file not in workflow_files and _is_n8n_workflow_file(wf_file) ): workflow_files.append(wf_file) # 查找模式6: 最近修改的工作流文件(在任务创建后修改的) if notified_at: for wf_file in (WORKSPACE_ROOT / "datafactory").rglob("*.json"): if wf_file.is_file() and wf_file not in workflow_files: try: mtime = datetime.fromtimestamp(wf_file.stat().st_mtime) # 如果文件在任务通知后被修改,可能是相关的工作流 if mtime > notified_at.replace( tzinfo=None ) and _is_n8n_workflow_file(wf_file): workflow_files.append(wf_file) logger.debug(f"发现最近修改的工作流: {wf_file.name}") except (OSError, ValueError): pass return workflow_files def _is_n8n_workflow_file(file_path: Path) -> bool: """ 检查文件是否是有效的 n8n 工作流文件 通过检查 JSON 结构来验证 """ try: with open(file_path, encoding="utf-8") as f: data = json.load(f) # n8n 工作流文件通常包含 nodes 和 connections 字段 if isinstance(data, dict): has_nodes = "nodes" in data has_connections = "connections" in data has_name = "name" in data # 至少需要有 nodes 或符合 n8n 工作流特征 return has_nodes or (has_name and has_connections) return False except (json.JSONDecodeError, OSError): return False def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool: """ 自动部署已完成任务的脚本和工作流到生产服务器 部署流程: 1. 部署 Python 脚本到生产服务器 (通过 SFTP) 2. 查找并部署相关的 n8n 工作流 (通过 n8n API) 3. 记录部署结果 """ # 优先使用 code_file 字段,其次使用 code_name code_file = task_info.get("code_file", "") code_name = task_info.get("code_name", "") code_path = task_info.get("code_path", "datafactory/scripts") task_name = task_info.get("task_name", "未知任务") task_id = task_info.get("task_id", "N/A") # 确定实际的脚本文件名:优先使用 code_file,如果为空则尝试 code_name actual_script_file = code_file if code_file else code_name if not actual_script_file or not code_path: logger.warning(f"任务 {task_name} (ID: {task_id}) 缺少代码文件信息,跳过部署") return False logger.info("=" * 60) logger.info(f"🚀 开始自动部署任务: {task_name} (ID: {task_id})") logger.info("=" * 60) deploy_results = { "script_deployed": False, "workflows_found": 0, "workflows_deployed": 0, "workflows_failed": 0, } # 1. 部署 Python 脚本 if actual_script_file.endswith(".py"): # 修复路径重复问题:如果 actual_script_file 已经包含 code_path,则只使用 actual_script_file # 否则拼接 code_path 和 actual_script_file if actual_script_file.startswith(code_path): # actual_script_file 已经是完整路径,如 "datafactory/scripts/task_41_xxx.py" script_path = actual_script_file elif "/" in actual_script_file or "\\" in actual_script_file: # actual_script_file 包含路径分隔符但不以 code_path 开头 # 可能是其他格式的路径,提取文件名后拼接 script_filename = Path(actual_script_file).name script_path = f"{code_path}/{script_filename}" else: # actual_script_file 只是文件名,正常拼接 script_path = f"{code_path}/{actual_script_file}" logger.info(f"📦 部署 Python 脚本: {script_path}") if deploy_script_to_production(script_path): logger.info(f"✅ 脚本 {actual_script_file} 部署成功") deploy_results["script_deployed"] = True else: logger.error(f"❌ 脚本 {actual_script_file} 部署失败") # 2. 查找并部署相关的 n8n 工作流文件 # 2.1 首先从本地查找工作流文件 logger.info("🔍 查找本地 n8n 工作流文件...") workflow_files = find_related_workflow_files(task_info) if workflow_files: logger.info(f"📋 本地发现 {len(workflow_files)} 个相关工作流文件:") for wf_file in workflow_files: logger.info(f" - {wf_file.relative_to(WORKSPACE_ROOT)}") for wf_file in workflow_files: logger.info(f"🔄 部署本地工作流: {wf_file.name}") if deploy_n8n_workflow_to_production(str(wf_file)): logger.info(f"✅ 工作流 {wf_file.name} 部署成功") deploy_results["workflows_deployed"] += 1 else: logger.error(f"❌ 工作流 {wf_file.name} 部署失败") deploy_results["workflows_failed"] += 1 else: logger.info("ℹ️ 本地未发现相关工作流文件") # 2.2 然后从生产服务器查找并部署工作流文件 logger.info("🔍 查找生产服务器上的 n8n 工作流文件...") remote_workflow_files = find_remote_workflow_files(task_info) if remote_workflow_files: logger.info(f"📋 远程服务器发现 {len(remote_workflow_files)} 个相关工作流文件:") for remote_file in remote_workflow_files: logger.info(f" - {Path(remote_file).name}") for remote_file in remote_workflow_files: logger.info(f"🔄 部署远程工作流: {Path(remote_file).name}") if deploy_remote_workflow_to_n8n(remote_file): logger.info(f"✅ 远程工作流 {Path(remote_file).name} 部署成功") deploy_results["workflows_deployed"] += 1 else: logger.error(f"❌ 远程工作流 {Path(remote_file).name} 部署失败") deploy_results["workflows_failed"] += 1 else: logger.info("ℹ️ 远程服务器未发现相关工作流文件") # 更新发现的工作流总数 deploy_results["workflows_found"] = len(workflow_files) + len(remote_workflow_files) # 3. 汇总部署结果 logger.info("=" * 60) logger.info(f"📊 部署结果汇总 - 任务: {task_name} (ID: {task_id})") logger.info("-" * 40) logger.info( f" 脚本部署: {'✅ 成功' if deploy_results['script_deployed'] else '❌ 失败或跳过'}" ) logger.info(f" 发现工作流: {deploy_results['workflows_found']} 个") logger.info(f" 工作流部署成功: {deploy_results['workflows_deployed']} 个") logger.info(f" 工作流部署失败: {deploy_results['workflows_failed']} 个") # 判断整体部署是否成功 deploy_success = ( deploy_results["script_deployed"] and deploy_results["workflows_failed"] == 0 ) if deploy_success: logger.info(f"✅ 任务 {task_name} 部署完成!") elif deploy_results["script_deployed"]: if deploy_results["workflows_failed"] > 0: logger.warning(f"⚠️ 任务 {task_name} 脚本部署成功,但部分工作流部署失败") else: logger.info(f"✅ 任务 {task_name} 脚本部署成功") deploy_success = True # 脚本部署成功就认为整体成功 else: logger.error(f"❌ 任务 {task_name} 部署失败") logger.info("=" * 60) return deploy_success # ============================================================================ # Cursor Agent 自动化 # ============================================================================ # Agent 会话状态 AGENT_SESSION_ACTIVE: bool = False AGENT_START_TIME: float = 0 def get_all_cursor_windows() -> list[dict[str, Any]]: """获取所有 Cursor 窗口信息""" if not HAS_CURSOR_GUI: return [] cursor_windows: list[dict[str, Any]] = [] def enum_windows_callback(hwnd, _extra): if win32gui.IsWindowVisible(hwnd): title = win32gui.GetWindowText(hwnd) or "" class_name = win32gui.GetClassName(hwnd) or "" is_cursor = "cursor" in title.lower() if class_name and "chrome_widgetwin" in class_name.lower(): is_cursor = True if is_cursor: left, top, right, bottom = win32gui.GetWindowRect(hwnd) area = (right - left) * (bottom - top) cursor_windows.append( { "hwnd": hwnd, "title": title, "class_name": class_name, "area": area, } ) return True win32gui.EnumWindows(enum_windows_callback, None) return cursor_windows def find_cursor_window() -> int | None: """查找 Cursor 主窗口句柄""" if not HAS_CURSOR_GUI: return None cursor_windows = get_all_cursor_windows() if not cursor_windows: logger.warning("未找到 Cursor 窗口") return None # 按面积排序,返回最大的窗口(主窗口) cursor_windows.sort(key=lambda x: x["area"], reverse=True) return cursor_windows[0]["hwnd"] def activate_window(hwnd: int) -> bool: """ 激活指定窗口 Windows 对 SetForegroundWindow 有限制,只有满足以下条件之一才能成功: 1. 调用进程是前台进程 2. 调用进程由前台进程启动 3. 目标窗口属于前台进程 4. 没有其他窗口在前台 此函数使用多种技巧绕过这些限制。 """ if not HAS_CURSOR_GUI: return False try: # 方法1: 使用 AttachThreadInput 技巧绕过 SetForegroundWindow 限制 # 这是最可靠的方法,通过将当前线程附加到前台窗口的线程来获取激活权限 import ctypes user32 = ctypes.windll.user32 # 获取当前前台窗口的线程ID foreground_hwnd = user32.GetForegroundWindow() foreground_thread_id = user32.GetWindowThreadProcessId(foreground_hwnd, None) # 获取当前线程ID current_thread_id = ctypes.windll.kernel32.GetCurrentThreadId() attached = False # 如果当前线程不是前台线程,则附加到前台线程 if current_thread_id != foreground_thread_id: attached = user32.AttachThreadInput( current_thread_id, foreground_thread_id, True ) try: # 先确保窗口不是最小化状态 if win32gui.IsIconic(hwnd): win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.2) # 使用 BringWindowToTop 将窗口置顶 user32.BringWindowToTop(hwnd) # 显示窗口 win32gui.ShowWindow(hwnd, win32con.SW_SHOW) # 尝试 SetForegroundWindow result = user32.SetForegroundWindow(hwnd) if not result: # 方法2: 使用 Alt 键模拟技巧 # 发送一个 Alt 键可以让系统认为用户有交互意图 # 定义必要的常量 KEYEVENTF_EXTENDEDKEY = 0x0001 KEYEVENTF_KEYUP = 0x0002 VK_MENU = 0x12 # Alt 键 # 模拟按下和释放 Alt 键 user32.keybd_event(VK_MENU, 0, KEYEVENTF_EXTENDEDKEY, 0) user32.keybd_event( VK_MENU, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0 ) time.sleep(0.1) # 再次尝试 result = user32.SetForegroundWindow(hwnd) if not result: # 方法3: 使用 ShowWindow 配合 SW_SHOWDEFAULT win32gui.ShowWindow(hwnd, win32con.SW_SHOWDEFAULT) time.sleep(0.1) result = user32.SetForegroundWindow(hwnd) if not result: # 方法4: 使用 SetWindowPos 将窗口置于最顶层 SWP_NOMOVE = 0x0002 SWP_NOSIZE = 0x0001 SWP_SHOWWINDOW = 0x0040 HWND_TOPMOST = -1 HWND_NOTOPMOST = -2 # 先设为最顶层 user32.SetWindowPos( hwnd, HWND_TOPMOST, 0, 0, 0, 0, SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW, ) time.sleep(0.1) # 再取消最顶层(但窗口仍在前台) user32.SetWindowPos( hwnd, HWND_NOTOPMOST, 0, 0, 0, 0, SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW, ) result = user32.SetForegroundWindow(hwnd) time.sleep(0.3) # 验证是否成功 current_foreground = user32.GetForegroundWindow() if current_foreground == hwnd: logger.debug("窗口激活成功") return True else: # 即使 SetForegroundWindow 返回失败,窗口可能已经被置顶并可见 # 检查窗口是否可见且不是最小化 if win32gui.IsWindowVisible(hwnd) and not win32gui.IsIconic(hwnd): logger.warning("窗口可能未完全激活到前台,但窗口可见,继续执行...") return True else: logger.error("激活窗口失败: 窗口不在前台") return False finally: # 分离线程 if attached: user32.AttachThreadInput(current_thread_id, foreground_thread_id, False) except Exception as e: logger.error(f"激活窗口失败: {e}") # 最后的备用方案:直接尝试基本操作 try: win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) win32gui.ShowWindow(hwnd, win32con.SW_SHOW) time.sleep(0.3) # 即使失败也返回 True,让调用者继续尝试 if win32gui.IsWindowVisible(hwnd): logger.warning("使用备用方案激活窗口,继续执行...") return True except Exception: pass return False def open_new_agent() -> bool: """在 Cursor 中打开新的 Agent 窗口""" global AGENT_SESSION_ACTIVE, AGENT_START_TIME if not HAS_CURSOR_GUI: logger.warning("当前环境不支持 Cursor GUI 自动化") return False hwnd = find_cursor_window() if not hwnd: return False if not activate_window(hwnd): return False try: # 使用 Ctrl+Shift+I 打开新的 Agent/Composer logger.info("正在打开新的 Agent...") pyautogui.hotkey("ctrl", "shift", "i") time.sleep(2.0) # 等待 Agent 窗口打开 AGENT_SESSION_ACTIVE = True AGENT_START_TIME = time.time() logger.info("✅ 新的 Agent 已打开") return True except Exception as e: logger.error(f"打开 Agent 失败: {e}") return False def close_current_agent(force: bool = False, max_retries: int = 3) -> bool: """ 关闭当前的 Agent 会话 Args: force: 是否强制关闭(使用多种方法) max_retries: 最大重试次数 关闭策略: 1. 使用 Escape 键关闭 Agent 面板 2. 如果失败,尝试 Ctrl+Shift+I 切换 Agent 面板 3. 如果仍失败,尝试点击空白区域并按 Escape """ global AGENT_SESSION_ACTIVE if not HAS_CURSOR_GUI: AGENT_SESSION_ACTIVE = False return False if not AGENT_SESSION_ACTIVE and not force: logger.info("没有活动的 Agent 会话") return True logger.info("🔄 正在关闭 Agent...") for attempt in range(max_retries): try: hwnd = find_cursor_window() if not hwnd: logger.warning("未找到 Cursor 窗口") AGENT_SESSION_ACTIVE = False return False if not activate_window(hwnd): logger.warning(f"激活窗口失败 (尝试 {attempt + 1}/{max_retries})") time.sleep(0.5) continue # 方法1: 按 Escape 键关闭 Agent logger.debug(f"尝试方法1: Escape 键 (尝试 {attempt + 1}/{max_retries})") pyautogui.press("escape") time.sleep(0.3) pyautogui.press("escape") time.sleep(0.3) # 方法2: 使用 Ctrl+Shift+I 切换 Agent 面板(关闭) if force or attempt > 0: logger.debug("尝试方法2: Ctrl+Shift+I 切换") pyautogui.hotkey("ctrl", "shift", "i") time.sleep(0.5) # 方法3: 点击编辑器区域并按 Escape if force or attempt > 1: logger.debug("尝试方法3: 点击编辑器区域") # 获取窗口位置,点击中心偏左位置(编辑器区域) try: left, top, right, bottom = win32gui.GetWindowRect(hwnd) center_x = left + (right - left) // 3 # 偏左1/3位置 center_y = top + (bottom - top) // 2 pyautogui.click(center_x, center_y) time.sleep(0.2) pyautogui.press("escape") time.sleep(0.3) except Exception as click_err: logger.debug(f"点击方法失败: {click_err}") AGENT_SESSION_ACTIVE = False logger.info("✅ Agent 已关闭") return True except Exception as e: logger.warning(f"关闭 Agent 尝试 {attempt + 1} 失败: {e}") time.sleep(0.5) # 即使关闭失败,也标记为非活动状态,避免状态不一致 AGENT_SESSION_ACTIVE = False logger.warning("⚠️ Agent 关闭可能未完全成功,但已重置状态") return False def force_close_all_agents() -> bool: """ 强制关闭所有可能的 Agent 会话 用于清理可能遗留的多个 Agent 窗口 """ global AGENT_SESSION_ACTIVE if not HAS_CURSOR_GUI: return False logger.info("🔄 强制关闭所有 Agent 会话...") try: hwnd = find_cursor_window() if not hwnd: AGENT_SESSION_ACTIVE = False return True if not activate_window(hwnd): AGENT_SESSION_ACTIVE = False return False # 连续按多次 Escape 确保关闭所有面板 for _ in range(5): pyautogui.press("escape") time.sleep(0.2) # 使用快捷键关闭可能的 Agent 面板 pyautogui.hotkey("ctrl", "shift", "i") time.sleep(0.3) pyautogui.hotkey("ctrl", "shift", "i") time.sleep(0.3) AGENT_SESSION_ACTIVE = False logger.info("✅ 所有 Agent 会话已关闭") return True except Exception as e: logger.error(f"强制关闭 Agent 失败: {e}") AGENT_SESSION_ACTIVE = False return False def type_message_to_agent(message: str) -> bool: """向 Agent 输入消息""" if not HAS_CURSOR_GUI: return False try: # 等待 Agent 输入框获得焦点 time.sleep(0.5) # 使用剪贴板粘贴(更可靠地处理中文和特殊字符) if HAS_PYPERCLIP: try: pyperclip.copy(message) pyautogui.hotkey("ctrl", "v") time.sleep(0.5) except Exception: # 回退到逐字符输入 pyautogui.write(message, interval=0.03) else: pyautogui.write(message, interval=0.03) time.sleep(0.3) # 按 Enter 发送消息 pyautogui.press("enter") logger.info("✅ 消息已发送到 Agent") return True except Exception as e: logger.error(f"发送消息到 Agent 失败: {e}") return False def wait_for_agent_completion( timeout: int = 3600, check_interval: int = 30, ) -> bool: """ 等待 Agent 完成任务 通过检查 pending_tasks.json 中的任务状态来判断是否完成 """ start_time = time.time() logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...") while time.time() - start_time < timeout: processing_ids = get_processing_task_ids() if not processing_ids: elapsed = int(time.time() - start_time) logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s") return True remaining = len(processing_ids) elapsed = int(time.time() - start_time) logger.info( f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)" ) time.sleep(check_interval) logger.warning("等待超时,仍有未完成的任务") return False def run_agent_once( timeout: int = 3600, auto_close: bool = True, ) -> bool: """ 执行一次 Agent 任务 流程: 1. 同步已完成任务到数据库 2. 从数据库读取 pending 任务 3. 更新任务状态为 processing 4. 生成执行指令文件(包含所有 processing 任务) 5. 打开 Agent 并发送消息 6. 等待任务完成 7. 同步完成任务 + 自动部署 8. 关闭 Agent """ logger.info("=" * 60) logger.info("Agent 单次执行模式") logger.info("=" * 60) # 1. 先同步已完成任务 sync_completed_tasks_to_db() # 2. 从数据库获取 pending 任务 logger.info("正在从数据库查询 pending 任务...") pending_tasks = get_pending_tasks() # 3. 获取当前 processing 任务 processing_ids = get_processing_task_ids() # 4. 检查是否有任务需要执行 if not pending_tasks and not processing_ids: logger.info("✅ 没有待执行的任务") return True if pending_tasks: logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务") # 5. 更新新任务状态为 processing for task in pending_tasks: update_task_status(task["task_id"], "processing") # 6. 写入 pending_tasks.json write_pending_tasks_json(pending_tasks) if processing_ids: logger.info(f"发现 {len(processing_ids)} 个已有的 processing 任务") # 7. 获取所有需要执行的任务(包含完整信息)并生成执行指令 all_tasks_to_execute = get_all_tasks_to_execute() if all_tasks_to_execute: logger.info(f"共 {len(all_tasks_to_execute)} 个任务需要执行") # 生成包含所有任务的执行指令文件 create_execute_instructions(all_tasks_to_execute) else: logger.warning("无法获取任务详细信息,跳过生成执行指令") # 7. 更新触发器文件 all_processing_ids = get_processing_task_ids() if all_processing_ids: update_trigger_file( task_count=len(all_processing_ids), status="有待执行任务", task_ids=all_processing_ids, ) # 8. 打开 Agent 并发送消息 if not open_new_agent(): logger.error("❌ 无法打开 Agent") return False if not type_message_to_agent(AGENT_MESSAGE): logger.error("❌ 无法发送消息到 Agent") close_current_agent() return False logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...") # 9. 等待任务完成 completed = wait_for_agent_completion(timeout=timeout) # 10. 立即关闭 Agent(在同步之前) logger.info("🔄 任务执行完毕,立即关闭 Agent...") if auto_close: close_current_agent(force=True) time.sleep(1.0) # 等待关闭完成 # 11. 同步已完成的任务到数据库(触发自动部署) logger.info("🔄 开始同步和部署...") sync_completed_tasks_to_db() if completed: logger.info("✅ Agent 已完成所有任务") else: logger.warning("⚠️ Agent 未能在超时时间内完成所有任务") # 强制关闭可能遗留的 Agent force_close_all_agents() logger.info("=" * 60) logger.info("Agent 会话结束") logger.info("=" * 60) return completed def run_agent_loop( interval: int = 300, timeout: int = 3600, auto_close: bool = True, ) -> None: """ Agent 循环模式 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止 完整流程: 1. 同步已完成任务到数据库(触发自动部署) 2. 检查是否有新的 pending 任务 3. 生成执行指令文件 4. 启动 Agent 执行任务 5. 等待任务完成 6. 同步完成任务并触发自动部署 7. 循环... """ global AGENT_SESSION_ACTIVE logger.info("=" * 60) logger.info("🔄 Agent 循环模式已启动") logger.info("=" * 60) logger.info(f" 检查间隔: {interval} 秒") logger.info(f" 任务超时: {timeout} 秒") logger.info(f" 自动部署: {'✅ 已启用' if ENABLE_AUTO_DEPLOY else '❌ 已禁用'}") logger.info(f" 自动关闭 Agent: {'✅ 是' if auto_close else '❌ 否'}") logger.info("=" * 60) logger.info("按 Ctrl+C 停止服务") logger.info("=" * 60) loop_count = 0 total_tasks_completed = 0 total_deployments = 0 try: while True: try: loop_count += 1 logger.info(f"\n{'=' * 60}") logger.info(f"📍 开始第 {loop_count} 轮任务检查...") logger.info(f"{'=' * 60}") # 1. 同步已完成任务(这会触发自动部署) logger.info("🔄 检查并同步已完成的任务...") synced_count = sync_completed_tasks_to_db() if synced_count > 0: total_tasks_completed += synced_count total_deployments += synced_count logger.info( f"✅ 已同步 {synced_count} 个完成的任务(累计: {total_tasks_completed})" ) # 2. 从数据库获取 pending 任务 logger.info("📡 检查数据库中的 pending 任务...") pending_tasks = get_pending_tasks() if pending_tasks: logger.info(f"📋 发现 {len(pending_tasks)} 个新的 pending 任务:") for task in pending_tasks: logger.info(f" - [{task['task_id']}] {task['task_name']}") # 更新任务状态为 processing for task in pending_tasks: update_task_status(task["task_id"], "processing") # 写入 pending_tasks.json write_pending_tasks_json(pending_tasks) # 3. 检查是否有 processing 任务 processing_ids = get_processing_task_ids() # 4. 如果有新任务或有 processing 任务,生成包含所有任务的执行指令 if pending_tasks or processing_ids: all_tasks_to_execute = get_all_tasks_to_execute() if all_tasks_to_execute: logger.info( f"📝 生成执行指令文件,共 {len(all_tasks_to_execute)} 个任务" ) create_execute_instructions(all_tasks_to_execute) if processing_ids: # 如果有活动的 Agent 会话,不需要重新启动 if AGENT_SESSION_ACTIVE: logger.info( f"⏳ Agent 正在执行中,剩余 {len(processing_ids)} 个任务" ) else: logger.info( f"🎯 发现 {len(processing_ids)} 个待处理任务,准备启动 Agent" ) # 更新触发器文件 update_trigger_file( task_count=len(processing_ids), status="有待执行任务", task_ids=processing_ids, ) # 启动 Agent if open_new_agent(): if type_message_to_agent(AGENT_MESSAGE): logger.info("✅ 已启动 Agent 并发送执行提醒") # 等待任务完成 task_completed = wait_for_agent_completion( timeout=timeout ) # ===== 关键:任务完成后立即关闭 Agent ===== logger.info("🔄 任务执行完毕,立即关闭 Agent...") if auto_close: # 使用强制关闭,确保 Agent 被正确关闭 close_current_agent(force=True) # 等待一小段时间确保关闭完成 time.sleep(1.0) # 同步完成的任务(这会触发自动部署) logger.info("🔄 开始同步和部署...") synced = sync_completed_tasks_to_db() if synced > 0: total_tasks_completed += synced total_deployments += synced logger.info( f"✅ 本轮完成 {synced} 个任务的同步和部署" ) # 显示本轮统计 logger.info(f"📊 本轮统计: 完成任务 {synced} 个") if ENABLE_AUTO_DEPLOY: logger.info(f" 已触发自动部署: {synced} 个") # 如果任务未完成(超时),也确保关闭 Agent if not task_completed: logger.warning("⚠️ 任务超时,强制关闭 Agent") force_close_all_agents() else: logger.warning("❌ 发送消息失败") close_current_agent(force=True) else: logger.warning("❌ 启动 Agent 失败") else: logger.info("✅ 当前没有待处理任务") # 显示累计统计 logger.info( f"\n📈 累计统计: 已完成 {total_tasks_completed} 个任务, " f"已部署 {total_deployments} 个" ) logger.info(f"⏰ {interval} 秒后将进行第 {loop_count + 1} 轮检查...") time.sleep(interval) except KeyboardInterrupt: raise except Exception as e: logger.error(f"❌ 执行出错: {e}") import traceback logger.error(traceback.format_exc()) logger.info(f"⏰ {interval} 秒后重试...") time.sleep(interval) except KeyboardInterrupt: # 退出时关闭 Agent logger.info("\n" + "=" * 60) logger.info("⛔ 收到停止信号,正在退出...") if AGENT_SESSION_ACTIVE: logger.info("🔄 正在关闭 Agent...") close_current_agent() # 最后一次同步 logger.info("🔄 执行最终同步...") final_synced = sync_completed_tasks_to_db() if final_synced > 0: total_tasks_completed += final_synced logger.info(f"✅ 最终同步了 {final_synced} 个任务") logger.info("=" * 60) logger.info("📊 会话统计:") logger.info(f" 总循环次数: {loop_count}") logger.info(f" 总完成任务: {total_tasks_completed}") logger.info(f" 总部署次数: {total_deployments}") logger.info("=" * 60) logger.info("✅ Agent 循环模式已停止") # ============================================================================ # 交互式菜单 # ============================================================================ def show_interactive_menu() -> None: """显示交互式菜单并执行用户选择的操作""" global ENABLE_AUTO_DEPLOY while True: print("\n" + "=" * 60) print("自动任务执行调度脚本 - Agent 模式") print("=" * 60) print("\n请选择操作模式:\n") print(" 1. Agent 单次执行") print(" 2. Agent 循环模式(含自动部署脚本和n8n工作流)") print(" 3. Agent 循环模式(禁用部署)") print(" 4. 测试生产服务器连接") print(" 5. 查看当前任务状态") print(" 6. 手动触发任务部署") print(" 7. 强制关闭所有 Agent") print(" 0. 退出") print("\n" + "-" * 60) try: choice = input("请输入选项 [0-5]: ").strip() except (KeyboardInterrupt, EOFError): print("\n再见!") break if choice == "0": print("再见!") break elif choice == "1": print("\n启动 Agent 单次执行模式...") run_agent_once(timeout=3600, auto_close=True) input("\n按 Enter 键返回菜单...") elif choice == "2": try: interval_str = input("请输入检查间隔(秒,默认300): ").strip() interval = int(interval_str) if interval_str else 300 except ValueError: interval = 300 print("\n🚀 启动 Agent 循环模式(含自动部署)") print(f" 检查间隔: {interval} 秒") print(" 自动部署: ✅ 已启用") print("\n 任务完成后将自动:") print(" - 部署 Python 脚本到生产服务器") print(" - 查找并部署相关 n8n 工作流") print("\n按 Ctrl+C 停止服务并返回菜单\n") ENABLE_AUTO_DEPLOY = True try: run_agent_loop(interval=interval) except KeyboardInterrupt: print("\n循环已停止") elif choice == "3": try: interval_str = input("请输入检查间隔(秒,默认300): ").strip() interval = int(interval_str) if interval_str else 300 except ValueError: interval = 300 print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒") print("按 Ctrl+C 停止服务并返回菜单\n") ENABLE_AUTO_DEPLOY = False try: run_agent_loop(interval=interval) except KeyboardInterrupt: print("\n循环已停止") elif choice == "4": print("\n测试生产服务器连接...") if test_ssh_connection(): print("✅ 连接测试成功") else: print("❌ 连接测试失败") input("\n按 Enter 键返回菜单...") elif choice == "5": print("\n当前任务状态:") print("-" * 40) # 从数据库获取 pending 任务 pending_tasks = get_pending_tasks() print(f" 数据库中 pending 任务: {len(pending_tasks)} 个") for task in pending_tasks: print(f" - [{task['task_id']}] {task['task_name']}") # 从本地文件获取 processing 任务 processing_ids = get_processing_task_ids() print(f" 本地 processing 任务: {len(processing_ids)} 个") if processing_ids: print(f" 任务 ID: {processing_ids}") # 显示已完成的任务 if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: all_local_tasks = json.load(f) completed_tasks = [ t for t in all_local_tasks if t.get("status") == "completed" ] print(f" 本地 completed 任务: {len(completed_tasks)} 个") for task in completed_tasks: print( f" - [{task.get('task_id')}] {task.get('task_name')} -> {task.get('code_file', 'N/A')}" ) except Exception: pass input("\n按 Enter 键返回菜单...") elif choice == "6": print("\n手动触发任务部署") print("-" * 40) # 显示已完成的任务列表 if PENDING_TASKS_FILE.exists(): try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: all_tasks = json.load(f) completed_tasks = [ t for t in all_tasks if t.get("status") == "completed" ] if not completed_tasks: print("没有已完成的任务可供部署") input("\n按 Enter 键返回菜单...") continue print("已完成的任务:") for idx, task in enumerate(completed_tasks, 1): task_id = task.get("task_id", "N/A") task_name = task.get("task_name", "未知") code_file = task.get("code_file", "N/A") print(f" {idx}. [{task_id}] {task_name}") print(f" 代码文件: {code_file}") print("\n 0. 部署全部") print(" q. 返回菜单") try: selection = input("\n请选择要部署的任务编号: ").strip().lower() if selection == "q": continue tasks_to_deploy = [] if selection == "0": tasks_to_deploy = completed_tasks else: try: idx = int(selection) - 1 if 0 <= idx < len(completed_tasks): tasks_to_deploy = [completed_tasks[idx]] else: print("❌ 无效的编号") continue except ValueError: print("❌ 请输入有效的数字") continue if tasks_to_deploy: print(f"\n🚀 开始部署 {len(tasks_to_deploy)} 个任务...") ENABLE_AUTO_DEPLOY = True success_count = 0 for task in tasks_to_deploy: if auto_deploy_completed_task(task): success_count += 1 print( f"\n📊 部署完成: {success_count}/{len(tasks_to_deploy)} 成功" ) except (KeyboardInterrupt, EOFError): pass except Exception as e: print(f"❌ 读取任务列表失败: {e}") else: print("没有本地任务记录") input("\n按 Enter 键返回菜单...") elif choice == "7": print("\n🔄 强制关闭所有 Agent 会话...") if HAS_CURSOR_GUI: if force_close_all_agents(): print("✅ 所有 Agent 会话已关闭") else: print("⚠️ 关闭过程中可能出现问题,请检查 Cursor 窗口") else: print("❌ 当前环境不支持 GUI 自动化") input("\n按 Enter 键返回菜单...") else: print("❌ 无效的选项,请重新选择") # ============================================================================ # 主函数 # ============================================================================ def main() -> None: """主函数""" parser = argparse.ArgumentParser( description="自动任务执行调度脚本 (Agent 模式)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # Agent 单次执行 python scripts/auto_execute_tasks.py --agent-run # Agent 循环模式 python scripts/auto_execute_tasks.py --agent-loop # Agent 循环模式 + 禁用自动部署 python scripts/auto_execute_tasks.py --agent-loop --no-deploy # 设置 Agent 超时时间 python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200 # 立即部署指定任务到生产服务器 python scripts/auto_execute_tasks.py --deploy-now 123 # 测试生产服务器连接 python scripts/auto_execute_tasks.py --test-connection """, ) # Agent 模式参数 parser.add_argument( "--agent-run", action="store_true", help="Agent 单次执行模式", ) parser.add_argument( "--agent-loop", action="store_true", help="Agent 循环模式", ) parser.add_argument( "--agent-timeout", type=int, default=3600, help="Agent 等待任务完成的超时时间(秒),默认 3600", ) parser.add_argument( "--interval", type=int, default=300, help="循环模式检查间隔(秒),默认 300", ) parser.add_argument( "--no-auto-close", action="store_true", help="任务完成后不自动关闭 Agent", ) # 部署相关参数 parser.add_argument( "--no-deploy", action="store_true", help="禁用自动部署功能", ) parser.add_argument( "--deploy-now", type=str, metavar="TASK_ID", help="立即部署指定任务ID的脚本到生产服务器", ) parser.add_argument( "--test-connection", action="store_true", help="测试到生产服务器的 SSH 连接", ) args = parser.parse_args() global ENABLE_AUTO_DEPLOY ENABLE_AUTO_DEPLOY = not args.no_deploy auto_close = not args.no_auto_close # 测试 SSH 连接 if args.test_connection: if test_ssh_connection(): logger.info("✅ 连接测试成功") else: logger.error("❌ 连接测试失败") return # 立即部署指定任务 if args.deploy_now: try: task_id = int(args.deploy_now) logger.info(f"开始部署任务 {task_id}...") # 从 pending_tasks.json 查找任务信息 if PENDING_TASKS_FILE.exists(): with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: tasks = json.load(f) task_found = None for t in tasks: if t.get("task_id") == task_id: task_found = t break if task_found: if auto_deploy_completed_task(task_found): logger.info(f"✅ 任务 {task_id} 部署成功") else: logger.error(f"❌ 任务 {task_id} 部署失败") else: logger.error(f"未找到任务 {task_id}") else: logger.error("pending_tasks.json 文件不存在") except ValueError: logger.error(f"无效的任务ID: {args.deploy_now}") return # Agent 单次执行 if args.agent_run: success = run_agent_once( timeout=args.agent_timeout, auto_close=auto_close, ) if success: logger.info("✅ Agent 单次执行完成") else: logger.error("❌ Agent 单次执行失败") return # Agent 循环模式 if args.agent_loop: run_agent_loop( interval=args.interval, timeout=args.agent_timeout, auto_close=auto_close, ) return # 没有指定任何模式参数时,显示交互式菜单 if len(sys.argv) == 1: show_interactive_menu() else: # 显示帮助信息 parser.print_help() if __name__ == "__main__": main()