| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626 |
- #!/usr/bin/env python3
- """
- 自动任务执行核心调度脚本 (Agent 模式)
- 工作流程:
- 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
- 2. 生成 tasks/task_execute_instructions.md 执行指令文件
- 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
- 4. 更新 tasks/task_trigger.txt 触发器文件
- 5. 启动新的 Cursor Agent 并发送执行指令
- 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
- 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
- 使用方式:
- # Agent 单次执行(执行一次任务后退出)
- python scripts/auto_execute_tasks.py --agent-run
- # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务)
- python scripts/auto_execute_tasks.py --agent-loop
- # Agent 循环模式 + 禁用自动部署
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
- # 设置 Agent 超时时间(默认 3600 秒)
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
- # 任务完成后不自动关闭 Agent
- python scripts/auto_execute_tasks.py --agent-run --no-auto-close
- # 立即部署指定任务ID的脚本到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试到生产服务器的 SSH 连接
- python scripts/auto_execute_tasks.py --test-connection
- """
- from __future__ import annotations
- import argparse
- import json
- import logging
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- # ============================================================================
- # 日志配置
- # ============================================================================
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ============================================================================
- # Windows GUI 自动化依赖(可选)
- # ============================================================================
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- try:
- import pyautogui
- import win32con
- import win32gui
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- HAS_CURSOR_GUI = True
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- pass
- except ImportError:
- logger.info(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
- "将禁用自动 Cursor Agent 功能。"
- )
- # ============================================================================
- # 全局配置
- # ============================================================================
- WORKSPACE_ROOT = Path(__file__).parent.parent
- TASKS_DIR = WORKSPACE_ROOT / "tasks"
- PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
- INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
- TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
- # 生产服务器配置
- PRODUCTION_SERVER = {
- "host": "192.168.3.143",
- "port": 22,
- "username": "ubuntu",
- "password": "citumxl2357",
- "script_path": "/opt/dataops-platform/datafactory/scripts",
- "workflow_path": "/opt/dataops-platform/n8n/workflows",
- }
- # Agent 消息模板
- AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
- # 命令行参数控制的全局变量
- ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
- # ============================================================================
- # 数据库操作
- # ============================================================================
- def get_db_connection():
- """获取数据库连接(使用 production 环境配置)"""
- try:
- from urllib.parse import urlparse
- import psycopg2
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的数据库配置
- app_config = config["production"]
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
- # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式
- parsed = urlparse(db_uri)
- conn = psycopg2.connect(
- host=parsed.hostname,
- port=parsed.port or 5432,
- database=parsed.path.lstrip("/"),
- user=parsed.username,
- password=parsed.password,
- )
- logger.debug(
- f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}"
- )
- return conn
- except ImportError as e:
- logger.error(f"导入依赖失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return None
- def get_pending_tasks() -> list[dict[str, Any]]:
- """
- 从 PostgreSQL task_list 表获取所有 pending 状态的任务
- 重要:此函数直接查询数据库,确保获取最新的任务列表
- """
- try:
- from psycopg2.extras import RealDictCursor
- logger.info("📡 正在连接数据库...")
- conn = get_db_connection()
- if not conn:
- logger.error("❌ 无法获取数据库连接")
- return []
- logger.info("✅ 数据库连接成功,正在查询 pending 任务...")
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute(
- """
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """
- )
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- task_list = [dict(task) for task in tasks]
- logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务")
- if task_list:
- for task in task_list:
- logger.info(f" - 任务 {task['task_id']}: {task['task_name']}")
- return task_list
- except Exception as e:
- logger.error(f"获取 pending 任务失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return []
- def update_task_status(
- task_id: int,
- status: str,
- code_name: str | None = None,
- code_path: str | None = None,
- ) -> bool:
- """更新任务状态"""
- try:
- conn = get_db_connection()
- if not conn:
- return False
- cursor = conn.cursor()
- if code_name and code_path:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, code_name, code_path, task_id),
- )
- else:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, task_id),
- )
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- return updated
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ============================================================================
- # 触发器文件管理
- # ============================================================================
- def update_trigger_file(
- task_count: int = 0,
- status: str = "有待执行任务",
- task_ids: list[int] | None = None,
- ) -> None:
- """更新触发器文件 tasks/task_trigger.txt"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
- content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
- 生成时间: {gen_time}
- 状态: {status}
- 待处理任务数: {task_count}
- 任务ID列表: [{task_ids_str}]
- 此文件用于触发Cursor自动执行任务。
- 【重要】Cursor AI 请立即执行以下操作:
- 1. 读取 tasks/task_execute_instructions.md 文件
- 2. 按照文件中的任务描述,依次完成每个任务
- 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
- """
- with TRIGGER_FILE.open("w", encoding="utf-8") as f:
- f.write(content)
- logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
- def get_processing_task_ids() -> list[int]:
- """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
- if not PENDING_TASKS_FILE.exists():
- return []
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- return [
- t.get("task_id")
- for t in tasks
- if t.get("status") == "processing" and t.get("task_id")
- ]
- except Exception:
- return []
- # ============================================================================
- # 任务文件生成
- # ============================================================================
- def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
- """将任务列表写入 tasks/pending_tasks.json"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- # 读取现有任务
- existing_tasks = []
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- existing_tasks = json.load(f)
- except Exception:
- existing_tasks = []
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
- # 添加新任务
- for task in tasks:
- if task["task_id"] not in existing_ids:
- task_info = {
- "task_id": task["task_id"],
- "task_name": task["task_name"],
- "code_path": task.get("code_path", ""),
- "code_name": task.get("code_name", ""),
- "status": "processing",
- "notified_at": datetime.now().isoformat(),
- "code_file": task.get("code_file", ""),
- }
- existing_tasks.append(task_info)
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
- def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
- """生成任务执行指令文件 tasks/task_execute_instructions.md"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# Cursor 自动任务执行指令\n\n")
- f.write("**重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("## 任务约束要求\n\n")
- f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
- f.write("- 不要创建任何 summary、report、总结类的文档文件\n")
- f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n")
- f.write("- 只需创建任务要求的功能脚本文件\n")
- f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task["task_id"]
- task_name = task["task_name"]
- task_desc = task["task_description"]
- create_time = task.get("create_time", "")
- if hasattr(create_time, "strftime"):
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"## 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **创建时间**: {create_time}\n")
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
- f.write(f"### 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
- # ============================================================================
- # Neo4j 独立连接(不依赖 Flask 应用上下文)
- # ============================================================================
- def get_neo4j_driver():
- """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
- try:
- from neo4j import GraphDatabase
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的配置
- app_config = config["production"]
- uri = app_config.NEO4J_URI
- user = app_config.NEO4J_USER
- password = app_config.NEO4J_PASSWORD
- driver = GraphDatabase.driver(uri, auth=(user, password))
- return driver
- except ImportError as e:
- logger.error(f"导入 Neo4j 驱动失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接 Neo4j 失败: {e}")
- return None
- # ============================================================================
- # 状态同步
- # ============================================================================
- def extract_dataflow_name_from_task(task_id: int) -> str | None:
- """从任务描述中提取 DataFlow 名称"""
- import re
- try:
- conn = get_db_connection()
- if not conn:
- return None
- cursor = conn.cursor()
- cursor.execute(
- "SELECT task_description FROM task_list WHERE task_id = %s",
- (task_id,),
- )
- result = cursor.fetchone()
- cursor.close()
- conn.close()
- if not result:
- return None
- task_desc = result[0]
- # 从任务描述中提取 DataFlow Name
- match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc)
- if match:
- dataflow_name = match.group(1).strip()
- logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}")
- return dataflow_name
- return None
- except Exception as e:
- logger.error(f"提取 DataFlow 名称失败: {e}")
- return None
- def update_dataflow_script_path(
- task_name: str, script_path: str, task_id: int | None = None
- ) -> bool:
- """更新 DataFlow 节点的 script_path 字段"""
- try:
- driver = get_neo4j_driver()
- if not driver:
- logger.error("无法获取 Neo4j 驱动")
- return False
- # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称
- dataflow_name = task_name
- if task_id:
- extracted_name = extract_dataflow_name_from_task(task_id)
- if extracted_name:
- dataflow_name = extracted_name
- logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}")
- query = """
- MATCH (n:DataFlow {name_zh: $name_zh})
- SET n.script_path = $script_path, n.updated_at = $updated_at
- RETURN n
- """
- updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- with driver.session() as session:
- result = session.run(
- query,
- name_zh=dataflow_name,
- script_path=script_path,
- updated_at=updated_at,
- ).single()
- driver.close()
- if result:
- logger.info(
- f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
- )
- return True
- else:
- logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
- return False
- except Exception as e:
- logger.error(f"更新 DataFlow script_path 失败: {e}")
- return False
- def sync_completed_tasks_to_db() -> int:
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as e:
- logger.error(f"读取 pending_tasks.json 失败: {e}")
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- remaining_tasks = []
- for t in tasks:
- if t.get("status") == "completed":
- task_id = t.get("task_id")
- if not task_id:
- continue
- task_name = t.get("task_name")
- code_name = t.get("code_name")
- code_path = t.get("code_path")
- # 统一处理:code_path 始终为 "datafactory/scripts"
- code_path = "datafactory/scripts"
- # 只处理 Python 脚本文件
- is_python_script = code_name and code_name.endswith(".py")
- if is_python_script:
- logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_name}")
- else:
- logger.info(
- f"任务 {task_id} 的 code_name ({code_name}) 不是 Python 脚本,跳过 DataFlow 更新"
- )
- if update_task_status(task_id, "completed", code_name, code_path):
- updated += 1
- logger.info(f"已同步任务 {task_id} 为 completed")
- # 只有 Python 脚本才更新 DataFlow 节点的 script_path
- if task_name and is_python_script:
- full_script_path = f"{code_path}/{code_name}"
- if update_dataflow_script_path(
- task_name, full_script_path, task_id=task_id
- ):
- logger.info(
- f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
- )
- else:
- logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
- # 自动部署到生产服务器(如果启用)
- if ENABLE_AUTO_DEPLOY:
- logger.info(f"开始自动部署任务 {task_id} 到生产服务器...")
- if auto_deploy_completed_task(t):
- logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
- else:
- logger.warning(f"任务 {task_id} 部署到生产服务器失败")
- else:
- logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署")
- else:
- remaining_tasks.append(t)
- else:
- remaining_tasks.append(t)
- if updated > 0:
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
- return updated
- # ============================================================================
- # 生产服务器部署功能
- # ============================================================================
- def get_ssh_connection():
- """获取 SSH 连接到生产服务器"""
- try:
- import paramiko # type: ignore
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- logger.info(
- f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
- f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
- )
- ssh.connect(
- hostname=PRODUCTION_SERVER["host"],
- port=PRODUCTION_SERVER["port"],
- username=PRODUCTION_SERVER["username"],
- password=PRODUCTION_SERVER["password"],
- timeout=10,
- )
- logger.info("✅ SSH 连接成功")
- return ssh
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return None
- except Exception as e:
- logger.error(f"SSH 连接失败: {e}")
- return None
- def test_ssh_connection() -> bool:
- """测试 SSH 连接到生产服务器"""
- logger.info("=" * 60)
- logger.info("测试生产服务器连接")
- logger.info("=" * 60)
- ssh = get_ssh_connection()
- if not ssh:
- logger.error("❌ SSH 连接测试失败")
- return False
- try:
- # 测试执行命令
- _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
- output = stdout.read().decode().strip()
- logger.info(f"✅ 命令执行成功: {output}")
- # 检查目标目录是否存在
- _, stdout, _ = ssh.exec_command(
- f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
- )
- result = stdout.read().decode().strip()
- if result == "exists":
- logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
- else:
- logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
- logger.info("将在首次部署时自动创建")
- ssh.close()
- logger.info("=" * 60)
- logger.info("✅ 连接测试完成")
- logger.info("=" * 60)
- return True
- except Exception as e:
- logger.error(f"❌ 测试执行命令失败: {e}")
- ssh.close()
- return False
- def deploy_script_to_production(
- local_script_path: str, remote_filename: str | None = None
- ) -> bool:
- """部署脚本文件到生产服务器"""
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- # 转换为绝对路径
- local_path = Path(local_script_path)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"本地文件不存在: {local_path}")
- return False
- # 确定远程文件名
- if not remote_filename:
- remote_filename = local_path.name
- remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["script_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['script_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传文件
- logger.info(f"正在上传: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- # 设置文件权限为可执行
- sftp.chmod(remote_path, 0o755)
- logger.info(f"✅ 脚本部署成功: {remote_path}")
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.error(f"文件传输失败: {e}")
- ssh.close()
- return False
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- except Exception as e:
- logger.error(f"部署脚本失败: {e}")
- return False
- def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
- """
- 部署 n8n 工作流到 n8n 服务器
- 此函数执行两个步骤:
- 1. 通过 n8n API 创建工作流(主要步骤)
- 2. 通过 SFTP 备份工作流文件到生产服务器(可选)
- """
- try:
- import json
- import requests
- # 转换为绝对路径
- local_path = Path(workflow_file)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"工作流文件不存在: {local_path}")
- return False
- # 加载工作流 JSON
- with open(local_path, encoding="utf-8") as f:
- workflow_data = json.load(f)
- workflow_name = workflow_data.get("name", local_path.stem)
- logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
- # 获取 n8n API 配置
- try:
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import BaseConfig
- api_url = BaseConfig.N8N_API_URL
- api_key = BaseConfig.N8N_API_KEY
- timeout = BaseConfig.N8N_API_TIMEOUT
- except (ImportError, AttributeError):
- import os
- api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
- api_key = os.environ.get("N8N_API_KEY", "")
- timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
- if not api_key:
- logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
- return False
- # 准备 API 请求
- headers = {
- "X-N8N-API-KEY": api_key,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
- workflow_payload = {
- "name": workflow_name,
- "nodes": workflow_data.get("nodes", []),
- "connections": workflow_data.get("connections", {}),
- "settings": workflow_data.get("settings", {}),
- }
- # 调用 n8n API 创建工作流
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
- logger.info(f"调用 n8n API: {create_url}")
- try:
- response = requests.post(
- create_url,
- headers=headers,
- json=workflow_payload,
- timeout=timeout,
- )
- if response.status_code == 401:
- logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
- return False
- elif response.status_code == 403:
- logger.error("n8n API 权限不足")
- return False
- response.raise_for_status()
- created_workflow = response.json()
- workflow_id = created_workflow.get("id")
- logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
- # 可选:将工作流文件备份到生产服务器
- try:
- _backup_workflow_to_server(local_path)
- except Exception as backup_error:
- logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}")
- return True
- except requests.exceptions.Timeout:
- logger.error("n8n API 请求超时,请检查网络连接")
- return False
- except requests.exceptions.ConnectionError:
- logger.error(f"无法连接到 n8n 服务器: {api_url}")
- return False
- except requests.exceptions.HTTPError as e:
- error_detail = ""
- try:
- error_detail = e.response.json()
- except Exception:
- error_detail = e.response.text
- logger.error(
- f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
- )
- return False
- except Exception as e:
- logger.error(f"部署工作流失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def _backup_workflow_to_server(local_path: Path) -> bool:
- """备份工作流文件到生产服务器(通过 SFTP)"""
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.debug("未安装 paramiko 库,跳过文件备份")
- return False
- remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["workflow_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传工作流文件
- logger.debug(f"备份工作流文件: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.warning(f"工作流文件备份失败: {e}")
- ssh.close()
- return False
- except Exception as e:
- logger.warning(f"备份工作流失败: {e}")
- return False
- def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
- """自动部署已完成任务的脚本和工作流到生产服务器"""
- code_name = task_info.get("code_name")
- code_path = task_info.get("code_path")
- task_name = task_info.get("task_name", "未知任务")
- if not code_name or not code_path:
- logger.warning(f"任务 {task_name} 缺少代码文件信息,跳过部署")
- return False
- logger.info("=" * 60)
- logger.info(f"开始自动部署任务: {task_name}")
- logger.info("=" * 60)
- deploy_success = True
- # 1. 部署 Python 脚本
- if code_name.endswith(".py"):
- script_path = f"{code_path}/{code_name}"
- logger.info(f"部署 Python 脚本: {script_path}")
- if deploy_script_to_production(script_path):
- logger.info(f"✅ 脚本 {code_name} 部署成功")
- else:
- logger.error(f"❌ 脚本 {code_name} 部署失败")
- deploy_success = False
- # 2. 查找并部署相关的 n8n 工作流文件
- workflow_files = []
- # 查找模式1: 与脚本同目录的工作流文件
- script_dir = WORKSPACE_ROOT / code_path
- if script_dir.exists() and script_dir.is_dir():
- for wf_file in script_dir.glob("n8n_workflow_*.json"):
- if wf_file.is_file():
- workflow_files.append(wf_file)
- # 查找模式2: datafactory/n8n_workflows 目录
- n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
- if n8n_workflows_dir.exists():
- for wf_file in n8n_workflows_dir.glob("*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- workflow_files.append(wf_file)
- # 查找模式3: 根据任务名称匹配工作流文件
- if task_name and task_name != "未知任务":
- task_name_pattern = task_name.replace(" ", "_").lower()
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
- f"*{task_name_pattern}*.json"
- ):
- if (
- wf_file.is_file()
- and "n8n" in wf_file.name.lower()
- and wf_file not in workflow_files
- ):
- workflow_files.append(wf_file)
- if workflow_files:
- logger.info(f"发现 {len(workflow_files)} 个工作流文件")
- for wf_file in workflow_files:
- logger.info(f"部署工作流: {wf_file.name}")
- if deploy_n8n_workflow_to_production(str(wf_file)):
- logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
- else:
- logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
- deploy_success = False
- else:
- logger.info("未发现相关工作流文件")
- logger.info("=" * 60)
- if deploy_success:
- logger.info(f"✅ 任务 {task_name} 部署完成")
- else:
- logger.warning(f"任务 {task_name} 部署过程中出现错误")
- logger.info("=" * 60)
- return deploy_success
- # ============================================================================
- # Cursor Agent 自动化
- # ============================================================================
- # Agent 会话状态
- AGENT_SESSION_ACTIVE: bool = False
- AGENT_START_TIME: float = 0
- def get_all_cursor_windows() -> list[dict[str, Any]]:
- """获取所有 Cursor 窗口信息"""
- if not HAS_CURSOR_GUI:
- return []
- cursor_windows: list[dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = "cursor" in title.lower()
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- area = (right - left) * (bottom - top)
- cursor_windows.append(
- {
- "hwnd": hwnd,
- "title": title,
- "class_name": class_name,
- "area": area,
- }
- )
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- return cursor_windows
- def find_cursor_window() -> int | None:
- """查找 Cursor 主窗口句柄"""
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows = get_all_cursor_windows()
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- # 按面积排序,返回最大的窗口(主窗口)
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- return cursor_windows[0]["hwnd"]
- def activate_window(hwnd: int) -> bool:
- """激活指定窗口"""
- if not HAS_CURSOR_GUI:
- return False
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.3)
- win32gui.SetForegroundWindow(hwnd)
- time.sleep(0.5)
- return True
- except Exception as e:
- logger.error(f"激活窗口失败: {e}")
- return False
- def open_new_agent() -> bool:
- """在 Cursor 中打开新的 Agent 窗口"""
- global AGENT_SESSION_ACTIVE, AGENT_START_TIME
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- try:
- # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
- logger.info("正在打开新的 Agent...")
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(2.0) # 等待 Agent 窗口打开
- AGENT_SESSION_ACTIVE = True
- AGENT_START_TIME = time.time()
- logger.info("✅ 新的 Agent 已打开")
- return True
- except Exception as e:
- logger.error(f"打开 Agent 失败: {e}")
- return False
- def close_current_agent() -> bool:
- """关闭当前的 Agent 会话"""
- global AGENT_SESSION_ACTIVE
- if not HAS_CURSOR_GUI:
- return False
- if not AGENT_SESSION_ACTIVE:
- logger.info("没有活动的 Agent 会话")
- return True
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- try:
- logger.info("正在关闭 Agent...")
- # 按 Escape 键关闭 Agent
- pyautogui.press("escape")
- time.sleep(0.5)
- # 再按一次确保关闭
- pyautogui.press("escape")
- time.sleep(0.3)
- AGENT_SESSION_ACTIVE = False
- logger.info("✅ Agent 已关闭")
- return True
- except Exception as e:
- logger.error(f"关闭 Agent 失败: {e}")
- return False
- def type_message_to_agent(message: str) -> bool:
- """向 Agent 输入消息"""
- if not HAS_CURSOR_GUI:
- return False
- try:
- # 等待 Agent 输入框获得焦点
- time.sleep(0.5)
- # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- # 回退到逐字符输入
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- # 按 Enter 发送消息
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Agent")
- return True
- except Exception as e:
- logger.error(f"发送消息到 Agent 失败: {e}")
- return False
- def wait_for_agent_completion(
- timeout: int = 3600,
- check_interval: int = 30,
- ) -> bool:
- """
- 等待 Agent 完成任务
- 通过检查 pending_tasks.json 中的任务状态来判断是否完成
- """
- start_time = time.time()
- logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...")
- while time.time() - start_time < timeout:
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- elapsed = int(time.time() - start_time)
- logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
- return True
- remaining = len(processing_ids)
- elapsed = int(time.time() - start_time)
- logger.info(
- f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
- )
- time.sleep(check_interval)
- logger.warning("等待超时,仍有未完成的任务")
- return False
- def run_agent_once(
- timeout: int = 3600,
- auto_close: bool = True,
- ) -> bool:
- """
- 执行一次 Agent 任务
- 流程:
- 1. 同步已完成任务到数据库
- 2. 从数据库读取 pending 任务
- 3. 更新任务状态为 processing
- 4. 生成执行指令文件
- 5. 打开 Agent 并发送消息
- 6. 等待任务完成
- 7. 同步完成任务 + 自动部署
- 8. 关闭 Agent
- """
- logger.info("=" * 60)
- logger.info("Agent 单次执行模式")
- logger.info("=" * 60)
- # 1. 先同步已完成任务
- sync_completed_tasks_to_db()
- # 2. 从数据库获取 pending 任务
- logger.info("正在从数据库查询 pending 任务...")
- pending_tasks = get_pending_tasks()
- # 3. 检查是否有任务需要执行
- if not pending_tasks:
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("✅ 没有待执行的任务")
- return True
- logger.info(f"发现 {len(processing_ids)} 个 processing 任务,继续执行")
- else:
- logger.info(f"发现 {len(pending_tasks)} 个 pending 任务")
- # 4. 更新任务状态为 processing
- for task in pending_tasks:
- update_task_status(task["task_id"], "processing")
- # 5. 写入 pending_tasks.json
- write_pending_tasks_json(pending_tasks)
- # 6. 生成执行指令文件
- create_execute_instructions(pending_tasks)
- # 7. 更新触发器文件
- all_processing_ids = get_processing_task_ids()
- if all_processing_ids:
- update_trigger_file(
- task_count=len(all_processing_ids),
- status="有待执行任务",
- task_ids=all_processing_ids,
- )
- # 8. 打开 Agent 并发送消息
- if not open_new_agent():
- logger.error("❌ 无法打开 Agent")
- return False
- if not type_message_to_agent(AGENT_MESSAGE):
- logger.error("❌ 无法发送消息到 Agent")
- close_current_agent()
- return False
- logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...")
- # 9. 等待任务完成
- completed = wait_for_agent_completion(timeout=timeout)
- # 10. 同步已完成的任务到数据库
- sync_completed_tasks_to_db()
- if completed:
- logger.info("✅ Agent 已完成所有任务")
- else:
- logger.warning("Agent 未能在超时时间内完成所有任务")
- # 11. 关闭 Agent
- if auto_close:
- close_current_agent()
- logger.info("=" * 60)
- logger.info("Agent 会话结束")
- logger.info("=" * 60)
- return completed
- def run_agent_loop(
- interval: int = 300,
- timeout: int = 3600,
- auto_close: bool = True,
- ) -> None:
- """
- Agent 循环模式
- 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止
- """
- global AGENT_SESSION_ACTIVE
- logger.info("=" * 60)
- logger.info("Agent 循环模式已启动")
- logger.info(f"检查间隔: {interval} 秒")
- logger.info(f"任务超时: {timeout} 秒")
- logger.info(f"自动部署: {'已启用' if ENABLE_AUTO_DEPLOY else '已禁用'}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- try:
- while True:
- try:
- logger.info("开始新一轮任务检查...")
- # 1. 同步已完成任务
- sync_completed_tasks_to_db()
- # 2. 从数据库获取 pending 任务
- pending_tasks = get_pending_tasks()
- if pending_tasks:
- logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务")
- # 更新任务状态为 processing
- for task in pending_tasks:
- update_task_status(task["task_id"], "processing")
- # 写入 pending_tasks.json
- write_pending_tasks_json(pending_tasks)
- # 生成执行指令文件
- create_execute_instructions(pending_tasks)
- # 3. 检查是否有 processing 任务
- processing_ids = get_processing_task_ids()
- if processing_ids:
- # 如果有活动的 Agent 会话,不需要重新启动
- if AGENT_SESSION_ACTIVE:
- logger.info(
- f"Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
- )
- else:
- logger.info(f"发现 {len(processing_ids)} 个待处理任务")
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- # 启动 Agent
- if open_new_agent():
- if type_message_to_agent(AGENT_MESSAGE):
- logger.info("✅ 已启动 Agent 并发送执行提醒")
- # 等待任务完成
- wait_for_agent_completion(timeout=timeout)
- # 同步完成的任务
- sync_completed_tasks_to_db()
- # 关闭 Agent
- if auto_close:
- close_current_agent()
- else:
- logger.warning("发送消息失败")
- close_current_agent()
- else:
- logger.warning("启动 Agent 失败")
- else:
- logger.info("✅ 没有待处理任务")
- logger.info(f"{interval} 秒后将重新检查任务列表...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- import traceback
- logger.error(traceback.format_exc())
- time.sleep(interval)
- except KeyboardInterrupt:
- # 退出时关闭 Agent
- if AGENT_SESSION_ACTIVE:
- logger.info("正在关闭 Agent...")
- close_current_agent()
- logger.info("\n⛔ 服务已停止")
- # ============================================================================
- # 交互式菜单
- # ============================================================================
- def show_interactive_menu() -> None:
- """显示交互式菜单并执行用户选择的操作"""
- global ENABLE_AUTO_DEPLOY
- while True:
- print("\n" + "=" * 60)
- print("自动任务执行调度脚本 - Agent 模式")
- print("=" * 60)
- print("\n请选择操作模式:\n")
- print(" 1. Agent 单次执行")
- print(" 2. Agent 循环模式")
- print(" 3. Agent 循环模式(禁用部署)")
- print(" 4. 测试生产服务器连接")
- print(" 5. 查看当前任务状态")
- print(" 0. 退出")
- print("\n" + "-" * 60)
- try:
- choice = input("请输入选项 [0-5]: ").strip()
- except (KeyboardInterrupt, EOFError):
- print("\n再见!")
- break
- if choice == "0":
- print("再见!")
- break
- elif choice == "1":
- print("\n启动 Agent 单次执行模式...")
- run_agent_once(timeout=3600, auto_close=True)
- input("\n按 Enter 键返回菜单...")
- elif choice == "2":
- try:
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
- interval = int(interval_str) if interval_str else 300
- except ValueError:
- interval = 300
- print(f"\n启动 Agent 循环模式,检查间隔: {interval} 秒")
- print("按 Ctrl+C 停止服务并返回菜单\n")
- ENABLE_AUTO_DEPLOY = True
- try:
- run_agent_loop(interval=interval)
- except KeyboardInterrupt:
- print("\n循环已停止")
- elif choice == "3":
- try:
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
- interval = int(interval_str) if interval_str else 300
- except ValueError:
- interval = 300
- print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒")
- print("按 Ctrl+C 停止服务并返回菜单\n")
- ENABLE_AUTO_DEPLOY = False
- try:
- run_agent_loop(interval=interval)
- except KeyboardInterrupt:
- print("\n循环已停止")
- elif choice == "4":
- print("\n测试生产服务器连接...")
- if test_ssh_connection():
- print("✅ 连接测试成功")
- else:
- print("❌ 连接测试失败")
- input("\n按 Enter 键返回菜单...")
- elif choice == "5":
- print("\n当前任务状态:")
- print("-" * 40)
- # 从数据库获取 pending 任务
- pending_tasks = get_pending_tasks()
- print(f" 数据库中 pending 任务: {len(pending_tasks)} 个")
- for task in pending_tasks:
- print(f" - [{task['task_id']}] {task['task_name']}")
- # 从本地文件获取 processing 任务
- processing_ids = get_processing_task_ids()
- print(f" 本地 processing 任务: {len(processing_ids)} 个")
- if processing_ids:
- print(f" 任务 ID: {processing_ids}")
- input("\n按 Enter 键返回菜单...")
- else:
- print("❌ 无效的选项,请重新选择")
- # ============================================================================
- # 主函数
- # ============================================================================
- def main() -> None:
- """主函数"""
- parser = argparse.ArgumentParser(
- description="自动任务执行调度脚本 (Agent 模式)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # Agent 单次执行
- python scripts/auto_execute_tasks.py --agent-run
- # Agent 循环模式
- python scripts/auto_execute_tasks.py --agent-loop
- # Agent 循环模式 + 禁用自动部署
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
- # 设置 Agent 超时时间
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
- # 立即部署指定任务到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试生产服务器连接
- python scripts/auto_execute_tasks.py --test-connection
- """,
- )
- # Agent 模式参数
- parser.add_argument(
- "--agent-run",
- action="store_true",
- help="Agent 单次执行模式",
- )
- parser.add_argument(
- "--agent-loop",
- action="store_true",
- help="Agent 循环模式",
- )
- parser.add_argument(
- "--agent-timeout",
- type=int,
- default=3600,
- help="Agent 等待任务完成的超时时间(秒),默认 3600",
- )
- parser.add_argument(
- "--interval",
- type=int,
- default=300,
- help="循环模式检查间隔(秒),默认 300",
- )
- parser.add_argument(
- "--no-auto-close",
- action="store_true",
- help="任务完成后不自动关闭 Agent",
- )
- # 部署相关参数
- parser.add_argument(
- "--no-deploy",
- action="store_true",
- help="禁用自动部署功能",
- )
- parser.add_argument(
- "--deploy-now",
- type=str,
- metavar="TASK_ID",
- help="立即部署指定任务ID的脚本到生产服务器",
- )
- parser.add_argument(
- "--test-connection",
- action="store_true",
- help="测试到生产服务器的 SSH 连接",
- )
- args = parser.parse_args()
- global ENABLE_AUTO_DEPLOY
- ENABLE_AUTO_DEPLOY = not args.no_deploy
- auto_close = not args.no_auto_close
- # 测试 SSH 连接
- if args.test_connection:
- if test_ssh_connection():
- logger.info("✅ 连接测试成功")
- else:
- logger.error("❌ 连接测试失败")
- return
- # 立即部署指定任务
- if args.deploy_now:
- try:
- task_id = int(args.deploy_now)
- logger.info(f"开始部署任务 {task_id}...")
- # 从 pending_tasks.json 查找任务信息
- if PENDING_TASKS_FILE.exists():
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- task_found = None
- for t in tasks:
- if t.get("task_id") == task_id:
- task_found = t
- break
- if task_found:
- if auto_deploy_completed_task(task_found):
- logger.info(f"✅ 任务 {task_id} 部署成功")
- else:
- logger.error(f"❌ 任务 {task_id} 部署失败")
- else:
- logger.error(f"未找到任务 {task_id}")
- else:
- logger.error("pending_tasks.json 文件不存在")
- except ValueError:
- logger.error(f"无效的任务ID: {args.deploy_now}")
- return
- # Agent 单次执行
- if args.agent_run:
- success = run_agent_once(
- timeout=args.agent_timeout,
- auto_close=auto_close,
- )
- if success:
- logger.info("✅ Agent 单次执行完成")
- else:
- logger.error("❌ Agent 单次执行失败")
- return
- # Agent 循环模式
- if args.agent_loop:
- run_agent_loop(
- interval=args.interval,
- timeout=args.agent_timeout,
- auto_close=auto_close,
- )
- return
- # 没有指定任何模式参数时,显示交互式菜单
- if len(sys.argv) == 1:
- show_interactive_menu()
- else:
- # 显示帮助信息
- parser.print_help()
- if __name__ == "__main__":
- main()
|