|
|
@@ -1,2597 +0,0 @@
|
|
|
-#!/usr/bin/env python3
|
|
|
-"""
|
|
|
-自动任务执行核心调度脚本 (Agent 模式)
|
|
|
-
|
|
|
-工作流程:
|
|
|
-1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
|
|
|
-2. 生成 tasks/task_execute_instructions.md 执行指令文件
|
|
|
-3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
|
|
|
-4. 更新 tasks/task_trigger.txt 触发器文件
|
|
|
-5. 启动新的 Cursor Agent 并发送执行指令
|
|
|
-6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
|
|
|
-7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
|
|
|
-
|
|
|
-使用方式:
|
|
|
- # Agent 单次执行(执行一次任务后退出)
|
|
|
- python scripts/auto_execute_tasks.py --agent-run
|
|
|
-
|
|
|
- # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务)
|
|
|
- python scripts/auto_execute_tasks.py --agent-loop
|
|
|
-
|
|
|
- # Agent 循环模式 + 禁用自动部署
|
|
|
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
|
|
|
-
|
|
|
- # 设置 Agent 超时时间(默认 3600 秒)
|
|
|
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
|
|
|
-
|
|
|
- # 任务完成后不自动关闭 Agent
|
|
|
- python scripts/auto_execute_tasks.py --agent-run --no-auto-close
|
|
|
-
|
|
|
- # 立即部署指定任务ID的脚本到生产服务器
|
|
|
- python scripts/auto_execute_tasks.py --deploy-now 123
|
|
|
-
|
|
|
- # 测试到生产服务器的 SSH 连接
|
|
|
- python scripts/auto_execute_tasks.py --test-connection
|
|
|
-"""
|
|
|
-
|
|
|
-from __future__ import annotations
|
|
|
-
|
|
|
-import argparse
|
|
|
-import contextlib
|
|
|
-import json
|
|
|
-import logging
|
|
|
-import sys
|
|
|
-import time
|
|
|
-from datetime import datetime
|
|
|
-from pathlib import Path
|
|
|
-from typing import Any
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 日志配置
|
|
|
-# ============================================================================
|
|
|
-logging.basicConfig(
|
|
|
- level=logging.INFO,
|
|
|
- format="%(asctime)s - %(levelname)s - %(message)s",
|
|
|
-)
|
|
|
-logger = logging.getLogger("AutoExecuteTasks")
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# Windows GUI 自动化依赖(可选)
|
|
|
-# ============================================================================
|
|
|
-HAS_CURSOR_GUI = False
|
|
|
-HAS_PYPERCLIP = False
|
|
|
-
|
|
|
-try:
|
|
|
- import pyautogui
|
|
|
- import win32con
|
|
|
- import win32gui
|
|
|
-
|
|
|
- pyautogui.FAILSAFE = True
|
|
|
- pyautogui.PAUSE = 0.5
|
|
|
- HAS_CURSOR_GUI = True
|
|
|
-
|
|
|
- try:
|
|
|
- import pyperclip
|
|
|
-
|
|
|
- HAS_PYPERCLIP = True
|
|
|
- except ImportError:
|
|
|
- pass
|
|
|
-except ImportError:
|
|
|
- logger.info(
|
|
|
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
|
|
|
- "将禁用自动 Cursor Agent 功能。"
|
|
|
- )
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 全局配置
|
|
|
-# ============================================================================
|
|
|
-WORKSPACE_ROOT = Path(__file__).parent.parent
|
|
|
-TASKS_DIR = WORKSPACE_ROOT / "tasks"
|
|
|
-PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
|
|
|
-INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
|
|
|
-TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
|
|
|
-
|
|
|
-# 生产服务器配置
|
|
|
-PRODUCTION_SERVER = {
|
|
|
- "host": "192.168.3.143",
|
|
|
- "port": 22,
|
|
|
- "username": "ubuntu",
|
|
|
- "password": "citumxl2357",
|
|
|
- "script_path": "/opt/dataops-platform/datafactory/scripts",
|
|
|
- "workflow_path": "/opt/dataops-platform/datafactory/workflows", # 工作流 JSON 文件目录
|
|
|
-}
|
|
|
-
|
|
|
-# Agent 消息模板
|
|
|
-AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
|
|
|
-
|
|
|
-# 命令行参数控制的全局变量
|
|
|
-ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 数据库操作
|
|
|
-# ============================================================================
|
|
|
-def get_db_connection():
|
|
|
- """获取数据库连接(使用 production 环境配置)"""
|
|
|
- try:
|
|
|
- from urllib.parse import urlparse
|
|
|
-
|
|
|
- import psycopg2
|
|
|
-
|
|
|
- sys.path.insert(0, str(WORKSPACE_ROOT))
|
|
|
- from app.config.config import config
|
|
|
-
|
|
|
- # 强制使用 production 环境的数据库配置
|
|
|
- app_config = config["production"]
|
|
|
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
|
|
|
-
|
|
|
- # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式
|
|
|
- parsed = urlparse(db_uri)
|
|
|
-
|
|
|
- conn = psycopg2.connect(
|
|
|
- host=parsed.hostname,
|
|
|
- port=parsed.port or 5432,
|
|
|
- database=parsed.path.lstrip("/"),
|
|
|
- user=parsed.username,
|
|
|
- password=parsed.password,
|
|
|
- )
|
|
|
-
|
|
|
- logger.debug(
|
|
|
- f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}"
|
|
|
- )
|
|
|
- return conn
|
|
|
-
|
|
|
- except ImportError as e:
|
|
|
- logger.error(f"导入依赖失败: {e}")
|
|
|
- return None
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"连接数据库失败: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def get_pending_tasks() -> list[dict[str, Any]]:
|
|
|
- """
|
|
|
- 从 PostgreSQL task_list 表获取所有 pending 状态的任务
|
|
|
-
|
|
|
- 重要:此函数直接查询数据库,确保获取最新的任务列表
|
|
|
- """
|
|
|
- try:
|
|
|
- from psycopg2.extras import RealDictCursor
|
|
|
-
|
|
|
- logger.info("📡 正在连接数据库...")
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- logger.error("❌ 无法获取数据库连接")
|
|
|
- return []
|
|
|
-
|
|
|
- logger.info("✅ 数据库连接成功,正在查询 pending 任务...")
|
|
|
- cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
- cursor.execute(
|
|
|
- """
|
|
|
- SELECT task_id, task_name, task_description, status,
|
|
|
- code_name, code_path, create_time, create_by
|
|
|
- FROM task_list
|
|
|
- WHERE status = 'pending'
|
|
|
- ORDER BY create_time ASC
|
|
|
- """
|
|
|
- )
|
|
|
-
|
|
|
- tasks = cursor.fetchall()
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
-
|
|
|
- task_list = [dict(task) for task in tasks]
|
|
|
- logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务")
|
|
|
-
|
|
|
- if task_list:
|
|
|
- for task in task_list:
|
|
|
- logger.info(f" - 任务 {task['task_id']}: {task['task_name']}")
|
|
|
-
|
|
|
- return task_list
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"获取 pending 任务失败: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-def update_task_status(
|
|
|
- task_id: int,
|
|
|
- status: str,
|
|
|
- code_name: str | None = None,
|
|
|
- code_path: str | None = None,
|
|
|
-) -> bool:
|
|
|
- """更新任务状态"""
|
|
|
- try:
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- return False
|
|
|
-
|
|
|
- cursor = conn.cursor()
|
|
|
-
|
|
|
- if code_name and code_path:
|
|
|
- cursor.execute(
|
|
|
- """
|
|
|
- UPDATE task_list
|
|
|
- SET status = %s, code_name = %s, code_path = %s,
|
|
|
- update_time = CURRENT_TIMESTAMP
|
|
|
- WHERE task_id = %s
|
|
|
- """,
|
|
|
- (status, code_name, code_path, task_id),
|
|
|
- )
|
|
|
- else:
|
|
|
- cursor.execute(
|
|
|
- """
|
|
|
- UPDATE task_list
|
|
|
- SET status = %s, update_time = CURRENT_TIMESTAMP
|
|
|
- WHERE task_id = %s
|
|
|
- """,
|
|
|
- (status, task_id),
|
|
|
- )
|
|
|
-
|
|
|
- conn.commit()
|
|
|
- updated = cursor.rowcount > 0
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
-
|
|
|
- if updated:
|
|
|
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
|
|
|
- return updated
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"更新任务状态失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 触发器文件管理
|
|
|
-# ============================================================================
|
|
|
-def update_trigger_file(
|
|
|
- task_count: int = 0,
|
|
|
- status: str = "有待执行任务",
|
|
|
- task_ids: list[int] | None = None,
|
|
|
-) -> None:
|
|
|
- """更新触发器文件 tasks/task_trigger.txt"""
|
|
|
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
- task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
|
|
|
-
|
|
|
- content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
|
|
|
-生成时间: {gen_time}
|
|
|
-状态: {status}
|
|
|
-待处理任务数: {task_count}
|
|
|
-任务ID列表: [{task_ids_str}]
|
|
|
-
|
|
|
-此文件用于触发Cursor自动执行任务。
|
|
|
-
|
|
|
-【重要】Cursor AI 请立即执行以下操作:
|
|
|
-1. 读取 tasks/task_execute_instructions.md 文件
|
|
|
-2. 按照文件中的任务描述,依次完成每个任务
|
|
|
-3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
|
|
|
-
|
|
|
-"""
|
|
|
-
|
|
|
- with TRIGGER_FILE.open("w", encoding="utf-8") as f:
|
|
|
- f.write(content)
|
|
|
-
|
|
|
- logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
|
|
|
-
|
|
|
-
|
|
|
-def get_processing_task_ids() -> list[int]:
|
|
|
- """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
|
|
|
- if not PENDING_TASKS_FILE.exists():
|
|
|
- return []
|
|
|
-
|
|
|
- try:
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- tasks = json.load(f)
|
|
|
- return [
|
|
|
- t.get("task_id")
|
|
|
- for t in tasks
|
|
|
- if t.get("status") == "processing" and t.get("task_id")
|
|
|
- ]
|
|
|
- except Exception:
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-def get_tasks_by_ids(task_ids: list[int]) -> list[dict[str, Any]]:
|
|
|
- """
|
|
|
- 根据任务 ID 列表从数据库获取任务详细信息
|
|
|
-
|
|
|
- Args:
|
|
|
- task_ids: 任务 ID 列表
|
|
|
-
|
|
|
- Returns:
|
|
|
- 包含任务详细信息的列表(包括 task_description)
|
|
|
- """
|
|
|
- if not task_ids:
|
|
|
- return []
|
|
|
-
|
|
|
- try:
|
|
|
- from psycopg2.extras import RealDictCursor
|
|
|
-
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- logger.error("无法获取数据库连接")
|
|
|
- return []
|
|
|
-
|
|
|
- cursor = conn.cursor(cursor_factory=RealDictCursor)
|
|
|
-
|
|
|
- # 构建 IN 查询
|
|
|
- placeholders = ", ".join(["%s"] * len(task_ids))
|
|
|
- query = f"""
|
|
|
- SELECT task_id, task_name, task_description, status,
|
|
|
- code_name, code_path, create_time, create_by
|
|
|
- FROM task_list
|
|
|
- WHERE task_id IN ({placeholders})
|
|
|
- ORDER BY create_time ASC
|
|
|
- """
|
|
|
-
|
|
|
- cursor.execute(query, tuple(task_ids))
|
|
|
- tasks = cursor.fetchall()
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
-
|
|
|
- task_list = [dict(task) for task in tasks]
|
|
|
- logger.info(f"从数据库获取了 {len(task_list)} 个任务的详细信息")
|
|
|
-
|
|
|
- return task_list
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"根据 ID 获取任务失败: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return []
|
|
|
-
|
|
|
-
|
|
|
-def get_all_tasks_to_execute() -> list[dict[str, Any]]:
|
|
|
- """
|
|
|
- 获取所有需要执行的任务(包括新的 pending 任务和已有的 processing 任务)
|
|
|
-
|
|
|
- 此函数确保返回的任务列表包含完整信息(特别是 task_description),
|
|
|
- 用于生成执行指令文件。
|
|
|
-
|
|
|
- Returns:
|
|
|
- 包含所有需要执行任务的完整信息列表
|
|
|
- """
|
|
|
- # 1. 获取本地 pending_tasks.json 中 processing 状态的任务 ID
|
|
|
- processing_ids = get_processing_task_ids()
|
|
|
-
|
|
|
- # 2. 从数据库获取所有 pending 任务
|
|
|
- pending_tasks = get_pending_tasks()
|
|
|
- pending_ids = [t["task_id"] for t in pending_tasks]
|
|
|
-
|
|
|
- # 3. 合并所有需要查询的任务 ID(去重)
|
|
|
- all_task_ids = list(set(processing_ids + pending_ids))
|
|
|
-
|
|
|
- if not all_task_ids:
|
|
|
- return []
|
|
|
-
|
|
|
- # 4. 从数据库获取这些任务的完整信息
|
|
|
- all_tasks = get_tasks_by_ids(all_task_ids)
|
|
|
-
|
|
|
- logger.info(
|
|
|
- f"需要执行的任务: {len(all_tasks)} 个 "
|
|
|
- f"(processing: {len(processing_ids)}, pending: {len(pending_ids)})"
|
|
|
- )
|
|
|
-
|
|
|
- return all_tasks
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 任务文件生成
|
|
|
-# ============================================================================
|
|
|
-def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
|
|
|
- """将任务列表写入 tasks/pending_tasks.json"""
|
|
|
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- # 读取现有任务
|
|
|
- existing_tasks = []
|
|
|
- if PENDING_TASKS_FILE.exists():
|
|
|
- try:
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- existing_tasks = json.load(f)
|
|
|
- except Exception:
|
|
|
- existing_tasks = []
|
|
|
-
|
|
|
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
|
|
|
-
|
|
|
- # 添加新任务
|
|
|
- for task in tasks:
|
|
|
- if task["task_id"] not in existing_ids:
|
|
|
- task_info = {
|
|
|
- "task_id": task["task_id"],
|
|
|
- "task_name": task["task_name"],
|
|
|
- "code_path": task.get("code_path", ""),
|
|
|
- "code_name": task.get("code_name", ""),
|
|
|
- "status": "processing",
|
|
|
- "notified_at": datetime.now().isoformat(),
|
|
|
- "code_file": task.get("code_file", ""),
|
|
|
- }
|
|
|
- existing_tasks.append(task_info)
|
|
|
-
|
|
|
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
|
|
|
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
|
|
|
-
|
|
|
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
|
|
|
-
|
|
|
-
|
|
|
-def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
|
|
|
- """生成任务执行指令文件 tasks/task_execute_instructions.md"""
|
|
|
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
|
|
|
- f.write("# Cursor 自动任务执行指令\n\n")
|
|
|
- f.write("**重要:请立即执行以下任务!**\n\n")
|
|
|
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
- f.write(f"**生成时间**: {gen_time}\n\n")
|
|
|
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
|
|
|
-
|
|
|
- f.write("## 任务完成后的操作\n\n")
|
|
|
- f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
|
|
|
- f.write("对应任务的 `status` 为 `completed`,\n")
|
|
|
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
|
|
|
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
|
|
|
-
|
|
|
- f.write("## 任务约束要求\n\n")
|
|
|
- f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
|
|
|
- f.write("- 不要创建任何 summary、report、总结类的文档文件\n")
|
|
|
- f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n")
|
|
|
- f.write("- 只需创建任务要求的功能脚本文件\n")
|
|
|
- f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
|
|
|
-
|
|
|
- f.write("---\n\n")
|
|
|
-
|
|
|
- for idx, task in enumerate(tasks, 1):
|
|
|
- task_id = task["task_id"]
|
|
|
- task_name = task["task_name"]
|
|
|
- task_desc = task["task_description"]
|
|
|
-
|
|
|
- create_time = task.get("create_time", "")
|
|
|
- if hasattr(create_time, "strftime"):
|
|
|
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
-
|
|
|
- f.write(f"## 任务 {idx}: {task_name}\n\n")
|
|
|
- f.write(f"- **任务ID**: `{task_id}`\n")
|
|
|
- f.write(f"- **创建时间**: {create_time}\n")
|
|
|
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
|
|
|
- f.write(f"### 任务描述\n\n{task_desc}\n\n")
|
|
|
- f.write("---\n\n")
|
|
|
-
|
|
|
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# Neo4j 独立连接(不依赖 Flask 应用上下文)
|
|
|
-# ============================================================================
|
|
|
-def get_neo4j_driver():
|
|
|
- """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
|
|
|
- try:
|
|
|
- from neo4j import GraphDatabase
|
|
|
-
|
|
|
- sys.path.insert(0, str(WORKSPACE_ROOT))
|
|
|
- from app.config.config import config
|
|
|
-
|
|
|
- # 强制使用 production 环境的配置
|
|
|
- app_config = config["production"]
|
|
|
- uri = app_config.NEO4J_URI
|
|
|
- user = app_config.NEO4J_USER
|
|
|
- password = app_config.NEO4J_PASSWORD
|
|
|
-
|
|
|
- driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
|
- return driver
|
|
|
-
|
|
|
- except ImportError as e:
|
|
|
- logger.error(f"导入 Neo4j 驱动失败: {e}")
|
|
|
- return None
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"连接 Neo4j 失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 状态同步
|
|
|
-# ============================================================================
|
|
|
-def extract_dataflow_name_from_task(task_id: int) -> str | None:
|
|
|
- """从任务描述中提取 DataFlow 名称"""
|
|
|
- import re
|
|
|
-
|
|
|
- try:
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- return None
|
|
|
-
|
|
|
- cursor = conn.cursor()
|
|
|
- cursor.execute(
|
|
|
- "SELECT task_description FROM task_list WHERE task_id = %s",
|
|
|
- (task_id,),
|
|
|
- )
|
|
|
- result = cursor.fetchone()
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
-
|
|
|
- if not result:
|
|
|
- return None
|
|
|
-
|
|
|
- task_desc = result[0]
|
|
|
-
|
|
|
- # 从任务描述中提取 DataFlow Name
|
|
|
- match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc)
|
|
|
- if match:
|
|
|
- dataflow_name = match.group(1).strip()
|
|
|
- logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}")
|
|
|
- return dataflow_name
|
|
|
-
|
|
|
- return None
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"提取 DataFlow 名称失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def update_dataflow_script_path(
|
|
|
- task_name: str, script_path: str, task_id: int | None = None
|
|
|
-) -> bool:
|
|
|
- """更新 DataFlow 节点的 script_path 字段"""
|
|
|
- try:
|
|
|
- driver = get_neo4j_driver()
|
|
|
- if not driver:
|
|
|
- logger.error("无法获取 Neo4j 驱动")
|
|
|
- return False
|
|
|
-
|
|
|
- # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称
|
|
|
- dataflow_name = task_name
|
|
|
- if task_id:
|
|
|
- extracted_name = extract_dataflow_name_from_task(task_id)
|
|
|
- if extracted_name:
|
|
|
- dataflow_name = extracted_name
|
|
|
- logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}")
|
|
|
-
|
|
|
- query = """
|
|
|
- MATCH (n:DataFlow {name_zh: $name_zh})
|
|
|
- SET n.script_path = $script_path, n.updated_at = $updated_at
|
|
|
- RETURN n
|
|
|
- """
|
|
|
-
|
|
|
- updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
-
|
|
|
- with driver.session() as session:
|
|
|
- result = session.run(
|
|
|
- query,
|
|
|
- name_zh=dataflow_name,
|
|
|
- script_path=script_path,
|
|
|
- updated_at=updated_at,
|
|
|
- ).single()
|
|
|
-
|
|
|
- driver.close()
|
|
|
-
|
|
|
- if result:
|
|
|
- logger.info(
|
|
|
- f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
|
|
|
- )
|
|
|
- return True
|
|
|
- else:
|
|
|
- logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"更新 DataFlow script_path 失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def sync_completed_tasks_to_db() -> int:
|
|
|
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
|
|
|
- if not PENDING_TASKS_FILE.exists():
|
|
|
- return 0
|
|
|
-
|
|
|
- try:
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- tasks = json.load(f)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"读取 pending_tasks.json 失败: {e}")
|
|
|
- return 0
|
|
|
-
|
|
|
- if not isinstance(tasks, list):
|
|
|
- return 0
|
|
|
-
|
|
|
- updated = 0
|
|
|
- remaining_tasks = []
|
|
|
-
|
|
|
- for t in tasks:
|
|
|
- if t.get("status") == "completed":
|
|
|
- task_id = t.get("task_id")
|
|
|
- if not task_id:
|
|
|
- continue
|
|
|
-
|
|
|
- task_name = t.get("task_name")
|
|
|
- code_path = t.get("code_path")
|
|
|
- # 使用 code_file 字段获取实际的脚本文件名
|
|
|
- code_file = t.get("code_file", "")
|
|
|
-
|
|
|
- # 统一处理:code_path 始终为 "datafactory/scripts"
|
|
|
- code_path = "datafactory/scripts"
|
|
|
-
|
|
|
- # 使用 code_file 判断是否为 Python 脚本
|
|
|
- is_python_script = code_file and code_file.endswith(".py")
|
|
|
-
|
|
|
- # 修复路径重复问题:统一处理脚本路径
|
|
|
- if is_python_script:
|
|
|
- if code_file.startswith(code_path):
|
|
|
- # code_file 已经是完整路径
|
|
|
- full_script_path = code_file
|
|
|
- # 提取纯文件名用于数据库存储
|
|
|
- code_file_name = Path(code_file).name
|
|
|
- elif "/" in code_file or "\\" in code_file:
|
|
|
- # code_file 包含其他路径,提取文件名
|
|
|
- code_file_name = Path(code_file).name
|
|
|
- full_script_path = f"{code_path}/{code_file_name}"
|
|
|
- else:
|
|
|
- # code_file 只是文件名
|
|
|
- code_file_name = code_file
|
|
|
- full_script_path = f"{code_path}/{code_file}"
|
|
|
- logger.info(f"任务 {task_id} 使用 Python 脚本: {full_script_path}")
|
|
|
- else:
|
|
|
- logger.info(
|
|
|
- f"任务 {task_id} 的 code_file ({code_file}) 不是 Python 脚本,跳过 DataFlow 更新"
|
|
|
- )
|
|
|
- code_file_name = code_file
|
|
|
- full_script_path = ""
|
|
|
-
|
|
|
- if update_task_status(task_id, "completed", code_file_name, code_path):
|
|
|
- updated += 1
|
|
|
- logger.info(f"已同步任务 {task_id} 为 completed")
|
|
|
-
|
|
|
- # 只有 Python 脚本才更新 DataFlow 节点的 script_path
|
|
|
- if task_name and is_python_script:
|
|
|
- if update_dataflow_script_path(
|
|
|
- task_name, full_script_path, task_id=task_id
|
|
|
- ):
|
|
|
- logger.info(
|
|
|
- f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
|
|
|
- )
|
|
|
- else:
|
|
|
- logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
|
|
|
-
|
|
|
- # 自动部署到生产服务器(如果启用)
|
|
|
- if ENABLE_AUTO_DEPLOY:
|
|
|
- logger.info(f"开始自动部署任务 {task_id} 到生产服务器...")
|
|
|
- if auto_deploy_completed_task(t):
|
|
|
- logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
|
|
|
- else:
|
|
|
- logger.warning(f"任务 {task_id} 部署到生产服务器失败")
|
|
|
- else:
|
|
|
- logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署")
|
|
|
- else:
|
|
|
- remaining_tasks.append(t)
|
|
|
- else:
|
|
|
- remaining_tasks.append(t)
|
|
|
-
|
|
|
- if updated > 0:
|
|
|
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
|
|
|
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
|
|
|
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
|
|
|
-
|
|
|
- return updated
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 生产服务器部署功能
|
|
|
-# ============================================================================
|
|
|
-def get_ssh_connection():
|
|
|
- """获取 SSH 连接到生产服务器"""
|
|
|
- try:
|
|
|
- import paramiko # type: ignore
|
|
|
-
|
|
|
- ssh = paramiko.SSHClient()
|
|
|
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
-
|
|
|
- logger.info(
|
|
|
- f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
|
|
|
- f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
|
|
|
- )
|
|
|
-
|
|
|
- ssh.connect(
|
|
|
- hostname=PRODUCTION_SERVER["host"],
|
|
|
- port=PRODUCTION_SERVER["port"],
|
|
|
- username=PRODUCTION_SERVER["username"],
|
|
|
- password=PRODUCTION_SERVER["password"],
|
|
|
- timeout=10,
|
|
|
- )
|
|
|
-
|
|
|
- logger.info("✅ SSH 连接成功")
|
|
|
- return ssh
|
|
|
-
|
|
|
- except ImportError:
|
|
|
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
|
|
|
- return None
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"SSH 连接失败: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def test_ssh_connection() -> bool:
|
|
|
- """测试 SSH 连接到生产服务器"""
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("测试生产服务器连接")
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- ssh = get_ssh_connection()
|
|
|
- if not ssh:
|
|
|
- logger.error("❌ SSH 连接测试失败")
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 测试执行命令
|
|
|
- _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
|
|
|
- output = stdout.read().decode().strip()
|
|
|
- logger.info(f"✅ 命令执行成功: {output}")
|
|
|
-
|
|
|
- # 检查目标目录是否存在
|
|
|
- _, stdout, _ = ssh.exec_command(
|
|
|
- f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
|
|
|
- )
|
|
|
- result = stdout.read().decode().strip()
|
|
|
-
|
|
|
- if result == "exists":
|
|
|
- logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
|
|
|
- else:
|
|
|
- logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
|
|
|
- logger.info("将在首次部署时自动创建")
|
|
|
-
|
|
|
- ssh.close()
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("✅ 连接测试完成")
|
|
|
- logger.info("=" * 60)
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 测试执行命令失败: {e}")
|
|
|
- ssh.close()
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def deploy_script_to_production(
|
|
|
- local_script_path: str, remote_filename: str | None = None
|
|
|
-) -> bool:
|
|
|
- """部署脚本文件到生产服务器"""
|
|
|
- try:
|
|
|
- import importlib.util
|
|
|
-
|
|
|
- if importlib.util.find_spec("paramiko") is None:
|
|
|
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
|
|
|
- return False
|
|
|
-
|
|
|
- # 转换为绝对路径
|
|
|
- local_path = Path(local_script_path)
|
|
|
- if not local_path.is_absolute():
|
|
|
- local_path = WORKSPACE_ROOT / local_path
|
|
|
-
|
|
|
- if not local_path.exists():
|
|
|
- logger.error(f"本地文件不存在: {local_path}")
|
|
|
- return False
|
|
|
-
|
|
|
- # 确定远程文件名
|
|
|
- if not remote_filename:
|
|
|
- remote_filename = local_path.name
|
|
|
-
|
|
|
- remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
|
|
|
-
|
|
|
- # 建立 SSH 连接
|
|
|
- ssh = get_ssh_connection()
|
|
|
- if not ssh:
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 创建 SFTP 客户端
|
|
|
- sftp = ssh.open_sftp()
|
|
|
-
|
|
|
- # 确保远程目录存在
|
|
|
- try:
|
|
|
- sftp.stat(PRODUCTION_SERVER["script_path"])
|
|
|
- except FileNotFoundError:
|
|
|
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
|
|
|
- _, stdout, _ = ssh.exec_command(
|
|
|
- f"mkdir -p {PRODUCTION_SERVER['script_path']}"
|
|
|
- )
|
|
|
- stdout.channel.recv_exit_status()
|
|
|
-
|
|
|
- # 上传文件
|
|
|
- logger.info(f"正在上传: {local_path} -> {remote_path}")
|
|
|
- sftp.put(str(local_path), remote_path)
|
|
|
-
|
|
|
- # 设置文件权限为可执行
|
|
|
- sftp.chmod(remote_path, 0o755)
|
|
|
-
|
|
|
- logger.info(f"✅ 脚本部署成功: {remote_path}")
|
|
|
-
|
|
|
- sftp.close()
|
|
|
- ssh.close()
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"文件传输失败: {e}")
|
|
|
- ssh.close()
|
|
|
- return False
|
|
|
-
|
|
|
- except ImportError:
|
|
|
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
|
|
|
- return False
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"部署脚本失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
|
|
|
- """
|
|
|
- 部署 n8n 工作流到 n8n 服务器
|
|
|
-
|
|
|
- 此函数执行两个步骤:
|
|
|
- 1. 通过 n8n API 创建工作流(主要步骤)
|
|
|
- 2. 通过 SFTP 备份工作流文件到生产服务器(可选)
|
|
|
- """
|
|
|
- try:
|
|
|
- import json
|
|
|
-
|
|
|
- import requests
|
|
|
-
|
|
|
- # 转换为绝对路径
|
|
|
- local_path = Path(workflow_file)
|
|
|
- if not local_path.is_absolute():
|
|
|
- local_path = WORKSPACE_ROOT / local_path
|
|
|
-
|
|
|
- if not local_path.exists():
|
|
|
- logger.error(f"工作流文件不存在: {local_path}")
|
|
|
- return False
|
|
|
-
|
|
|
- # 加载工作流 JSON
|
|
|
- with open(local_path, encoding="utf-8") as f:
|
|
|
- workflow_data = json.load(f)
|
|
|
-
|
|
|
- workflow_name = workflow_data.get("name", local_path.stem)
|
|
|
- logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
|
|
|
-
|
|
|
- # 获取 n8n API 配置
|
|
|
- try:
|
|
|
- sys.path.insert(0, str(WORKSPACE_ROOT))
|
|
|
- from app.config.config import BaseConfig
|
|
|
-
|
|
|
- api_url = BaseConfig.N8N_API_URL
|
|
|
- api_key = BaseConfig.N8N_API_KEY
|
|
|
- timeout = BaseConfig.N8N_API_TIMEOUT
|
|
|
- except (ImportError, AttributeError):
|
|
|
- import os
|
|
|
-
|
|
|
- api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
|
|
|
- api_key = os.environ.get("N8N_API_KEY", "")
|
|
|
- timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
|
|
|
-
|
|
|
- if not api_key:
|
|
|
- logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
|
|
|
- return False
|
|
|
-
|
|
|
- # 准备 API 请求
|
|
|
- headers = {
|
|
|
- "X-N8N-API-KEY": api_key,
|
|
|
- "Content-Type": "application/json",
|
|
|
- "Accept": "application/json",
|
|
|
- }
|
|
|
-
|
|
|
- # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
|
|
|
- workflow_payload = {
|
|
|
- "name": workflow_name,
|
|
|
- "nodes": workflow_data.get("nodes", []),
|
|
|
- "connections": workflow_data.get("connections", {}),
|
|
|
- "settings": workflow_data.get("settings", {}),
|
|
|
- }
|
|
|
-
|
|
|
- # 调用 n8n API 创建工作流
|
|
|
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
|
|
|
- logger.info(f"调用 n8n API: {create_url}")
|
|
|
-
|
|
|
- try:
|
|
|
- response = requests.post(
|
|
|
- create_url,
|
|
|
- headers=headers,
|
|
|
- json=workflow_payload,
|
|
|
- timeout=timeout,
|
|
|
- )
|
|
|
-
|
|
|
- if response.status_code == 401:
|
|
|
- logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
|
|
|
- return False
|
|
|
- elif response.status_code == 403:
|
|
|
- logger.error("n8n API 权限不足")
|
|
|
- return False
|
|
|
-
|
|
|
- response.raise_for_status()
|
|
|
- created_workflow = response.json()
|
|
|
- workflow_id = created_workflow.get("id")
|
|
|
-
|
|
|
- logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
|
|
|
-
|
|
|
- # 可选:将工作流文件备份到生产服务器
|
|
|
- try:
|
|
|
- _backup_workflow_to_server(local_path)
|
|
|
- except Exception as backup_error:
|
|
|
- logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}")
|
|
|
-
|
|
|
- return True
|
|
|
-
|
|
|
- except requests.exceptions.Timeout:
|
|
|
- logger.error("n8n API 请求超时,请检查网络连接")
|
|
|
- return False
|
|
|
- except requests.exceptions.ConnectionError:
|
|
|
- logger.error(f"无法连接到 n8n 服务器: {api_url}")
|
|
|
- return False
|
|
|
- except requests.exceptions.HTTPError as e:
|
|
|
- error_detail = ""
|
|
|
- try:
|
|
|
- error_detail = e.response.json()
|
|
|
- except Exception:
|
|
|
- error_detail = e.response.text
|
|
|
- logger.error(
|
|
|
- f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
|
|
|
- )
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"部署工作流失败: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def _backup_workflow_to_server(local_path: Path) -> bool:
|
|
|
- """备份工作流文件到生产服务器(通过 SFTP)"""
|
|
|
- try:
|
|
|
- import importlib.util
|
|
|
-
|
|
|
- if importlib.util.find_spec("paramiko") is None:
|
|
|
- logger.debug("未安装 paramiko 库,跳过文件备份")
|
|
|
- return False
|
|
|
-
|
|
|
- remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
|
|
|
-
|
|
|
- # 建立 SSH 连接
|
|
|
- ssh = get_ssh_connection()
|
|
|
- if not ssh:
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 创建 SFTP 客户端
|
|
|
- sftp = ssh.open_sftp()
|
|
|
-
|
|
|
- # 确保远程目录存在
|
|
|
- try:
|
|
|
- sftp.stat(PRODUCTION_SERVER["workflow_path"])
|
|
|
- except FileNotFoundError:
|
|
|
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
|
|
|
- _, stdout, _ = ssh.exec_command(
|
|
|
- f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
|
|
|
- )
|
|
|
- stdout.channel.recv_exit_status()
|
|
|
-
|
|
|
- # 上传工作流文件
|
|
|
- logger.debug(f"备份工作流文件: {local_path} -> {remote_path}")
|
|
|
- sftp.put(str(local_path), remote_path)
|
|
|
-
|
|
|
- sftp.close()
|
|
|
- ssh.close()
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"工作流文件备份失败: {e}")
|
|
|
- ssh.close()
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"备份工作流失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def find_remote_workflow_files(task_info: dict[str, Any]) -> list[str]:
|
|
|
- """
|
|
|
- 从生产服务器查找与任务相关的 n8n 工作流文件
|
|
|
-
|
|
|
- 查找策略:
|
|
|
- 1. 列出远程 workflow_path 目录下的所有 .json 文件
|
|
|
- 2. 根据任务名称或脚本名称匹配相关工作流
|
|
|
-
|
|
|
- Args:
|
|
|
- task_info: 任务信息字典
|
|
|
-
|
|
|
- Returns:
|
|
|
- 远程工作流文件路径列表
|
|
|
- """
|
|
|
- remote_files: list[str] = []
|
|
|
-
|
|
|
- code_file = task_info.get("code_file", "")
|
|
|
- task_name = task_info.get("task_name", "")
|
|
|
-
|
|
|
- ssh = get_ssh_connection()
|
|
|
- if not ssh:
|
|
|
- logger.warning("无法连接到生产服务器,跳过远程工作流文件查找")
|
|
|
- return remote_files
|
|
|
-
|
|
|
- try:
|
|
|
- workflow_path = PRODUCTION_SERVER["workflow_path"]
|
|
|
-
|
|
|
- # 检查目录是否存在
|
|
|
- _, stdout, _ = ssh.exec_command(f"test -d {workflow_path} && echo 'exists'")
|
|
|
- if stdout.read().decode().strip() != "exists":
|
|
|
- logger.info(f"远程工作流目录不存在: {workflow_path}")
|
|
|
- ssh.close()
|
|
|
- return remote_files
|
|
|
-
|
|
|
- # 列出目录下所有 .json 文件
|
|
|
- _, stdout, _ = ssh.exec_command(f"ls -1 {workflow_path}/*.json 2>/dev/null")
|
|
|
- file_list = stdout.read().decode().strip().split("\n")
|
|
|
-
|
|
|
- # 过滤有效文件路径
|
|
|
- all_json_files = [
|
|
|
- f.strip() for f in file_list if f.strip() and f.endswith(".json")
|
|
|
- ]
|
|
|
-
|
|
|
- if not all_json_files:
|
|
|
- logger.info(f"远程工作流目录 {workflow_path} 中没有 JSON 文件")
|
|
|
- ssh.close()
|
|
|
- return remote_files
|
|
|
-
|
|
|
- logger.info(f"远程服务器发现 {len(all_json_files)} 个工作流文件")
|
|
|
-
|
|
|
- # 根据任务信息匹配相关工作流
|
|
|
- # 构建匹配模式
|
|
|
- match_patterns: list[str] = []
|
|
|
-
|
|
|
- # 基于脚本文件名匹配
|
|
|
- if code_file and code_file.endswith(".py"):
|
|
|
- script_base = code_file[:-3] # 去掉 .py
|
|
|
- match_patterns.append(script_base.lower())
|
|
|
-
|
|
|
- # 基于任务名称匹配(针对 DF_DO 格式的任务名)
|
|
|
- if task_name:
|
|
|
- if task_name.startswith("DF_DO"):
|
|
|
- match_patterns.append(task_name.lower())
|
|
|
- # 对于中文任务名,尝试提取英文/数字部分
|
|
|
- import re
|
|
|
-
|
|
|
- alphanumeric = re.sub(r"[^a-zA-Z0-9_-]", "", task_name)
|
|
|
- if alphanumeric and len(alphanumeric) >= 3:
|
|
|
- match_patterns.append(alphanumeric.lower())
|
|
|
-
|
|
|
- # 匹配文件
|
|
|
- for remote_file in all_json_files:
|
|
|
- file_name_lower = Path(remote_file).stem.lower()
|
|
|
-
|
|
|
- # 检查是否与任何模式匹配
|
|
|
- matched = False
|
|
|
- for pattern in match_patterns:
|
|
|
- if pattern in file_name_lower or file_name_lower in pattern:
|
|
|
- matched = True
|
|
|
- break
|
|
|
-
|
|
|
- if matched and remote_file not in remote_files:
|
|
|
- remote_files.append(remote_file)
|
|
|
- logger.info(f" 匹配到工作流: {Path(remote_file).name}")
|
|
|
-
|
|
|
- # 如果没有匹配到任何文件,不再自动部署所有文件
|
|
|
- # 这样可以避免误部署其他任务的工作流
|
|
|
- if not remote_files and all_json_files:
|
|
|
- logger.info("没有精确匹配的工作流文件,跳过远程工作流部署")
|
|
|
- # 不再自动部署所有文件,避免重复部署问题
|
|
|
-
|
|
|
- ssh.close()
|
|
|
- return remote_files
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"查找远程工作流文件失败: {e}")
|
|
|
- if ssh:
|
|
|
- ssh.close()
|
|
|
- return remote_files
|
|
|
-
|
|
|
-
|
|
|
-def deploy_remote_workflow_to_n8n(remote_file_path: str) -> bool:
|
|
|
- """
|
|
|
- 从生产服务器读取工作流 JSON 文件并部署到 n8n 系统
|
|
|
-
|
|
|
- Args:
|
|
|
- remote_file_path: 远程服务器上的工作流文件完整路径
|
|
|
-
|
|
|
- Returns:
|
|
|
- 是否部署成功
|
|
|
- """
|
|
|
- try:
|
|
|
- import requests
|
|
|
-
|
|
|
- ssh = get_ssh_connection()
|
|
|
- if not ssh:
|
|
|
- logger.error("无法连接到生产服务器")
|
|
|
- return False
|
|
|
-
|
|
|
- # 读取远程工作流文件内容
|
|
|
- logger.info(f"从远程服务器读取工作流: {remote_file_path}")
|
|
|
- _, stdout, stderr = ssh.exec_command(f"cat {remote_file_path}")
|
|
|
- file_content = stdout.read().decode("utf-8")
|
|
|
- error_output = stderr.read().decode()
|
|
|
-
|
|
|
- if error_output:
|
|
|
- logger.error(f"读取远程文件失败: {error_output}")
|
|
|
- ssh.close()
|
|
|
- return False
|
|
|
-
|
|
|
- ssh.close()
|
|
|
-
|
|
|
- # 解析工作流 JSON
|
|
|
- try:
|
|
|
- workflow_data = json.loads(file_content)
|
|
|
- except json.JSONDecodeError as e:
|
|
|
- logger.error(f"解析工作流 JSON 失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
- workflow_name = workflow_data.get("name", Path(remote_file_path).stem)
|
|
|
- logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
|
|
|
-
|
|
|
- # 获取 n8n API 配置
|
|
|
- try:
|
|
|
- sys.path.insert(0, str(WORKSPACE_ROOT))
|
|
|
- from app.config.config import BaseConfig
|
|
|
-
|
|
|
- api_url = BaseConfig.N8N_API_URL
|
|
|
- api_key = BaseConfig.N8N_API_KEY
|
|
|
- timeout = BaseConfig.N8N_API_TIMEOUT
|
|
|
- except (ImportError, AttributeError):
|
|
|
- import os
|
|
|
-
|
|
|
- api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
|
|
|
- api_key = os.environ.get("N8N_API_KEY", "")
|
|
|
- timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
|
|
|
-
|
|
|
- if not api_key:
|
|
|
- logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
|
|
|
- return False
|
|
|
-
|
|
|
- # 准备 API 请求
|
|
|
- headers = {
|
|
|
- "X-N8N-API-KEY": api_key,
|
|
|
- "Content-Type": "application/json",
|
|
|
- "Accept": "application/json",
|
|
|
- }
|
|
|
-
|
|
|
- # 准备工作流数据
|
|
|
- workflow_payload = {
|
|
|
- "name": workflow_name,
|
|
|
- "nodes": workflow_data.get("nodes", []),
|
|
|
- "connections": workflow_data.get("connections", {}),
|
|
|
- "settings": workflow_data.get("settings", {}),
|
|
|
- }
|
|
|
-
|
|
|
- # 先检查是否已存在同名工作流
|
|
|
- list_url = f"{api_url.rstrip('/')}/api/v1/workflows"
|
|
|
- try:
|
|
|
- list_response = requests.get(
|
|
|
- list_url,
|
|
|
- headers=headers,
|
|
|
- timeout=timeout,
|
|
|
- )
|
|
|
- if list_response.status_code == 200:
|
|
|
- existing_workflows = list_response.json().get("data", [])
|
|
|
- existing_wf = None
|
|
|
- for wf in existing_workflows:
|
|
|
- if wf.get("name") == workflow_name:
|
|
|
- existing_wf = wf
|
|
|
- break
|
|
|
-
|
|
|
- if existing_wf:
|
|
|
- # 已存在同名工作流,跳过创建避免重复
|
|
|
- workflow_id = existing_wf.get("id")
|
|
|
- logger.info(
|
|
|
- f"发现已存在的工作流 (ID: {workflow_id}),跳过部署避免重复"
|
|
|
- )
|
|
|
- logger.info(
|
|
|
- "如需更新工作流,请手动在 n8n 控制台操作或删除后重新部署"
|
|
|
- )
|
|
|
- return True # 返回成功,因为工作流已存在
|
|
|
-
|
|
|
- except requests.exceptions.RequestException as e:
|
|
|
- logger.warning(f"检查已存在工作流时出错: {e}")
|
|
|
-
|
|
|
- # 调用 n8n API 创建工作流
|
|
|
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
|
|
|
- logger.info(f"调用 n8n API 创建工作流: {create_url}")
|
|
|
-
|
|
|
- try:
|
|
|
- response = requests.post(
|
|
|
- create_url,
|
|
|
- headers=headers,
|
|
|
- json=workflow_payload,
|
|
|
- timeout=timeout,
|
|
|
- )
|
|
|
-
|
|
|
- if response.status_code == 401:
|
|
|
- logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
|
|
|
- return False
|
|
|
- elif response.status_code == 403:
|
|
|
- logger.error("n8n API 权限不足")
|
|
|
- return False
|
|
|
-
|
|
|
- response.raise_for_status()
|
|
|
- created_workflow = response.json()
|
|
|
- workflow_id = created_workflow.get("id")
|
|
|
-
|
|
|
- logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
|
|
|
- return True
|
|
|
-
|
|
|
- except requests.exceptions.Timeout:
|
|
|
- logger.error("n8n API 请求超时,请检查网络连接")
|
|
|
- return False
|
|
|
- except requests.exceptions.ConnectionError:
|
|
|
- logger.error(f"无法连接到 n8n 服务器: {api_url}")
|
|
|
- return False
|
|
|
- except requests.exceptions.HTTPError as e:
|
|
|
- error_detail = ""
|
|
|
- try:
|
|
|
- error_detail = e.response.json()
|
|
|
- except Exception:
|
|
|
- error_detail = e.response.text
|
|
|
- logger.error(
|
|
|
- f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
|
|
|
- )
|
|
|
- return False
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"从远程服务器部署工作流失败: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def find_related_workflow_files(
|
|
|
- task_info: dict[str, Any],
|
|
|
-) -> list[Path]:
|
|
|
- """
|
|
|
- 查找与任务相关的所有 n8n 工作流文件
|
|
|
-
|
|
|
- 查找策略:
|
|
|
- 1. 与脚本同目录的工作流文件 (n8n_workflow_*.json)
|
|
|
- 2. datafactory/n8n_workflows 目录下的工作流文件
|
|
|
- 3. 根据任务名称模式匹配
|
|
|
- 4. 根据脚本名称匹配 (去掉 .py 后缀)
|
|
|
- 5. 根据任务 ID 匹配
|
|
|
- 6. 最近修改的工作流文件 (在任务创建后修改的)
|
|
|
- """
|
|
|
- workflow_files: list[Path] = []
|
|
|
- code_name = task_info.get("code_name", "")
|
|
|
- code_path = task_info.get("code_path", "datafactory/scripts")
|
|
|
- task_name = task_info.get("task_name", "")
|
|
|
- task_id = task_info.get("task_id")
|
|
|
-
|
|
|
- # 获取任务通知时间用于判断文件是否是新创建的
|
|
|
- notified_at_str = task_info.get("notified_at", "")
|
|
|
- notified_at = None
|
|
|
- if notified_at_str:
|
|
|
- with contextlib.suppress(ValueError, TypeError):
|
|
|
- notified_at = datetime.fromisoformat(notified_at_str.replace("Z", "+00:00"))
|
|
|
-
|
|
|
- # 查找模式1: 与脚本同目录的工作流文件
|
|
|
- script_dir = WORKSPACE_ROOT / code_path
|
|
|
- if script_dir.exists() and script_dir.is_dir():
|
|
|
- for wf_file in script_dir.glob("n8n_workflow_*.json"):
|
|
|
- if wf_file.is_file() and wf_file not in workflow_files:
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 也查找以 workflow_ 开头的文件
|
|
|
- for wf_file in script_dir.glob("workflow_*.json"):
|
|
|
- if wf_file.is_file() and wf_file not in workflow_files:
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 查找模式2: datafactory/n8n_workflows 目录
|
|
|
- n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
|
|
|
- if n8n_workflows_dir.exists():
|
|
|
- for wf_file in n8n_workflows_dir.glob("*.json"):
|
|
|
- if wf_file.is_file() and wf_file not in workflow_files:
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 查找模式3: 根据任务名称匹配
|
|
|
- if task_name and task_name != "未知任务":
|
|
|
- # 尝试多种名称变体
|
|
|
- name_patterns = [
|
|
|
- task_name.replace(" ", "_").lower(),
|
|
|
- task_name.replace(" ", "-").lower(),
|
|
|
- task_name.lower(),
|
|
|
- ]
|
|
|
-
|
|
|
- for pattern in name_patterns:
|
|
|
- if len(pattern) < 3: # 跳过过短的模式
|
|
|
- continue
|
|
|
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*{pattern}*.json"):
|
|
|
- # 验证是文件、未添加过、且是有效的 n8n 工作流文件
|
|
|
- if (
|
|
|
- wf_file.is_file()
|
|
|
- and wf_file not in workflow_files
|
|
|
- and _is_n8n_workflow_file(wf_file)
|
|
|
- ):
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 查找模式4: 根据脚本名称匹配
|
|
|
- if code_name and code_name.endswith(".py"):
|
|
|
- script_base_name = code_name[:-3] # 去掉 .py
|
|
|
-
|
|
|
- # 在 datafactory 目录下查找
|
|
|
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
|
|
|
- f"*{script_base_name}*.json"
|
|
|
- ):
|
|
|
- if (
|
|
|
- wf_file.is_file()
|
|
|
- and wf_file not in workflow_files
|
|
|
- and _is_n8n_workflow_file(wf_file)
|
|
|
- ):
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 查找模式5: 根据任务 ID 匹配
|
|
|
- if task_id:
|
|
|
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*task_{task_id}*.json"):
|
|
|
- if (
|
|
|
- wf_file.is_file()
|
|
|
- and wf_file not in workflow_files
|
|
|
- and _is_n8n_workflow_file(wf_file)
|
|
|
- ):
|
|
|
- workflow_files.append(wf_file)
|
|
|
-
|
|
|
- # 查找模式6: 最近修改的工作流文件(在任务创建后修改的)
|
|
|
- if notified_at:
|
|
|
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob("*.json"):
|
|
|
- if wf_file.is_file() and wf_file not in workflow_files:
|
|
|
- try:
|
|
|
- mtime = datetime.fromtimestamp(wf_file.stat().st_mtime)
|
|
|
- # 如果文件在任务通知后被修改,可能是相关的工作流
|
|
|
- if mtime > notified_at.replace(
|
|
|
- tzinfo=None
|
|
|
- ) and _is_n8n_workflow_file(wf_file):
|
|
|
- workflow_files.append(wf_file)
|
|
|
- logger.debug(f"发现最近修改的工作流: {wf_file.name}")
|
|
|
- except (OSError, ValueError):
|
|
|
- pass
|
|
|
-
|
|
|
- return workflow_files
|
|
|
-
|
|
|
-
|
|
|
-def _is_n8n_workflow_file(file_path: Path) -> bool:
|
|
|
- """
|
|
|
- 检查文件是否是有效的 n8n 工作流文件
|
|
|
-
|
|
|
- 通过检查 JSON 结构来验证
|
|
|
- """
|
|
|
- try:
|
|
|
- with open(file_path, encoding="utf-8") as f:
|
|
|
- data = json.load(f)
|
|
|
-
|
|
|
- # n8n 工作流文件通常包含 nodes 和 connections 字段
|
|
|
- if isinstance(data, dict):
|
|
|
- has_nodes = "nodes" in data
|
|
|
- has_connections = "connections" in data
|
|
|
- has_name = "name" in data
|
|
|
-
|
|
|
- # 至少需要有 nodes 或符合 n8n 工作流特征
|
|
|
- return has_nodes or (has_name and has_connections)
|
|
|
-
|
|
|
- return False
|
|
|
- except (json.JSONDecodeError, OSError):
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
|
|
|
- """
|
|
|
- 自动部署已完成任务的脚本和工作流到生产服务器
|
|
|
-
|
|
|
- 部署流程:
|
|
|
- 1. 部署 Python 脚本到生产服务器 (通过 SFTP)
|
|
|
- 2. 查找并部署相关的 n8n 工作流 (通过 n8n API)
|
|
|
- 3. 记录部署结果
|
|
|
- """
|
|
|
- # 优先使用 code_file 字段,其次使用 code_name
|
|
|
- code_file = task_info.get("code_file", "")
|
|
|
- code_name = task_info.get("code_name", "")
|
|
|
- code_path = task_info.get("code_path", "datafactory/scripts")
|
|
|
- task_name = task_info.get("task_name", "未知任务")
|
|
|
- task_id = task_info.get("task_id", "N/A")
|
|
|
-
|
|
|
- # 确定实际的脚本文件名:优先使用 code_file,如果为空则尝试 code_name
|
|
|
- actual_script_file = code_file if code_file else code_name
|
|
|
-
|
|
|
- if not actual_script_file or not code_path:
|
|
|
- logger.warning(f"任务 {task_name} (ID: {task_id}) 缺少代码文件信息,跳过部署")
|
|
|
- return False
|
|
|
-
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info(f"🚀 开始自动部署任务: {task_name} (ID: {task_id})")
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- deploy_results = {
|
|
|
- "script_deployed": False,
|
|
|
- "workflows_found": 0,
|
|
|
- "workflows_deployed": 0,
|
|
|
- "workflows_failed": 0,
|
|
|
- }
|
|
|
-
|
|
|
- # 1. 部署 Python 脚本
|
|
|
- if actual_script_file.endswith(".py"):
|
|
|
- # 修复路径重复问题:如果 actual_script_file 已经包含 code_path,则只使用 actual_script_file
|
|
|
- # 否则拼接 code_path 和 actual_script_file
|
|
|
- if actual_script_file.startswith(code_path):
|
|
|
- # actual_script_file 已经是完整路径,如 "datafactory/scripts/task_41_xxx.py"
|
|
|
- script_path = actual_script_file
|
|
|
- elif "/" in actual_script_file or "\\" in actual_script_file:
|
|
|
- # actual_script_file 包含路径分隔符但不以 code_path 开头
|
|
|
- # 可能是其他格式的路径,提取文件名后拼接
|
|
|
- script_filename = Path(actual_script_file).name
|
|
|
- script_path = f"{code_path}/{script_filename}"
|
|
|
- else:
|
|
|
- # actual_script_file 只是文件名,正常拼接
|
|
|
- script_path = f"{code_path}/{actual_script_file}"
|
|
|
- logger.info(f"📦 部署 Python 脚本: {script_path}")
|
|
|
-
|
|
|
- if deploy_script_to_production(script_path):
|
|
|
- logger.info(f"✅ 脚本 {actual_script_file} 部署成功")
|
|
|
- deploy_results["script_deployed"] = True
|
|
|
- else:
|
|
|
- logger.error(f"❌ 脚本 {actual_script_file} 部署失败")
|
|
|
-
|
|
|
- # 2. 查找并部署相关的 n8n 工作流文件
|
|
|
- # 2.1 首先从本地查找工作流文件
|
|
|
- logger.info("🔍 查找本地 n8n 工作流文件...")
|
|
|
- workflow_files = find_related_workflow_files(task_info)
|
|
|
-
|
|
|
- if workflow_files:
|
|
|
- logger.info(f"📋 本地发现 {len(workflow_files)} 个相关工作流文件:")
|
|
|
- for wf_file in workflow_files:
|
|
|
- logger.info(f" - {wf_file.relative_to(WORKSPACE_ROOT)}")
|
|
|
-
|
|
|
- for wf_file in workflow_files:
|
|
|
- logger.info(f"🔄 部署本地工作流: {wf_file.name}")
|
|
|
- if deploy_n8n_workflow_to_production(str(wf_file)):
|
|
|
- logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
|
|
|
- deploy_results["workflows_deployed"] += 1
|
|
|
- else:
|
|
|
- logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
|
|
|
- deploy_results["workflows_failed"] += 1
|
|
|
- else:
|
|
|
- logger.info("ℹ️ 本地未发现相关工作流文件")
|
|
|
-
|
|
|
- # 2.2 然后从生产服务器查找并部署工作流文件
|
|
|
- logger.info("🔍 查找生产服务器上的 n8n 工作流文件...")
|
|
|
- remote_workflow_files = find_remote_workflow_files(task_info)
|
|
|
-
|
|
|
- if remote_workflow_files:
|
|
|
- logger.info(f"📋 远程服务器发现 {len(remote_workflow_files)} 个相关工作流文件:")
|
|
|
- for remote_file in remote_workflow_files:
|
|
|
- logger.info(f" - {Path(remote_file).name}")
|
|
|
-
|
|
|
- for remote_file in remote_workflow_files:
|
|
|
- logger.info(f"🔄 部署远程工作流: {Path(remote_file).name}")
|
|
|
- if deploy_remote_workflow_to_n8n(remote_file):
|
|
|
- logger.info(f"✅ 远程工作流 {Path(remote_file).name} 部署成功")
|
|
|
- deploy_results["workflows_deployed"] += 1
|
|
|
- else:
|
|
|
- logger.error(f"❌ 远程工作流 {Path(remote_file).name} 部署失败")
|
|
|
- deploy_results["workflows_failed"] += 1
|
|
|
- else:
|
|
|
- logger.info("ℹ️ 远程服务器未发现相关工作流文件")
|
|
|
-
|
|
|
- # 更新发现的工作流总数
|
|
|
- deploy_results["workflows_found"] = len(workflow_files) + len(remote_workflow_files)
|
|
|
-
|
|
|
- # 3. 汇总部署结果
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info(f"📊 部署结果汇总 - 任务: {task_name} (ID: {task_id})")
|
|
|
- logger.info("-" * 40)
|
|
|
- logger.info(
|
|
|
- f" 脚本部署: {'✅ 成功' if deploy_results['script_deployed'] else '❌ 失败或跳过'}"
|
|
|
- )
|
|
|
- logger.info(f" 发现工作流: {deploy_results['workflows_found']} 个")
|
|
|
- logger.info(f" 工作流部署成功: {deploy_results['workflows_deployed']} 个")
|
|
|
- logger.info(f" 工作流部署失败: {deploy_results['workflows_failed']} 个")
|
|
|
-
|
|
|
- # 判断整体部署是否成功
|
|
|
- deploy_success = (
|
|
|
- deploy_results["script_deployed"] and deploy_results["workflows_failed"] == 0
|
|
|
- )
|
|
|
-
|
|
|
- if deploy_success:
|
|
|
- logger.info(f"✅ 任务 {task_name} 部署完成!")
|
|
|
- elif deploy_results["script_deployed"]:
|
|
|
- if deploy_results["workflows_failed"] > 0:
|
|
|
- logger.warning(f"⚠️ 任务 {task_name} 脚本部署成功,但部分工作流部署失败")
|
|
|
- else:
|
|
|
- logger.info(f"✅ 任务 {task_name} 脚本部署成功")
|
|
|
- deploy_success = True # 脚本部署成功就认为整体成功
|
|
|
- else:
|
|
|
- logger.error(f"❌ 任务 {task_name} 部署失败")
|
|
|
-
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- return deploy_success
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# Cursor Agent 自动化
|
|
|
-# ============================================================================
|
|
|
-
|
|
|
-# Agent 会话状态
|
|
|
-AGENT_SESSION_ACTIVE: bool = False
|
|
|
-AGENT_START_TIME: float = 0
|
|
|
-
|
|
|
-
|
|
|
-def get_all_cursor_windows() -> list[dict[str, Any]]:
|
|
|
- """获取所有 Cursor 窗口信息"""
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- return []
|
|
|
-
|
|
|
- cursor_windows: list[dict[str, Any]] = []
|
|
|
-
|
|
|
- def enum_windows_callback(hwnd, _extra):
|
|
|
- if win32gui.IsWindowVisible(hwnd):
|
|
|
- title = win32gui.GetWindowText(hwnd) or ""
|
|
|
- class_name = win32gui.GetClassName(hwnd) or ""
|
|
|
-
|
|
|
- is_cursor = "cursor" in title.lower()
|
|
|
- if class_name and "chrome_widgetwin" in class_name.lower():
|
|
|
- is_cursor = True
|
|
|
-
|
|
|
- if is_cursor:
|
|
|
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
|
|
|
- area = (right - left) * (bottom - top)
|
|
|
- cursor_windows.append(
|
|
|
- {
|
|
|
- "hwnd": hwnd,
|
|
|
- "title": title,
|
|
|
- "class_name": class_name,
|
|
|
- "area": area,
|
|
|
- }
|
|
|
- )
|
|
|
- return True
|
|
|
-
|
|
|
- win32gui.EnumWindows(enum_windows_callback, None)
|
|
|
- return cursor_windows
|
|
|
-
|
|
|
-
|
|
|
-def find_cursor_window() -> int | None:
|
|
|
- """查找 Cursor 主窗口句柄"""
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- return None
|
|
|
-
|
|
|
- cursor_windows = get_all_cursor_windows()
|
|
|
-
|
|
|
- if not cursor_windows:
|
|
|
- logger.warning("未找到 Cursor 窗口")
|
|
|
- return None
|
|
|
-
|
|
|
- # 按面积排序,返回最大的窗口(主窗口)
|
|
|
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
|
|
|
- return cursor_windows[0]["hwnd"]
|
|
|
-
|
|
|
-
|
|
|
-def activate_window(hwnd: int) -> bool:
|
|
|
- """
|
|
|
- 激活指定窗口
|
|
|
-
|
|
|
- Windows 对 SetForegroundWindow 有限制,只有满足以下条件之一才能成功:
|
|
|
- 1. 调用进程是前台进程
|
|
|
- 2. 调用进程由前台进程启动
|
|
|
- 3. 目标窗口属于前台进程
|
|
|
- 4. 没有其他窗口在前台
|
|
|
-
|
|
|
- 此函数使用多种技巧绕过这些限制。
|
|
|
- """
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 方法1: 使用 AttachThreadInput 技巧绕过 SetForegroundWindow 限制
|
|
|
- # 这是最可靠的方法,通过将当前线程附加到前台窗口的线程来获取激活权限
|
|
|
- import ctypes
|
|
|
-
|
|
|
- user32 = ctypes.windll.user32
|
|
|
-
|
|
|
- # 获取当前前台窗口的线程ID
|
|
|
- foreground_hwnd = user32.GetForegroundWindow()
|
|
|
- foreground_thread_id = user32.GetWindowThreadProcessId(foreground_hwnd, None)
|
|
|
-
|
|
|
- # 获取当前线程ID
|
|
|
- current_thread_id = ctypes.windll.kernel32.GetCurrentThreadId()
|
|
|
-
|
|
|
- attached = False
|
|
|
-
|
|
|
- # 如果当前线程不是前台线程,则附加到前台线程
|
|
|
- if current_thread_id != foreground_thread_id:
|
|
|
- attached = user32.AttachThreadInput(
|
|
|
- current_thread_id, foreground_thread_id, True
|
|
|
- )
|
|
|
-
|
|
|
- try:
|
|
|
- # 先确保窗口不是最小化状态
|
|
|
- if win32gui.IsIconic(hwnd):
|
|
|
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
|
|
|
- time.sleep(0.2)
|
|
|
-
|
|
|
- # 使用 BringWindowToTop 将窗口置顶
|
|
|
- user32.BringWindowToTop(hwnd)
|
|
|
-
|
|
|
- # 显示窗口
|
|
|
- win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
|
|
|
-
|
|
|
- # 尝试 SetForegroundWindow
|
|
|
- result = user32.SetForegroundWindow(hwnd)
|
|
|
-
|
|
|
- if not result:
|
|
|
- # 方法2: 使用 Alt 键模拟技巧
|
|
|
- # 发送一个 Alt 键可以让系统认为用户有交互意图
|
|
|
- # 定义必要的常量
|
|
|
- KEYEVENTF_EXTENDEDKEY = 0x0001
|
|
|
- KEYEVENTF_KEYUP = 0x0002
|
|
|
- VK_MENU = 0x12 # Alt 键
|
|
|
-
|
|
|
- # 模拟按下和释放 Alt 键
|
|
|
- user32.keybd_event(VK_MENU, 0, KEYEVENTF_EXTENDEDKEY, 0)
|
|
|
- user32.keybd_event(
|
|
|
- VK_MENU, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0
|
|
|
- )
|
|
|
- time.sleep(0.1)
|
|
|
-
|
|
|
- # 再次尝试
|
|
|
- result = user32.SetForegroundWindow(hwnd)
|
|
|
-
|
|
|
- if not result:
|
|
|
- # 方法3: 使用 ShowWindow 配合 SW_SHOWDEFAULT
|
|
|
- win32gui.ShowWindow(hwnd, win32con.SW_SHOWDEFAULT)
|
|
|
- time.sleep(0.1)
|
|
|
- result = user32.SetForegroundWindow(hwnd)
|
|
|
-
|
|
|
- if not result:
|
|
|
- # 方法4: 使用 SetWindowPos 将窗口置于最顶层
|
|
|
- SWP_NOMOVE = 0x0002
|
|
|
- SWP_NOSIZE = 0x0001
|
|
|
- SWP_SHOWWINDOW = 0x0040
|
|
|
- HWND_TOPMOST = -1
|
|
|
- HWND_NOTOPMOST = -2
|
|
|
-
|
|
|
- # 先设为最顶层
|
|
|
- user32.SetWindowPos(
|
|
|
- hwnd,
|
|
|
- HWND_TOPMOST,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
|
|
|
- )
|
|
|
- time.sleep(0.1)
|
|
|
- # 再取消最顶层(但窗口仍在前台)
|
|
|
- user32.SetWindowPos(
|
|
|
- hwnd,
|
|
|
- HWND_NOTOPMOST,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- 0,
|
|
|
- SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
|
|
|
- )
|
|
|
- result = user32.SetForegroundWindow(hwnd)
|
|
|
-
|
|
|
- time.sleep(0.3)
|
|
|
-
|
|
|
- # 验证是否成功
|
|
|
- current_foreground = user32.GetForegroundWindow()
|
|
|
- if current_foreground == hwnd:
|
|
|
- logger.debug("窗口激活成功")
|
|
|
- return True
|
|
|
- else:
|
|
|
- # 即使 SetForegroundWindow 返回失败,窗口可能已经被置顶并可见
|
|
|
- # 检查窗口是否可见且不是最小化
|
|
|
- if win32gui.IsWindowVisible(hwnd) and not win32gui.IsIconic(hwnd):
|
|
|
- logger.warning("窗口可能未完全激活到前台,但窗口可见,继续执行...")
|
|
|
- return True
|
|
|
- else:
|
|
|
- logger.error("激活窗口失败: 窗口不在前台")
|
|
|
- return False
|
|
|
-
|
|
|
- finally:
|
|
|
- # 分离线程
|
|
|
- if attached:
|
|
|
- user32.AttachThreadInput(current_thread_id, foreground_thread_id, False)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"激活窗口失败: {e}")
|
|
|
- # 最后的备用方案:直接尝试基本操作
|
|
|
- try:
|
|
|
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
|
|
|
- win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
|
|
|
- time.sleep(0.3)
|
|
|
- # 即使失败也返回 True,让调用者继续尝试
|
|
|
- if win32gui.IsWindowVisible(hwnd):
|
|
|
- logger.warning("使用备用方案激活窗口,继续执行...")
|
|
|
- return True
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def open_new_agent() -> bool:
|
|
|
- """在 Cursor 中打开新的 Agent 窗口"""
|
|
|
- global AGENT_SESSION_ACTIVE, AGENT_START_TIME
|
|
|
-
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- logger.warning("当前环境不支持 Cursor GUI 自动化")
|
|
|
- return False
|
|
|
-
|
|
|
- hwnd = find_cursor_window()
|
|
|
- if not hwnd:
|
|
|
- return False
|
|
|
-
|
|
|
- if not activate_window(hwnd):
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
|
|
|
- logger.info("正在打开新的 Agent...")
|
|
|
- pyautogui.hotkey("ctrl", "shift", "i")
|
|
|
- time.sleep(2.0) # 等待 Agent 窗口打开
|
|
|
-
|
|
|
- AGENT_SESSION_ACTIVE = True
|
|
|
- AGENT_START_TIME = time.time()
|
|
|
- logger.info("✅ 新的 Agent 已打开")
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"打开 Agent 失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def close_current_agent(force: bool = False, max_retries: int = 3) -> bool:
|
|
|
- """
|
|
|
- 关闭当前的 Agent 会话
|
|
|
-
|
|
|
- Args:
|
|
|
- force: 是否强制关闭(使用多种方法)
|
|
|
- max_retries: 最大重试次数
|
|
|
-
|
|
|
- 关闭策略:
|
|
|
- 1. 使用 Escape 键关闭 Agent 面板
|
|
|
- 2. 如果失败,尝试 Ctrl+Shift+I 切换 Agent 面板
|
|
|
- 3. 如果仍失败,尝试点击空白区域并按 Escape
|
|
|
- """
|
|
|
- global AGENT_SESSION_ACTIVE
|
|
|
-
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- return False
|
|
|
-
|
|
|
- if not AGENT_SESSION_ACTIVE and not force:
|
|
|
- logger.info("没有活动的 Agent 会话")
|
|
|
- return True
|
|
|
-
|
|
|
- logger.info("🔄 正在关闭 Agent...")
|
|
|
-
|
|
|
- for attempt in range(max_retries):
|
|
|
- try:
|
|
|
- hwnd = find_cursor_window()
|
|
|
- if not hwnd:
|
|
|
- logger.warning("未找到 Cursor 窗口")
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- return False
|
|
|
-
|
|
|
- if not activate_window(hwnd):
|
|
|
- logger.warning(f"激活窗口失败 (尝试 {attempt + 1}/{max_retries})")
|
|
|
- time.sleep(0.5)
|
|
|
- continue
|
|
|
-
|
|
|
- # 方法1: 按 Escape 键关闭 Agent
|
|
|
- logger.debug(f"尝试方法1: Escape 键 (尝试 {attempt + 1}/{max_retries})")
|
|
|
- pyautogui.press("escape")
|
|
|
- time.sleep(0.3)
|
|
|
- pyautogui.press("escape")
|
|
|
- time.sleep(0.3)
|
|
|
-
|
|
|
- # 方法2: 使用 Ctrl+Shift+I 切换 Agent 面板(关闭)
|
|
|
- if force or attempt > 0:
|
|
|
- logger.debug("尝试方法2: Ctrl+Shift+I 切换")
|
|
|
- pyautogui.hotkey("ctrl", "shift", "i")
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- # 方法3: 点击编辑器区域并按 Escape
|
|
|
- if force or attempt > 1:
|
|
|
- logger.debug("尝试方法3: 点击编辑器区域")
|
|
|
- # 获取窗口位置,点击中心偏左位置(编辑器区域)
|
|
|
- try:
|
|
|
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
|
|
|
- center_x = left + (right - left) // 3 # 偏左1/3位置
|
|
|
- center_y = top + (bottom - top) // 2
|
|
|
- pyautogui.click(center_x, center_y)
|
|
|
- time.sleep(0.2)
|
|
|
- pyautogui.press("escape")
|
|
|
- time.sleep(0.3)
|
|
|
- except Exception as click_err:
|
|
|
- logger.debug(f"点击方法失败: {click_err}")
|
|
|
-
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- logger.info("✅ Agent 已关闭")
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"关闭 Agent 尝试 {attempt + 1} 失败: {e}")
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- # 即使关闭失败,也标记为非活动状态,避免状态不一致
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- logger.warning("⚠️ Agent 关闭可能未完全成功,但已重置状态")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def force_close_all_agents() -> bool:
|
|
|
- """
|
|
|
- 强制关闭所有可能的 Agent 会话
|
|
|
-
|
|
|
- 用于清理可能遗留的多个 Agent 窗口
|
|
|
- """
|
|
|
- global AGENT_SESSION_ACTIVE
|
|
|
-
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- return False
|
|
|
-
|
|
|
- logger.info("🔄 强制关闭所有 Agent 会话...")
|
|
|
-
|
|
|
- try:
|
|
|
- hwnd = find_cursor_window()
|
|
|
- if not hwnd:
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- return True
|
|
|
-
|
|
|
- if not activate_window(hwnd):
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- return False
|
|
|
-
|
|
|
- # 连续按多次 Escape 确保关闭所有面板
|
|
|
- for _ in range(5):
|
|
|
- pyautogui.press("escape")
|
|
|
- time.sleep(0.2)
|
|
|
-
|
|
|
- # 使用快捷键关闭可能的 Agent 面板
|
|
|
- pyautogui.hotkey("ctrl", "shift", "i")
|
|
|
- time.sleep(0.3)
|
|
|
- pyautogui.hotkey("ctrl", "shift", "i")
|
|
|
- time.sleep(0.3)
|
|
|
-
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- logger.info("✅ 所有 Agent 会话已关闭")
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"强制关闭 Agent 失败: {e}")
|
|
|
- AGENT_SESSION_ACTIVE = False
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def type_message_to_agent(message: str) -> bool:
|
|
|
- """向 Agent 输入消息"""
|
|
|
- if not HAS_CURSOR_GUI:
|
|
|
- return False
|
|
|
-
|
|
|
- try:
|
|
|
- # 等待 Agent 输入框获得焦点
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
|
|
|
- if HAS_PYPERCLIP:
|
|
|
- try:
|
|
|
- pyperclip.copy(message)
|
|
|
- pyautogui.hotkey("ctrl", "v")
|
|
|
- time.sleep(0.5)
|
|
|
- except Exception:
|
|
|
- # 回退到逐字符输入
|
|
|
- pyautogui.write(message, interval=0.03)
|
|
|
- else:
|
|
|
- pyautogui.write(message, interval=0.03)
|
|
|
-
|
|
|
- time.sleep(0.3)
|
|
|
-
|
|
|
- # 按 Enter 发送消息
|
|
|
- pyautogui.press("enter")
|
|
|
- logger.info("✅ 消息已发送到 Agent")
|
|
|
- return True
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"发送消息到 Agent 失败: {e}")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def wait_for_agent_completion(
|
|
|
- timeout: int = 3600,
|
|
|
- check_interval: int = 30,
|
|
|
-) -> bool:
|
|
|
- """
|
|
|
- 等待 Agent 完成任务
|
|
|
-
|
|
|
- 通过检查 pending_tasks.json 中的任务状态来判断是否完成
|
|
|
- """
|
|
|
- start_time = time.time()
|
|
|
- logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...")
|
|
|
-
|
|
|
- while time.time() - start_time < timeout:
|
|
|
- processing_ids = get_processing_task_ids()
|
|
|
-
|
|
|
- if not processing_ids:
|
|
|
- elapsed = int(time.time() - start_time)
|
|
|
- logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
|
|
|
- return True
|
|
|
-
|
|
|
- remaining = len(processing_ids)
|
|
|
- elapsed = int(time.time() - start_time)
|
|
|
- logger.info(
|
|
|
- f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
|
|
|
- )
|
|
|
-
|
|
|
- time.sleep(check_interval)
|
|
|
-
|
|
|
- logger.warning("等待超时,仍有未完成的任务")
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-def run_agent_once(
|
|
|
- timeout: int = 3600,
|
|
|
- auto_close: bool = True,
|
|
|
-) -> bool:
|
|
|
- """
|
|
|
- 执行一次 Agent 任务
|
|
|
-
|
|
|
- 流程:
|
|
|
- 1. 同步已完成任务到数据库
|
|
|
- 2. 从数据库读取 pending 任务
|
|
|
- 3. 更新任务状态为 processing
|
|
|
- 4. 生成执行指令文件(包含所有 processing 任务)
|
|
|
- 5. 打开 Agent 并发送消息
|
|
|
- 6. 等待任务完成
|
|
|
- 7. 同步完成任务 + 自动部署
|
|
|
- 8. 关闭 Agent
|
|
|
- """
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("Agent 单次执行模式")
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- # 1. 先同步已完成任务
|
|
|
- sync_completed_tasks_to_db()
|
|
|
-
|
|
|
- # 2. 从数据库获取 pending 任务
|
|
|
- logger.info("正在从数据库查询 pending 任务...")
|
|
|
- pending_tasks = get_pending_tasks()
|
|
|
-
|
|
|
- # 3. 获取当前 processing 任务
|
|
|
- processing_ids = get_processing_task_ids()
|
|
|
-
|
|
|
- # 4. 检查是否有任务需要执行
|
|
|
- if not pending_tasks and not processing_ids:
|
|
|
- logger.info("✅ 没有待执行的任务")
|
|
|
- return True
|
|
|
-
|
|
|
- if pending_tasks:
|
|
|
- logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务")
|
|
|
-
|
|
|
- # 5. 更新新任务状态为 processing
|
|
|
- for task in pending_tasks:
|
|
|
- update_task_status(task["task_id"], "processing")
|
|
|
-
|
|
|
- # 6. 写入 pending_tasks.json
|
|
|
- write_pending_tasks_json(pending_tasks)
|
|
|
-
|
|
|
- if processing_ids:
|
|
|
- logger.info(f"发现 {len(processing_ids)} 个已有的 processing 任务")
|
|
|
-
|
|
|
- # 7. 获取所有需要执行的任务(包含完整信息)并生成执行指令
|
|
|
- all_tasks_to_execute = get_all_tasks_to_execute()
|
|
|
-
|
|
|
- if all_tasks_to_execute:
|
|
|
- logger.info(f"共 {len(all_tasks_to_execute)} 个任务需要执行")
|
|
|
- # 生成包含所有任务的执行指令文件
|
|
|
- create_execute_instructions(all_tasks_to_execute)
|
|
|
- else:
|
|
|
- logger.warning("无法获取任务详细信息,跳过生成执行指令")
|
|
|
-
|
|
|
- # 7. 更新触发器文件
|
|
|
- all_processing_ids = get_processing_task_ids()
|
|
|
- if all_processing_ids:
|
|
|
- update_trigger_file(
|
|
|
- task_count=len(all_processing_ids),
|
|
|
- status="有待执行任务",
|
|
|
- task_ids=all_processing_ids,
|
|
|
- )
|
|
|
-
|
|
|
- # 8. 打开 Agent 并发送消息
|
|
|
- if not open_new_agent():
|
|
|
- logger.error("❌ 无法打开 Agent")
|
|
|
- return False
|
|
|
-
|
|
|
- if not type_message_to_agent(AGENT_MESSAGE):
|
|
|
- logger.error("❌ 无法发送消息到 Agent")
|
|
|
- close_current_agent()
|
|
|
- return False
|
|
|
-
|
|
|
- logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...")
|
|
|
-
|
|
|
- # 9. 等待任务完成
|
|
|
- completed = wait_for_agent_completion(timeout=timeout)
|
|
|
-
|
|
|
- # 10. 立即关闭 Agent(在同步之前)
|
|
|
- logger.info("🔄 任务执行完毕,立即关闭 Agent...")
|
|
|
- if auto_close:
|
|
|
- close_current_agent(force=True)
|
|
|
- time.sleep(1.0) # 等待关闭完成
|
|
|
-
|
|
|
- # 11. 同步已完成的任务到数据库(触发自动部署)
|
|
|
- logger.info("🔄 开始同步和部署...")
|
|
|
- sync_completed_tasks_to_db()
|
|
|
-
|
|
|
- if completed:
|
|
|
- logger.info("✅ Agent 已完成所有任务")
|
|
|
- else:
|
|
|
- logger.warning("⚠️ Agent 未能在超时时间内完成所有任务")
|
|
|
- # 强制关闭可能遗留的 Agent
|
|
|
- force_close_all_agents()
|
|
|
-
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("Agent 会话结束")
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- return completed
|
|
|
-
|
|
|
-
|
|
|
-def run_agent_loop(
|
|
|
- interval: int = 300,
|
|
|
- timeout: int = 3600,
|
|
|
- auto_close: bool = True,
|
|
|
-) -> None:
|
|
|
- """
|
|
|
- Agent 循环模式
|
|
|
-
|
|
|
- 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止
|
|
|
-
|
|
|
- 完整流程:
|
|
|
- 1. 同步已完成任务到数据库(触发自动部署)
|
|
|
- 2. 检查是否有新的 pending 任务
|
|
|
- 3. 生成执行指令文件
|
|
|
- 4. 启动 Agent 执行任务
|
|
|
- 5. 等待任务完成
|
|
|
- 6. 同步完成任务并触发自动部署
|
|
|
- 7. 循环...
|
|
|
- """
|
|
|
- global AGENT_SESSION_ACTIVE
|
|
|
-
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("🔄 Agent 循环模式已启动")
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info(f" 检查间隔: {interval} 秒")
|
|
|
- logger.info(f" 任务超时: {timeout} 秒")
|
|
|
- logger.info(f" 自动部署: {'✅ 已启用' if ENABLE_AUTO_DEPLOY else '❌ 已禁用'}")
|
|
|
- logger.info(f" 自动关闭 Agent: {'✅ 是' if auto_close else '❌ 否'}")
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("按 Ctrl+C 停止服务")
|
|
|
- logger.info("=" * 60)
|
|
|
-
|
|
|
- loop_count = 0
|
|
|
- total_tasks_completed = 0
|
|
|
- total_deployments = 0
|
|
|
-
|
|
|
- try:
|
|
|
- while True:
|
|
|
- try:
|
|
|
- loop_count += 1
|
|
|
- logger.info(f"\n{'=' * 60}")
|
|
|
- logger.info(f"📍 开始第 {loop_count} 轮任务检查...")
|
|
|
- logger.info(f"{'=' * 60}")
|
|
|
-
|
|
|
- # 1. 同步已完成任务(这会触发自动部署)
|
|
|
- logger.info("🔄 检查并同步已完成的任务...")
|
|
|
- synced_count = sync_completed_tasks_to_db()
|
|
|
- if synced_count > 0:
|
|
|
- total_tasks_completed += synced_count
|
|
|
- total_deployments += synced_count
|
|
|
- logger.info(
|
|
|
- f"✅ 已同步 {synced_count} 个完成的任务(累计: {total_tasks_completed})"
|
|
|
- )
|
|
|
-
|
|
|
- # 2. 从数据库获取 pending 任务
|
|
|
- logger.info("📡 检查数据库中的 pending 任务...")
|
|
|
- pending_tasks = get_pending_tasks()
|
|
|
-
|
|
|
- if pending_tasks:
|
|
|
- logger.info(f"📋 发现 {len(pending_tasks)} 个新的 pending 任务:")
|
|
|
- for task in pending_tasks:
|
|
|
- logger.info(f" - [{task['task_id']}] {task['task_name']}")
|
|
|
-
|
|
|
- # 更新任务状态为 processing
|
|
|
- for task in pending_tasks:
|
|
|
- update_task_status(task["task_id"], "processing")
|
|
|
-
|
|
|
- # 写入 pending_tasks.json
|
|
|
- write_pending_tasks_json(pending_tasks)
|
|
|
-
|
|
|
- # 3. 检查是否有 processing 任务
|
|
|
- processing_ids = get_processing_task_ids()
|
|
|
-
|
|
|
- # 4. 如果有新任务或有 processing 任务,生成包含所有任务的执行指令
|
|
|
- if pending_tasks or processing_ids:
|
|
|
- all_tasks_to_execute = get_all_tasks_to_execute()
|
|
|
- if all_tasks_to_execute:
|
|
|
- logger.info(
|
|
|
- f"📝 生成执行指令文件,共 {len(all_tasks_to_execute)} 个任务"
|
|
|
- )
|
|
|
- create_execute_instructions(all_tasks_to_execute)
|
|
|
-
|
|
|
- if processing_ids:
|
|
|
- # 如果有活动的 Agent 会话,不需要重新启动
|
|
|
- if AGENT_SESSION_ACTIVE:
|
|
|
- logger.info(
|
|
|
- f"⏳ Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
|
|
|
- )
|
|
|
- else:
|
|
|
- logger.info(
|
|
|
- f"🎯 发现 {len(processing_ids)} 个待处理任务,准备启动 Agent"
|
|
|
- )
|
|
|
-
|
|
|
- # 更新触发器文件
|
|
|
- update_trigger_file(
|
|
|
- task_count=len(processing_ids),
|
|
|
- status="有待执行任务",
|
|
|
- task_ids=processing_ids,
|
|
|
- )
|
|
|
-
|
|
|
- # 启动 Agent
|
|
|
- if open_new_agent():
|
|
|
- if type_message_to_agent(AGENT_MESSAGE):
|
|
|
- logger.info("✅ 已启动 Agent 并发送执行提醒")
|
|
|
-
|
|
|
- # 等待任务完成
|
|
|
- task_completed = wait_for_agent_completion(
|
|
|
- timeout=timeout
|
|
|
- )
|
|
|
-
|
|
|
- # ===== 关键:任务完成后立即关闭 Agent =====
|
|
|
- logger.info("🔄 任务执行完毕,立即关闭 Agent...")
|
|
|
- if auto_close:
|
|
|
- # 使用强制关闭,确保 Agent 被正确关闭
|
|
|
- close_current_agent(force=True)
|
|
|
- # 等待一小段时间确保关闭完成
|
|
|
- time.sleep(1.0)
|
|
|
-
|
|
|
- # 同步完成的任务(这会触发自动部署)
|
|
|
- logger.info("🔄 开始同步和部署...")
|
|
|
- synced = sync_completed_tasks_to_db()
|
|
|
- if synced > 0:
|
|
|
- total_tasks_completed += synced
|
|
|
- total_deployments += synced
|
|
|
- logger.info(
|
|
|
- f"✅ 本轮完成 {synced} 个任务的同步和部署"
|
|
|
- )
|
|
|
-
|
|
|
- # 显示本轮统计
|
|
|
- logger.info(f"📊 本轮统计: 完成任务 {synced} 个")
|
|
|
- if ENABLE_AUTO_DEPLOY:
|
|
|
- logger.info(f" 已触发自动部署: {synced} 个")
|
|
|
-
|
|
|
- # 如果任务未完成(超时),也确保关闭 Agent
|
|
|
- if not task_completed:
|
|
|
- logger.warning("⚠️ 任务超时,强制关闭 Agent")
|
|
|
- force_close_all_agents()
|
|
|
- else:
|
|
|
- logger.warning("❌ 发送消息失败")
|
|
|
- close_current_agent(force=True)
|
|
|
- else:
|
|
|
- logger.warning("❌ 启动 Agent 失败")
|
|
|
- else:
|
|
|
- logger.info("✅ 当前没有待处理任务")
|
|
|
-
|
|
|
- # 显示累计统计
|
|
|
- logger.info(
|
|
|
- f"\n📈 累计统计: 已完成 {total_tasks_completed} 个任务, "
|
|
|
- f"已部署 {total_deployments} 个"
|
|
|
- )
|
|
|
- logger.info(f"⏰ {interval} 秒后将进行第 {loop_count + 1} 轮检查...")
|
|
|
- time.sleep(interval)
|
|
|
-
|
|
|
- except KeyboardInterrupt:
|
|
|
- raise
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"❌ 执行出错: {e}")
|
|
|
- import traceback
|
|
|
-
|
|
|
- logger.error(traceback.format_exc())
|
|
|
- logger.info(f"⏰ {interval} 秒后重试...")
|
|
|
- time.sleep(interval)
|
|
|
-
|
|
|
- except KeyboardInterrupt:
|
|
|
- # 退出时关闭 Agent
|
|
|
- logger.info("\n" + "=" * 60)
|
|
|
- logger.info("⛔ 收到停止信号,正在退出...")
|
|
|
-
|
|
|
- if AGENT_SESSION_ACTIVE:
|
|
|
- logger.info("🔄 正在关闭 Agent...")
|
|
|
- close_current_agent()
|
|
|
-
|
|
|
- # 最后一次同步
|
|
|
- logger.info("🔄 执行最终同步...")
|
|
|
- final_synced = sync_completed_tasks_to_db()
|
|
|
- if final_synced > 0:
|
|
|
- total_tasks_completed += final_synced
|
|
|
- logger.info(f"✅ 最终同步了 {final_synced} 个任务")
|
|
|
-
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("📊 会话统计:")
|
|
|
- logger.info(f" 总循环次数: {loop_count}")
|
|
|
- logger.info(f" 总完成任务: {total_tasks_completed}")
|
|
|
- logger.info(f" 总部署次数: {total_deployments}")
|
|
|
- logger.info("=" * 60)
|
|
|
- logger.info("✅ Agent 循环模式已停止")
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 交互式菜单
|
|
|
-# ============================================================================
|
|
|
-def show_interactive_menu() -> None:
|
|
|
- """显示交互式菜单并执行用户选择的操作"""
|
|
|
- global ENABLE_AUTO_DEPLOY
|
|
|
-
|
|
|
- while True:
|
|
|
- print("\n" + "=" * 60)
|
|
|
- print("自动任务执行调度脚本 - Agent 模式")
|
|
|
- print("=" * 60)
|
|
|
- print("\n请选择操作模式:\n")
|
|
|
- print(" 1. Agent 单次执行")
|
|
|
- print(" 2. Agent 循环模式(含自动部署脚本和n8n工作流)")
|
|
|
- print(" 3. Agent 循环模式(禁用部署)")
|
|
|
- print(" 4. 测试生产服务器连接")
|
|
|
- print(" 5. 查看当前任务状态")
|
|
|
- print(" 6. 手动触发任务部署")
|
|
|
- print(" 7. 强制关闭所有 Agent")
|
|
|
- print(" 0. 退出")
|
|
|
- print("\n" + "-" * 60)
|
|
|
-
|
|
|
- try:
|
|
|
- choice = input("请输入选项 [0-5]: ").strip()
|
|
|
- except (KeyboardInterrupt, EOFError):
|
|
|
- print("\n再见!")
|
|
|
- break
|
|
|
-
|
|
|
- if choice == "0":
|
|
|
- print("再见!")
|
|
|
- break
|
|
|
-
|
|
|
- elif choice == "1":
|
|
|
- print("\n启动 Agent 单次执行模式...")
|
|
|
- run_agent_once(timeout=3600, auto_close=True)
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
-
|
|
|
- elif choice == "2":
|
|
|
- try:
|
|
|
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
|
|
|
- interval = int(interval_str) if interval_str else 300
|
|
|
- except ValueError:
|
|
|
- interval = 300
|
|
|
-
|
|
|
- print("\n🚀 启动 Agent 循环模式(含自动部署)")
|
|
|
- print(f" 检查间隔: {interval} 秒")
|
|
|
- print(" 自动部署: ✅ 已启用")
|
|
|
- print("\n 任务完成后将自动:")
|
|
|
- print(" - 部署 Python 脚本到生产服务器")
|
|
|
- print(" - 查找并部署相关 n8n 工作流")
|
|
|
- print("\n按 Ctrl+C 停止服务并返回菜单\n")
|
|
|
- ENABLE_AUTO_DEPLOY = True
|
|
|
- try:
|
|
|
- run_agent_loop(interval=interval)
|
|
|
- except KeyboardInterrupt:
|
|
|
- print("\n循环已停止")
|
|
|
-
|
|
|
- elif choice == "3":
|
|
|
- try:
|
|
|
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
|
|
|
- interval = int(interval_str) if interval_str else 300
|
|
|
- except ValueError:
|
|
|
- interval = 300
|
|
|
- print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒")
|
|
|
- print("按 Ctrl+C 停止服务并返回菜单\n")
|
|
|
- ENABLE_AUTO_DEPLOY = False
|
|
|
- try:
|
|
|
- run_agent_loop(interval=interval)
|
|
|
- except KeyboardInterrupt:
|
|
|
- print("\n循环已停止")
|
|
|
-
|
|
|
- elif choice == "4":
|
|
|
- print("\n测试生产服务器连接...")
|
|
|
- if test_ssh_connection():
|
|
|
- print("✅ 连接测试成功")
|
|
|
- else:
|
|
|
- print("❌ 连接测试失败")
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
-
|
|
|
- elif choice == "5":
|
|
|
- print("\n当前任务状态:")
|
|
|
- print("-" * 40)
|
|
|
-
|
|
|
- # 从数据库获取 pending 任务
|
|
|
- pending_tasks = get_pending_tasks()
|
|
|
- print(f" 数据库中 pending 任务: {len(pending_tasks)} 个")
|
|
|
- for task in pending_tasks:
|
|
|
- print(f" - [{task['task_id']}] {task['task_name']}")
|
|
|
-
|
|
|
- # 从本地文件获取 processing 任务
|
|
|
- processing_ids = get_processing_task_ids()
|
|
|
- print(f" 本地 processing 任务: {len(processing_ids)} 个")
|
|
|
- if processing_ids:
|
|
|
- print(f" 任务 ID: {processing_ids}")
|
|
|
-
|
|
|
- # 显示已完成的任务
|
|
|
- if PENDING_TASKS_FILE.exists():
|
|
|
- try:
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- all_local_tasks = json.load(f)
|
|
|
- completed_tasks = [
|
|
|
- t for t in all_local_tasks if t.get("status") == "completed"
|
|
|
- ]
|
|
|
- print(f" 本地 completed 任务: {len(completed_tasks)} 个")
|
|
|
- for task in completed_tasks:
|
|
|
- print(
|
|
|
- f" - [{task.get('task_id')}] {task.get('task_name')} -> {task.get('code_file', 'N/A')}"
|
|
|
- )
|
|
|
- except Exception:
|
|
|
- pass
|
|
|
-
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
-
|
|
|
- elif choice == "6":
|
|
|
- print("\n手动触发任务部署")
|
|
|
- print("-" * 40)
|
|
|
-
|
|
|
- # 显示已完成的任务列表
|
|
|
- if PENDING_TASKS_FILE.exists():
|
|
|
- try:
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- all_tasks = json.load(f)
|
|
|
-
|
|
|
- completed_tasks = [
|
|
|
- t for t in all_tasks if t.get("status") == "completed"
|
|
|
- ]
|
|
|
-
|
|
|
- if not completed_tasks:
|
|
|
- print("没有已完成的任务可供部署")
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
- continue
|
|
|
-
|
|
|
- print("已完成的任务:")
|
|
|
- for idx, task in enumerate(completed_tasks, 1):
|
|
|
- task_id = task.get("task_id", "N/A")
|
|
|
- task_name = task.get("task_name", "未知")
|
|
|
- code_file = task.get("code_file", "N/A")
|
|
|
- print(f" {idx}. [{task_id}] {task_name}")
|
|
|
- print(f" 代码文件: {code_file}")
|
|
|
-
|
|
|
- print("\n 0. 部署全部")
|
|
|
- print(" q. 返回菜单")
|
|
|
-
|
|
|
- try:
|
|
|
- selection = input("\n请选择要部署的任务编号: ").strip().lower()
|
|
|
-
|
|
|
- if selection == "q":
|
|
|
- continue
|
|
|
-
|
|
|
- tasks_to_deploy = []
|
|
|
-
|
|
|
- if selection == "0":
|
|
|
- tasks_to_deploy = completed_tasks
|
|
|
- else:
|
|
|
- try:
|
|
|
- idx = int(selection) - 1
|
|
|
- if 0 <= idx < len(completed_tasks):
|
|
|
- tasks_to_deploy = [completed_tasks[idx]]
|
|
|
- else:
|
|
|
- print("❌ 无效的编号")
|
|
|
- continue
|
|
|
- except ValueError:
|
|
|
- print("❌ 请输入有效的数字")
|
|
|
- continue
|
|
|
-
|
|
|
- if tasks_to_deploy:
|
|
|
- print(f"\n🚀 开始部署 {len(tasks_to_deploy)} 个任务...")
|
|
|
- ENABLE_AUTO_DEPLOY = True
|
|
|
-
|
|
|
- success_count = 0
|
|
|
- for task in tasks_to_deploy:
|
|
|
- if auto_deploy_completed_task(task):
|
|
|
- success_count += 1
|
|
|
-
|
|
|
- print(
|
|
|
- f"\n📊 部署完成: {success_count}/{len(tasks_to_deploy)} 成功"
|
|
|
- )
|
|
|
-
|
|
|
- except (KeyboardInterrupt, EOFError):
|
|
|
- pass
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"❌ 读取任务列表失败: {e}")
|
|
|
- else:
|
|
|
- print("没有本地任务记录")
|
|
|
-
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
-
|
|
|
- elif choice == "7":
|
|
|
- print("\n🔄 强制关闭所有 Agent 会话...")
|
|
|
- if HAS_CURSOR_GUI:
|
|
|
- if force_close_all_agents():
|
|
|
- print("✅ 所有 Agent 会话已关闭")
|
|
|
- else:
|
|
|
- print("⚠️ 关闭过程中可能出现问题,请检查 Cursor 窗口")
|
|
|
- else:
|
|
|
- print("❌ 当前环境不支持 GUI 自动化")
|
|
|
- input("\n按 Enter 键返回菜单...")
|
|
|
-
|
|
|
- else:
|
|
|
- print("❌ 无效的选项,请重新选择")
|
|
|
-
|
|
|
-
|
|
|
-# ============================================================================
|
|
|
-# 主函数
|
|
|
-# ============================================================================
|
|
|
-def main() -> None:
|
|
|
- """主函数"""
|
|
|
- parser = argparse.ArgumentParser(
|
|
|
- description="自动任务执行调度脚本 (Agent 模式)",
|
|
|
- formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
|
- epilog="""
|
|
|
-示例:
|
|
|
- # Agent 单次执行
|
|
|
- python scripts/auto_execute_tasks.py --agent-run
|
|
|
-
|
|
|
- # Agent 循环模式
|
|
|
- python scripts/auto_execute_tasks.py --agent-loop
|
|
|
-
|
|
|
- # Agent 循环模式 + 禁用自动部署
|
|
|
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
|
|
|
-
|
|
|
- # 设置 Agent 超时时间
|
|
|
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
|
|
|
-
|
|
|
- # 立即部署指定任务到生产服务器
|
|
|
- python scripts/auto_execute_tasks.py --deploy-now 123
|
|
|
-
|
|
|
- # 测试生产服务器连接
|
|
|
- python scripts/auto_execute_tasks.py --test-connection
|
|
|
- """,
|
|
|
- )
|
|
|
-
|
|
|
- # Agent 模式参数
|
|
|
- parser.add_argument(
|
|
|
- "--agent-run",
|
|
|
- action="store_true",
|
|
|
- help="Agent 单次执行模式",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--agent-loop",
|
|
|
- action="store_true",
|
|
|
- help="Agent 循环模式",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--agent-timeout",
|
|
|
- type=int,
|
|
|
- default=3600,
|
|
|
- help="Agent 等待任务完成的超时时间(秒),默认 3600",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--interval",
|
|
|
- type=int,
|
|
|
- default=300,
|
|
|
- help="循环模式检查间隔(秒),默认 300",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--no-auto-close",
|
|
|
- action="store_true",
|
|
|
- help="任务完成后不自动关闭 Agent",
|
|
|
- )
|
|
|
-
|
|
|
- # 部署相关参数
|
|
|
- parser.add_argument(
|
|
|
- "--no-deploy",
|
|
|
- action="store_true",
|
|
|
- help="禁用自动部署功能",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--deploy-now",
|
|
|
- type=str,
|
|
|
- metavar="TASK_ID",
|
|
|
- help="立即部署指定任务ID的脚本到生产服务器",
|
|
|
- )
|
|
|
- parser.add_argument(
|
|
|
- "--test-connection",
|
|
|
- action="store_true",
|
|
|
- help="测试到生产服务器的 SSH 连接",
|
|
|
- )
|
|
|
-
|
|
|
- args = parser.parse_args()
|
|
|
-
|
|
|
- global ENABLE_AUTO_DEPLOY
|
|
|
- ENABLE_AUTO_DEPLOY = not args.no_deploy
|
|
|
- auto_close = not args.no_auto_close
|
|
|
-
|
|
|
- # 测试 SSH 连接
|
|
|
- if args.test_connection:
|
|
|
- if test_ssh_connection():
|
|
|
- logger.info("✅ 连接测试成功")
|
|
|
- else:
|
|
|
- logger.error("❌ 连接测试失败")
|
|
|
- return
|
|
|
-
|
|
|
- # 立即部署指定任务
|
|
|
- if args.deploy_now:
|
|
|
- try:
|
|
|
- task_id = int(args.deploy_now)
|
|
|
- logger.info(f"开始部署任务 {task_id}...")
|
|
|
-
|
|
|
- # 从 pending_tasks.json 查找任务信息
|
|
|
- if PENDING_TASKS_FILE.exists():
|
|
|
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
|
|
|
- tasks = json.load(f)
|
|
|
-
|
|
|
- task_found = None
|
|
|
- for t in tasks:
|
|
|
- if t.get("task_id") == task_id:
|
|
|
- task_found = t
|
|
|
- break
|
|
|
-
|
|
|
- if task_found:
|
|
|
- if auto_deploy_completed_task(task_found):
|
|
|
- logger.info(f"✅ 任务 {task_id} 部署成功")
|
|
|
- else:
|
|
|
- logger.error(f"❌ 任务 {task_id} 部署失败")
|
|
|
- else:
|
|
|
- logger.error(f"未找到任务 {task_id}")
|
|
|
- else:
|
|
|
- logger.error("pending_tasks.json 文件不存在")
|
|
|
-
|
|
|
- except ValueError:
|
|
|
- logger.error(f"无效的任务ID: {args.deploy_now}")
|
|
|
- return
|
|
|
-
|
|
|
- # Agent 单次执行
|
|
|
- if args.agent_run:
|
|
|
- success = run_agent_once(
|
|
|
- timeout=args.agent_timeout,
|
|
|
- auto_close=auto_close,
|
|
|
- )
|
|
|
- if success:
|
|
|
- logger.info("✅ Agent 单次执行完成")
|
|
|
- else:
|
|
|
- logger.error("❌ Agent 单次执行失败")
|
|
|
- return
|
|
|
-
|
|
|
- # Agent 循环模式
|
|
|
- if args.agent_loop:
|
|
|
- run_agent_loop(
|
|
|
- interval=args.interval,
|
|
|
- timeout=args.agent_timeout,
|
|
|
- auto_close=auto_close,
|
|
|
- )
|
|
|
- return
|
|
|
-
|
|
|
- # 没有指定任何模式参数时,显示交互式菜单
|
|
|
- if len(sys.argv) == 1:
|
|
|
- show_interactive_menu()
|
|
|
- else:
|
|
|
- # 显示帮助信息
|
|
|
- parser.print_help()
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- main()
|