| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743 |
- #!/usr/bin/env python3
- """
- 自动任务执行核心调度脚本
- 工作流程:
- 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
- 2. 生成 tasks/task_execute_instructions.md 执行指令文件
- 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
- 4. 更新 tasks/task_trigger.txt 触发器文件
- 5. 启动新的 Cursor Agent 并发送执行指令
- 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
- 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
- 使用方式:
- # 执行一次任务检查
- python scripts/auto_execute_tasks.py --once
- # 循环模式(每 5 分钟检查)
- python scripts/auto_execute_tasks.py --interval 300
- # 刷新触发器文件(用于现有 processing 任务)
- python scripts/auto_execute_tasks.py --refresh-trigger
- # 【推荐】使用 Agent 模式执行任务(自动打开/关闭 Agent)
- python scripts/auto_execute_tasks.py --agent-run
- # 【推荐】启动 Agent 循环模式(有任务时自动启动 Agent,完成后自动关闭)
- python scripts/auto_execute_tasks.py --chat-loop --use-agent
- # 启动传统 Chat 模式(不使用 Agent)
- python scripts/auto_execute_tasks.py --chat-loop --no-agent
- # 立即发送 Chat 消息触发 Cursor 执行
- python scripts/auto_execute_tasks.py --send-chat-now
- # 设置 Agent 超时时间(默认 3600 秒)
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
- # 任务完成后不自动关闭 Agent
- python scripts/auto_execute_tasks.py --agent-run --no-auto-close
- # 禁用自动部署功能
- python scripts/auto_execute_tasks.py --chat-loop --use-agent --no-deploy
- # 立即部署指定任务ID的脚本到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试到生产服务器的 SSH 连接
- python scripts/auto_execute_tasks.py --test-connection
- """
- from __future__ import annotations
- import argparse
- import json
- import logging
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- # ============================================================================
- # 日志配置
- # ============================================================================
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ============================================================================
- # Windows GUI 自动化依赖(可选)
- # ============================================================================
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- try:
- import pyautogui
- import win32con
- import win32gui
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- HAS_CURSOR_GUI = True
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- pass
- except ImportError:
- logger.info(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
- "将禁用自动 Cursor Chat 功能。"
- )
- # ============================================================================
- # 全局配置
- # ============================================================================
- WORKSPACE_ROOT = Path(__file__).parent.parent
- TASKS_DIR = WORKSPACE_ROOT / "tasks"
- PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
- INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
- TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
- # 生产服务器配置
- PRODUCTION_SERVER = {
- "host": "192.168.3.143",
- "port": 22,
- "username": "ubuntu",
- "password": "citumxl2357",
- "script_path": "/opt/dataops-platform/datafactory/scripts",
- "workflow_path": "/opt/dataops-platform/n8n/workflows",
- }
- # 命令行参数控制的全局变量
- ENABLE_CHAT: bool = False
- CHAT_MESSAGE: str = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
- CHAT_INPUT_POS: tuple[int, int] | None = None
- ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
- # ============================================================================
- # 数据库操作
- # ============================================================================
- def get_db_connection():
- """获取数据库连接(使用 production 环境配置)"""
- try:
- import sys
- import psycopg2
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的数据库配置
- app_config = config["production"]
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
- return psycopg2.connect(db_uri)
- except ImportError as e:
- logger.error(f"导入依赖失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- return None
- def get_pending_tasks() -> list[dict[str, Any]]:
- """从数据库获取所有 pending 任务"""
- try:
- from psycopg2.extras import RealDictCursor
- conn = get_db_connection()
- if not conn:
- return []
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute("""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """)
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- return [dict(task) for task in tasks]
- except Exception as e:
- logger.error(f"获取 pending 任务失败: {e}")
- return []
- def update_task_status(
- task_id: int,
- status: str,
- code_name: str | None = None,
- code_path: str | None = None,
- ) -> bool:
- """更新任务状态"""
- try:
- conn = get_db_connection()
- if not conn:
- return False
- cursor = conn.cursor()
- if code_name and code_path:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, code_name, code_path, task_id),
- )
- else:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, task_id),
- )
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- return updated
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ============================================================================
- # 触发器文件管理
- # ============================================================================
- def update_trigger_file(
- task_count: int = 0,
- status: str = "有待执行任务",
- task_ids: list[int] | None = None,
- ) -> None:
- """
- 更新触发器文件 tasks/task_trigger.txt
- Cursor Rules 会检测此文件的更新时间,如果在 5 分钟内更新,
- 则触发自动任务执行。
- """
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
- content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
- 生成时间: {gen_time}
- 状态: {status}
- 待处理任务数: {task_count}
- 任务ID列表: [{task_ids_str}]
- 此文件用于触发Cursor自动执行任务。
- 当此文件更新时间在5分钟内时,Cursor应该检查 tasks/task_execute_instructions.md 并执行其中的任务。
- 【重要】Cursor AI 请立即执行以下操作:
- 1. 读取 tasks/task_execute_instructions.md 文件
- 2. 按照文件中的任务描述,依次完成每个任务
- 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
- """
- with TRIGGER_FILE.open("w", encoding="utf-8") as f:
- f.write(content)
- logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
- def get_processing_task_ids() -> list[int]:
- """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
- if not PENDING_TASKS_FILE.exists():
- return []
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- return [
- t.get("task_id")
- for t in tasks
- if t.get("status") == "processing" and t.get("task_id")
- ]
- except Exception:
- return []
- # ============================================================================
- # 任务文件生成
- # ============================================================================
- def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
- """将任务列表写入 tasks/pending_tasks.json"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- # 读取现有任务
- existing_tasks = []
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- existing_tasks = json.load(f)
- except Exception:
- existing_tasks = []
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
- # 添加新任务
- for task in tasks:
- if task["task_id"] not in existing_ids:
- task_info = {
- "task_id": task["task_id"],
- "task_name": task["task_name"],
- "code_path": task.get("code_path", ""),
- "code_name": task.get("code_name", ""),
- "status": "processing",
- "notified_at": datetime.now().isoformat(),
- "code_file": task.get("code_file", ""),
- }
- existing_tasks.append(task_info)
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
- def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
- """生成任务执行指令文件 tasks/task_execute_instructions.md"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# 🤖 Cursor 自动任务执行指令\n\n")
- f.write("**⚠️ 重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 📋 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("## ⚠️ 任务约束要求\n\n")
- f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
- f.write("- ❌ 不要创建任何 summary、report、总结类的文档文件\n")
- f.write("- ❌ 不要生成 task_summary.md、execution_report.md 等总结文件\n")
- f.write("- ✅ 只需创建任务要求的功能脚本文件\n")
- f.write("- ✅ 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task["task_id"]
- task_name = task["task_name"]
- task_desc = task["task_description"]
- create_time = task.get("create_time", "")
- if hasattr(create_time, "strftime"):
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"## 🔴 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **创建时间**: {create_time}\n")
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
- f.write(f"### 📝 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
- # ============================================================================
- # Neo4j 独立连接(不依赖 Flask 应用上下文)
- # ============================================================================
- def get_neo4j_driver():
- """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
- try:
- import sys
- from neo4j import GraphDatabase
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的配置
- app_config = config["production"]
- uri = app_config.NEO4J_URI
- user = app_config.NEO4J_USER
- password = app_config.NEO4J_PASSWORD
- driver = GraphDatabase.driver(uri, auth=(user, password))
- return driver
- except ImportError as e:
- logger.error(f"导入 Neo4j 驱动失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接 Neo4j 失败: {e}")
- return None
- # ============================================================================
- # 状态同步
- # ============================================================================
- def update_dataflow_script_path(task_name: str, script_path: str) -> bool:
- """
- 更新 DataFlow 节点的 script_path 字段(独立于 Flask 应用上下文)
- Args:
- task_name: 任务名称(对应 DataFlow 的 name_zh)
- script_path: Python 脚本的完整路径
- Returns:
- 是否更新成功
- """
- try:
- from datetime import datetime
- driver = get_neo4j_driver()
- if not driver:
- logger.error("无法获取 Neo4j 驱动")
- return False
- query = """
- MATCH (n:DataFlow {name_zh: $name_zh})
- SET n.script_path = $script_path, n.updated_at = $updated_at
- RETURN n
- """
- updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- with driver.session() as session:
- result = session.run(
- query,
- name_zh=task_name,
- script_path=script_path,
- updated_at=updated_at,
- ).single()
- driver.close()
- if result:
- logger.info(f"成功更新 DataFlow 脚本路径: {task_name} -> {script_path}")
- return True
- else:
- logger.warning(f"未找到 DataFlow 节点: {task_name}")
- return False
- except Exception as e:
- logger.error(f"更新 DataFlow script_path 失败: {e}")
- return False
- def sync_completed_tasks_to_db() -> int:
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as e:
- logger.error(f"读取 pending_tasks.json 失败: {e}")
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- remaining_tasks = []
- for t in tasks:
- if t.get("status") == "completed":
- task_id = t.get("task_id")
- if not task_id:
- continue
- task_name = t.get("task_name")
- code_name = t.get("code_name")
- code_path = t.get("code_path")
- # 统一处理:code_path 始终为 "datafactory/scripts"
- # code_name 保持原样(可能是新建的脚本名或 import_resource_data.py)
- code_path = "datafactory/scripts"
- # 只处理 Python 脚本文件,跳过 JSON 等其他文件
- is_python_script = code_name and code_name.endswith(".py")
- if is_python_script:
- logger.info(f"任务 {task_id} 使用 Python 脚本: {code_path}/{code_name}")
- else:
- logger.info(
- f"任务 {task_id} 的 code_name ({code_name}) 不是 Python 脚本,跳过 DataFlow 更新"
- )
- if update_task_status(task_id, "completed", code_name, code_path):
- updated += 1
- logger.info(f"已同步任务 {task_id} 为 completed")
- # 只有 Python 脚本才更新 DataFlow 节点的 script_path
- if task_name and is_python_script:
- full_script_path = f"{code_path}/{code_name}"
- if update_dataflow_script_path(task_name, full_script_path):
- logger.info(
- f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
- )
- else:
- logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
- # 自动部署到生产服务器(如果启用)
- if ENABLE_AUTO_DEPLOY:
- logger.info(f"🚀 开始自动部署任务 {task_id} 到生产服务器...")
- if auto_deploy_completed_task(t):
- logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
- else:
- logger.warning(f"⚠️ 任务 {task_id} 部署到生产服务器失败")
- else:
- logger.info(f"ℹ️ 自动部署已禁用,跳过任务 {task_id} 的部署")
- else:
- remaining_tasks.append(t)
- else:
- remaining_tasks.append(t)
- if updated > 0:
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
- return updated
- # ============================================================================
- # 生产服务器部署功能
- # ============================================================================
- def get_ssh_connection():
- """获取 SSH 连接到生产服务器"""
- try:
- import paramiko # type: ignore
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- logger.info(
- f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
- f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
- )
- ssh.connect(
- hostname=PRODUCTION_SERVER["host"],
- port=PRODUCTION_SERVER["port"],
- username=PRODUCTION_SERVER["username"],
- password=PRODUCTION_SERVER["password"],
- timeout=10,
- )
- logger.info("✅ SSH 连接成功")
- return ssh
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return None
- except Exception as e:
- logger.error(f"SSH 连接失败: {e}")
- return None
- def test_ssh_connection() -> bool:
- """
- 测试 SSH 连接到生产服务器
- Returns:
- 连接是否成功
- """
- logger.info("=" * 60)
- logger.info("🔍 测试生产服务器连接")
- logger.info("=" * 60)
- ssh = get_ssh_connection()
- if not ssh:
- logger.error("❌ SSH 连接测试失败")
- return False
- try:
- # 测试执行命令
- _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
- output = stdout.read().decode().strip()
- logger.info(f"✅ 命令执行成功: {output}")
- # 检查目标目录是否存在
- _, stdout, _ = ssh.exec_command(
- f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
- )
- result = stdout.read().decode().strip()
- if result == "exists":
- logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
- else:
- logger.warning(f"⚠️ 脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
- logger.info("将在首次部署时自动创建")
- ssh.close()
- logger.info("=" * 60)
- logger.info("✅ 连接测试完成")
- logger.info("=" * 60)
- return True
- except Exception as e:
- logger.error(f"❌ 测试执行命令失败: {e}")
- ssh.close()
- return False
- def deploy_script_to_production(
- local_script_path: str, remote_filename: str | None = None
- ) -> bool:
- """
- 部署脚本文件到生产服务器
- Args:
- local_script_path: 本地脚本文件路径(相对或绝对)
- remote_filename: 远程文件名(可选,默认使用本地文件名)
- Returns:
- 是否部署成功
- """
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- # 转换为绝对路径
- local_path = Path(local_script_path)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"本地文件不存在: {local_path}")
- return False
- # 确定远程文件名
- if not remote_filename:
- remote_filename = local_path.name
- remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["script_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['script_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传文件
- logger.info(f"正在上传: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- # 设置文件权限为可执行
- sftp.chmod(remote_path, 0o755)
- logger.info(f"✅ 脚本部署成功: {remote_path}")
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.error(f"文件传输失败: {e}")
- ssh.close()
- return False
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- except Exception as e:
- logger.error(f"部署脚本失败: {e}")
- return False
- def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
- """
- 部署 n8n 工作流文件到生产服务器
- Args:
- workflow_file: 本地工作流 JSON 文件路径
- Returns:
- 是否部署成功
- """
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- # 转换为绝对路径
- local_path = Path(workflow_file)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"工作流文件不存在: {local_path}")
- return False
- remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["workflow_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传工作流文件
- logger.info(f"正在上传工作流: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- logger.info(f"✅ 工作流部署成功: {remote_path}")
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.error(f"工作流传输失败: {e}")
- ssh.close()
- return False
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- except Exception as e:
- logger.error(f"部署工作流失败: {e}")
- return False
- def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
- """
- 自动部署已完成任务的脚本和工作流到生产服务器
- Args:
- task_info: 任务信息字典,包含 code_name, code_path 等
- Returns:
- 是否部署成功
- """
- code_name = task_info.get("code_name")
- code_path = task_info.get("code_path")
- task_name = task_info.get("task_name", "未知任务")
- if not code_name or not code_path:
- logger.warning(f"任务 {task_name} 缺少代码文件信息,跳过部署")
- return False
- logger.info("=" * 60)
- logger.info(f"🚀 开始自动部署任务: {task_name}")
- logger.info("=" * 60)
- deploy_success = True
- # 1. 部署 Python 脚本
- if code_name.endswith(".py"):
- script_path = f"{code_path}/{code_name}"
- logger.info(f"📦 部署 Python 脚本: {script_path}")
- if deploy_script_to_production(script_path):
- logger.info(f"✅ 脚本 {code_name} 部署成功")
- else:
- logger.error(f"❌ 脚本 {code_name} 部署失败")
- deploy_success = False
- # 2. 查找并部署相关的 n8n 工作流文件
- # 工作流文件通常与脚本在同一目录或 n8n_workflows 目录
- workflow_files = []
- # 查找模式1: 与脚本同目录的工作流文件
- script_dir = WORKSPACE_ROOT / code_path
- if script_dir.exists() and script_dir.is_dir():
- for wf_file in script_dir.glob("n8n_workflow_*.json"):
- if wf_file.is_file():
- workflow_files.append(wf_file)
- # 查找模式2: datafactory/n8n_workflows 目录
- n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
- if n8n_workflows_dir.exists():
- for wf_file in n8n_workflows_dir.glob("*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- workflow_files.append(wf_file)
- # 查找模式3: 根据任务名称匹配工作流文件
- if task_name and task_name != "未知任务":
- # 将任务名称转换为可能的文件名模式
- task_name_pattern = task_name.replace(" ", "_").lower()
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
- f"*{task_name_pattern}*.json"
- ):
- if (
- wf_file.is_file()
- and "n8n" in wf_file.name.lower()
- and wf_file not in workflow_files
- ):
- workflow_files.append(wf_file)
- if workflow_files:
- logger.info(f"📦 发现 {len(workflow_files)} 个工作流文件")
- for wf_file in workflow_files:
- logger.info(f"📦 部署工作流: {wf_file.name}")
- if deploy_n8n_workflow_to_production(str(wf_file)):
- logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
- else:
- logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
- deploy_success = False
- else:
- logger.info("ℹ️ 未发现相关工作流文件")
- logger.info("=" * 60)
- if deploy_success:
- logger.info(f"✅ 任务 {task_name} 部署完成")
- else:
- logger.warning(f"⚠️ 任务 {task_name} 部署过程中出现错误")
- logger.info("=" * 60)
- return deploy_success
- # ============================================================================
- # Cursor Chat 自动化(Agent 模式)
- # ============================================================================
- # Agent 会话状态
- AGENT_SESSION_ACTIVE: bool = False
- AGENT_START_TIME: float = 0
- def get_all_cursor_windows() -> list[dict[str, Any]]:
- """获取所有 Cursor 窗口信息"""
- if not HAS_CURSOR_GUI:
- return []
- cursor_windows: list[dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = "cursor" in title.lower()
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- area = (right - left) * (bottom - top)
- cursor_windows.append(
- {
- "hwnd": hwnd,
- "title": title,
- "class_name": class_name,
- "area": area,
- }
- )
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- return cursor_windows
- def find_cursor_window() -> int | None:
- """查找 Cursor 主窗口句柄"""
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows = get_all_cursor_windows()
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- # 按面积排序,返回最大的窗口(主窗口)
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- return cursor_windows[0]["hwnd"]
- def activate_window(hwnd: int) -> bool:
- """激活指定窗口"""
- if not HAS_CURSOR_GUI:
- return False
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.3)
- win32gui.SetForegroundWindow(hwnd)
- time.sleep(0.5)
- return True
- except Exception as e:
- logger.error(f"激活窗口失败: {e}")
- return False
- def open_new_agent() -> bool:
- """
- 在 Cursor 中打开新的 Agent 窗口
- 使用快捷键 Ctrl+Shift+I 打开新的 Composer/Agent
- """
- global AGENT_SESSION_ACTIVE, AGENT_START_TIME
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- try:
- # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
- # 这是 Cursor 默认的快捷键
- logger.info("🚀 正在打开新的 Agent...")
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(2.0) # 等待 Agent 窗口打开
- AGENT_SESSION_ACTIVE = True
- AGENT_START_TIME = time.time()
- logger.info("✅ 新的 Agent 已打开")
- return True
- except Exception as e:
- logger.error(f"打开 Agent 失败: {e}")
- return False
- def close_current_agent() -> bool:
- """
- 关闭当前的 Agent 会话
- 使用 Escape 键关闭 Agent 或使用快捷键
- """
- global AGENT_SESSION_ACTIVE
- if not HAS_CURSOR_GUI:
- return False
- if not AGENT_SESSION_ACTIVE:
- logger.info("没有活动的 Agent 会话")
- return True
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- try:
- logger.info("🔄 正在关闭 Agent...")
- # 方法1: 按 Escape 键关闭 Agent
- pyautogui.press("escape")
- time.sleep(0.5)
- # 再按一次确保关闭
- pyautogui.press("escape")
- time.sleep(0.3)
- AGENT_SESSION_ACTIVE = False
- logger.info("✅ Agent 已关闭")
- return True
- except Exception as e:
- logger.error(f"关闭 Agent 失败: {e}")
- return False
- def type_message_to_agent(message: str) -> bool:
- """
- 向 Agent 输入消息
- Args:
- message: 要发送的消息内容
- Returns:
- 是否成功发送
- """
- if not HAS_CURSOR_GUI:
- return False
- try:
- # 等待 Agent 输入框获得焦点
- time.sleep(0.5)
- # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- # 回退到逐字符输入
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- # 按 Enter 发送消息
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Agent")
- return True
- except Exception as e:
- logger.error(f"发送消息到 Agent 失败: {e}")
- return False
- def wait_for_agent_completion(
- timeout: int = 3600,
- check_interval: int = 30,
- ) -> bool:
- """
- 等待 Agent 完成任务
- 通过检查 pending_tasks.json 中的任务状态来判断是否完成
- Args:
- timeout: 超时时间(秒),默认 1 小时
- check_interval: 检查间隔(秒),默认 30 秒
- Returns:
- 是否所有任务都已完成
- """
- start_time = time.time()
- logger.info(f"⏳ 等待 Agent 完成任务(超时: {timeout}s)...")
- while time.time() - start_time < timeout:
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- elapsed = int(time.time() - start_time)
- logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
- return True
- remaining = len(processing_ids)
- elapsed = int(time.time() - start_time)
- logger.info(
- f"⏳ 仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
- )
- time.sleep(check_interval)
- logger.warning("⚠️ 等待超时,仍有未完成的任务")
- return False
- def run_agent_session(
- message: str,
- wait_completion: bool = True,
- timeout: int = 3600,
- auto_close: bool = True,
- ) -> bool:
- """
- 运行完整的 Agent 会话
- 流程:
- 1. 打开新的 Agent
- 2. 发送任务消息
- 3. (可选)等待任务完成
- 4. (可选)关闭 Agent
- Args:
- message: 要发送给 Agent 的消息
- wait_completion: 是否等待任务完成
- timeout: 等待超时时间(秒)
- auto_close: 任务完成后是否自动关闭 Agent
- Returns:
- 是否成功完成会话
- """
- logger.info("=" * 50)
- logger.info("🤖 开始 Agent 会话")
- logger.info("=" * 50)
- # 1. 打开新的 Agent
- if not open_new_agent():
- logger.error("❌ 无法打开 Agent")
- return False
- # 2. 发送消息
- if not type_message_to_agent(message):
- logger.error("❌ 无法发送消息到 Agent")
- close_current_agent()
- return False
- logger.info(f"📤 已发送消息: {message[:50]}...")
- # 3. 等待任务完成
- if wait_completion:
- completed = wait_for_agent_completion(timeout=timeout)
- # 同步已完成的任务到数据库
- sync_completed_tasks_to_db()
- if completed:
- logger.info("✅ Agent 已完成所有任务")
- else:
- logger.warning("⚠️ Agent 未能在超时时间内完成所有任务")
- # 4. 关闭 Agent
- if auto_close:
- close_current_agent()
- logger.info("=" * 50)
- logger.info("🏁 Agent 会话结束")
- logger.info("=" * 50)
- return True
- def send_chat_message(message: str, input_pos: tuple[int, int] | None) -> bool:
- """
- 在 Cursor Chat 中发送消息(传统方式,保留向后兼容)
- 推荐使用 run_agent_session() 替代此函数
- """
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- # 点击输入框或使用快捷键
- if input_pos:
- x, y = input_pos
- pyautogui.click(x, y)
- time.sleep(0.4)
- else:
- pyautogui.hotkey("ctrl", "l")
- time.sleep(1.0)
- pyautogui.hotkey("ctrl", "a")
- time.sleep(0.2)
- # 输入消息
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Cursor Chat")
- return True
- def send_chat_for_tasks(force: bool = False, use_agent: bool = True) -> bool:
- """
- 向 Cursor Chat 发送任务提醒
- Args:
- force: 强制发送,忽略 ENABLE_CHAT 设置
- use_agent: 是否使用新 Agent 模式(推荐)
- Returns:
- 是否成功发送
- """
- if not force and not ENABLE_CHAT:
- return False
- if not HAS_CURSOR_GUI:
- logger.warning("未安装 GUI 自动化依赖,无法发送 Chat 消息")
- return False
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("没有 processing 任务,跳过发送")
- return False
- logger.info(f"📤 发送任务提醒({len(processing_ids)} 个任务)...")
- if use_agent:
- # 使用新的 Agent 模式
- return run_agent_session(
- message=CHAT_MESSAGE,
- wait_completion=False, # 在循环模式中不阻塞等待
- auto_close=False, # 保持 Agent 打开,让其完成任务
- )
- else:
- # 传统模式
- return send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS)
- def chat_trigger_loop(
- interval: int = 60,
- use_agent: bool = True,
- auto_close_on_complete: bool = True,
- ) -> None:
- """
- 循环模式:定期检查 processing 任务并发送 Chat 消息
- Args:
- interval: 检查间隔(秒),默认 60 秒
- use_agent: 是否使用新 Agent 模式
- auto_close_on_complete: 任务完成后是否自动关闭 Agent
- """
- global AGENT_SESSION_ACTIVE
- logger.info("=" * 60)
- logger.info("🚀 Cursor Chat 自动触发服务已启动")
- logger.info(f"⏰ 检查间隔: {interval} 秒")
- logger.info(f"🤖 Agent 模式: {'已启用' if use_agent else '未启用'}")
- logger.info(f"📝 发送消息: {CHAT_MESSAGE}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- last_sent_time = 0
- min_send_interval = 120 # 最小发送间隔(秒),避免频繁打扰
- try:
- while True:
- try:
- # 1. 先同步已完成任务
- sync_completed_tasks_to_db()
- # 2. 从数据库拉取 pending 任务并转为 processing
- pending_tasks = get_pending_tasks()
- if pending_tasks:
- logger.info(f"📥 从数据库发现 {len(pending_tasks)} 个 pending 任务")
- # 更新任务状态为 processing
- for task in pending_tasks:
- update_task_status(task["task_id"], "processing")
- # 写入 pending_tasks.json
- write_pending_tasks_json(pending_tasks)
- # 生成执行指令文件
- create_execute_instructions(pending_tasks)
- logger.info("✅ 已将 pending 任务转为 processing")
- # 3. 检查是否有 processing 任务
- processing_ids = get_processing_task_ids()
- if processing_ids:
- current_time = time.time()
- time_since_last = current_time - last_sent_time
- # 如果有活动的 Agent 会话,不需要重新发送
- if AGENT_SESSION_ACTIVE:
- logger.info(
- f"🤖 Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
- )
- elif time_since_last >= min_send_interval:
- logger.info(f"📋 发现 {len(processing_ids)} 个待处理任务")
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- # 发送 Chat 消息(启动 Agent)
- if use_agent:
- if open_new_agent():
- if type_message_to_agent(CHAT_MESSAGE):
- last_sent_time = current_time
- logger.info("✅ 已启动 Agent 并发送执行提醒")
- else:
- logger.warning("⚠️ 发送消息失败")
- close_current_agent()
- else:
- logger.warning("⚠️ 启动 Agent 失败")
- else:
- if send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS):
- last_sent_time = current_time
- logger.info("✅ 已发送执行提醒")
- else:
- logger.warning("⚠️ 发送失败,将在下次重试")
- else:
- remaining = int(min_send_interval - time_since_last)
- logger.info(
- f"⏳ 距离上次发送不足 {min_send_interval}s,"
- f"还需等待 {remaining}s"
- )
- else:
- # 没有待处理任务
- if AGENT_SESSION_ACTIVE and auto_close_on_complete:
- logger.info("✅ 所有任务已完成,正在关闭 Agent...")
- close_current_agent()
- logger.info("✅ Agent 已关闭")
- else:
- logger.info("✅ 没有待处理任务")
- logger.info(f"⏳ {interval} 秒后再次检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- time.sleep(interval)
- except KeyboardInterrupt:
- # 退出时关闭 Agent
- if AGENT_SESSION_ACTIVE:
- logger.info("正在关闭 Agent...")
- close_current_agent()
- logger.info("\n⛔ 服务已停止")
- # ============================================================================
- # 主执行流程
- # ============================================================================
- def auto_execute_tasks_once() -> int:
- """执行一次任务检查和处理"""
- # 1. 先同步已完成任务到数据库
- sync_completed_tasks_to_db()
- # 2. 获取 pending 任务
- logger.info("🔍 检查 pending 任务...")
- tasks = get_pending_tasks()
- # 3. 检查是否有现有的 processing 任务
- existing_processing_ids = get_processing_task_ids()
- if not tasks and not existing_processing_ids:
- logger.info("✅ 没有 pending 或 processing 任务")
- # 更新触发器文件为"已完成"状态
- update_trigger_file(0, "所有任务已完成", [])
- return 0
- if tasks:
- logger.info(f"📋 找到 {len(tasks)} 个 pending 任务")
- # 4. 更新任务状态为 processing
- for task in tasks:
- update_task_status(task["task_id"], "processing")
- # 5. 写入 pending_tasks.json
- write_pending_tasks_json(tasks)
- # 6. 生成执行指令文件
- create_execute_instructions(tasks)
- # 7. 更新触发器文件(关键:触发 Cursor 自动执行)
- all_processing_ids = get_processing_task_ids()
- if all_processing_ids:
- update_trigger_file(
- task_count=len(all_processing_ids),
- status="有待执行任务",
- task_ids=all_processing_ids,
- )
- logger.info(
- f"🔔 已更新触发器,等待 Cursor 执行 {len(all_processing_ids)} 个任务"
- )
- return len(tasks)
- def auto_execute_tasks_loop(interval: int = 300) -> None:
- """循环执行任务检查"""
- logger.info("=" * 60)
- logger.info("🚀 自动任务执行服务已启动")
- logger.info(f"⏰ 检查间隔: {interval} 秒")
- logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- try:
- while True:
- try:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks()
- logger.info(f"✅ 已处理 {count} 个任务")
- logger.info(f"⏳ {interval} 秒后再次检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- time.sleep(interval)
- except KeyboardInterrupt:
- logger.info("\n⛔ 服务已停止")
- def refresh_trigger_only() -> None:
- """仅刷新触发器文件(用于现有 processing 任务)"""
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("没有 processing 状态的任务")
- update_trigger_file(0, "所有任务已完成", [])
- return
- logger.info(f"📋 发现 {len(processing_ids)} 个 processing 任务")
- # 重新生成指令文件(从 pending_tasks.json 读取)
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- all_tasks = json.load(f)
- processing_tasks = [t for t in all_tasks if t.get("status") == "processing"]
- if processing_tasks:
- # 重新生成执行指令文件
- create_execute_instructions_from_json(processing_tasks)
- except Exception as e:
- logger.error(f"读取任务文件失败: {e}")
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- logger.info("✅ 触发器已刷新,等待 Cursor 执行任务")
- def create_execute_instructions_from_json(tasks: list[dict[str, Any]]) -> None:
- """从 pending_tasks.json 格式的任务列表生成执行指令文件"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# 🤖 Cursor 自动任务执行指令\n\n")
- f.write("**⚠️ 重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 📋 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("## ⚠️ 任务约束要求\n\n")
- f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
- f.write("- ❌ 不要创建任何 summary、report、总结类的文档文件\n")
- f.write("- ❌ 不要生成 task_summary.md、execution_report.md 等总结文件\n")
- f.write("- ✅ 只需创建任务要求的功能脚本文件\n")
- f.write("- ✅ 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task.get("task_id", "N/A")
- task_name = task.get("task_name", "未命名任务")
- task_desc = task.get("task_description", "无描述")
- notified_at = task.get("notified_at", "")
- f.write(f"## 🔴 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **通知时间**: {notified_at}\n")
- f.write(f"- **代码路径**: {task.get('code_path', 'N/A')}\n\n")
- f.write(f"### 📝 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已更新: {INSTRUCTIONS_FILE}")
- def main() -> None:
- """主函数"""
- parser = argparse.ArgumentParser(
- description="自动任务执行调度脚本",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # 执行一次任务检查
- python scripts/auto_execute_tasks.py --once
- # 使用 Agent 模式执行任务(自动部署)
- python scripts/auto_execute_tasks.py --agent-run
- # 启动 Agent 循环模式(推荐,自动部署)
- python scripts/auto_execute_tasks.py --chat-loop --use-agent
- # 禁用自动部署功能
- python scripts/auto_execute_tasks.py --chat-loop --use-agent --no-deploy
- # 立即部署指定任务到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试生产服务器连接
- python scripts/auto_execute_tasks.py --test-connection
- """,
- )
- parser.add_argument("--once", action="store_true", help="只执行一次")
- parser.add_argument("--interval", type=int, default=300, help="检查间隔(秒)")
- parser.add_argument(
- "--enable-chat", action="store_true", help="启用自动 Cursor Chat"
- )
- parser.add_argument("--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"')
- parser.add_argument(
- "--chat-message",
- type=str,
- default="请阅读 tasks/task_execute_instructions.md 并执行任务。",
- help="发送到 Chat 的消息",
- )
- parser.add_argument(
- "--refresh-trigger",
- action="store_true",
- help="仅刷新触发器文件(用于现有 processing 任务)",
- )
- parser.add_argument(
- "--chat-loop",
- action="store_true",
- help="启动 Chat 自动触发循环(定期发送执行消息)",
- )
- parser.add_argument(
- "--chat-interval",
- type=int,
- default=60,
- help="Chat 触发循环的检查间隔(秒),默认 60",
- )
- parser.add_argument(
- "--send-chat-now",
- action="store_true",
- help="立即发送一次 Chat 消息(用于手动触发)",
- )
- # Agent 模式相关参数
- parser.add_argument(
- "--use-agent",
- action="store_true",
- default=True,
- help="使用新 Agent 模式(默认启用)",
- )
- parser.add_argument(
- "--no-agent",
- action="store_true",
- help="禁用 Agent 模式,使用传统 Chat 方式",
- )
- parser.add_argument(
- "--agent-run",
- action="store_true",
- help="立即启动 Agent 会话执行任务",
- )
- parser.add_argument(
- "--agent-timeout",
- type=int,
- default=3600,
- help="Agent 等待任务完成的超时时间(秒),默认 3600",
- )
- parser.add_argument(
- "--no-auto-close",
- action="store_true",
- help="任务完成后不自动关闭 Agent",
- )
- # 自动部署相关参数
- parser.add_argument(
- "--enable-deploy",
- action="store_true",
- default=True,
- help="启用自动部署到生产服务器(默认启用)",
- )
- parser.add_argument(
- "--no-deploy",
- action="store_true",
- help="禁用自动部署功能",
- )
- parser.add_argument(
- "--deploy-now",
- type=str,
- metavar="TASK_ID",
- help="立即部署指定任务ID的脚本到生产服务器",
- )
- parser.add_argument(
- "--test-connection",
- action="store_true",
- help="测试到生产服务器的 SSH 连接",
- )
- args = parser.parse_args()
- global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE, ENABLE_AUTO_DEPLOY
- ENABLE_CHAT = bool(args.enable_chat)
- CHAT_MESSAGE = args.chat_message
- ENABLE_AUTO_DEPLOY = args.enable_deploy and not args.no_deploy
- # 确定是否使用 Agent 模式
- use_agent = args.use_agent and not args.no_agent
- auto_close = not args.no_auto_close
- if args.chat_input_pos:
- try:
- x, y = args.chat_input_pos.split(",")
- CHAT_INPUT_POS = (int(x.strip()), int(y.strip()))
- except Exception:
- pass
- # 测试 SSH 连接
- if args.test_connection:
- if test_ssh_connection():
- logger.info("✅ 连接测试成功")
- else:
- logger.error("❌ 连接测试失败")
- return
- # 立即部署指定任务
- if args.deploy_now:
- try:
- task_id = int(args.deploy_now)
- logger.info(f"🚀 开始部署任务 {task_id}...")
- # 从 pending_tasks.json 查找任务信息
- if PENDING_TASKS_FILE.exists():
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- task_found = None
- for t in tasks:
- if t.get("task_id") == task_id:
- task_found = t
- break
- if task_found:
- if auto_deploy_completed_task(task_found):
- logger.info(f"✅ 任务 {task_id} 部署成功")
- else:
- logger.error(f"❌ 任务 {task_id} 部署失败")
- else:
- logger.error(f"未找到任务 {task_id}")
- else:
- logger.error("pending_tasks.json 文件不存在")
- except ValueError:
- logger.error(f"无效的任务ID: {args.deploy_now}")
- return
- # 仅刷新触发器
- if args.refresh_trigger:
- refresh_trigger_only()
- return
- # 立即启动 Agent 会话
- if args.agent_run:
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- # 先检查是否有 pending 任务
- auto_execute_tasks_once()
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("没有待执行的任务")
- return
- logger.info(f"🚀 启动 Agent 执行 {len(processing_ids)} 个任务...")
- success = run_agent_session(
- message=CHAT_MESSAGE,
- wait_completion=True,
- timeout=args.agent_timeout,
- auto_close=auto_close,
- )
- if success:
- logger.info("✅ Agent 会话完成")
- else:
- logger.error("❌ Agent 会话失败")
- return
- # 立即发送一次 Chat 消息
- if args.send_chat_now:
- if use_agent:
- # 使用 Agent 模式
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- auto_execute_tasks_once()
- processing_ids = get_processing_task_ids()
- if processing_ids:
- if open_new_agent():
- if type_message_to_agent(CHAT_MESSAGE):
- logger.info("✅ Agent 已启动并发送消息")
- else:
- logger.error("❌ 发送消息失败")
- else:
- logger.error("❌ 启动 Agent 失败")
- else:
- logger.info("没有待执行的任务")
- else:
- if send_chat_for_tasks(force=True, use_agent=False):
- logger.info("✅ 消息已发送")
- else:
- logger.error("❌ 发送失败")
- return
- # Chat 自动触发循环模式
- if args.chat_loop:
- chat_trigger_loop(
- interval=args.chat_interval,
- use_agent=use_agent,
- auto_close_on_complete=auto_close,
- )
- return
- if args.once:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks(use_agent=use_agent)
- logger.info(f"✅ 完成!处理了 {count} 个任务")
- else:
- auto_execute_tasks_loop(interval=args.interval)
- if __name__ == "__main__":
- main()
|