auto_execute_tasks.py 52 KB


  1. #!/usr/bin/env python3
  2. """
  3. 自动任务执行核心调度脚本 (Agent 模式)
  4. 工作流程:
  5. 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
  6. 2. 生成 tasks/task_execute_instructions.md 执行指令文件
  7. 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
  8. 4. 更新 tasks/task_trigger.txt 触发器文件
  9. 5. 启动新的 Cursor Agent 并发送执行指令
  10. 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
  11. 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
  12. 使用方式:
  13. # Agent 单次执行(执行一次任务后退出)
  14. python scripts/auto_execute_tasks.py --agent-run
  15. # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务)
  16. python scripts/auto_execute_tasks.py --agent-loop
  17. # Agent 循环模式 + 禁用自动部署
  18. python scripts/auto_execute_tasks.py --agent-loop --no-deploy
  19. # 设置 Agent 超时时间(默认 3600 秒)
  20. python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
  21. # 任务完成后不自动关闭 Agent
  22. python scripts/auto_execute_tasks.py --agent-run --no-auto-close
  23. # 立即部署指定任务ID的脚本到生产服务器
  24. python scripts/auto_execute_tasks.py --deploy-now 123
  25. # 测试到生产服务器的 SSH 连接
  26. python scripts/auto_execute_tasks.py --test-connection
  27. """
  28. from __future__ import annotations
  29. import argparse
  30. import json
  31. import logging
  32. import sys
  33. import time
  34. from datetime import datetime
  35. from pathlib import Path
  36. from typing import Any
  37. # ============================================================================
  38. # 日志配置
  39. # ============================================================================
  40. logging.basicConfig(
  41. level=logging.INFO,
  42. format="%(asctime)s - %(levelname)s - %(message)s",
  43. )
  44. logger = logging.getLogger("AutoExecuteTasks")
  45. # ============================================================================
  46. # Windows GUI 自动化依赖(可选)
  47. # ============================================================================
  48. HAS_CURSOR_GUI = False
  49. HAS_PYPERCLIP = False
  50. try:
  51. import pyautogui
  52. import win32con
  53. import win32gui
  54. pyautogui.FAILSAFE = True
  55. pyautogui.PAUSE = 0.5
  56. HAS_CURSOR_GUI = True
  57. try:
  58. import pyperclip
  59. HAS_PYPERCLIP = True
  60. except ImportError:
  61. pass
  62. except ImportError:
  63. logger.info(
  64. "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
  65. "将禁用自动 Cursor Agent 功能。"
  66. )
  67. # ============================================================================
  68. # 全局配置
  69. # ============================================================================
  70. WORKSPACE_ROOT = Path(__file__).parent.parent
  71. TASKS_DIR = WORKSPACE_ROOT / "tasks"
  72. PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
  73. INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
  74. TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
  75. # 生产服务器配置
  76. PRODUCTION_SERVER = {
  77. "host": "192.168.3.143",
  78. "port": 22,
  79. "username": "ubuntu",
  80. "password": "citumxl2357",
  81. "script_path": "/opt/dataops-platform/datafactory/scripts",
  82. "workflow_path": "/opt/dataops-platform/n8n/workflows",
  83. }
  84. # Agent 消息模板
  85. AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
  86. # 命令行参数控制的全局变量
  87. ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
  88. # ============================================================================
  89. # 数据库操作
  90. # ============================================================================
  91. def get_db_connection():
  92. """获取数据库连接(使用 production 环境配置)"""
  93. try:
  94. from urllib.parse import urlparse
  95. import psycopg2
  96. sys.path.insert(0, str(WORKSPACE_ROOT))
  97. from app.config.config import config
  98. # 强制使用 production 环境的数据库配置
  99. app_config = config["production"]
  100. db_uri = app_config.SQLALCHEMY_DATABASE_URI
  101. # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式
  102. parsed = urlparse(db_uri)
  103. conn = psycopg2.connect(
  104. host=parsed.hostname,
  105. port=parsed.port or 5432,
  106. database=parsed.path.lstrip("/"),
  107. user=parsed.username,
  108. password=parsed.password,
  109. )
  110. logger.debug(
  111. f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}"
  112. )
  113. return conn
  114. except ImportError as e:
  115. logger.error(f"导入依赖失败: {e}")
  116. return None
  117. except Exception as e:
  118. logger.error(f"连接数据库失败: {e}")
  119. import traceback
  120. logger.error(traceback.format_exc())
  121. return None
  122. def get_pending_tasks() -> list[dict[str, Any]]:
  123. """
  124. 从 PostgreSQL task_list 表获取所有 pending 状态的任务
  125. 重要:此函数直接查询数据库,确保获取最新的任务列表
  126. """
  127. try:
  128. from psycopg2.extras import RealDictCursor
  129. logger.info("📡 正在连接数据库...")
  130. conn = get_db_connection()
  131. if not conn:
  132. logger.error("❌ 无法获取数据库连接")
  133. return []
  134. logger.info("✅ 数据库连接成功,正在查询 pending 任务...")
  135. cursor = conn.cursor(cursor_factory=RealDictCursor)
  136. cursor.execute(
  137. """
  138. SELECT task_id, task_name, task_description, status,
  139. code_name, code_path, create_time, create_by
  140. FROM task_list
  141. WHERE status = 'pending'
  142. ORDER BY create_time ASC
  143. """
  144. )
  145. tasks = cursor.fetchall()
  146. cursor.close()
  147. conn.close()
  148. task_list = [dict(task) for task in tasks]
  149. logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务")
  150. if task_list:
  151. for task in task_list:
  152. logger.info(f" - 任务 {task['task_id']}: {task['task_name']}")
  153. return task_list
  154. except Exception as e:
  155. logger.error(f"获取 pending 任务失败: {e}")
  156. import traceback
  157. logger.error(traceback.format_exc())
  158. return []
  159. def update_task_status(
  160. task_id: int,
  161. status: str,
  162. code_name: str | None = None,
  163. code_path: str | None = None,
  164. ) -> bool:
  165. """更新任务状态"""
  166. try:
  167. conn = get_db_connection()
  168. if not conn:
  169. return False
  170. cursor = conn.cursor()
  171. if code_name and code_path:
  172. cursor.execute(
  173. """
  174. UPDATE task_list
  175. SET status = %s, code_name = %s, code_path = %s,
  176. update_time = CURRENT_TIMESTAMP
  177. WHERE task_id = %s
  178. """,
  179. (status, code_name, code_path, task_id),
  180. )
  181. else:
  182. cursor.execute(
  183. """
  184. UPDATE task_list
  185. SET status = %s, update_time = CURRENT_TIMESTAMP
  186. WHERE task_id = %s
  187. """,
  188. (status, task_id),
  189. )
  190. conn.commit()
  191. updated = cursor.rowcount > 0
  192. cursor.close()
  193. conn.close()
  194. if updated:
  195. logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
  196. return updated
  197. except Exception as e:
  198. logger.error(f"更新任务状态失败: {e}")
  199. return False
  200. # ============================================================================
  201. # 触发器文件管理
  202. # ============================================================================
  203. def update_trigger_file(
  204. task_count: int = 0,
  205. status: str = "有待执行任务",
  206. task_ids: list[int] | None = None,
  207. ) -> None:
  208. """更新触发器文件 tasks/task_trigger.txt"""
  209. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  210. gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  211. task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
  212. content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
  213. 生成时间: {gen_time}
  214. 状态: {status}
  215. 待处理任务数: {task_count}
  216. 任务ID列表: [{task_ids_str}]
  217. 此文件用于触发Cursor自动执行任务。
  218. 【重要】Cursor AI 请立即执行以下操作:
  219. 1. 读取 tasks/task_execute_instructions.md 文件
  220. 2. 按照文件中的任务描述,依次完成每个任务
  221. 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
  222. """
  223. with TRIGGER_FILE.open("w", encoding="utf-8") as f:
  224. f.write(content)
  225. logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
  226. def get_processing_task_ids() -> list[int]:
  227. """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
  228. if not PENDING_TASKS_FILE.exists():
  229. return []
  230. try:
  231. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  232. tasks = json.load(f)
  233. return [
  234. t.get("task_id")
  235. for t in tasks
  236. if t.get("status") == "processing" and t.get("task_id")
  237. ]
  238. except Exception:
  239. return []
  240. # ============================================================================
  241. # 任务文件生成
  242. # ============================================================================
  243. def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
  244. """将任务列表写入 tasks/pending_tasks.json"""
  245. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  246. # 读取现有任务
  247. existing_tasks = []
  248. if PENDING_TASKS_FILE.exists():
  249. try:
  250. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  251. existing_tasks = json.load(f)
  252. except Exception:
  253. existing_tasks = []
  254. existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
  255. # 添加新任务
  256. for task in tasks:
  257. if task["task_id"] not in existing_ids:
  258. task_info = {
  259. "task_id": task["task_id"],
  260. "task_name": task["task_name"],
  261. "code_path": task.get("code_path", ""),
  262. "code_name": task.get("code_name", ""),
  263. "status": "processing",
  264. "notified_at": datetime.now().isoformat(),
  265. "code_file": task.get("code_file", ""),
  266. }
  267. existing_tasks.append(task_info)
  268. with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
  269. json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
  270. logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
  271. def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
  272. """生成任务执行指令文件 tasks/task_execute_instructions.md"""
  273. TASKS_DIR.mkdir(parents=True, exist_ok=True)
  274. with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
  275. f.write("# Cursor 自动任务执行指令\n\n")
  276. f.write("**重要:请立即执行以下任务!**\n\n")
  277. gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  278. f.write(f"**生成时间**: {gen_time}\n\n")
  279. f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
  280. f.write("## 任务完成后的操作\n\n")
  281. f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
  282. f.write("对应任务的 `status` 为 `completed`,\n")
  283. f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
  284. f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
  285. f.write("## 任务约束要求\n\n")
  286. f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
  287. f.write("- 不要创建任何 summary、report、总结类的文档文件\n")
  288. f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n")
  289. f.write("- 只需创建任务要求的功能脚本文件\n")
  290. f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
  291. f.write("---\n\n")
  292. for idx, task in enumerate(tasks, 1):
  293. task_id = task["task_id"]
  294. task_name = task["task_name"]
  295. task_desc = task["task_description"]
  296. create_time = task.get("create_time", "")
  297. if hasattr(create_time, "strftime"):
  298. create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
  299. f.write(f"## 任务 {idx}: {task_name}\n\n")
  300. f.write(f"- **任务ID**: `{task_id}`\n")
  301. f.write(f"- **创建时间**: {create_time}\n")
  302. f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
  303. f.write(f"### 任务描述\n\n{task_desc}\n\n")
  304. f.write("---\n\n")
  305. logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
  306. # ============================================================================
  307. # Neo4j 独立连接(不依赖 Flask 应用上下文)
  308. # ============================================================================
  309. def get_neo4j_driver():
  310. """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
  311. try:
  312. from neo4j import GraphDatabase
  313. sys.path.insert(0, str(WORKSPACE_ROOT))
  314. from app.config.config import config
  315. # 强制使用 production 环境的配置
  316. app_config = config["production"]
  317. uri = app_config.NEO4J_URI
  318. user = app_config.NEO4J_USER
  319. password = app_config.NEO4J_PASSWORD
  320. driver = GraphDatabase.driver(uri, auth=(user, password))
  321. return driver
  322. except ImportError as e:
  323. logger.error(f"导入 Neo4j 驱动失败: {e}")
  324. return None
  325. except Exception as e:
  326. logger.error(f"连接 Neo4j 失败: {e}")
  327. return None
  328. # ============================================================================
  329. # 状态同步
  330. # ============================================================================
  331. def extract_dataflow_name_from_task(task_id: int) -> str | None:
  332. """从任务描述中提取 DataFlow 名称"""
  333. import re
  334. try:
  335. conn = get_db_connection()
  336. if not conn:
  337. return None
  338. cursor = conn.cursor()
  339. cursor.execute(
  340. "SELECT task_description FROM task_list WHERE task_id = %s",
  341. (task_id,),
  342. )
  343. result = cursor.fetchone()
  344. cursor.close()
  345. conn.close()
  346. if not result:
  347. return None
  348. task_desc = result[0]
  349. # 从任务描述中提取 DataFlow Name
  350. match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc)
  351. if match:
  352. dataflow_name = match.group(1).strip()
  353. logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}")
  354. return dataflow_name
  355. return None
  356. except Exception as e:
  357. logger.error(f"提取 DataFlow 名称失败: {e}")
  358. return None
  359. def update_dataflow_script_path(
  360. task_name: str, script_path: str, task_id: int | None = None
  361. ) -> bool:
  362. """更新 DataFlow 节点的 script_path 字段"""
  363. try:
  364. driver = get_neo4j_driver()
  365. if not driver:
  366. logger.error("无法获取 Neo4j 驱动")
  367. return False
  368. # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称
  369. dataflow_name = task_name
  370. if task_id:
  371. extracted_name = extract_dataflow_name_from_task(task_id)
  372. if extracted_name:
  373. dataflow_name = extracted_name
  374. logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}")
  375. query = """
  376. MATCH (n:DataFlow {name_zh: $name_zh})
  377. SET n.script_path = $script_path, n.updated_at = $updated_at
  378. RETURN n
  379. """
  380. updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  381. with driver.session() as session:
  382. result = session.run(
  383. query,
  384. name_zh=dataflow_name,
  385. script_path=script_path,
  386. updated_at=updated_at,
  387. ).single()
  388. driver.close()
  389. if result:
  390. logger.info(
  391. f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
  392. )
  393. return True
  394. else:
  395. logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
  396. return False
  397. except Exception as e:
  398. logger.error(f"更新 DataFlow script_path 失败: {e}")
  399. return False
  400. def sync_completed_tasks_to_db() -> int:
  401. """将 pending_tasks.json 中 completed 的任务同步到数据库"""
  402. if not PENDING_TASKS_FILE.exists():
  403. return 0
  404. try:
  405. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  406. tasks = json.load(f)
  407. except Exception as e:
  408. logger.error(f"读取 pending_tasks.json 失败: {e}")
  409. return 0
  410. if not isinstance(tasks, list):
  411. return 0
  412. updated = 0
  413. remaining_tasks = []
  414. for t in tasks:
  415. if t.get("status") == "completed":
  416. task_id = t.get("task_id")
  417. if not task_id:
  418. continue
  419. task_name = t.get("task_name")
  420. code_name = t.get("code_name")
  421. code_path = t.get("code_path")
  422. # 统一处理:code_path 始终为 "datafactory/scripts"
  423. code_path = "datafactory/scripts"
  424. # 只处理 Python 脚本文件
  425. is_python_script = code_name and code_name.endswith(".py")
  426. if is_python_script:
  427. logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_name}")
  428. else:
  429. logger.info(
  430. f"任务 {task_id} 的 code_name ({code_name}) 不是 Python 脚本,跳过 DataFlow 更新"
  431. )
  432. if update_task_status(task_id, "completed", code_name, code_path):
  433. updated += 1
  434. logger.info(f"已同步任务 {task_id} 为 completed")
  435. # 只有 Python 脚本才更新 DataFlow 节点的 script_path
  436. if task_name and is_python_script:
  437. full_script_path = f"{code_path}/{code_name}"
  438. if update_dataflow_script_path(
  439. task_name, full_script_path, task_id=task_id
  440. ):
  441. logger.info(
  442. f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
  443. )
  444. else:
  445. logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
  446. # 自动部署到生产服务器(如果启用)
  447. if ENABLE_AUTO_DEPLOY:
  448. logger.info(f"开始自动部署任务 {task_id} 到生产服务器...")
  449. if auto_deploy_completed_task(t):
  450. logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
  451. else:
  452. logger.warning(f"任务 {task_id} 部署到生产服务器失败")
  453. else:
  454. logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署")
  455. else:
  456. remaining_tasks.append(t)
  457. else:
  458. remaining_tasks.append(t)
  459. if updated > 0:
  460. with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
  461. json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
  462. logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
  463. return updated
  464. # ============================================================================
  465. # 生产服务器部署功能
  466. # ============================================================================
  467. def get_ssh_connection():
  468. """获取 SSH 连接到生产服务器"""
  469. try:
  470. import paramiko # type: ignore
  471. ssh = paramiko.SSHClient()
  472. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  473. logger.info(
  474. f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
  475. f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
  476. )
  477. ssh.connect(
  478. hostname=PRODUCTION_SERVER["host"],
  479. port=PRODUCTION_SERVER["port"],
  480. username=PRODUCTION_SERVER["username"],
  481. password=PRODUCTION_SERVER["password"],
  482. timeout=10,
  483. )
  484. logger.info("✅ SSH 连接成功")
  485. return ssh
  486. except ImportError:
  487. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  488. return None
  489. except Exception as e:
  490. logger.error(f"SSH 连接失败: {e}")
  491. return None
  492. def test_ssh_connection() -> bool:
  493. """测试 SSH 连接到生产服务器"""
  494. logger.info("=" * 60)
  495. logger.info("测试生产服务器连接")
  496. logger.info("=" * 60)
  497. ssh = get_ssh_connection()
  498. if not ssh:
  499. logger.error("❌ SSH 连接测试失败")
  500. return False
  501. try:
  502. # 测试执行命令
  503. _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
  504. output = stdout.read().decode().strip()
  505. logger.info(f"✅ 命令执行成功: {output}")
  506. # 检查目标目录是否存在
  507. _, stdout, _ = ssh.exec_command(
  508. f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
  509. )
  510. result = stdout.read().decode().strip()
  511. if result == "exists":
  512. logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
  513. else:
  514. logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
  515. logger.info("将在首次部署时自动创建")
  516. ssh.close()
  517. logger.info("=" * 60)
  518. logger.info("✅ 连接测试完成")
  519. logger.info("=" * 60)
  520. return True
  521. except Exception as e:
  522. logger.error(f"❌ 测试执行命令失败: {e}")
  523. ssh.close()
  524. return False
  525. def deploy_script_to_production(
  526. local_script_path: str, remote_filename: str | None = None
  527. ) -> bool:
  528. """部署脚本文件到生产服务器"""
  529. try:
  530. import importlib.util
  531. if importlib.util.find_spec("paramiko") is None:
  532. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  533. return False
  534. # 转换为绝对路径
  535. local_path = Path(local_script_path)
  536. if not local_path.is_absolute():
  537. local_path = WORKSPACE_ROOT / local_path
  538. if not local_path.exists():
  539. logger.error(f"本地文件不存在: {local_path}")
  540. return False
  541. # 确定远程文件名
  542. if not remote_filename:
  543. remote_filename = local_path.name
  544. remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
  545. # 建立 SSH 连接
  546. ssh = get_ssh_connection()
  547. if not ssh:
  548. return False
  549. try:
  550. # 创建 SFTP 客户端
  551. sftp = ssh.open_sftp()
  552. # 确保远程目录存在
  553. try:
  554. sftp.stat(PRODUCTION_SERVER["script_path"])
  555. except FileNotFoundError:
  556. logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
  557. _, stdout, _ = ssh.exec_command(
  558. f"mkdir -p {PRODUCTION_SERVER['script_path']}"
  559. )
  560. stdout.channel.recv_exit_status()
  561. # 上传文件
  562. logger.info(f"正在上传: {local_path} -> {remote_path}")
  563. sftp.put(str(local_path), remote_path)
  564. # 设置文件权限为可执行
  565. sftp.chmod(remote_path, 0o755)
  566. logger.info(f"✅ 脚本部署成功: {remote_path}")
  567. sftp.close()
  568. ssh.close()
  569. return True
  570. except Exception as e:
  571. logger.error(f"文件传输失败: {e}")
  572. ssh.close()
  573. return False
  574. except ImportError:
  575. logger.error("未安装 paramiko 库,请运行: pip install paramiko")
  576. return False
  577. except Exception as e:
  578. logger.error(f"部署脚本失败: {e}")
  579. return False
  580. def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
  581. """
  582. 部署 n8n 工作流到 n8n 服务器
  583. 此函数执行两个步骤:
  584. 1. 通过 n8n API 创建工作流(主要步骤)
  585. 2. 通过 SFTP 备份工作流文件到生产服务器(可选)
  586. """
  587. try:
  588. import json
  589. import requests
  590. # 转换为绝对路径
  591. local_path = Path(workflow_file)
  592. if not local_path.is_absolute():
  593. local_path = WORKSPACE_ROOT / local_path
  594. if not local_path.exists():
  595. logger.error(f"工作流文件不存在: {local_path}")
  596. return False
  597. # 加载工作流 JSON
  598. with open(local_path, encoding="utf-8") as f:
  599. workflow_data = json.load(f)
  600. workflow_name = workflow_data.get("name", local_path.stem)
  601. logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
  602. # 获取 n8n API 配置
  603. try:
  604. sys.path.insert(0, str(WORKSPACE_ROOT))
  605. from app.config.config import BaseConfig
  606. api_url = BaseConfig.N8N_API_URL
  607. api_key = BaseConfig.N8N_API_KEY
  608. timeout = BaseConfig.N8N_API_TIMEOUT
  609. except (ImportError, AttributeError):
  610. import os
  611. api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
  612. api_key = os.environ.get("N8N_API_KEY", "")
  613. timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
  614. if not api_key:
  615. logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
  616. return False
  617. # 准备 API 请求
  618. headers = {
  619. "X-N8N-API-KEY": api_key,
  620. "Content-Type": "application/json",
  621. "Accept": "application/json",
  622. }
  623. # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
  624. workflow_payload = {
  625. "name": workflow_name,
  626. "nodes": workflow_data.get("nodes", []),
  627. "connections": workflow_data.get("connections", {}),
  628. "settings": workflow_data.get("settings", {}),
  629. }
  630. # 调用 n8n API 创建工作流
  631. create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
  632. logger.info(f"调用 n8n API: {create_url}")
  633. try:
  634. response = requests.post(
  635. create_url,
  636. headers=headers,
  637. json=workflow_payload,
  638. timeout=timeout,
  639. )
  640. if response.status_code == 401:
  641. logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
  642. return False
  643. elif response.status_code == 403:
  644. logger.error("n8n API 权限不足")
  645. return False
  646. response.raise_for_status()
  647. created_workflow = response.json()
  648. workflow_id = created_workflow.get("id")
  649. logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
  650. # 可选:将工作流文件备份到生产服务器
  651. try:
  652. _backup_workflow_to_server(local_path)
  653. except Exception as backup_error:
  654. logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}")
  655. return True
  656. except requests.exceptions.Timeout:
  657. logger.error("n8n API 请求超时,请检查网络连接")
  658. return False
  659. except requests.exceptions.ConnectionError:
  660. logger.error(f"无法连接到 n8n 服务器: {api_url}")
  661. return False
  662. except requests.exceptions.HTTPError as e:
  663. error_detail = ""
  664. try:
  665. error_detail = e.response.json()
  666. except Exception:
  667. error_detail = e.response.text
  668. logger.error(
  669. f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
  670. )
  671. return False
  672. except Exception as e:
  673. logger.error(f"部署工作流失败: {e}")
  674. import traceback
  675. logger.error(traceback.format_exc())
  676. return False
  677. def _backup_workflow_to_server(local_path: Path) -> bool:
  678. """备份工作流文件到生产服务器(通过 SFTP)"""
  679. try:
  680. import importlib.util
  681. if importlib.util.find_spec("paramiko") is None:
  682. logger.debug("未安装 paramiko 库,跳过文件备份")
  683. return False
  684. remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
  685. # 建立 SSH 连接
  686. ssh = get_ssh_connection()
  687. if not ssh:
  688. return False
  689. try:
  690. # 创建 SFTP 客户端
  691. sftp = ssh.open_sftp()
  692. # 确保远程目录存在
  693. try:
  694. sftp.stat(PRODUCTION_SERVER["workflow_path"])
  695. except FileNotFoundError:
  696. logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
  697. _, stdout, _ = ssh.exec_command(
  698. f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
  699. )
  700. stdout.channel.recv_exit_status()
  701. # 上传工作流文件
  702. logger.debug(f"备份工作流文件: {local_path} -> {remote_path}")
  703. sftp.put(str(local_path), remote_path)
  704. sftp.close()
  705. ssh.close()
  706. return True
  707. except Exception as e:
  708. logger.warning(f"工作流文件备份失败: {e}")
  709. ssh.close()
  710. return False
  711. except Exception as e:
  712. logger.warning(f"备份工作流失败: {e}")
  713. return False
  714. def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
  715. """自动部署已完成任务的脚本和工作流到生产服务器"""
  716. code_name = task_info.get("code_name")
  717. code_path = task_info.get("code_path")
  718. task_name = task_info.get("task_name", "未知任务")
  719. if not code_name or not code_path:
  720. logger.warning(f"任务 {task_name} 缺少代码文件信息,跳过部署")
  721. return False
  722. logger.info("=" * 60)
  723. logger.info(f"开始自动部署任务: {task_name}")
  724. logger.info("=" * 60)
  725. deploy_success = True
  726. # 1. 部署 Python 脚本
  727. if code_name.endswith(".py"):
  728. script_path = f"{code_path}/{code_name}"
  729. logger.info(f"部署 Python 脚本: {script_path}")
  730. if deploy_script_to_production(script_path):
  731. logger.info(f"✅ 脚本 {code_name} 部署成功")
  732. else:
  733. logger.error(f"❌ 脚本 {code_name} 部署失败")
  734. deploy_success = False
  735. # 2. 查找并部署相关的 n8n 工作流文件
  736. workflow_files = []
  737. # 查找模式1: 与脚本同目录的工作流文件
  738. script_dir = WORKSPACE_ROOT / code_path
  739. if script_dir.exists() and script_dir.is_dir():
  740. for wf_file in script_dir.glob("n8n_workflow_*.json"):
  741. if wf_file.is_file():
  742. workflow_files.append(wf_file)
  743. # 查找模式2: datafactory/n8n_workflows 目录
  744. n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
  745. if n8n_workflows_dir.exists():
  746. for wf_file in n8n_workflows_dir.glob("*.json"):
  747. if wf_file.is_file() and wf_file not in workflow_files:
  748. workflow_files.append(wf_file)
  749. # 查找模式3: 根据任务名称匹配工作流文件
  750. if task_name and task_name != "未知任务":
  751. task_name_pattern = task_name.replace(" ", "_").lower()
  752. for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
  753. f"*{task_name_pattern}*.json"
  754. ):
  755. if (
  756. wf_file.is_file()
  757. and "n8n" in wf_file.name.lower()
  758. and wf_file not in workflow_files
  759. ):
  760. workflow_files.append(wf_file)
  761. if workflow_files:
  762. logger.info(f"发现 {len(workflow_files)} 个工作流文件")
  763. for wf_file in workflow_files:
  764. logger.info(f"部署工作流: {wf_file.name}")
  765. if deploy_n8n_workflow_to_production(str(wf_file)):
  766. logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
  767. else:
  768. logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
  769. deploy_success = False
  770. else:
  771. logger.info("未发现相关工作流文件")
  772. logger.info("=" * 60)
  773. if deploy_success:
  774. logger.info(f"✅ 任务 {task_name} 部署完成")
  775. else:
  776. logger.warning(f"任务 {task_name} 部署过程中出现错误")
  777. logger.info("=" * 60)
  778. return deploy_success
  779. # ============================================================================
  780. # Cursor Agent 自动化
  781. # ============================================================================
  782. # Agent 会话状态
  783. AGENT_SESSION_ACTIVE: bool = False
  784. AGENT_START_TIME: float = 0
  785. def get_all_cursor_windows() -> list[dict[str, Any]]:
  786. """获取所有 Cursor 窗口信息"""
  787. if not HAS_CURSOR_GUI:
  788. return []
  789. cursor_windows: list[dict[str, Any]] = []
  790. def enum_windows_callback(hwnd, _extra):
  791. if win32gui.IsWindowVisible(hwnd):
  792. title = win32gui.GetWindowText(hwnd) or ""
  793. class_name = win32gui.GetClassName(hwnd) or ""
  794. is_cursor = "cursor" in title.lower()
  795. if class_name and "chrome_widgetwin" in class_name.lower():
  796. is_cursor = True
  797. if is_cursor:
  798. left, top, right, bottom = win32gui.GetWindowRect(hwnd)
  799. area = (right - left) * (bottom - top)
  800. cursor_windows.append(
  801. {
  802. "hwnd": hwnd,
  803. "title": title,
  804. "class_name": class_name,
  805. "area": area,
  806. }
  807. )
  808. return True
  809. win32gui.EnumWindows(enum_windows_callback, None)
  810. return cursor_windows
  811. def find_cursor_window() -> int | None:
  812. """查找 Cursor 主窗口句柄"""
  813. if not HAS_CURSOR_GUI:
  814. return None
  815. cursor_windows = get_all_cursor_windows()
  816. if not cursor_windows:
  817. logger.warning("未找到 Cursor 窗口")
  818. return None
  819. # 按面积排序,返回最大的窗口(主窗口)
  820. cursor_windows.sort(key=lambda x: x["area"], reverse=True)
  821. return cursor_windows[0]["hwnd"]
  822. def activate_window(hwnd: int) -> bool:
  823. """激活指定窗口"""
  824. if not HAS_CURSOR_GUI:
  825. return False
  826. try:
  827. win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
  828. time.sleep(0.3)
  829. win32gui.SetForegroundWindow(hwnd)
  830. time.sleep(0.5)
  831. return True
  832. except Exception as e:
  833. logger.error(f"激活窗口失败: {e}")
  834. return False
  835. def open_new_agent() -> bool:
  836. """在 Cursor 中打开新的 Agent 窗口"""
  837. global AGENT_SESSION_ACTIVE, AGENT_START_TIME
  838. if not HAS_CURSOR_GUI:
  839. logger.warning("当前环境不支持 Cursor GUI 自动化")
  840. return False
  841. hwnd = find_cursor_window()
  842. if not hwnd:
  843. return False
  844. if not activate_window(hwnd):
  845. return False
  846. try:
  847. # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
  848. logger.info("正在打开新的 Agent...")
  849. pyautogui.hotkey("ctrl", "shift", "i")
  850. time.sleep(2.0) # 等待 Agent 窗口打开
  851. AGENT_SESSION_ACTIVE = True
  852. AGENT_START_TIME = time.time()
  853. logger.info("✅ 新的 Agent 已打开")
  854. return True
  855. except Exception as e:
  856. logger.error(f"打开 Agent 失败: {e}")
  857. return False
  858. def close_current_agent() -> bool:
  859. """关闭当前的 Agent 会话"""
  860. global AGENT_SESSION_ACTIVE
  861. if not HAS_CURSOR_GUI:
  862. return False
  863. if not AGENT_SESSION_ACTIVE:
  864. logger.info("没有活动的 Agent 会话")
  865. return True
  866. hwnd = find_cursor_window()
  867. if not hwnd:
  868. return False
  869. if not activate_window(hwnd):
  870. return False
  871. try:
  872. logger.info("正在关闭 Agent...")
  873. # 按 Escape 键关闭 Agent
  874. pyautogui.press("escape")
  875. time.sleep(0.5)
  876. # 再按一次确保关闭
  877. pyautogui.press("escape")
  878. time.sleep(0.3)
  879. AGENT_SESSION_ACTIVE = False
  880. logger.info("✅ Agent 已关闭")
  881. return True
  882. except Exception as e:
  883. logger.error(f"关闭 Agent 失败: {e}")
  884. return False
  885. def type_message_to_agent(message: str) -> bool:
  886. """向 Agent 输入消息"""
  887. if not HAS_CURSOR_GUI:
  888. return False
  889. try:
  890. # 等待 Agent 输入框获得焦点
  891. time.sleep(0.5)
  892. # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
  893. if HAS_PYPERCLIP:
  894. try:
  895. pyperclip.copy(message)
  896. pyautogui.hotkey("ctrl", "v")
  897. time.sleep(0.5)
  898. except Exception:
  899. # 回退到逐字符输入
  900. pyautogui.write(message, interval=0.03)
  901. else:
  902. pyautogui.write(message, interval=0.03)
  903. time.sleep(0.3)
  904. # 按 Enter 发送消息
  905. pyautogui.press("enter")
  906. logger.info("✅ 消息已发送到 Agent")
  907. return True
  908. except Exception as e:
  909. logger.error(f"发送消息到 Agent 失败: {e}")
  910. return False
  911. def wait_for_agent_completion(
  912. timeout: int = 3600,
  913. check_interval: int = 30,
  914. ) -> bool:
  915. """
  916. 等待 Agent 完成任务
  917. 通过检查 pending_tasks.json 中的任务状态来判断是否完成
  918. """
  919. start_time = time.time()
  920. logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...")
  921. while time.time() - start_time < timeout:
  922. processing_ids = get_processing_task_ids()
  923. if not processing_ids:
  924. elapsed = int(time.time() - start_time)
  925. logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
  926. return True
  927. remaining = len(processing_ids)
  928. elapsed = int(time.time() - start_time)
  929. logger.info(
  930. f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
  931. )
  932. time.sleep(check_interval)
  933. logger.warning("等待超时,仍有未完成的任务")
  934. return False
  935. def run_agent_once(
  936. timeout: int = 3600,
  937. auto_close: bool = True,
  938. ) -> bool:
  939. """
  940. 执行一次 Agent 任务
  941. 流程:
  942. 1. 同步已完成任务到数据库
  943. 2. 从数据库读取 pending 任务
  944. 3. 更新任务状态为 processing
  945. 4. 生成执行指令文件
  946. 5. 打开 Agent 并发送消息
  947. 6. 等待任务完成
  948. 7. 同步完成任务 + 自动部署
  949. 8. 关闭 Agent
  950. """
  951. logger.info("=" * 60)
  952. logger.info("Agent 单次执行模式")
  953. logger.info("=" * 60)
  954. # 1. 先同步已完成任务
  955. sync_completed_tasks_to_db()
  956. # 2. 从数据库获取 pending 任务
  957. logger.info("正在从数据库查询 pending 任务...")
  958. pending_tasks = get_pending_tasks()
  959. # 3. 检查是否有任务需要执行
  960. if not pending_tasks:
  961. processing_ids = get_processing_task_ids()
  962. if not processing_ids:
  963. logger.info("✅ 没有待执行的任务")
  964. return True
  965. logger.info(f"发现 {len(processing_ids)} 个 processing 任务,继续执行")
  966. else:
  967. logger.info(f"发现 {len(pending_tasks)} 个 pending 任务")
  968. # 4. 更新任务状态为 processing
  969. for task in pending_tasks:
  970. update_task_status(task["task_id"], "processing")
  971. # 5. 写入 pending_tasks.json
  972. write_pending_tasks_json(pending_tasks)
  973. # 6. 生成执行指令文件
  974. create_execute_instructions(pending_tasks)
  975. # 7. 更新触发器文件
  976. all_processing_ids = get_processing_task_ids()
  977. if all_processing_ids:
  978. update_trigger_file(
  979. task_count=len(all_processing_ids),
  980. status="有待执行任务",
  981. task_ids=all_processing_ids,
  982. )
  983. # 8. 打开 Agent 并发送消息
  984. if not open_new_agent():
  985. logger.error("❌ 无法打开 Agent")
  986. return False
  987. if not type_message_to_agent(AGENT_MESSAGE):
  988. logger.error("❌ 无法发送消息到 Agent")
  989. close_current_agent()
  990. return False
  991. logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...")
  992. # 9. 等待任务完成
  993. completed = wait_for_agent_completion(timeout=timeout)
  994. # 10. 同步已完成的任务到数据库
  995. sync_completed_tasks_to_db()
  996. if completed:
  997. logger.info("✅ Agent 已完成所有任务")
  998. else:
  999. logger.warning("Agent 未能在超时时间内完成所有任务")
  1000. # 11. 关闭 Agent
  1001. if auto_close:
  1002. close_current_agent()
  1003. logger.info("=" * 60)
  1004. logger.info("Agent 会话结束")
  1005. logger.info("=" * 60)
  1006. return completed
  1007. def run_agent_loop(
  1008. interval: int = 300,
  1009. timeout: int = 3600,
  1010. auto_close: bool = True,
  1011. ) -> None:
  1012. """
  1013. Agent 循环模式
  1014. 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止
  1015. """
  1016. global AGENT_SESSION_ACTIVE
  1017. logger.info("=" * 60)
  1018. logger.info("Agent 循环模式已启动")
  1019. logger.info(f"检查间隔: {interval} 秒")
  1020. logger.info(f"任务超时: {timeout} 秒")
  1021. logger.info(f"自动部署: {'已启用' if ENABLE_AUTO_DEPLOY else '已禁用'}")
  1022. logger.info("按 Ctrl+C 停止服务")
  1023. logger.info("=" * 60)
  1024. try:
  1025. while True:
  1026. try:
  1027. logger.info("开始新一轮任务检查...")
  1028. # 1. 同步已完成任务
  1029. sync_completed_tasks_to_db()
  1030. # 2. 从数据库获取 pending 任务
  1031. pending_tasks = get_pending_tasks()
  1032. if pending_tasks:
  1033. logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务")
  1034. # 更新任务状态为 processing
  1035. for task in pending_tasks:
  1036. update_task_status(task["task_id"], "processing")
  1037. # 写入 pending_tasks.json
  1038. write_pending_tasks_json(pending_tasks)
  1039. # 生成执行指令文件
  1040. create_execute_instructions(pending_tasks)
  1041. # 3. 检查是否有 processing 任务
  1042. processing_ids = get_processing_task_ids()
  1043. if processing_ids:
  1044. # 如果有活动的 Agent 会话,不需要重新启动
  1045. if AGENT_SESSION_ACTIVE:
  1046. logger.info(
  1047. f"Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
  1048. )
  1049. else:
  1050. logger.info(f"发现 {len(processing_ids)} 个待处理任务")
  1051. # 更新触发器文件
  1052. update_trigger_file(
  1053. task_count=len(processing_ids),
  1054. status="有待执行任务",
  1055. task_ids=processing_ids,
  1056. )
  1057. # 启动 Agent
  1058. if open_new_agent():
  1059. if type_message_to_agent(AGENT_MESSAGE):
  1060. logger.info("✅ 已启动 Agent 并发送执行提醒")
  1061. # 等待任务完成
  1062. wait_for_agent_completion(timeout=timeout)
  1063. # 同步完成的任务
  1064. sync_completed_tasks_to_db()
  1065. # 关闭 Agent
  1066. if auto_close:
  1067. close_current_agent()
  1068. else:
  1069. logger.warning("发送消息失败")
  1070. close_current_agent()
  1071. else:
  1072. logger.warning("启动 Agent 失败")
  1073. else:
  1074. logger.info("✅ 没有待处理任务")
  1075. logger.info(f"{interval} 秒后将重新检查任务列表...")
  1076. time.sleep(interval)
  1077. except KeyboardInterrupt:
  1078. raise
  1079. except Exception as e:
  1080. logger.error(f"❌ 执行出错: {e}")
  1081. import traceback
  1082. logger.error(traceback.format_exc())
  1083. time.sleep(interval)
  1084. except KeyboardInterrupt:
  1085. # 退出时关闭 Agent
  1086. if AGENT_SESSION_ACTIVE:
  1087. logger.info("正在关闭 Agent...")
  1088. close_current_agent()
  1089. logger.info("\n⛔ 服务已停止")
  1090. # ============================================================================
  1091. # 交互式菜单
  1092. # ============================================================================
  1093. def show_interactive_menu() -> None:
  1094. """显示交互式菜单并执行用户选择的操作"""
  1095. global ENABLE_AUTO_DEPLOY
  1096. while True:
  1097. print("\n" + "=" * 60)
  1098. print("自动任务执行调度脚本 - Agent 模式")
  1099. print("=" * 60)
  1100. print("\n请选择操作模式:\n")
  1101. print(" 1. Agent 单次执行")
  1102. print(" 2. Agent 循环模式")
  1103. print(" 3. Agent 循环模式(禁用部署)")
  1104. print(" 4. 测试生产服务器连接")
  1105. print(" 5. 查看当前任务状态")
  1106. print(" 0. 退出")
  1107. print("\n" + "-" * 60)
  1108. try:
  1109. choice = input("请输入选项 [0-5]: ").strip()
  1110. except (KeyboardInterrupt, EOFError):
  1111. print("\n再见!")
  1112. break
  1113. if choice == "0":
  1114. print("再见!")
  1115. break
  1116. elif choice == "1":
  1117. print("\n启动 Agent 单次执行模式...")
  1118. run_agent_once(timeout=3600, auto_close=True)
  1119. input("\n按 Enter 键返回菜单...")
  1120. elif choice == "2":
  1121. try:
  1122. interval_str = input("请输入检查间隔(秒,默认300): ").strip()
  1123. interval = int(interval_str) if interval_str else 300
  1124. except ValueError:
  1125. interval = 300
  1126. print(f"\n启动 Agent 循环模式,检查间隔: {interval} 秒")
  1127. print("按 Ctrl+C 停止服务并返回菜单\n")
  1128. ENABLE_AUTO_DEPLOY = True
  1129. try:
  1130. run_agent_loop(interval=interval)
  1131. except KeyboardInterrupt:
  1132. print("\n循环已停止")
  1133. elif choice == "3":
  1134. try:
  1135. interval_str = input("请输入检查间隔(秒,默认300): ").strip()
  1136. interval = int(interval_str) if interval_str else 300
  1137. except ValueError:
  1138. interval = 300
  1139. print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒")
  1140. print("按 Ctrl+C 停止服务并返回菜单\n")
  1141. ENABLE_AUTO_DEPLOY = False
  1142. try:
  1143. run_agent_loop(interval=interval)
  1144. except KeyboardInterrupt:
  1145. print("\n循环已停止")
  1146. elif choice == "4":
  1147. print("\n测试生产服务器连接...")
  1148. if test_ssh_connection():
  1149. print("✅ 连接测试成功")
  1150. else:
  1151. print("❌ 连接测试失败")
  1152. input("\n按 Enter 键返回菜单...")
  1153. elif choice == "5":
  1154. print("\n当前任务状态:")
  1155. print("-" * 40)
  1156. # 从数据库获取 pending 任务
  1157. pending_tasks = get_pending_tasks()
  1158. print(f" 数据库中 pending 任务: {len(pending_tasks)} 个")
  1159. for task in pending_tasks:
  1160. print(f" - [{task['task_id']}] {task['task_name']}")
  1161. # 从本地文件获取 processing 任务
  1162. processing_ids = get_processing_task_ids()
  1163. print(f" 本地 processing 任务: {len(processing_ids)} 个")
  1164. if processing_ids:
  1165. print(f" 任务 ID: {processing_ids}")
  1166. input("\n按 Enter 键返回菜单...")
  1167. else:
  1168. print("❌ 无效的选项,请重新选择")
  1169. # ============================================================================
  1170. # 主函数
  1171. # ============================================================================
  1172. def main() -> None:
  1173. """主函数"""
  1174. parser = argparse.ArgumentParser(
  1175. description="自动任务执行调度脚本 (Agent 模式)",
  1176. formatter_class=argparse.RawDescriptionHelpFormatter,
  1177. epilog="""
  1178. 示例:
  1179. # Agent 单次执行
  1180. python scripts/auto_execute_tasks.py --agent-run
  1181. # Agent 循环模式
  1182. python scripts/auto_execute_tasks.py --agent-loop
  1183. # Agent 循环模式 + 禁用自动部署
  1184. python scripts/auto_execute_tasks.py --agent-loop --no-deploy
  1185. # 设置 Agent 超时时间
  1186. python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
  1187. # 立即部署指定任务到生产服务器
  1188. python scripts/auto_execute_tasks.py --deploy-now 123
  1189. # 测试生产服务器连接
  1190. python scripts/auto_execute_tasks.py --test-connection
  1191. """,
  1192. )
  1193. # Agent 模式参数
  1194. parser.add_argument(
  1195. "--agent-run",
  1196. action="store_true",
  1197. help="Agent 单次执行模式",
  1198. )
  1199. parser.add_argument(
  1200. "--agent-loop",
  1201. action="store_true",
  1202. help="Agent 循环模式",
  1203. )
  1204. parser.add_argument(
  1205. "--agent-timeout",
  1206. type=int,
  1207. default=3600,
  1208. help="Agent 等待任务完成的超时时间(秒),默认 3600",
  1209. )
  1210. parser.add_argument(
  1211. "--interval",
  1212. type=int,
  1213. default=300,
  1214. help="循环模式检查间隔(秒),默认 300",
  1215. )
  1216. parser.add_argument(
  1217. "--no-auto-close",
  1218. action="store_true",
  1219. help="任务完成后不自动关闭 Agent",
  1220. )
  1221. # 部署相关参数
  1222. parser.add_argument(
  1223. "--no-deploy",
  1224. action="store_true",
  1225. help="禁用自动部署功能",
  1226. )
  1227. parser.add_argument(
  1228. "--deploy-now",
  1229. type=str,
  1230. metavar="TASK_ID",
  1231. help="立即部署指定任务ID的脚本到生产服务器",
  1232. )
  1233. parser.add_argument(
  1234. "--test-connection",
  1235. action="store_true",
  1236. help="测试到生产服务器的 SSH 连接",
  1237. )
  1238. args = parser.parse_args()
  1239. global ENABLE_AUTO_DEPLOY
  1240. ENABLE_AUTO_DEPLOY = not args.no_deploy
  1241. auto_close = not args.no_auto_close
  1242. # 测试 SSH 连接
  1243. if args.test_connection:
  1244. if test_ssh_connection():
  1245. logger.info("✅ 连接测试成功")
  1246. else:
  1247. logger.error("❌ 连接测试失败")
  1248. return
  1249. # 立即部署指定任务
  1250. if args.deploy_now:
  1251. try:
  1252. task_id = int(args.deploy_now)
  1253. logger.info(f"开始部署任务 {task_id}...")
  1254. # 从 pending_tasks.json 查找任务信息
  1255. if PENDING_TASKS_FILE.exists():
  1256. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  1257. tasks = json.load(f)
  1258. task_found = None
  1259. for t in tasks:
  1260. if t.get("task_id") == task_id:
  1261. task_found = t
  1262. break
  1263. if task_found:
  1264. if auto_deploy_completed_task(task_found):
  1265. logger.info(f"✅ 任务 {task_id} 部署成功")
  1266. else:
  1267. logger.error(f"❌ 任务 {task_id} 部署失败")
  1268. else:
  1269. logger.error(f"未找到任务 {task_id}")
  1270. else:
  1271. logger.error("pending_tasks.json 文件不存在")
  1272. except ValueError:
  1273. logger.error(f"无效的任务ID: {args.deploy_now}")
  1274. return
  1275. # Agent 单次执行
  1276. if args.agent_run:
  1277. success = run_agent_once(
  1278. timeout=args.agent_timeout,
  1279. auto_close=auto_close,
  1280. )
  1281. if success:
  1282. logger.info("✅ Agent 单次执行完成")
  1283. else:
  1284. logger.error("❌ Agent 单次执行失败")
  1285. return
  1286. # Agent 循环模式
  1287. if args.agent_loop:
  1288. run_agent_loop(
  1289. interval=args.interval,
  1290. timeout=args.agent_timeout,
  1291. auto_close=auto_close,
  1292. )
  1293. return
  1294. # 没有指定任何模式参数时,显示交互式菜单
  1295. if len(sys.argv) == 1:
  1296. show_interactive_menu()
  1297. else:
  1298. # 显示帮助信息
  1299. parser.print_help()
  1300. if __name__ == "__main__":
  1301. main()