auto_execute_tasks.py 90 KB


  1. #!/usr/bin/env python3
  2. """
  3. 自动任务执行核心调度脚本 (Agent 模式)
  4. 工作流程:
  5. 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
  6. 2. 生成 tasks/task_execute_instructions.md 执行指令文件
  7. 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
  8. 4. 更新 tasks/task_trigger.txt 触发器文件
  9. 5. 启动新的 Cursor Agent 并发送执行指令
  10. 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
  11. 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
  12. 使用方式:
  13. # Agent 单次执行(执行一次任务后退出)
  14. python scripts/auto_execute_tasks.py --agent-run
  15. # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务)
  16. python scripts/auto_execute_tasks.py --agent-loop
  17. # Agent 循环模式 + 禁用自动部署
  18. python scripts/auto_execute_tasks.py --agent-loop --no-deploy
  19. # 设置 Agent 超时时间(默认 3600 秒)
  20. python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
  21. # 任务完成后不自动关闭 Agent
  22. python scripts/auto_execute_tasks.py --agent-run --no-auto-close
  23. # 立即部署指定任务ID的脚本到生产服务器
  24. python scripts/auto_execute_tasks.py --deploy-now 123
  25. # 测试到生产服务器的 SSH 连接
  26. python scripts/auto_execute_tasks.py --test-connection
  27. """
  28. from __future__ import annotations
  29. import argparse
  30. import contextlib
  31. import json
  32. import logging
  33. import sys
  34. import time
  35. from datetime import datetime
  36. from pathlib import Path
  37. from typing import Any
  38. # ============================================================================
  39. # 日志配置
  40. # ============================================================================
  41. logging.basicConfig(
  42. level=logging.INFO,
  43. format="%(asctime)s - %(levelname)s - %(message)s",
  44. )
  45. logger = logging.getLogger("AutoExecuteTasks")
  46. # ============================================================================
  47. # Windows GUI 自动化依赖(可选)
  48. # ============================================================================
  49. HAS_CURSOR_GUI = False
  50. HAS_PYPERCLIP = False
  51. try:
  52. import pyautogui
  53. import win32con
  54. import win32gui
  55. pyautogui.FAILSAFE = True
  56. pyautogui.PAUSE = 0.5
  57. HAS_CURSOR_GUI = True
  58. try:
  59. import pyperclip
  60. HAS_PYPERCLIP = True
  61. except ImportError:
  62. pass
  63. except ImportError:
  64. logger.info(
  65. "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
  66. "将禁用自动 Cursor Agent 功能。"
  67. )
  68. # ============================================================================
  69. # 全局配置
  70. # ============================================================================
  71. WORKSPACE_ROOT = Path(__file__).parent.parent
  72. TASKS_DIR = WORKSPACE_ROOT / "tasks"
  73. PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
  74. INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
  75. TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
  76. # 生产服务器配置
  77. PRODUCTION_SERVER = {
  78. "host": "192.168.3.143",
  79. "port": 22,
  80. "username": "ubuntu",
  81. "password": "citumxl2357",
  82. "script_path": "/opt/dataops-platform/datafactory/scripts",
  83. "workflow_path": "/opt/dataops-platform/datafactory/workflows", # 工作流 JSON 文件目录
  84. }
  85. # Agent 消息模板
  86. AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
  87. # 命令行参数控制的全局变量
  88. ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
  89. # ============================================================================
  90. # 数据库操作
  91. # ============================================================================
  92. def get_db_connection():
  93. """获取数据库连接(使用 production 环境配置)"""
  94. try:
  95. from urllib.parse import urlparse
  96. import psycopg2
  97. sys.path.insert(0, str(WORKSPACE_ROOT))
  98. from app.config.config import config
  99. # 强制使用 production 环境的数据库配置
  100. app_config = config["production"]
  101. db_uri = app_config.SQLALCHEMY_DATABASE_URI
  102. # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式
  103. parsed = urlparse(db_uri)
  104. conn = psycopg2.connect(
  105. host=parsed.hostname,
  106. port=parsed.port or 5432,
  107. database=parsed.path.lstrip("/"),
  108. user=parsed.username,
  109. password=parsed.password,
  110. )
  111. logger.debug(
  112. f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}"
  113. )
  114. return conn
  115. except ImportError as e:
  116. logger.error(f"导入依赖失败: {e}")
  117. return None
  118. except Exception as e:
  119. logger.error(f"连接数据库失败: {e}")
  120. import traceback
  121. logger.error(traceback.format_exc())
  122. return None
  123. def get_pending_tasks() -> list[dict[str, Any]]:
  124. """
  125. 从 PostgreSQL task_list 表获取所有 pending 状态的任务
  126. 重要:此函数直接查询数据库,确保获取最新的任务列表
  127. """
  128. try:
  129. from psycopg2.extras import RealDictCursor
  130. logger.info("📡 正在连接数据库...")
  131. conn = get_db_connection()
  132. if not conn:
  133. logger.error("❌ 无法获取数据库连接")
  134. return []
  135. logger.info("✅ 数据库连接成功,正在查询 pending 任务...")
  136. cursor = conn.cursor(cursor_factory=RealDictCursor)
  137. cursor.execute(
  138. """
  139. SELECT task_id, task_name, task_description, status,
  140. code_name, code_path, create_time, create_by
  141. FROM task_list
  142. WHERE status = 'pending'
  143. ORDER BY create_time ASC
  144. """
  145. )
  146. tasks = cursor.fetchall()
  147. cursor.close()
  148. conn.close()
  149. task_list = [dict(task) for task in tasks]
  150. logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务")
  151. if task_list:
  152. for task in task_list:
  153. logger.info(f" - 任务 {task['task_id']}: {task['task_name']}")
  154. return task_list
  155. except Exception as e:
  156. logger.error(f"获取 pending 任务失败: {e}")
  157. import traceback
  158. logger.error(traceback.format_exc())
  159. return []
  160. def update_task_status(
  161. task_id: int,
  162. status: str,
  163. code_name: str | None = None,
  164. code_path: str | None = None,
  165. ) -> bool:
  166. """更新任务状态"""
  167. try:
  168. conn = get_db_connection()
  169. if not conn:
  170. return False
  171. cursor = conn.cursor()
  172. if code_name and code_path:
  173. cursor.execute(
  174. """
  175. UPDATE task_list
  176. SET status = %s, code_name = %s, code_path = %s,
  177. update_time = CURRENT_TIMESTAMP
  178. WHERE task_id = %s
  179. """,
  180. (status, code_name, code_path, task_id),
  181. )
  182. else:
  183. cursor.execute(
  184. """
  185. UPDATE task_list
  186. SET status = %s, update_time = CURRENT_TIMESTAMP
  187. WHERE task_id = %s
  188. """,
  189. (status, task_id),
  190. )
  191. conn.commit()
  192. updated = cursor.rowcount > 0
  193. cursor.close()
  194. conn.close()
  195. if updated:
  196. logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
  197. return updated
  198. except Exception as e:
  199. logger.error(f"更新任务状态失败: {e}")
  200. return False
  201. # ============================================================================
  202. # 触发器文件管理
  203. # ============================================================================
  204. def update_trigger_file(
  205. task_count: int = 0,
  206. status: str = "有待执行任务",
  207. task_ids: list[int] | None = None,
  208. ) -> None:
  209. """更新触发器文件 tasks/task_trigger.txt"""
  210. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  211. gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  212. task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
  213. content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
  214. 生成时间: {gen_time}
  215. 状态: {status}
  216. 待处理任务数: {task_count}
  217. 任务ID列表: [{task_ids_str}]
  218. 此文件用于触发Cursor自动执行任务。
  219. 【重要】Cursor AI 请立即执行以下操作:
  220. 1. 读取 tasks/task_execute_instructions.md 文件
  221. 2. 按照文件中的任务描述,依次完成每个任务
  222. 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
  223. """
  224. with TRIGGER_FILE.open("w", encoding="utf-8") as f:
  225. f.write(content)
  226. logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
  227. def get_processing_task_ids() -> list[int]:
  228. """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
  229. if not PENDING_TASKS_FILE.exists():
  230. return []
  231. try:
  232. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  233. tasks = json.load(f)
  234. return [
  235. t.get("task_id")
  236. for t in tasks
  237. if t.get("status") == "processing" and t.get("task_id")
  238. ]
  239. except Exception:
  240. return []
  241. def get_tasks_by_ids(task_ids: list[int]) -> list[dict[str, Any]]:
  242. """
  243. 根据任务 ID 列表从数据库获取任务详细信息
  244. Args:
  245. task_ids: 任务 ID 列表
  246. Returns:
  247. 包含任务详细信息的列表(包括 task_description)
  248. """
  249. if not task_ids:
  250. return []
  251. try:
  252. from psycopg2.extras import RealDictCursor
  253. conn = get_db_connection()
  254. if not conn:
  255. logger.error("无法获取数据库连接")
  256. return []
  257. cursor = conn.cursor(cursor_factory=RealDictCursor)
  258. # 构建 IN 查询
  259. placeholders = ", ".join(["%s"] * len(task_ids))
  260. query = f"""
  261. SELECT task_id, task_name, task_description, status,
  262. code_name, code_path, create_time, create_by
  263. FROM task_list
  264. WHERE task_id IN ({placeholders})
  265. ORDER BY create_time ASC
  266. """
  267. cursor.execute(query, tuple(task_ids))
  268. tasks = cursor.fetchall()
  269. cursor.close()
  270. conn.close()
  271. task_list = [dict(task) for task in tasks]
  272. logger.info(f"从数据库获取了 {len(task_list)} 个任务的详细信息")
  273. return task_list
  274. except Exception as e:
  275. logger.error(f"根据 ID 获取任务失败: {e}")
  276. import traceback
  277. logger.error(traceback.format_exc())
  278. return []
  279. def get_all_tasks_to_execute() -> list[dict[str, Any]]:
  280. """
  281. 获取所有需要执行的任务(包括新的 pending 任务和已有的 processing 任务)
  282. 此函数确保返回的任务列表包含完整信息(特别是 task_description),
  283. 用于生成执行指令文件。
  284. Returns:
  285. 包含所有需要执行任务的完整信息列表
  286. """
  287. # 1. 获取本地 pending_tasks.json 中 processing 状态的任务 ID
  288. processing_ids = get_processing_task_ids()
  289. # 2. 从数据库获取所有 pending 任务
  290. pending_tasks = get_pending_tasks()
  291. pending_ids = [t["task_id"] for t in pending_tasks]
  292. # 3. 合并所有需要查询的任务 ID(去重)
  293. all_task_ids = list(set(processing_ids + pending_ids))
  294. if not all_task_ids:
  295. return []
  296. # 4. 从数据库获取这些任务的完整信息
  297. all_tasks = get_tasks_by_ids(all_task_ids)
  298. logger.info(
  299. f"需要执行的任务: {len(all_tasks)} 个 "
  300. f"(processing: {len(processing_ids)}, pending: {len(pending_ids)})"
  301. )
  302. return all_tasks
  303. # ============================================================================
  304. # 任务文件生成
  305. # ============================================================================
  306. def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
  307. """将任务列表写入 tasks/pending_tasks.json"""
  308. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  309. # 读取现有任务
  310. existing_tasks = []
  311. if PENDING_TASKS_FILE.exists():
  312. try:
  313. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  314. existing_tasks = json.load(f)
  315. except Exception:
  316. existing_tasks = []
  317. existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
  318. # 添加新任务
  319. for task in tasks:
  320. if task["task_id"] not in existing_ids:
  321. task_info = {
  322. "task_id": task["task_id"],
  323. "task_name": task["task_name"],
  324. "code_path": task.get("code_path", ""),
  325. "code_name": task.get("code_name", ""),
  326. "status": "processing",
  327. "notified_at": datetime.now().isoformat(),
  328. "code_file": task.get("code_file", ""),
  329. }
  330. existing_tasks.append(task_info)
  331. with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
  332. json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
  333. logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
  334. def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
  335. """生成任务执行指令文件 tasks/task_execute_instructions.md"""
  336. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  337. with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
  338. f.write("# Cursor 自动任务执行指令\n\n")
  339. f.write("**重要:请立即执行以下任务!**\n\n")
  340. gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  341. f.write(f"**生成时间**: {gen_time}\n\n")
  342. f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
  343. f.write("## 任务完成后的操作\n\n")
  344. f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
  345. f.write("对应任务的 `status` 为 `completed`,\n")
  346. f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
  347. f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
  348. f.write("## 任务约束要求\n\n")
  349. f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
  350. f.write("- 不要创建任何 summary、report、总结类的文档文件\n")
  351. f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n")
  352. f.write("- 只需创建任务要求的功能脚本文件\n")
  353. f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
  354. f.write("---\n\n")
  355. for idx, task in enumerate(tasks, 1):
  356. task_id = task["task_id"]
  357. task_name = task["task_name"]
  358. task_desc = task["task_description"]
  359. create_time = task.get("create_time", "")
  360. if hasattr(create_time, "strftime"):
  361. create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
  362. f.write(f"## 任务 {idx}: {task_name}\n\n")
  363. f.write(f"- **任务ID**: `{task_id}`\n")
  364. f.write(f"- **创建时间**: {create_time}\n")
  365. f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
  366. f.write(f"### 任务描述\n\n{task_desc}\n\n")
  367. f.write("---\n\n")
  368. logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
  369. # ============================================================================
  370. # Neo4j 独立连接(不依赖 Flask 应用上下文)
  371. # ============================================================================
  372. def get_neo4j_driver():
  373. """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
  374. try:
  375. from neo4j import GraphDatabase
  376. sys.path.insert(0, str(WORKSPACE_ROOT))
  377. from app.config.config import config
  378. # 强制使用 production 环境的配置
  379. app_config = config["production"]
  380. uri = app_config.NEO4J_URI
  381. user = app_config.NEO4J_USER
  382. password = app_config.NEO4J_PASSWORD
  383. driver = GraphDatabase.driver(uri, auth=(user, password))
  384. return driver
  385. except ImportError as e:
  386. logger.error(f"导入 Neo4j 驱动失败: {e}")
  387. return None
  388. except Exception as e:
  389. logger.error(f"连接 Neo4j 失败: {e}")
  390. return None
  391. # ============================================================================
  392. # 状态同步
  393. # ============================================================================
  394. def extract_dataflow_name_from_task(task_id: int) -> str | None:
  395. """从任务描述中提取 DataFlow 名称"""
  396. import re
  397. try:
  398. conn = get_db_connection()
  399. if not conn:
  400. return None
  401. cursor = conn.cursor()
  402. cursor.execute(
  403. "SELECT task_description FROM task_list WHERE task_id = %s",
  404. (task_id,),
  405. )
  406. result = cursor.fetchone()
  407. cursor.close()
  408. conn.close()
  409. if not result:
  410. return None
  411. task_desc = result[0]
  412. # 从任务描述中提取 DataFlow Name
  413. match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc)
  414. if match:
  415. dataflow_name = match.group(1).strip()
  416. logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}")
  417. return dataflow_name
  418. return None
  419. except Exception as e:
  420. logger.error(f"提取 DataFlow 名称失败: {e}")
  421. return None
  422. def update_dataflow_script_path(
  423. task_name: str, script_path: str, task_id: int | None = None
  424. ) -> bool:
  425. """更新 DataFlow 节点的 script_path 字段"""
  426. try:
  427. driver = get_neo4j_driver()
  428. if not driver:
  429. logger.error("无法获取 Neo4j 驱动")
  430. return False
  431. # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称
  432. dataflow_name = task_name
  433. if task_id:
  434. extracted_name = extract_dataflow_name_from_task(task_id)
  435. if extracted_name:
  436. dataflow_name = extracted_name
  437. logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}")
  438. query = """
  439. MATCH (n:DataFlow {name_zh: $name_zh})
  440. SET n.script_path = $script_path, n.updated_at = $updated_at
  441. RETURN n
  442. """
  443. updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  444. with driver.session() as session:
  445. result = session.run(
  446. query,
  447. name_zh=dataflow_name,
  448. script_path=script_path,
  449. updated_at=updated_at,
  450. ).single()
  451. driver.close()
  452. if result:
  453. logger.info(
  454. f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
  455. )
  456. return True
  457. else:
  458. logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
  459. return False
  460. except Exception as e:
  461. logger.error(f"更新 DataFlow script_path 失败: {e}")
  462. return False
  463. def sync_completed_tasks_to_db() -> int:
  464. """将 pending_tasks.json 中 completed 的任务同步到数据库"""
  465. if not PENDING_TASKS_FILE.exists():
  466. return 0
  467. try:
  468. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  469. tasks = json.load(f)
  470. except Exception as e:
  471. logger.error(f"读取 pending_tasks.json 失败: {e}")
  472. return 0
  473. if not isinstance(tasks, list):
  474. return 0
  475. updated = 0
  476. remaining_tasks = []
  477. for t in tasks:
  478. if t.get("status") == "completed":
  479. task_id = t.get("task_id")
  480. if not task_id:
  481. continue
  482. task_name = t.get("task_name")
  483. code_path = t.get("code_path")
  484. # 使用 code_file 字段获取实际的脚本文件名
  485. code_file = t.get("code_file", "")
  486. # 统一处理:code_path 始终为 "datafactory/scripts"
  487. code_path = "datafactory/scripts"
  488. # 使用 code_file 判断是否为 Python 脚本
  489. is_python_script = code_file and code_file.endswith(".py")
  490. # 修复路径重复问题:统一处理脚本路径
  491. if is_python_script:
  492. if code_file.startswith(code_path):
  493. # code_file 已经是完整路径
  494. full_script_path = code_file
  495. # 提取纯文件名用于数据库存储
  496. code_file_name = Path(code_file).name
  497. elif "/" in code_file or "\\" in code_file:
  498. # code_file 包含其他路径,提取文件名
  499. code_file_name = Path(code_file).name
  500. full_script_path = f"{code_path}/{code_file_name}"
  501. else:
  502. # code_file 只是文件名
  503. code_file_name = code_file
  504. full_script_path = f"{code_path}/{code_file}"
  505. logger.info(f"任务 {task_id} 使用 Python 脚本: {full_script_path}")
  506. else:
  507. logger.info(
  508. f"任务 {task_id} 的 code_file ({code_file}) 不是 Python 脚本,跳过 DataFlow 更新"
  509. )
  510. code_file_name = code_file
  511. full_script_path = ""
  512. if update_task_status(task_id, "completed", code_file_name, code_path):
  513. updated += 1
  514. logger.info(f"已同步任务 {task_id} 为 completed")
  515. # 只有 Python 脚本才更新 DataFlow 节点的 script_path
  516. if task_name and is_python_script:
  517. if update_dataflow_script_path(
  518. task_name, full_script_path, task_id=task_id
  519. ):
  520. logger.info(
  521. f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
  522. )
  523. else:
  524. logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
  525. # 自动部署到生产服务器(如果启用)
  526. if ENABLE_AUTO_DEPLOY:
  527. logger.info(f"开始自动部署任务 {task_id} 到生产服务器...")
  528. if auto_deploy_completed_task(t):
  529. logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
  530. else:
  531. logger.warning(f"任务 {task_id} 部署到生产服务器失败")
  532. else:
  533. logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署")
  534. else:
  535. remaining_tasks.append(t)
  536. else:
  537. remaining_tasks.append(t)
  538. if updated > 0:
  539. with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
  540. json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
  541. logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
  542. return updated
  543. # ============================================================================
  544. # 生产服务器部署功能
  545. # ============================================================================
  546. def get_ssh_connection():
  547. """获取 SSH 连接到生产服务器"""
  548. try:
  549. import paramiko # type: ignore
  550. ssh = paramiko.SSHClient()
  551. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  552. logger.info(
  553. f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
  554. f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
  555. )
  556. ssh.connect(
  557. hostname=PRODUCTION_SERVER["host"],
  558. port=PRODUCTION_SERVER["port"],
  559. username=PRODUCTION_SERVER["username"],
  560. password=PRODUCTION_SERVER["password"],
  561. timeout=10,
  562. )
  563. logger.info("✅ SSH 连接成功")
  564. return ssh
  565. except ImportError:
  566. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  567. return None
  568. except Exception as e:
  569. logger.error(f"SSH 连接失败: {e}")
  570. return None
  571. def test_ssh_connection() -> bool:
  572. """测试 SSH 连接到生产服务器"""
  573. logger.info("=" * 60)
  574. logger.info("测试生产服务器连接")
  575. logger.info("=" * 60)
  576. ssh = get_ssh_connection()
  577. if not ssh:
  578. logger.error("❌ SSH 连接测试失败")
  579. return False
  580. try:
  581. # 测试执行命令
  582. _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
  583. output = stdout.read().decode().strip()
  584. logger.info(f"✅ 命令执行成功: {output}")
  585. # 检查目标目录是否存在
  586. _, stdout, _ = ssh.exec_command(
  587. f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
  588. )
  589. result = stdout.read().decode().strip()
  590. if result == "exists":
  591. logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
  592. else:
  593. logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
  594. logger.info("将在首次部署时自动创建")
  595. ssh.close()
  596. logger.info("=" * 60)
  597. logger.info("✅ 连接测试完成")
  598. logger.info("=" * 60)
  599. return True
  600. except Exception as e:
  601. logger.error(f"❌ 测试执行命令失败: {e}")
  602. ssh.close()
  603. return False
  604. def deploy_script_to_production(
  605. local_script_path: str, remote_filename: str | None = None
  606. ) -> bool:
  607. """部署脚本文件到生产服务器"""
  608. try:
  609. import importlib.util
  610. if importlib.util.find_spec("paramiko") is None:
  611. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  612. return False
  613. # 转换为绝对路径
  614. local_path = Path(local_script_path)
  615. if not local_path.is_absolute():
  616. local_path = WORKSPACE_ROOT / local_path
  617. if not local_path.exists():
  618. logger.error(f"本地文件不存在: {local_path}")
  619. return False
  620. # 确定远程文件名
  621. if not remote_filename:
  622. remote_filename = local_path.name
  623. remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
  624. # 建立 SSH 连接
  625. ssh = get_ssh_connection()
  626. if not ssh:
  627. return False
  628. try:
  629. # 创建 SFTP 客户端
  630. sftp = ssh.open_sftp()
  631. # 确保远程目录存在
  632. try:
  633. sftp.stat(PRODUCTION_SERVER["script_path"])
  634. except FileNotFoundError:
  635. logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
  636. _, stdout, _ = ssh.exec_command(
  637. f"mkdir -p {PRODUCTION_SERVER['script_path']}"
  638. )
  639. stdout.channel.recv_exit_status()
  640. # 上传文件
  641. logger.info(f"正在上传: {local_path} -> {remote_path}")
  642. sftp.put(str(local_path), remote_path)
  643. # 设置文件权限为可执行
  644. sftp.chmod(remote_path, 0o755)
  645. logger.info(f"✅ 脚本部署成功: {remote_path}")
  646. sftp.close()
  647. ssh.close()
  648. return True
  649. except Exception as e:
  650. logger.error(f"文件传输失败: {e}")
  651. ssh.close()
  652. return False
  653. except ImportError:
  654. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  655. return False
  656. except Exception as e:
  657. logger.error(f"部署脚本失败: {e}")
  658. return False
  659. def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
  660. """
  661. 部署 n8n 工作流到 n8n 服务器
  662. 此函数执行两个步骤:
  663. 1. 通过 n8n API 创建工作流(主要步骤)
  664. 2. 通过 SFTP 备份工作流文件到生产服务器(可选)
  665. """
  666. try:
  667. import json
  668. import requests
  669. # 转换为绝对路径
  670. local_path = Path(workflow_file)
  671. if not local_path.is_absolute():
  672. local_path = WORKSPACE_ROOT / local_path
  673. if not local_path.exists():
  674. logger.error(f"工作流文件不存在: {local_path}")
  675. return False
  676. # 加载工作流 JSON
  677. with open(local_path, encoding="utf-8") as f:
  678. workflow_data = json.load(f)
  679. workflow_name = workflow_data.get("name", local_path.stem)
  680. logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
  681. # 获取 n8n API 配置
  682. try:
  683. sys.path.insert(0, str(WORKSPACE_ROOT))
  684. from app.config.config import BaseConfig
  685. api_url = BaseConfig.N8N_API_URL
  686. api_key = BaseConfig.N8N_API_KEY
  687. timeout = BaseConfig.N8N_API_TIMEOUT
  688. except (ImportError, AttributeError):
  689. import os
  690. api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
  691. api_key = os.environ.get("N8N_API_KEY", "")
  692. timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
  693. if not api_key:
  694. logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
  695. return False
  696. # 准备 API 请求
  697. headers = {
  698. "X-N8N-API-KEY": api_key,
  699. "Content-Type": "application/json",
  700. "Accept": "application/json",
  701. }
  702. # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
  703. workflow_payload = {
  704. "name": workflow_name,
  705. "nodes": workflow_data.get("nodes", []),
  706. "connections": workflow_data.get("connections", {}),
  707. "settings": workflow_data.get("settings", {}),
  708. }
  709. # 调用 n8n API 创建工作流
  710. create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
  711. logger.info(f"调用 n8n API: {create_url}")
  712. try:
  713. response = requests.post(
  714. create_url,
  715. headers=headers,
  716. json=workflow_payload,
  717. timeout=timeout,
  718. )
  719. if response.status_code == 401:
  720. logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
  721. return False
  722. elif response.status_code == 403:
  723. logger.error("n8n API 权限不足")
  724. return False
  725. response.raise_for_status()
  726. created_workflow = response.json()
  727. workflow_id = created_workflow.get("id")
  728. logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
  729. # 可选:将工作流文件备份到生产服务器
  730. try:
  731. _backup_workflow_to_server(local_path)
  732. except Exception as backup_error:
  733. logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}")
  734. return True
  735. except requests.exceptions.Timeout:
  736. logger.error("n8n API 请求超时,请检查网络连接")
  737. return False
  738. except requests.exceptions.ConnectionError:
  739. logger.error(f"无法连接到 n8n 服务器: {api_url}")
  740. return False
  741. except requests.exceptions.HTTPError as e:
  742. error_detail = ""
  743. try:
  744. error_detail = e.response.json()
  745. except Exception:
  746. error_detail = e.response.text
  747. logger.error(
  748. f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
  749. )
  750. return False
  751. except Exception as e:
  752. logger.error(f"部署工作流失败: {e}")
  753. import traceback
  754. logger.error(traceback.format_exc())
  755. return False
  756. def _backup_workflow_to_server(local_path: Path) -> bool:
  757. """备份工作流文件到生产服务器(通过 SFTP)"""
  758. try:
  759. import importlib.util
  760. if importlib.util.find_spec("paramiko") is None:
  761. logger.debug("未安装 paramiko 库,跳过文件备份")
  762. return False
  763. remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
  764. # 建立 SSH 连接
  765. ssh = get_ssh_connection()
  766. if not ssh:
  767. return False
  768. try:
  769. # 创建 SFTP 客户端
  770. sftp = ssh.open_sftp()
  771. # 确保远程目录存在
  772. try:
  773. sftp.stat(PRODUCTION_SERVER["workflow_path"])
  774. except FileNotFoundError:
  775. logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
  776. _, stdout, _ = ssh.exec_command(
  777. f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
  778. )
  779. stdout.channel.recv_exit_status()
  780. # 上传工作流文件
  781. logger.debug(f"备份工作流文件: {local_path} -> {remote_path}")
  782. sftp.put(str(local_path), remote_path)
  783. sftp.close()
  784. ssh.close()
  785. return True
  786. except Exception as e:
  787. logger.warning(f"工作流文件备份失败: {e}")
  788. ssh.close()
  789. return False
  790. except Exception as e:
  791. logger.warning(f"备份工作流失败: {e}")
  792. return False
  793. def find_remote_workflow_files(task_info: dict[str, Any]) -> list[str]:
  794. """
  795. 从生产服务器查找与任务相关的 n8n 工作流文件
  796. 查找策略:
  797. 1. 列出远程 workflow_path 目录下的所有 .json 文件
  798. 2. 根据任务名称或脚本名称匹配相关工作流
  799. Args:
  800. task_info: 任务信息字典
  801. Returns:
  802. 远程工作流文件路径列表
  803. """
  804. remote_files: list[str] = []
  805. code_file = task_info.get("code_file", "")
  806. task_name = task_info.get("task_name", "")
  807. ssh = get_ssh_connection()
  808. if not ssh:
  809. logger.warning("无法连接到生产服务器,跳过远程工作流文件查找")
  810. return remote_files
  811. try:
  812. workflow_path = PRODUCTION_SERVER["workflow_path"]
  813. # 检查目录是否存在
  814. _, stdout, _ = ssh.exec_command(f"test -d {workflow_path} && echo 'exists'")
  815. if stdout.read().decode().strip() != "exists":
  816. logger.info(f"远程工作流目录不存在: {workflow_path}")
  817. ssh.close()
  818. return remote_files
  819. # 列出目录下所有 .json 文件
  820. _, stdout, _ = ssh.exec_command(f"ls -1 {workflow_path}/*.json 2>/dev/null")
  821. file_list = stdout.read().decode().strip().split("\n")
  822. # 过滤有效文件路径
  823. all_json_files = [
  824. f.strip() for f in file_list if f.strip() and f.endswith(".json")
  825. ]
  826. if not all_json_files:
  827. logger.info(f"远程工作流目录 {workflow_path} 中没有 JSON 文件")
  828. ssh.close()
  829. return remote_files
  830. logger.info(f"远程服务器发现 {len(all_json_files)} 个工作流文件")
  831. # 根据任务信息匹配相关工作流
  832. # 构建匹配模式
  833. match_patterns: list[str] = []
  834. # 基于脚本文件名匹配
  835. if code_file and code_file.endswith(".py"):
  836. script_base = code_file[:-3] # 去掉 .py
  837. match_patterns.append(script_base.lower())
  838. # 基于任务名称匹配(针对 DF_DO 格式的任务名)
  839. if task_name:
  840. if task_name.startswith("DF_DO"):
  841. match_patterns.append(task_name.lower())
  842. # 对于中文任务名,尝试提取英文/数字部分
  843. import re
  844. alphanumeric = re.sub(r"[^a-zA-Z0-9_-]", "", task_name)
  845. if alphanumeric and len(alphanumeric) >= 3:
  846. match_patterns.append(alphanumeric.lower())
  847. # 匹配文件
  848. for remote_file in all_json_files:
  849. file_name_lower = Path(remote_file).stem.lower()
  850. # 检查是否与任何模式匹配
  851. matched = False
  852. for pattern in match_patterns:
  853. if pattern in file_name_lower or file_name_lower in pattern:
  854. matched = True
  855. break
  856. if matched and remote_file not in remote_files:
  857. remote_files.append(remote_file)
  858. logger.info(f" 匹配到工作流: {Path(remote_file).name}")
  859. # 如果没有匹配到任何文件,不再自动部署所有文件
  860. # 这样可以避免误部署其他任务的工作流
  861. if not remote_files and all_json_files:
  862. logger.info("没有精确匹配的工作流文件,跳过远程工作流部署")
  863. # 不再自动部署所有文件,避免重复部署问题
  864. ssh.close()
  865. return remote_files
  866. except Exception as e:
  867. logger.error(f"查找远程工作流文件失败: {e}")
  868. if ssh:
  869. ssh.close()
  870. return remote_files
  871. def deploy_remote_workflow_to_n8n(remote_file_path: str) -> bool:
  872. """
  873. 从生产服务器读取工作流 JSON 文件并部署到 n8n 系统
  874. Args:
  875. remote_file_path: 远程服务器上的工作流文件完整路径
  876. Returns:
  877. 是否部署成功
  878. """
  879. try:
  880. import requests
  881. ssh = get_ssh_connection()
  882. if not ssh:
  883. logger.error("无法连接到生产服务器")
  884. return False
  885. # 读取远程工作流文件内容
  886. logger.info(f"从远程服务器读取工作流: {remote_file_path}")
  887. _, stdout, stderr = ssh.exec_command(f"cat {remote_file_path}")
  888. file_content = stdout.read().decode("utf-8")
  889. error_output = stderr.read().decode()
  890. if error_output:
  891. logger.error(f"读取远程文件失败: {error_output}")
  892. ssh.close()
  893. return False
  894. ssh.close()
  895. # 解析工作流 JSON
  896. try:
  897. workflow_data = json.loads(file_content)
  898. except json.JSONDecodeError as e:
  899. logger.error(f"解析工作流 JSON 失败: {e}")
  900. return False
  901. workflow_name = workflow_data.get("name", Path(remote_file_path).stem)
  902. logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
  903. # 获取 n8n API 配置
  904. try:
  905. sys.path.insert(0, str(WORKSPACE_ROOT))
  906. from app.config.config import BaseConfig
  907. api_url = BaseConfig.N8N_API_URL
  908. api_key = BaseConfig.N8N_API_KEY
  909. timeout = BaseConfig.N8N_API_TIMEOUT
  910. except (ImportError, AttributeError):
  911. import os
  912. api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
  913. api_key = os.environ.get("N8N_API_KEY", "")
  914. timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
  915. if not api_key:
  916. logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
  917. return False
  918. # 准备 API 请求
  919. headers = {
  920. "X-N8N-API-KEY": api_key,
  921. "Content-Type": "application/json",
  922. "Accept": "application/json",
  923. }
  924. # 准备工作流数据
  925. workflow_payload = {
  926. "name": workflow_name,
  927. "nodes": workflow_data.get("nodes", []),
  928. "connections": workflow_data.get("connections", {}),
  929. "settings": workflow_data.get("settings", {}),
  930. }
  931. # 先检查是否已存在同名工作流
  932. list_url = f"{api_url.rstrip('/')}/api/v1/workflows"
  933. try:
  934. list_response = requests.get(
  935. list_url,
  936. headers=headers,
  937. timeout=timeout,
  938. )
  939. if list_response.status_code == 200:
  940. existing_workflows = list_response.json().get("data", [])
  941. existing_wf = None
  942. for wf in existing_workflows:
  943. if wf.get("name") == workflow_name:
  944. existing_wf = wf
  945. break
  946. if existing_wf:
  947. # 已存在同名工作流,跳过创建避免重复
  948. workflow_id = existing_wf.get("id")
  949. logger.info(
  950. f"发现已存在的工作流 (ID: {workflow_id}),跳过部署避免重复"
  951. )
  952. logger.info(
  953. "如需更新工作流,请手动在 n8n 控制台操作或删除后重新部署"
  954. )
  955. return True # 返回成功,因为工作流已存在
  956. except requests.exceptions.RequestException as e:
  957. logger.warning(f"检查已存在工作流时出错: {e}")
  958. # 调用 n8n API 创建工作流
  959. create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
  960. logger.info(f"调用 n8n API 创建工作流: {create_url}")
  961. try:
  962. response = requests.post(
  963. create_url,
  964. headers=headers,
  965. json=workflow_payload,
  966. timeout=timeout,
  967. )
  968. if response.status_code == 401:
  969. logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
  970. return False
  971. elif response.status_code == 403:
  972. logger.error("n8n API 权限不足")
  973. return False
  974. response.raise_for_status()
  975. created_workflow = response.json()
  976. workflow_id = created_workflow.get("id")
  977. logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
  978. return True
  979. except requests.exceptions.Timeout:
  980. logger.error("n8n API 请求超时,请检查网络连接")
  981. return False
  982. except requests.exceptions.ConnectionError:
  983. logger.error(f"无法连接到 n8n 服务器: {api_url}")
  984. return False
  985. except requests.exceptions.HTTPError as e:
  986. error_detail = ""
  987. try:
  988. error_detail = e.response.json()
  989. except Exception:
  990. error_detail = e.response.text
  991. logger.error(
  992. f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
  993. )
  994. return False
  995. except Exception as e:
  996. logger.error(f"从远程服务器部署工作流失败: {e}")
  997. import traceback
  998. logger.error(traceback.format_exc())
  999. return False
  1000. def find_related_workflow_files(
  1001. task_info: dict[str, Any],
  1002. ) -> list[Path]:
  1003. """
  1004. 查找与任务相关的所有 n8n 工作流文件
  1005. 查找策略:
  1006. 1. 与脚本同目录的工作流文件 (n8n_workflow_*.json)
  1007. 2. datafactory/n8n_workflows 目录下的工作流文件
  1008. 3. 根据任务名称模式匹配
  1009. 4. 根据脚本名称匹配 (去掉 .py 后缀)
  1010. 5. 根据任务 ID 匹配
  1011. 6. 最近修改的工作流文件 (在任务创建后修改的)
  1012. """
  1013. workflow_files: list[Path] = []
  1014. code_name = task_info.get("code_name", "")
  1015. code_path = task_info.get("code_path", "datafactory/scripts")
  1016. task_name = task_info.get("task_name", "")
  1017. task_id = task_info.get("task_id")
  1018. # 获取任务通知时间用于判断文件是否是新创建的
  1019. notified_at_str = task_info.get("notified_at", "")
  1020. notified_at = None
  1021. if notified_at_str:
  1022. with contextlib.suppress(ValueError, TypeError):
  1023. notified_at = datetime.fromisoformat(notified_at_str.replace("Z", "+00:00"))
  1024. # 查找模式1: 与脚本同目录的工作流文件
  1025. script_dir = WORKSPACE_ROOT / code_path
  1026. if script_dir.exists() and script_dir.is_dir():
  1027. for wf_file in script_dir.glob("n8n_workflow_*.json"):
  1028. if wf_file.is_file() and wf_file not in workflow_files:
  1029. workflow_files.append(wf_file)
  1030. # 也查找以 workflow_ 开头的文件
  1031. for wf_file in script_dir.glob("workflow_*.json"):
  1032. if wf_file.is_file() and wf_file not in workflow_files:
  1033. workflow_files.append(wf_file)
  1034. # 查找模式2: datafactory/n8n_workflows 目录
  1035. n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
  1036. if n8n_workflows_dir.exists():
  1037. for wf_file in n8n_workflows_dir.glob("*.json"):
  1038. if wf_file.is_file() and wf_file not in workflow_files:
  1039. workflow_files.append(wf_file)
  1040. # 查找模式3: 根据任务名称匹配
  1041. if task_name and task_name != "未知任务":
  1042. # 尝试多种名称变体
  1043. name_patterns = [
  1044. task_name.replace(" ", "_").lower(),
  1045. task_name.replace(" ", "-").lower(),
  1046. task_name.lower(),
  1047. ]
  1048. for pattern in name_patterns:
  1049. if len(pattern) < 3: # 跳过过短的模式
  1050. continue
  1051. for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*{pattern}*.json"):
  1052. # 验证是文件、未添加过、且是有效的 n8n 工作流文件
  1053. if (
  1054. wf_file.is_file()
  1055. and wf_file not in workflow_files
  1056. and _is_n8n_workflow_file(wf_file)
  1057. ):
  1058. workflow_files.append(wf_file)
  1059. # 查找模式4: 根据脚本名称匹配
  1060. if code_name and code_name.endswith(".py"):
  1061. script_base_name = code_name[:-3] # 去掉 .py
  1062. # 在 datafactory 目录下查找
  1063. for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
  1064. f"*{script_base_name}*.json"
  1065. ):
  1066. if (
  1067. wf_file.is_file()
  1068. and wf_file not in workflow_files
  1069. and _is_n8n_workflow_file(wf_file)
  1070. ):
  1071. workflow_files.append(wf_file)
  1072. # 查找模式5: 根据任务 ID 匹配
  1073. if task_id:
  1074. for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*task_{task_id}*.json"):
  1075. if (
  1076. wf_file.is_file()
  1077. and wf_file not in workflow_files
  1078. and _is_n8n_workflow_file(wf_file)
  1079. ):
  1080. workflow_files.append(wf_file)
  1081. # 查找模式6: 最近修改的工作流文件(在任务创建后修改的)
  1082. if notified_at:
  1083. for wf_file in (WORKSPACE_ROOT / "datafactory").rglob("*.json"):
  1084. if wf_file.is_file() and wf_file not in workflow_files:
  1085. try:
  1086. mtime = datetime.fromtimestamp(wf_file.stat().st_mtime)
  1087. # 如果文件在任务通知后被修改,可能是相关的工作流
  1088. if mtime > notified_at.replace(
  1089. tzinfo=None
  1090. ) and _is_n8n_workflow_file(wf_file):
  1091. workflow_files.append(wf_file)
  1092. logger.debug(f"发现最近修改的工作流: {wf_file.name}")
  1093. except (OSError, ValueError):
  1094. pass
  1095. return workflow_files
  1096. def _is_n8n_workflow_file(file_path: Path) -> bool:
  1097. """
  1098. 检查文件是否是有效的 n8n 工作流文件
  1099. 通过检查 JSON 结构来验证
  1100. """
  1101. try:
  1102. with open(file_path, encoding="utf-8") as f:
  1103. data = json.load(f)
  1104. # n8n 工作流文件通常包含 nodes 和 connections 字段
  1105. if isinstance(data, dict):
  1106. has_nodes = "nodes" in data
  1107. has_connections = "connections" in data
  1108. has_name = "name" in data
  1109. # 至少需要有 nodes 或符合 n8n 工作流特征
  1110. return has_nodes or (has_name and has_connections)
  1111. return False
  1112. except (json.JSONDecodeError, OSError):
  1113. return False
  1114. def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
  1115. """
  1116. 自动部署已完成任务的脚本和工作流到生产服务器
  1117. 部署流程:
  1118. 1. 部署 Python 脚本到生产服务器 (通过 SFTP)
  1119. 2. 查找并部署相关的 n8n 工作流 (通过 n8n API)
  1120. 3. 记录部署结果
  1121. """
  1122. # 优先使用 code_file 字段,其次使用 code_name
  1123. code_file = task_info.get("code_file", "")
  1124. code_name = task_info.get("code_name", "")
  1125. code_path = task_info.get("code_path", "datafactory/scripts")
  1126. task_name = task_info.get("task_name", "未知任务")
  1127. task_id = task_info.get("task_id", "N/A")
  1128. # 确定实际的脚本文件名:优先使用 code_file,如果为空则尝试 code_name
  1129. actual_script_file = code_file if code_file else code_name
  1130. if not actual_script_file or not code_path:
  1131. logger.warning(f"任务 {task_name} (ID: {task_id}) 缺少代码文件信息,跳过部署")
  1132. return False
  1133. logger.info("=" * 60)
  1134. logger.info(f"🚀 开始自动部署任务: {task_name} (ID: {task_id})")
  1135. logger.info("=" * 60)
  1136. deploy_results = {
  1137. "script_deployed": False,
  1138. "workflows_found": 0,
  1139. "workflows_deployed": 0,
  1140. "workflows_failed": 0,
  1141. }
  1142. # 1. 部署 Python 脚本
  1143. if actual_script_file.endswith(".py"):
  1144. # 修复路径重复问题:如果 actual_script_file 已经包含 code_path,则只使用 actual_script_file
  1145. # 否则拼接 code_path 和 actual_script_file
  1146. if actual_script_file.startswith(code_path):
  1147. # actual_script_file 已经是完整路径,如 "datafactory/scripts/task_41_xxx.py"
  1148. script_path = actual_script_file
  1149. elif "/" in actual_script_file or "\\" in actual_script_file:
  1150. # actual_script_file 包含路径分隔符但不以 code_path 开头
  1151. # 可能是其他格式的路径,提取文件名后拼接
  1152. script_filename = Path(actual_script_file).name
  1153. script_path = f"{code_path}/{script_filename}"
  1154. else:
  1155. # actual_script_file 只是文件名,正常拼接
  1156. script_path = f"{code_path}/{actual_script_file}"
  1157. logger.info(f"📦 部署 Python 脚本: {script_path}")
  1158. if deploy_script_to_production(script_path):
  1159. logger.info(f"✅ 脚本 {actual_script_file} 部署成功")
  1160. deploy_results["script_deployed"] = True
  1161. else:
  1162. logger.error(f"❌ 脚本 {actual_script_file} 部署失败")
  1163. # 2. 查找并部署相关的 n8n 工作流文件
  1164. # 2.1 首先从本地查找工作流文件
  1165. logger.info("🔍 查找本地 n8n 工作流文件...")
  1166. workflow_files = find_related_workflow_files(task_info)
  1167. if workflow_files:
  1168. logger.info(f"📋 本地发现 {len(workflow_files)} 个相关工作流文件:")
  1169. for wf_file in workflow_files:
  1170. logger.info(f" - {wf_file.relative_to(WORKSPACE_ROOT)}")
  1171. for wf_file in workflow_files:
  1172. logger.info(f"🔄 部署本地工作流: {wf_file.name}")
  1173. if deploy_n8n_workflow_to_production(str(wf_file)):
  1174. logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
  1175. deploy_results["workflows_deployed"] += 1
  1176. else:
  1177. logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
  1178. deploy_results["workflows_failed"] += 1
  1179. else:
  1180. logger.info("ℹ️ 本地未发现相关工作流文件")
  1181. # 2.2 然后从生产服务器查找并部署工作流文件
  1182. logger.info("🔍 查找生产服务器上的 n8n 工作流文件...")
  1183. remote_workflow_files = find_remote_workflow_files(task_info)
  1184. if remote_workflow_files:
  1185. logger.info(f"📋 远程服务器发现 {len(remote_workflow_files)} 个相关工作流文件:")
  1186. for remote_file in remote_workflow_files:
  1187. logger.info(f" - {Path(remote_file).name}")
  1188. for remote_file in remote_workflow_files:
  1189. logger.info(f"🔄 部署远程工作流: {Path(remote_file).name}")
  1190. if deploy_remote_workflow_to_n8n(remote_file):
  1191. logger.info(f"✅ 远程工作流 {Path(remote_file).name} 部署成功")
  1192. deploy_results["workflows_deployed"] += 1
  1193. else:
  1194. logger.error(f"❌ 远程工作流 {Path(remote_file).name} 部署失败")
  1195. deploy_results["workflows_failed"] += 1
  1196. else:
  1197. logger.info("ℹ️ 远程服务器未发现相关工作流文件")
  1198. # 更新发现的工作流总数
  1199. deploy_results["workflows_found"] = len(workflow_files) + len(remote_workflow_files)
  1200. # 3. 汇总部署结果
  1201. logger.info("=" * 60)
  1202. logger.info(f"📊 部署结果汇总 - 任务: {task_name} (ID: {task_id})")
  1203. logger.info("-" * 40)
  1204. logger.info(
  1205. f" 脚本部署: {'✅ 成功' if deploy_results['script_deployed'] else '❌ 失败或跳过'}"
  1206. )
  1207. logger.info(f" 发现工作流: {deploy_results['workflows_found']} 个")
  1208. logger.info(f" 工作流部署成功: {deploy_results['workflows_deployed']} 个")
  1209. logger.info(f" 工作流部署失败: {deploy_results['workflows_failed']} 个")
  1210. # 判断整体部署是否成功
  1211. deploy_success = (
  1212. deploy_results["script_deployed"] and deploy_results["workflows_failed"] == 0
  1213. )
  1214. if deploy_success:
  1215. logger.info(f"✅ 任务 {task_name} 部署完成!")
  1216. elif deploy_results["script_deployed"]:
  1217. if deploy_results["workflows_failed"] > 0:
  1218. logger.warning(f"⚠️ 任务 {task_name} 脚本部署成功,但部分工作流部署失败")
  1219. else:
  1220. logger.info(f"✅ 任务 {task_name} 脚本部署成功")
  1221. deploy_success = True # 脚本部署成功就认为整体成功
  1222. else:
  1223. logger.error(f"❌ 任务 {task_name} 部署失败")
  1224. logger.info("=" * 60)
  1225. return deploy_success
  1226. # ============================================================================
  1227. # Cursor Agent 自动化
  1228. # ============================================================================
  1229. # Agent 会话状态
  1230. AGENT_SESSION_ACTIVE: bool = False
  1231. AGENT_START_TIME: float = 0
  1232. def get_all_cursor_windows() -> list[dict[str, Any]]:
  1233. """获取所有 Cursor 窗口信息"""
  1234. if not HAS_CURSOR_GUI:
  1235. return []
  1236. cursor_windows: list[dict[str, Any]] = []
  1237. def enum_windows_callback(hwnd, _extra):
  1238. if win32gui.IsWindowVisible(hwnd):
  1239. title = win32gui.GetWindowText(hwnd) or ""
  1240. class_name = win32gui.GetClassName(hwnd) or ""
  1241. is_cursor = "cursor" in title.lower()
  1242. if class_name and "chrome_widgetwin" in class_name.lower():
  1243. is_cursor = True
  1244. if is_cursor:
  1245. left, top, right, bottom = win32gui.GetWindowRect(hwnd)
  1246. area = (right - left) * (bottom - top)
  1247. cursor_windows.append(
  1248. {
  1249. "hwnd": hwnd,
  1250. "title": title,
  1251. "class_name": class_name,
  1252. "area": area,
  1253. }
  1254. )
  1255. return True
  1256. win32gui.EnumWindows(enum_windows_callback, None)
  1257. return cursor_windows
  1258. def find_cursor_window() -> int | None:
  1259. """查找 Cursor 主窗口句柄"""
  1260. if not HAS_CURSOR_GUI:
  1261. return None
  1262. cursor_windows = get_all_cursor_windows()
  1263. if not cursor_windows:
  1264. logger.warning("未找到 Cursor 窗口")
  1265. return None
  1266. # 按面积排序,返回最大的窗口(主窗口)
  1267. cursor_windows.sort(key=lambda x: x["area"], reverse=True)
  1268. return cursor_windows[0]["hwnd"]
  1269. def activate_window(hwnd: int) -> bool:
  1270. """
  1271. 激活指定窗口
  1272. Windows 对 SetForegroundWindow 有限制,只有满足以下条件之一才能成功:
  1273. 1. 调用进程是前台进程
  1274. 2. 调用进程由前台进程启动
  1275. 3. 目标窗口属于前台进程
  1276. 4. 没有其他窗口在前台
  1277. 此函数使用多种技巧绕过这些限制。
  1278. """
  1279. if not HAS_CURSOR_GUI:
  1280. return False
  1281. try:
  1282. # 方法1: 使用 AttachThreadInput 技巧绕过 SetForegroundWindow 限制
  1283. # 这是最可靠的方法,通过将当前线程附加到前台窗口的线程来获取激活权限
  1284. import ctypes
  1285. user32 = ctypes.windll.user32
  1286. # 获取当前前台窗口的线程ID
  1287. foreground_hwnd = user32.GetForegroundWindow()
  1288. foreground_thread_id = user32.GetWindowThreadProcessId(foreground_hwnd, None)
  1289. # 获取当前线程ID
  1290. current_thread_id = ctypes.windll.kernel32.GetCurrentThreadId()
  1291. attached = False
  1292. # 如果当前线程不是前台线程,则附加到前台线程
  1293. if current_thread_id != foreground_thread_id:
  1294. attached = user32.AttachThreadInput(
  1295. current_thread_id, foreground_thread_id, True
  1296. )
  1297. try:
  1298. # 先确保窗口不是最小化状态
  1299. if win32gui.IsIconic(hwnd):
  1300. win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
  1301. time.sleep(0.2)
  1302. # 使用 BringWindowToTop 将窗口置顶
  1303. user32.BringWindowToTop(hwnd)
  1304. # 显示窗口
  1305. win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
  1306. # 尝试 SetForegroundWindow
  1307. result = user32.SetForegroundWindow(hwnd)
  1308. if not result:
  1309. # 方法2: 使用 Alt 键模拟技巧
  1310. # 发送一个 Alt 键可以让系统认为用户有交互意图
  1311. # 定义必要的常量
  1312. KEYEVENTF_EXTENDEDKEY = 0x0001
  1313. KEYEVENTF_KEYUP = 0x0002
  1314. VK_MENU = 0x12 # Alt 键
  1315. # 模拟按下和释放 Alt 键
  1316. user32.keybd_event(VK_MENU, 0, KEYEVENTF_EXTENDEDKEY, 0)
  1317. user32.keybd_event(
  1318. VK_MENU, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0
  1319. )
  1320. time.sleep(0.1)
  1321. # 再次尝试
  1322. result = user32.SetForegroundWindow(hwnd)
  1323. if not result:
  1324. # 方法3: 使用 ShowWindow 配合 SW_SHOWDEFAULT
  1325. win32gui.ShowWindow(hwnd, win32con.SW_SHOWDEFAULT)
  1326. time.sleep(0.1)
  1327. result = user32.SetForegroundWindow(hwnd)
  1328. if not result:
  1329. # 方法4: 使用 SetWindowPos 将窗口置于最顶层
  1330. SWP_NOMOVE = 0x0002
  1331. SWP_NOSIZE = 0x0001
  1332. SWP_SHOWWINDOW = 0x0040
  1333. HWND_TOPMOST = -1
  1334. HWND_NOTOPMOST = -2
  1335. # 先设为最顶层
  1336. user32.SetWindowPos(
  1337. hwnd,
  1338. HWND_TOPMOST,
  1339. 0,
  1340. 0,
  1341. 0,
  1342. 0,
  1343. SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
  1344. )
  1345. time.sleep(0.1)
  1346. # 再取消最顶层(但窗口仍在前台)
  1347. user32.SetWindowPos(
  1348. hwnd,
  1349. HWND_NOTOPMOST,
  1350. 0,
  1351. 0,
  1352. 0,
  1353. 0,
  1354. SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
  1355. )
  1356. result = user32.SetForegroundWindow(hwnd)
  1357. time.sleep(0.3)
  1358. # 验证是否成功
  1359. current_foreground = user32.GetForegroundWindow()
  1360. if current_foreground == hwnd:
  1361. logger.debug("窗口激活成功")
  1362. return True
  1363. else:
  1364. # 即使 SetForegroundWindow 返回失败,窗口可能已经被置顶并可见
  1365. # 检查窗口是否可见且不是最小化
  1366. if win32gui.IsWindowVisible(hwnd) and not win32gui.IsIconic(hwnd):
  1367. logger.warning("窗口可能未完全激活到前台,但窗口可见,继续执行...")
  1368. return True
  1369. else:
  1370. logger.error("激活窗口失败: 窗口不在前台")
  1371. return False
  1372. finally:
  1373. # 分离线程
  1374. if attached:
  1375. user32.AttachThreadInput(current_thread_id, foreground_thread_id, False)
  1376. except Exception as e:
  1377. logger.error(f"激活窗口失败: {e}")
  1378. # 最后的备用方案:直接尝试基本操作
  1379. try:
  1380. win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
  1381. win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
  1382. time.sleep(0.3)
  1383. # 即使失败也返回 True,让调用者继续尝试
  1384. if win32gui.IsWindowVisible(hwnd):
  1385. logger.warning("使用备用方案激活窗口,继续执行...")
  1386. return True
  1387. except Exception:
  1388. pass
  1389. return False
  1390. def open_new_agent() -> bool:
  1391. """在 Cursor 中打开新的 Agent 窗口"""
  1392. global AGENT_SESSION_ACTIVE, AGENT_START_TIME
  1393. if not HAS_CURSOR_GUI:
  1394. logger.warning("当前环境不支持 Cursor GUI 自动化")
  1395. return False
  1396. hwnd = find_cursor_window()
  1397. if not hwnd:
  1398. return False
  1399. if not activate_window(hwnd):
  1400. return False
  1401. try:
  1402. # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
  1403. logger.info("正在打开新的 Agent...")
  1404. pyautogui.hotkey("ctrl", "shift", "i")
  1405. time.sleep(2.0) # 等待 Agent 窗口打开
  1406. AGENT_SESSION_ACTIVE = True
  1407. AGENT_START_TIME = time.time()
  1408. logger.info("✅ 新的 Agent 已打开")
  1409. return True
  1410. except Exception as e:
  1411. logger.error(f"打开 Agent 失败: {e}")
  1412. return False
  1413. def close_current_agent(force: bool = False, max_retries: int = 3) -> bool:
  1414. """
  1415. 关闭当前的 Agent 会话
  1416. Args:
  1417. force: 是否强制关闭(使用多种方法)
  1418. max_retries: 最大重试次数
  1419. 关闭策略:
  1420. 1. 使用 Escape 键关闭 Agent 面板
  1421. 2. 如果失败,尝试 Ctrl+Shift+I 切换 Agent 面板
  1422. 3. 如果仍失败,尝试点击空白区域并按 Escape
  1423. """
  1424. global AGENT_SESSION_ACTIVE
  1425. if not HAS_CURSOR_GUI:
  1426. AGENT_SESSION_ACTIVE = False
  1427. return False
  1428. if not AGENT_SESSION_ACTIVE and not force:
  1429. logger.info("没有活动的 Agent 会话")
  1430. return True
  1431. logger.info("🔄 正在关闭 Agent...")
  1432. for attempt in range(max_retries):
  1433. try:
  1434. hwnd = find_cursor_window()
  1435. if not hwnd:
  1436. logger.warning("未找到 Cursor 窗口")
  1437. AGENT_SESSION_ACTIVE = False
  1438. return False
  1439. if not activate_window(hwnd):
  1440. logger.warning(f"激活窗口失败 (尝试 {attempt + 1}/{max_retries})")
  1441. time.sleep(0.5)
  1442. continue
  1443. # 方法1: 按 Escape 键关闭 Agent
  1444. logger.debug(f"尝试方法1: Escape 键 (尝试 {attempt + 1}/{max_retries})")
  1445. pyautogui.press("escape")
  1446. time.sleep(0.3)
  1447. pyautogui.press("escape")
  1448. time.sleep(0.3)
  1449. # 方法2: 使用 Ctrl+Shift+I 切换 Agent 面板(关闭)
  1450. if force or attempt > 0:
  1451. logger.debug("尝试方法2: Ctrl+Shift+I 切换")
  1452. pyautogui.hotkey("ctrl", "shift", "i")
  1453. time.sleep(0.5)
  1454. # 方法3: 点击编辑器区域并按 Escape
  1455. if force or attempt > 1:
  1456. logger.debug("尝试方法3: 点击编辑器区域")
  1457. # 获取窗口位置,点击中心偏左位置(编辑器区域)
  1458. try:
  1459. left, top, right, bottom = win32gui.GetWindowRect(hwnd)
  1460. center_x = left + (right - left) // 3 # 偏左1/3位置
  1461. center_y = top + (bottom - top) // 2
  1462. pyautogui.click(center_x, center_y)
  1463. time.sleep(0.2)
  1464. pyautogui.press("escape")
  1465. time.sleep(0.3)
  1466. except Exception as click_err:
  1467. logger.debug(f"点击方法失败: {click_err}")
  1468. AGENT_SESSION_ACTIVE = False
  1469. logger.info("✅ Agent 已关闭")
  1470. return True
  1471. except Exception as e:
  1472. logger.warning(f"关闭 Agent 尝试 {attempt + 1} 失败: {e}")
  1473. time.sleep(0.5)
  1474. # 即使关闭失败,也标记为非活动状态,避免状态不一致
  1475. AGENT_SESSION_ACTIVE = False
  1476. logger.warning("⚠️ Agent 关闭可能未完全成功,但已重置状态")
  1477. return False
  1478. def force_close_all_agents() -> bool:
  1479. """
  1480. 强制关闭所有可能的 Agent 会话
  1481. 用于清理可能遗留的多个 Agent 窗口
  1482. """
  1483. global AGENT_SESSION_ACTIVE
  1484. if not HAS_CURSOR_GUI:
  1485. return False
  1486. logger.info("🔄 强制关闭所有 Agent 会话...")
  1487. try:
  1488. hwnd = find_cursor_window()
  1489. if not hwnd:
  1490. AGENT_SESSION_ACTIVE = False
  1491. return True
  1492. if not activate_window(hwnd):
  1493. AGENT_SESSION_ACTIVE = False
  1494. return False
  1495. # 连续按多次 Escape 确保关闭所有面板
  1496. for _ in range(5):
  1497. pyautogui.press("escape")
  1498. time.sleep(0.2)
  1499. # 使用快捷键关闭可能的 Agent 面板
  1500. pyautogui.hotkey("ctrl", "shift", "i")
  1501. time.sleep(0.3)
  1502. pyautogui.hotkey("ctrl", "shift", "i")
  1503. time.sleep(0.3)
  1504. AGENT_SESSION_ACTIVE = False
  1505. logger.info("✅ 所有 Agent 会话已关闭")
  1506. return True
  1507. except Exception as e:
  1508. logger.error(f"强制关闭 Agent 失败: {e}")
  1509. AGENT_SESSION_ACTIVE = False
  1510. return False
  1511. def type_message_to_agent(message: str) -> bool:
  1512. """向 Agent 输入消息"""
  1513. if not HAS_CURSOR_GUI:
  1514. return False
  1515. try:
  1516. # 等待 Agent 输入框获得焦点
  1517. time.sleep(0.5)
  1518. # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
  1519. if HAS_PYPERCLIP:
  1520. try:
  1521. pyperclip.copy(message)
  1522. pyautogui.hotkey("ctrl", "v")
  1523. time.sleep(0.5)
  1524. except Exception:
  1525. # 回退到逐字符输入
  1526. pyautogui.write(message, interval=0.03)
  1527. else:
  1528. pyautogui.write(message, interval=0.03)
  1529. time.sleep(0.3)
  1530. # 按 Enter 发送消息
  1531. pyautogui.press("enter")
  1532. logger.info("✅ 消息已发送到 Agent")
  1533. return True
  1534. except Exception as e:
  1535. logger.error(f"发送消息到 Agent 失败: {e}")
  1536. return False
  1537. def wait_for_agent_completion(
  1538. timeout: int = 3600,
  1539. check_interval: int = 30,
  1540. ) -> bool:
  1541. """
  1542. 等待 Agent 完成任务
  1543. 通过检查 pending_tasks.json 中的任务状态来判断是否完成
  1544. """
  1545. start_time = time.time()
  1546. logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...")
  1547. while time.time() - start_time < timeout:
  1548. processing_ids = get_processing_task_ids()
  1549. if not processing_ids:
  1550. elapsed = int(time.time() - start_time)
  1551. logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
  1552. return True
  1553. remaining = len(processing_ids)
  1554. elapsed = int(time.time() - start_time)
  1555. logger.info(
  1556. f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
  1557. )
  1558. time.sleep(check_interval)
  1559. logger.warning("等待超时,仍有未完成的任务")
  1560. return False
  1561. def run_agent_once(
  1562. timeout: int = 3600,
  1563. auto_close: bool = True,
  1564. ) -> bool:
  1565. """
  1566. 执行一次 Agent 任务
  1567. 流程:
  1568. 1. 同步已完成任务到数据库
  1569. 2. 从数据库读取 pending 任务
  1570. 3. 更新任务状态为 processing
  1571. 4. 生成执行指令文件(包含所有 processing 任务)
  1572. 5. 打开 Agent 并发送消息
  1573. 6. 等待任务完成
  1574. 7. 同步完成任务 + 自动部署
  1575. 8. 关闭 Agent
  1576. """
  1577. logger.info("=" * 60)
  1578. logger.info("Agent 单次执行模式")
  1579. logger.info("=" * 60)
  1580. # 1. 先同步已完成任务
  1581. sync_completed_tasks_to_db()
  1582. # 2. 从数据库获取 pending 任务
  1583. logger.info("正在从数据库查询 pending 任务...")
  1584. pending_tasks = get_pending_tasks()
  1585. # 3. 获取当前 processing 任务
  1586. processing_ids = get_processing_task_ids()
  1587. # 4. 检查是否有任务需要执行
  1588. if not pending_tasks and not processing_ids:
  1589. logger.info("✅ 没有待执行的任务")
  1590. return True
  1591. if pending_tasks:
  1592. logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务")
  1593. # 5. 更新新任务状态为 processing
  1594. for task in pending_tasks:
  1595. update_task_status(task["task_id"], "processing")
  1596. # 6. 写入 pending_tasks.json
  1597. write_pending_tasks_json(pending_tasks)
  1598. if processing_ids:
  1599. logger.info(f"发现 {len(processing_ids)} 个已有的 processing 任务")
  1600. # 7. 获取所有需要执行的任务(包含完整信息)并生成执行指令
  1601. all_tasks_to_execute = get_all_tasks_to_execute()
  1602. if all_tasks_to_execute:
  1603. logger.info(f"共 {len(all_tasks_to_execute)} 个任务需要执行")
  1604. # 生成包含所有任务的执行指令文件
  1605. create_execute_instructions(all_tasks_to_execute)
  1606. else:
  1607. logger.warning("无法获取任务详细信息,跳过生成执行指令")
  1608. # 7. 更新触发器文件
  1609. all_processing_ids = get_processing_task_ids()
  1610. if all_processing_ids:
  1611. update_trigger_file(
  1612. task_count=len(all_processing_ids),
  1613. status="有待执行任务",
  1614. task_ids=all_processing_ids,
  1615. )
  1616. # 8. 打开 Agent 并发送消息
  1617. if not open_new_agent():
  1618. logger.error("❌ 无法打开 Agent")
  1619. return False
  1620. if not type_message_to_agent(AGENT_MESSAGE):
  1621. logger.error("❌ 无法发送消息到 Agent")
  1622. close_current_agent()
  1623. return False
  1624. logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...")
  1625. # 9. 等待任务完成
  1626. completed = wait_for_agent_completion(timeout=timeout)
  1627. # 10. 立即关闭 Agent(在同步之前)
  1628. logger.info("🔄 任务执行完毕,立即关闭 Agent...")
  1629. if auto_close:
  1630. close_current_agent(force=True)
  1631. time.sleep(1.0) # 等待关闭完成
  1632. # 11. 同步已完成的任务到数据库(触发自动部署)
  1633. logger.info("🔄 开始同步和部署...")
  1634. sync_completed_tasks_to_db()
  1635. if completed:
  1636. logger.info("✅ Agent 已完成所有任务")
  1637. else:
  1638. logger.warning("⚠️ Agent 未能在超时时间内完成所有任务")
  1639. # 强制关闭可能遗留的 Agent
  1640. force_close_all_agents()
  1641. logger.info("=" * 60)
  1642. logger.info("Agent 会话结束")
  1643. logger.info("=" * 60)
  1644. return completed
  1645. def run_agent_loop(
  1646. interval: int = 300,
  1647. timeout: int = 3600,
  1648. auto_close: bool = True,
  1649. ) -> None:
  1650. """
  1651. Agent 循环模式
  1652. 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止
  1653. 完整流程:
  1654. 1. 同步已完成任务到数据库(触发自动部署)
  1655. 2. 检查是否有新的 pending 任务
  1656. 3. 生成执行指令文件
  1657. 4. 启动 Agent 执行任务
  1658. 5. 等待任务完成
  1659. 6. 同步完成任务并触发自动部署
  1660. 7. 循环...
  1661. """
  1662. global AGENT_SESSION_ACTIVE
  1663. logger.info("=" * 60)
  1664. logger.info("🔄 Agent 循环模式已启动")
  1665. logger.info("=" * 60)
  1666. logger.info(f" 检查间隔: {interval} 秒")
  1667. logger.info(f" 任务超时: {timeout} 秒")
  1668. logger.info(f" 自动部署: {'✅ 已启用' if ENABLE_AUTO_DEPLOY else '❌ 已禁用'}")
  1669. logger.info(f" 自动关闭 Agent: {'✅ 是' if auto_close else '❌ 否'}")
  1670. logger.info("=" * 60)
  1671. logger.info("按 Ctrl+C 停止服务")
  1672. logger.info("=" * 60)
  1673. loop_count = 0
  1674. total_tasks_completed = 0
  1675. total_deployments = 0
  1676. try:
  1677. while True:
  1678. try:
  1679. loop_count += 1
  1680. logger.info(f"\n{'=' * 60}")
  1681. logger.info(f"📍 开始第 {loop_count} 轮任务检查...")
  1682. logger.info(f"{'=' * 60}")
  1683. # 1. 同步已完成任务(这会触发自动部署)
  1684. logger.info("🔄 检查并同步已完成的任务...")
  1685. synced_count = sync_completed_tasks_to_db()
  1686. if synced_count > 0:
  1687. total_tasks_completed += synced_count
  1688. total_deployments += synced_count
  1689. logger.info(
  1690. f"✅ 已同步 {synced_count} 个完成的任务(累计: {total_tasks_completed})"
  1691. )
  1692. # 2. 从数据库获取 pending 任务
  1693. logger.info("📡 检查数据库中的 pending 任务...")
  1694. pending_tasks = get_pending_tasks()
  1695. if pending_tasks:
  1696. logger.info(f"📋 发现 {len(pending_tasks)} 个新的 pending 任务:")
  1697. for task in pending_tasks:
  1698. logger.info(f" - [{task['task_id']}] {task['task_name']}")
  1699. # 更新任务状态为 processing
  1700. for task in pending_tasks:
  1701. update_task_status(task["task_id"], "processing")
  1702. # 写入 pending_tasks.json
  1703. write_pending_tasks_json(pending_tasks)
  1704. # 3. 检查是否有 processing 任务
  1705. processing_ids = get_processing_task_ids()
  1706. # 4. 如果有新任务或有 processing 任务,生成包含所有任务的执行指令
  1707. if pending_tasks or processing_ids:
  1708. all_tasks_to_execute = get_all_tasks_to_execute()
  1709. if all_tasks_to_execute:
  1710. logger.info(
  1711. f"📝 生成执行指令文件,共 {len(all_tasks_to_execute)} 个任务"
  1712. )
  1713. create_execute_instructions(all_tasks_to_execute)
  1714. if processing_ids:
  1715. # 如果有活动的 Agent 会话,不需要重新启动
  1716. if AGENT_SESSION_ACTIVE:
  1717. logger.info(
  1718. f"⏳ Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
  1719. )
  1720. else:
  1721. logger.info(
  1722. f"🎯 发现 {len(processing_ids)} 个待处理任务,准备启动 Agent"
  1723. )
  1724. # 更新触发器文件
  1725. update_trigger_file(
  1726. task_count=len(processing_ids),
  1727. status="有待执行任务",
  1728. task_ids=processing_ids,
  1729. )
  1730. # 启动 Agent
  1731. if open_new_agent():
  1732. if type_message_to_agent(AGENT_MESSAGE):
  1733. logger.info("✅ 已启动 Agent 并发送执行提醒")
  1734. # 等待任务完成
  1735. task_completed = wait_for_agent_completion(
  1736. timeout=timeout
  1737. )
  1738. # ===== 关键:任务完成后立即关闭 Agent =====
  1739. logger.info("🔄 任务执行完毕,立即关闭 Agent...")
  1740. if auto_close:
  1741. # 使用强制关闭,确保 Agent 被正确关闭
  1742. close_current_agent(force=True)
  1743. # 等待一小段时间确保关闭完成
  1744. time.sleep(1.0)
  1745. # 同步完成的任务(这会触发自动部署)
  1746. logger.info("🔄 开始同步和部署...")
  1747. synced = sync_completed_tasks_to_db()
  1748. if synced > 0:
  1749. total_tasks_completed += synced
  1750. total_deployments += synced
  1751. logger.info(
  1752. f"✅ 本轮完成 {synced} 个任务的同步和部署"
  1753. )
  1754. # 显示本轮统计
  1755. logger.info(f"📊 本轮统计: 完成任务 {synced} 个")
  1756. if ENABLE_AUTO_DEPLOY:
  1757. logger.info(f" 已触发自动部署: {synced} 个")
  1758. # 如果任务未完成(超时),也确保关闭 Agent
  1759. if not task_completed:
  1760. logger.warning("⚠️ 任务超时,强制关闭 Agent")
  1761. force_close_all_agents()
  1762. else:
  1763. logger.warning("❌ 发送消息失败")
  1764. close_current_agent(force=True)
  1765. else:
  1766. logger.warning("❌ 启动 Agent 失败")
  1767. else:
  1768. logger.info("✅ 当前没有待处理任务")
  1769. # 显示累计统计
  1770. logger.info(
  1771. f"\n📈 累计统计: 已完成 {total_tasks_completed} 个任务, "
  1772. f"已部署 {total_deployments} 个"
  1773. )
  1774. logger.info(f"⏰ {interval} 秒后将进行第 {loop_count + 1} 轮检查...")
  1775. time.sleep(interval)
  1776. except KeyboardInterrupt:
  1777. raise
  1778. except Exception as e:
  1779. logger.error(f"❌ 执行出错: {e}")
  1780. import traceback
  1781. logger.error(traceback.format_exc())
  1782. logger.info(f"⏰ {interval} 秒后重试...")
  1783. time.sleep(interval)
  1784. except KeyboardInterrupt:
  1785. # 退出时关闭 Agent
  1786. logger.info("\n" + "=" * 60)
  1787. logger.info("⛔ 收到停止信号,正在退出...")
  1788. if AGENT_SESSION_ACTIVE:
  1789. logger.info("🔄 正在关闭 Agent...")
  1790. close_current_agent()
  1791. # 最后一次同步
  1792. logger.info("🔄 执行最终同步...")
  1793. final_synced = sync_completed_tasks_to_db()
  1794. if final_synced > 0:
  1795. total_tasks_completed += final_synced
  1796. logger.info(f"✅ 最终同步了 {final_synced} 个任务")
  1797. logger.info("=" * 60)
  1798. logger.info("📊 会话统计:")
  1799. logger.info(f" 总循环次数: {loop_count}")
  1800. logger.info(f" 总完成任务: {total_tasks_completed}")
  1801. logger.info(f" 总部署次数: {total_deployments}")
  1802. logger.info("=" * 60)
  1803. logger.info("✅ Agent 循环模式已停止")
  1804. # ============================================================================
  1805. # 交互式菜单
  1806. # ============================================================================
  1807. def show_interactive_menu() -> None:
  1808. """显示交互式菜单并执行用户选择的操作"""
  1809. global ENABLE_AUTO_DEPLOY
  1810. while True:
  1811. print("\n" + "=" * 60)
  1812. print("自动任务执行调度脚本 - Agent 模式")
  1813. print("=" * 60)
  1814. print("\n请选择操作模式:\n")
  1815. print(" 1. Agent 单次执行")
  1816. print(" 2. Agent 循环模式(含自动部署脚本和n8n工作流)")
  1817. print(" 3. Agent 循环模式(禁用部署)")
  1818. print(" 4. 测试生产服务器连接")
  1819. print(" 5. 查看当前任务状态")
  1820. print(" 6. 手动触发任务部署")
  1821. print(" 7. 强制关闭所有 Agent")
  1822. print(" 0. 退出")
  1823. print("\n" + "-" * 60)
  1824. try:
  1825. choice = input("请输入选项 [0-5]: ").strip()
  1826. except (KeyboardInterrupt, EOFError):
  1827. print("\n再见!")
  1828. break
  1829. if choice == "0":
  1830. print("再见!")
  1831. break
  1832. elif choice == "1":
  1833. print("\n启动 Agent 单次执行模式...")
  1834. run_agent_once(timeout=3600, auto_close=True)
  1835. input("\n按 Enter 键返回菜单...")
  1836. elif choice == "2":
  1837. try:
  1838. interval_str = input("请输入检查间隔(秒,默认300): ").strip()
  1839. interval = int(interval_str) if interval_str else 300
  1840. except ValueError:
  1841. interval = 300
  1842. print("\n🚀 启动 Agent 循环模式(含自动部署)")
  1843. print(f" 检查间隔: {interval} 秒")
  1844. print(" 自动部署: ✅ 已启用")
  1845. print("\n 任务完成后将自动:")
  1846. print(" - 部署 Python 脚本到生产服务器")
  1847. print(" - 查找并部署相关 n8n 工作流")
  1848. print("\n按 Ctrl+C 停止服务并返回菜单\n")
  1849. ENABLE_AUTO_DEPLOY = True
  1850. try:
  1851. run_agent_loop(interval=interval)
  1852. except KeyboardInterrupt:
  1853. print("\n循环已停止")
  1854. elif choice == "3":
  1855. try:
  1856. interval_str = input("请输入检查间隔(秒,默认300): ").strip()
  1857. interval = int(interval_str) if interval_str else 300
  1858. except ValueError:
  1859. interval = 300
  1860. print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒")
  1861. print("按 Ctrl+C 停止服务并返回菜单\n")
  1862. ENABLE_AUTO_DEPLOY = False
  1863. try:
  1864. run_agent_loop(interval=interval)
  1865. except KeyboardInterrupt:
  1866. print("\n循环已停止")
  1867. elif choice == "4":
  1868. print("\n测试生产服务器连接...")
  1869. if test_ssh_connection():
  1870. print("✅ 连接测试成功")
  1871. else:
  1872. print("❌ 连接测试失败")
  1873. input("\n按 Enter 键返回菜单...")
  1874. elif choice == "5":
  1875. print("\n当前任务状态:")
  1876. print("-" * 40)
  1877. # 从数据库获取 pending 任务
  1878. pending_tasks = get_pending_tasks()
  1879. print(f" 数据库中 pending 任务: {len(pending_tasks)} 个")
  1880. for task in pending_tasks:
  1881. print(f" - [{task['task_id']}] {task['task_name']}")
  1882. # 从本地文件获取 processing 任务
  1883. processing_ids = get_processing_task_ids()
  1884. print(f" 本地 processing 任务: {len(processing_ids)} 个")
  1885. if processing_ids:
  1886. print(f" 任务 ID: {processing_ids}")
  1887. # 显示已完成的任务
  1888. if PENDING_TASKS_FILE.exists():
  1889. try:
  1890. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  1891. all_local_tasks = json.load(f)
  1892. completed_tasks = [
  1893. t for t in all_local_tasks if t.get("status") == "completed"
  1894. ]
  1895. print(f" 本地 completed 任务: {len(completed_tasks)} 个")
  1896. for task in completed_tasks:
  1897. print(
  1898. f" - [{task.get('task_id')}] {task.get('task_name')} -> {task.get('code_file', 'N/A')}"
  1899. )
  1900. except Exception:
  1901. pass
  1902. input("\n按 Enter 键返回菜单...")
  1903. elif choice == "6":
  1904. print("\n手动触发任务部署")
  1905. print("-" * 40)
  1906. # 显示已完成的任务列表
  1907. if PENDING_TASKS_FILE.exists():
  1908. try:
  1909. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  1910. all_tasks = json.load(f)
  1911. completed_tasks = [
  1912. t for t in all_tasks if t.get("status") == "completed"
  1913. ]
  1914. if not completed_tasks:
  1915. print("没有已完成的任务可供部署")
  1916. input("\n按 Enter 键返回菜单...")
  1917. continue
  1918. print("已完成的任务:")
  1919. for idx, task in enumerate(completed_tasks, 1):
  1920. task_id = task.get("task_id", "N/A")
  1921. task_name = task.get("task_name", "未知")
  1922. code_file = task.get("code_file", "N/A")
  1923. print(f" {idx}. [{task_id}] {task_name}")
  1924. print(f" 代码文件: {code_file}")
  1925. print("\n 0. 部署全部")
  1926. print(" q. 返回菜单")
  1927. try:
  1928. selection = input("\n请选择要部署的任务编号: ").strip().lower()
  1929. if selection == "q":
  1930. continue
  1931. tasks_to_deploy = []
  1932. if selection == "0":
  1933. tasks_to_deploy = completed_tasks
  1934. else:
  1935. try:
  1936. idx = int(selection) - 1
  1937. if 0 <= idx < len(completed_tasks):
  1938. tasks_to_deploy = [completed_tasks[idx]]
  1939. else:
  1940. print("❌ 无效的编号")
  1941. continue
  1942. except ValueError:
  1943. print("❌ 请输入有效的数字")
  1944. continue
  1945. if tasks_to_deploy:
  1946. print(f"\n🚀 开始部署 {len(tasks_to_deploy)} 个任务...")
  1947. ENABLE_AUTO_DEPLOY = True
  1948. success_count = 0
  1949. for task in tasks_to_deploy:
  1950. if auto_deploy_completed_task(task):
  1951. success_count += 1
  1952. print(
  1953. f"\n📊 部署完成: {success_count}/{len(tasks_to_deploy)} 成功"
  1954. )
  1955. except (KeyboardInterrupt, EOFError):
  1956. pass
  1957. except Exception as e:
  1958. print(f"❌ 读取任务列表失败: {e}")
  1959. else:
  1960. print("没有本地任务记录")
  1961. input("\n按 Enter 键返回菜单...")
  1962. elif choice == "7":
  1963. print("\n🔄 强制关闭所有 Agent 会话...")
  1964. if HAS_CURSOR_GUI:
  1965. if force_close_all_agents():
  1966. print("✅ 所有 Agent 会话已关闭")
  1967. else:
  1968. print("⚠️ 关闭过程中可能出现问题,请检查 Cursor 窗口")
  1969. else:
  1970. print("❌ 当前环境不支持 GUI 自动化")
  1971. input("\n按 Enter 键返回菜单...")
  1972. else:
  1973. print("❌ 无效的选项,请重新选择")
  1974. # ============================================================================
  1975. # 主函数
  1976. # ============================================================================
  1977. def main() -> None:
  1978. """主函数"""
  1979. parser = argparse.ArgumentParser(
  1980. description="自动任务执行调度脚本 (Agent 模式)",
  1981. formatter_class=argparse.RawDescriptionHelpFormatter,
  1982. epilog="""
  1983. 示例:
  1984. # Agent 单次执行
  1985. python scripts/auto_execute_tasks.py --agent-run
  1986. # Agent 循环模式
  1987. python scripts/auto_execute_tasks.py --agent-loop
  1988. # Agent 循环模式 + 禁用自动部署
  1989. python scripts/auto_execute_tasks.py --agent-loop --no-deploy
  1990. # 设置 Agent 超时时间
  1991. python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
  1992. # 立即部署指定任务到生产服务器
  1993. python scripts/auto_execute_tasks.py --deploy-now 123
  1994. # 测试生产服务器连接
  1995. python scripts/auto_execute_tasks.py --test-connection
  1996. """,
  1997. )
  1998. # Agent 模式参数
  1999. parser.add_argument(
  2000. "--agent-run",
  2001. action="store_true",
  2002. help="Agent 单次执行模式",
  2003. )
  2004. parser.add_argument(
  2005. "--agent-loop",
  2006. action="store_true",
  2007. help="Agent 循环模式",
  2008. )
  2009. parser.add_argument(
  2010. "--agent-timeout",
  2011. type=int,
  2012. default=3600,
  2013. help="Agent 等待任务完成的超时时间(秒),默认 3600",
  2014. )
  2015. parser.add_argument(
  2016. "--interval",
  2017. type=int,
  2018. default=300,
  2019. help="循环模式检查间隔(秒),默认 300",
  2020. )
  2021. parser.add_argument(
  2022. "--no-auto-close",
  2023. action="store_true",
  2024. help="任务完成后不自动关闭 Agent",
  2025. )
  2026. # 部署相关参数
  2027. parser.add_argument(
  2028. "--no-deploy",
  2029. action="store_true",
  2030. help="禁用自动部署功能",
  2031. )
  2032. parser.add_argument(
  2033. "--deploy-now",
  2034. type=str,
  2035. metavar="TASK_ID",
  2036. help="立即部署指定任务ID的脚本到生产服务器",
  2037. )
  2038. parser.add_argument(
  2039. "--test-connection",
  2040. action="store_true",
  2041. help="测试到生产服务器的 SSH 连接",
  2042. )
  2043. args = parser.parse_args()
  2044. global ENABLE_AUTO_DEPLOY
  2045. ENABLE_AUTO_DEPLOY = not args.no_deploy
  2046. auto_close = not args.no_auto_close
  2047. # 测试 SSH 连接
  2048. if args.test_connection:
  2049. if test_ssh_connection():
  2050. logger.info("✅ 连接测试成功")
  2051. else:
  2052. logger.error("❌ 连接测试失败")
  2053. return
  2054. # 立即部署指定任务
  2055. if args.deploy_now:
  2056. try:
  2057. task_id = int(args.deploy_now)
  2058. logger.info(f"开始部署任务 {task_id}...")
  2059. # 从 pending_tasks.json 查找任务信息
  2060. if PENDING_TASKS_FILE.exists():
  2061. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  2062. tasks = json.load(f)
  2063. task_found = None
  2064. for t in tasks:
  2065. if t.get("task_id") == task_id:
  2066. task_found = t
  2067. break
  2068. if task_found:
  2069. if auto_deploy_completed_task(task_found):
  2070. logger.info(f"✅ 任务 {task_id} 部署成功")
  2071. else:
  2072. logger.error(f"❌ 任务 {task_id} 部署失败")
  2073. else:
  2074. logger.error(f"未找到任务 {task_id}")
  2075. else:
  2076. logger.error("pending_tasks.json 文件不存在")
  2077. except ValueError:
  2078. logger.error(f"无效的任务ID: {args.deploy_now}")
  2079. return
  2080. # Agent 单次执行
  2081. if args.agent_run:
  2082. success = run_agent_once(
  2083. timeout=args.agent_timeout,
  2084. auto_close=auto_close,
  2085. )
  2086. if success:
  2087. logger.info("✅ Agent 单次执行完成")
  2088. else:
  2089. logger.error("❌ Agent 单次执行失败")
  2090. return
  2091. # Agent 循环模式
  2092. if args.agent_loop:
  2093. run_agent_loop(
  2094. interval=args.interval,
  2095. timeout=args.agent_timeout,
  2096. auto_close=auto_close,
  2097. )
  2098. return
  2099. # 没有指定任何模式参数时,显示交互式菜单
  2100. if len(sys.argv) == 1:
  2101. show_interactive_menu()
  2102. else:
  2103. # 显示帮助信息
  2104. parser.print_help()
  2105. if __name__ == "__main__":
  2106. main()