| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597 |
- #!/usr/bin/env python3
- """
- 自动任务执行核心调度脚本 (Agent 模式)
- 工作流程:
- 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
- 2. 生成 tasks/task_execute_instructions.md 执行指令文件
- 3. 更新任务状态为 processing,并维护 tasks/pending_tasks.json
- 4. 更新 tasks/task_trigger.txt 触发器文件
- 5. 启动新的 Cursor Agent 并发送执行指令
- 6. Cursor Agent 完成任务后,更新 pending_tasks.json 状态为 completed
- 7. 调度脚本检测到任务完成后,同步数据库并关闭 Agent
- 使用方式:
- # Agent 单次执行(执行一次任务后退出)
- python scripts/auto_execute_tasks.py --agent-run
- # Agent 循环模式(有任务时自动启动 Agent,完成后等待新任务)
- python scripts/auto_execute_tasks.py --agent-loop
- # Agent 循环模式 + 禁用自动部署
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
- # 设置 Agent 超时时间(默认 3600 秒)
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
- # 任务完成后不自动关闭 Agent
- python scripts/auto_execute_tasks.py --agent-run --no-auto-close
- # 立即部署指定任务ID的脚本到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试到生产服务器的 SSH 连接
- python scripts/auto_execute_tasks.py --test-connection
- """
- from __future__ import annotations
- import argparse
- import contextlib
- import json
- import logging
- import sys
- import time
- from datetime import datetime
- from pathlib import Path
- from typing import Any
- # ============================================================================
- # 日志配置
- # ============================================================================
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ============================================================================
- # Windows GUI 自动化依赖(可选)
- # ============================================================================
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- try:
- import pyautogui
- import win32con
- import win32gui
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- HAS_CURSOR_GUI = True
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- pass
- except ImportError:
- logger.info(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
- "将禁用自动 Cursor Agent 功能。"
- )
- # ============================================================================
- # 全局配置
- # ============================================================================
- WORKSPACE_ROOT = Path(__file__).parent.parent
- TASKS_DIR = WORKSPACE_ROOT / "tasks"
- PENDING_TASKS_FILE = TASKS_DIR / "pending_tasks.json"
- INSTRUCTIONS_FILE = TASKS_DIR / "task_execute_instructions.md"
- TRIGGER_FILE = TASKS_DIR / "task_trigger.txt"
- # 生产服务器配置
- PRODUCTION_SERVER = {
- "host": "192.168.3.143",
- "port": 22,
- "username": "ubuntu",
- "password": "citumxl2357",
- "script_path": "/opt/dataops-platform/datafactory/scripts",
- "workflow_path": "/opt/dataops-platform/datafactory/workflows", # 工作流 JSON 文件目录
- }
- # Agent 消息模板
- AGENT_MESSAGE = "请阅读 tasks/task_execute_instructions.md 并执行任务。"
- # 命令行参数控制的全局变量
- ENABLE_AUTO_DEPLOY: bool = True # 默认启用自动部署
- # ============================================================================
- # 数据库操作
- # ============================================================================
- def get_db_connection():
- """获取数据库连接(使用 production 环境配置)"""
- try:
- from urllib.parse import urlparse
- import psycopg2
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的数据库配置
- app_config = config["production"]
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
- # 解析 SQLAlchemy URI 格式为 psycopg2 可用的格式
- parsed = urlparse(db_uri)
- conn = psycopg2.connect(
- host=parsed.hostname,
- port=parsed.port or 5432,
- database=parsed.path.lstrip("/"),
- user=parsed.username,
- password=parsed.password,
- )
- logger.debug(
- f"数据库连接成功: {parsed.hostname}:{parsed.port}/{parsed.path.lstrip('/')}"
- )
- return conn
- except ImportError as e:
- logger.error(f"导入依赖失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return None
- def get_pending_tasks() -> list[dict[str, Any]]:
- """
- 从 PostgreSQL task_list 表获取所有 pending 状态的任务
- 重要:此函数直接查询数据库,确保获取最新的任务列表
- """
- try:
- from psycopg2.extras import RealDictCursor
- logger.info("📡 正在连接数据库...")
- conn = get_db_connection()
- if not conn:
- logger.error("❌ 无法获取数据库连接")
- return []
- logger.info("✅ 数据库连接成功,正在查询 pending 任务...")
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute(
- """
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """
- )
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- task_list = [dict(task) for task in tasks]
- logger.info(f"📊 从数据库查询到 {len(task_list)} 个 pending 任务")
- if task_list:
- for task in task_list:
- logger.info(f" - 任务 {task['task_id']}: {task['task_name']}")
- return task_list
- except Exception as e:
- logger.error(f"获取 pending 任务失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return []
- def update_task_status(
- task_id: int,
- status: str,
- code_name: str | None = None,
- code_path: str | None = None,
- ) -> bool:
- """更新任务状态"""
- try:
- conn = get_db_connection()
- if not conn:
- return False
- cursor = conn.cursor()
- if code_name and code_path:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, code_name, code_path, task_id),
- )
- else:
- cursor.execute(
- """
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """,
- (status, task_id),
- )
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- return updated
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ============================================================================
- # 触发器文件管理
- # ============================================================================
- def update_trigger_file(
- task_count: int = 0,
- status: str = "有待执行任务",
- task_ids: list[int] | None = None,
- ) -> None:
- """更新触发器文件 tasks/task_trigger.txt"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
- content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
- 生成时间: {gen_time}
- 状态: {status}
- 待处理任务数: {task_count}
- 任务ID列表: [{task_ids_str}]
- 此文件用于触发Cursor自动执行任务。
- 【重要】Cursor AI 请立即执行以下操作:
- 1. 读取 tasks/task_execute_instructions.md 文件
- 2. 按照文件中的任务描述,依次完成每个任务
- 3. 完成后更新 tasks/pending_tasks.json 中对应任务的状态为 completed
- """
- with TRIGGER_FILE.open("w", encoding="utf-8") as f:
- f.write(content)
- logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
- def get_processing_task_ids() -> list[int]:
- """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
- if not PENDING_TASKS_FILE.exists():
- return []
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- return [
- t.get("task_id")
- for t in tasks
- if t.get("status") == "processing" and t.get("task_id")
- ]
- except Exception:
- return []
- def get_tasks_by_ids(task_ids: list[int]) -> list[dict[str, Any]]:
- """
- 根据任务 ID 列表从数据库获取任务详细信息
- Args:
- task_ids: 任务 ID 列表
- Returns:
- 包含任务详细信息的列表(包括 task_description)
- """
- if not task_ids:
- return []
- try:
- from psycopg2.extras import RealDictCursor
- conn = get_db_connection()
- if not conn:
- logger.error("无法获取数据库连接")
- return []
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- # 构建 IN 查询
- placeholders = ", ".join(["%s"] * len(task_ids))
- query = f"""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE task_id IN ({placeholders})
- ORDER BY create_time ASC
- """
- cursor.execute(query, tuple(task_ids))
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- task_list = [dict(task) for task in tasks]
- logger.info(f"从数据库获取了 {len(task_list)} 个任务的详细信息")
- return task_list
- except Exception as e:
- logger.error(f"根据 ID 获取任务失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return []
- def get_all_tasks_to_execute() -> list[dict[str, Any]]:
- """
- 获取所有需要执行的任务(包括新的 pending 任务和已有的 processing 任务)
- 此函数确保返回的任务列表包含完整信息(特别是 task_description),
- 用于生成执行指令文件。
- Returns:
- 包含所有需要执行任务的完整信息列表
- """
- # 1. 获取本地 pending_tasks.json 中 processing 状态的任务 ID
- processing_ids = get_processing_task_ids()
- # 2. 从数据库获取所有 pending 任务
- pending_tasks = get_pending_tasks()
- pending_ids = [t["task_id"] for t in pending_tasks]
- # 3. 合并所有需要查询的任务 ID(去重)
- all_task_ids = list(set(processing_ids + pending_ids))
- if not all_task_ids:
- return []
- # 4. 从数据库获取这些任务的完整信息
- all_tasks = get_tasks_by_ids(all_task_ids)
- logger.info(
- f"需要执行的任务: {len(all_tasks)} 个 "
- f"(processing: {len(processing_ids)}, pending: {len(pending_ids)})"
- )
- return all_tasks
- # ============================================================================
- # 任务文件生成
- # ============================================================================
- def write_pending_tasks_json(tasks: list[dict[str, Any]]) -> None:
- """将任务列表写入 tasks/pending_tasks.json"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- # 读取现有任务
- existing_tasks = []
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- existing_tasks = json.load(f)
- except Exception:
- existing_tasks = []
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
- # 添加新任务
- for task in tasks:
- if task["task_id"] not in existing_ids:
- task_info = {
- "task_id": task["task_id"],
- "task_name": task["task_name"],
- "code_path": task.get("code_path", ""),
- "code_name": task.get("code_name", ""),
- "status": "processing",
- "notified_at": datetime.now().isoformat(),
- "code_file": task.get("code_file", ""),
- }
- existing_tasks.append(task_info)
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
- def create_execute_instructions(tasks: list[dict[str, Any]]) -> None:
- """生成任务执行指令文件 tasks/task_execute_instructions.md"""
- TASKS_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# Cursor 自动任务执行指令\n\n")
- f.write("**重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `tasks/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("## 任务约束要求\n\n")
- f.write("**重要约束**:完成脚本创建后,**不需要生成任务总结文件**。\n\n")
- f.write("- 不要创建任何 summary、report、总结类的文档文件\n")
- f.write("- 不要生成 task_summary.md、execution_report.md 等总结文件\n")
- f.write("- 只需创建任务要求的功能脚本文件\n")
- f.write("- 只需更新 `tasks/pending_tasks.json` 中的任务状态\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task["task_id"]
- task_name = task["task_name"]
- task_desc = task["task_description"]
- create_time = task.get("create_time", "")
- if hasattr(create_time, "strftime"):
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"## 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **创建时间**: {create_time}\n")
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
- f.write(f"### 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
- # ============================================================================
- # Neo4j 独立连接(不依赖 Flask 应用上下文)
- # ============================================================================
- def get_neo4j_driver():
- """获取 Neo4j 驱动(独立于 Flask 应用上下文)"""
- try:
- from neo4j import GraphDatabase
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的配置
- app_config = config["production"]
- uri = app_config.NEO4J_URI
- user = app_config.NEO4J_USER
- password = app_config.NEO4J_PASSWORD
- driver = GraphDatabase.driver(uri, auth=(user, password))
- return driver
- except ImportError as e:
- logger.error(f"导入 Neo4j 驱动失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接 Neo4j 失败: {e}")
- return None
- # ============================================================================
- # 状态同步
- # ============================================================================
- def extract_dataflow_name_from_task(task_id: int) -> str | None:
- """从任务描述中提取 DataFlow 名称"""
- import re
- try:
- conn = get_db_connection()
- if not conn:
- return None
- cursor = conn.cursor()
- cursor.execute(
- "SELECT task_description FROM task_list WHERE task_id = %s",
- (task_id,),
- )
- result = cursor.fetchone()
- cursor.close()
- conn.close()
- if not result:
- return None
- task_desc = result[0]
- # 从任务描述中提取 DataFlow Name
- match = re.search(r"\*\*DataFlow Name\*\*:\s*(.+?)(?:\n|$)", task_desc)
- if match:
- dataflow_name = match.group(1).strip()
- logger.info(f"从任务 {task_id} 提取到 DataFlow 名称: {dataflow_name}")
- return dataflow_name
- return None
- except Exception as e:
- logger.error(f"提取 DataFlow 名称失败: {e}")
- return None
- def update_dataflow_script_path(
- task_name: str, script_path: str, task_id: int | None = None
- ) -> bool:
- """更新 DataFlow 节点的 script_path 字段"""
- try:
- driver = get_neo4j_driver()
- if not driver:
- logger.error("无法获取 Neo4j 驱动")
- return False
- # 如果提供了 task_id,尝试从任务描述中提取真正的 DataFlow 名称
- dataflow_name = task_name
- if task_id:
- extracted_name = extract_dataflow_name_from_task(task_id)
- if extracted_name:
- dataflow_name = extracted_name
- logger.info(f"使用从任务描述提取的 DataFlow 名称: {dataflow_name}")
- query = """
- MATCH (n:DataFlow {name_zh: $name_zh})
- SET n.script_path = $script_path, n.updated_at = $updated_at
- RETURN n
- """
- updated_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- with driver.session() as session:
- result = session.run(
- query,
- name_zh=dataflow_name,
- script_path=script_path,
- updated_at=updated_at,
- ).single()
- driver.close()
- if result:
- logger.info(
- f"成功更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
- )
- return True
- else:
- logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
- return False
- except Exception as e:
- logger.error(f"更新 DataFlow script_path 失败: {e}")
- return False
- def sync_completed_tasks_to_db() -> int:
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as e:
- logger.error(f"读取 pending_tasks.json 失败: {e}")
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- remaining_tasks = []
- for t in tasks:
- if t.get("status") == "completed":
- task_id = t.get("task_id")
- if not task_id:
- continue
- task_name = t.get("task_name")
- code_path = t.get("code_path")
- # 使用 code_file 字段获取实际的脚本文件名
- code_file = t.get("code_file", "")
- # 统一处理:code_path 始终为 "datafactory/scripts"
- code_path = "datafactory/scripts"
- # 使用 code_file 判断是否为 Python 脚本
- is_python_script = code_file and code_file.endswith(".py")
- # 修复路径重复问题:统一处理脚本路径
- if is_python_script:
- if code_file.startswith(code_path):
- # code_file 已经是完整路径
- full_script_path = code_file
- # 提取纯文件名用于数据库存储
- code_file_name = Path(code_file).name
- elif "/" in code_file or "\\" in code_file:
- # code_file 包含其他路径,提取文件名
- code_file_name = Path(code_file).name
- full_script_path = f"{code_path}/{code_file_name}"
- else:
- # code_file 只是文件名
- code_file_name = code_file
- full_script_path = f"{code_path}/{code_file}"
- logger.info(f"任务 {task_id} 使用 Python 脚本: {full_script_path}")
- else:
- logger.info(
- f"任务 {task_id} 的 code_file ({code_file}) 不是 Python 脚本,跳过 DataFlow 更新"
- )
- code_file_name = code_file
- full_script_path = ""
- if update_task_status(task_id, "completed", code_file_name, code_path):
- updated += 1
- logger.info(f"已同步任务 {task_id} 为 completed")
- # 只有 Python 脚本才更新 DataFlow 节点的 script_path
- if task_name and is_python_script:
- if update_dataflow_script_path(
- task_name, full_script_path, task_id=task_id
- ):
- logger.info(
- f"已更新 DataFlow 脚本路径: {task_name} -> {full_script_path}"
- )
- else:
- logger.warning(f"更新 DataFlow 脚本路径失败: {task_name}")
- # 自动部署到生产服务器(如果启用)
- if ENABLE_AUTO_DEPLOY:
- logger.info(f"开始自动部署任务 {task_id} 到生产服务器...")
- if auto_deploy_completed_task(t):
- logger.info(f"✅ 任务 {task_id} 已成功部署到生产服务器")
- else:
- logger.warning(f"任务 {task_id} 部署到生产服务器失败")
- else:
- logger.info(f"自动部署已禁用,跳过任务 {task_id} 的部署")
- else:
- remaining_tasks.append(t)
- else:
- remaining_tasks.append(t)
- if updated > 0:
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
- return updated
- # ============================================================================
- # 生产服务器部署功能
- # ============================================================================
- def get_ssh_connection():
- """获取 SSH 连接到生产服务器"""
- try:
- import paramiko # type: ignore
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- logger.info(
- f"正在连接生产服务器 {PRODUCTION_SERVER['username']}@"
- f"{PRODUCTION_SERVER['host']}:{PRODUCTION_SERVER['port']}..."
- )
- ssh.connect(
- hostname=PRODUCTION_SERVER["host"],
- port=PRODUCTION_SERVER["port"],
- username=PRODUCTION_SERVER["username"],
- password=PRODUCTION_SERVER["password"],
- timeout=10,
- )
- logger.info("✅ SSH 连接成功")
- return ssh
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return None
- except Exception as e:
- logger.error(f"SSH 连接失败: {e}")
- return None
- def test_ssh_connection() -> bool:
- """测试 SSH 连接到生产服务器"""
- logger.info("=" * 60)
- logger.info("测试生产服务器连接")
- logger.info("=" * 60)
- ssh = get_ssh_connection()
- if not ssh:
- logger.error("❌ SSH 连接测试失败")
- return False
- try:
- # 测试执行命令
- _, stdout, _ = ssh.exec_command("echo 'Connection test successful'")
- output = stdout.read().decode().strip()
- logger.info(f"✅ 命令执行成功: {output}")
- # 检查目标目录是否存在
- _, stdout, _ = ssh.exec_command(
- f"test -d {PRODUCTION_SERVER['script_path']} && echo 'exists' || echo 'not exists'"
- )
- result = stdout.read().decode().strip()
- if result == "exists":
- logger.info(f"✅ 脚本目录存在: {PRODUCTION_SERVER['script_path']}")
- else:
- logger.warning(f"脚本目录不存在: {PRODUCTION_SERVER['script_path']}")
- logger.info("将在首次部署时自动创建")
- ssh.close()
- logger.info("=" * 60)
- logger.info("✅ 连接测试完成")
- logger.info("=" * 60)
- return True
- except Exception as e:
- logger.error(f"❌ 测试执行命令失败: {e}")
- ssh.close()
- return False
- def deploy_script_to_production(
- local_script_path: str, remote_filename: str | None = None
- ) -> bool:
- """部署脚本文件到生产服务器"""
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- # 转换为绝对路径
- local_path = Path(local_script_path)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"本地文件不存在: {local_path}")
- return False
- # 确定远程文件名
- if not remote_filename:
- remote_filename = local_path.name
- remote_path = f"{PRODUCTION_SERVER['script_path']}/{remote_filename}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["script_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['script_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['script_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传文件
- logger.info(f"正在上传: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- # 设置文件权限为可执行
- sftp.chmod(remote_path, 0o755)
- logger.info(f"✅ 脚本部署成功: {remote_path}")
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.error(f"文件传输失败: {e}")
- ssh.close()
- return False
- except ImportError:
- logger.error("未安装 paramiko 库,请运行: pip install paramiko")
- return False
- except Exception as e:
- logger.error(f"部署脚本失败: {e}")
- return False
- def deploy_n8n_workflow_to_production(workflow_file: str) -> bool:
- """
- 部署 n8n 工作流到 n8n 服务器
- 此函数执行两个步骤:
- 1. 通过 n8n API 创建工作流(主要步骤)
- 2. 通过 SFTP 备份工作流文件到生产服务器(可选)
- """
- try:
- import json
- import requests
- # 转换为绝对路径
- local_path = Path(workflow_file)
- if not local_path.is_absolute():
- local_path = WORKSPACE_ROOT / local_path
- if not local_path.exists():
- logger.error(f"工作流文件不存在: {local_path}")
- return False
- # 加载工作流 JSON
- with open(local_path, encoding="utf-8") as f:
- workflow_data = json.load(f)
- workflow_name = workflow_data.get("name", local_path.stem)
- logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
- # 获取 n8n API 配置
- try:
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import BaseConfig
- api_url = BaseConfig.N8N_API_URL
- api_key = BaseConfig.N8N_API_KEY
- timeout = BaseConfig.N8N_API_TIMEOUT
- except (ImportError, AttributeError):
- import os
- api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
- api_key = os.environ.get("N8N_API_KEY", "")
- timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
- if not api_key:
- logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
- return False
- # 准备 API 请求
- headers = {
- "X-N8N-API-KEY": api_key,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- # 准备工作流数据(移除 tags,n8n API 不支持直接创建带 tags)
- workflow_payload = {
- "name": workflow_name,
- "nodes": workflow_data.get("nodes", []),
- "connections": workflow_data.get("connections", {}),
- "settings": workflow_data.get("settings", {}),
- }
- # 调用 n8n API 创建工作流
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
- logger.info(f"调用 n8n API: {create_url}")
- try:
- response = requests.post(
- create_url,
- headers=headers,
- json=workflow_payload,
- timeout=timeout,
- )
- if response.status_code == 401:
- logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
- return False
- elif response.status_code == 403:
- logger.error("n8n API 权限不足")
- return False
- response.raise_for_status()
- created_workflow = response.json()
- workflow_id = created_workflow.get("id")
- logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
- # 可选:将工作流文件备份到生产服务器
- try:
- _backup_workflow_to_server(local_path)
- except Exception as backup_error:
- logger.warning(f"备份工作流文件到服务器失败(非关键): {backup_error}")
- return True
- except requests.exceptions.Timeout:
- logger.error("n8n API 请求超时,请检查网络连接")
- return False
- except requests.exceptions.ConnectionError:
- logger.error(f"无法连接到 n8n 服务器: {api_url}")
- return False
- except requests.exceptions.HTTPError as e:
- error_detail = ""
- try:
- error_detail = e.response.json()
- except Exception:
- error_detail = e.response.text
- logger.error(
- f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
- )
- return False
- except Exception as e:
- logger.error(f"部署工作流失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def _backup_workflow_to_server(local_path: Path) -> bool:
- """备份工作流文件到生产服务器(通过 SFTP)"""
- try:
- import importlib.util
- if importlib.util.find_spec("paramiko") is None:
- logger.debug("未安装 paramiko 库,跳过文件备份")
- return False
- remote_path = f"{PRODUCTION_SERVER['workflow_path']}/{local_path.name}"
- # 建立 SSH 连接
- ssh = get_ssh_connection()
- if not ssh:
- return False
- try:
- # 创建 SFTP 客户端
- sftp = ssh.open_sftp()
- # 确保远程目录存在
- try:
- sftp.stat(PRODUCTION_SERVER["workflow_path"])
- except FileNotFoundError:
- logger.info(f"创建远程目录: {PRODUCTION_SERVER['workflow_path']}")
- _, stdout, _ = ssh.exec_command(
- f"mkdir -p {PRODUCTION_SERVER['workflow_path']}"
- )
- stdout.channel.recv_exit_status()
- # 上传工作流文件
- logger.debug(f"备份工作流文件: {local_path} -> {remote_path}")
- sftp.put(str(local_path), remote_path)
- sftp.close()
- ssh.close()
- return True
- except Exception as e:
- logger.warning(f"工作流文件备份失败: {e}")
- ssh.close()
- return False
- except Exception as e:
- logger.warning(f"备份工作流失败: {e}")
- return False
- def find_remote_workflow_files(task_info: dict[str, Any]) -> list[str]:
- """
- 从生产服务器查找与任务相关的 n8n 工作流文件
- 查找策略:
- 1. 列出远程 workflow_path 目录下的所有 .json 文件
- 2. 根据任务名称或脚本名称匹配相关工作流
- Args:
- task_info: 任务信息字典
- Returns:
- 远程工作流文件路径列表
- """
- remote_files: list[str] = []
- code_file = task_info.get("code_file", "")
- task_name = task_info.get("task_name", "")
- ssh = get_ssh_connection()
- if not ssh:
- logger.warning("无法连接到生产服务器,跳过远程工作流文件查找")
- return remote_files
- try:
- workflow_path = PRODUCTION_SERVER["workflow_path"]
- # 检查目录是否存在
- _, stdout, _ = ssh.exec_command(f"test -d {workflow_path} && echo 'exists'")
- if stdout.read().decode().strip() != "exists":
- logger.info(f"远程工作流目录不存在: {workflow_path}")
- ssh.close()
- return remote_files
- # 列出目录下所有 .json 文件
- _, stdout, _ = ssh.exec_command(f"ls -1 {workflow_path}/*.json 2>/dev/null")
- file_list = stdout.read().decode().strip().split("\n")
- # 过滤有效文件路径
- all_json_files = [
- f.strip() for f in file_list if f.strip() and f.endswith(".json")
- ]
- if not all_json_files:
- logger.info(f"远程工作流目录 {workflow_path} 中没有 JSON 文件")
- ssh.close()
- return remote_files
- logger.info(f"远程服务器发现 {len(all_json_files)} 个工作流文件")
- # 根据任务信息匹配相关工作流
- # 构建匹配模式
- match_patterns: list[str] = []
- # 基于脚本文件名匹配
- if code_file and code_file.endswith(".py"):
- script_base = code_file[:-3] # 去掉 .py
- match_patterns.append(script_base.lower())
- # 基于任务名称匹配(针对 DF_DO 格式的任务名)
- if task_name:
- if task_name.startswith("DF_DO"):
- match_patterns.append(task_name.lower())
- # 对于中文任务名,尝试提取英文/数字部分
- import re
- alphanumeric = re.sub(r"[^a-zA-Z0-9_-]", "", task_name)
- if alphanumeric and len(alphanumeric) >= 3:
- match_patterns.append(alphanumeric.lower())
- # 匹配文件
- for remote_file in all_json_files:
- file_name_lower = Path(remote_file).stem.lower()
- # 检查是否与任何模式匹配
- matched = False
- for pattern in match_patterns:
- if pattern in file_name_lower or file_name_lower in pattern:
- matched = True
- break
- if matched and remote_file not in remote_files:
- remote_files.append(remote_file)
- logger.info(f" 匹配到工作流: {Path(remote_file).name}")
- # 如果没有匹配到任何文件,不再自动部署所有文件
- # 这样可以避免误部署其他任务的工作流
- if not remote_files and all_json_files:
- logger.info("没有精确匹配的工作流文件,跳过远程工作流部署")
- # 不再自动部署所有文件,避免重复部署问题
- ssh.close()
- return remote_files
- except Exception as e:
- logger.error(f"查找远程工作流文件失败: {e}")
- if ssh:
- ssh.close()
- return remote_files
- def deploy_remote_workflow_to_n8n(remote_file_path: str) -> bool:
- """
- 从生产服务器读取工作流 JSON 文件并部署到 n8n 系统
- Args:
- remote_file_path: 远程服务器上的工作流文件完整路径
- Returns:
- 是否部署成功
- """
- try:
- import requests
- ssh = get_ssh_connection()
- if not ssh:
- logger.error("无法连接到生产服务器")
- return False
- # 读取远程工作流文件内容
- logger.info(f"从远程服务器读取工作流: {remote_file_path}")
- _, stdout, stderr = ssh.exec_command(f"cat {remote_file_path}")
- file_content = stdout.read().decode("utf-8")
- error_output = stderr.read().decode()
- if error_output:
- logger.error(f"读取远程文件失败: {error_output}")
- ssh.close()
- return False
- ssh.close()
- # 解析工作流 JSON
- try:
- workflow_data = json.loads(file_content)
- except json.JSONDecodeError as e:
- logger.error(f"解析工作流 JSON 失败: {e}")
- return False
- workflow_name = workflow_data.get("name", Path(remote_file_path).stem)
- logger.info(f"正在部署工作流到 n8n 服务器: {workflow_name}")
- # 获取 n8n API 配置
- try:
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import BaseConfig
- api_url = BaseConfig.N8N_API_URL
- api_key = BaseConfig.N8N_API_KEY
- timeout = BaseConfig.N8N_API_TIMEOUT
- except (ImportError, AttributeError):
- import os
- api_url = os.environ.get("N8N_API_URL", "https://n8n.citupro.com")
- api_key = os.environ.get("N8N_API_KEY", "")
- timeout = int(os.environ.get("N8N_API_TIMEOUT", "30"))
- if not api_key:
- logger.error("未配置 N8N_API_KEY,无法部署工作流到 n8n 服务器")
- return False
- # 准备 API 请求
- headers = {
- "X-N8N-API-KEY": api_key,
- "Content-Type": "application/json",
- "Accept": "application/json",
- }
- # 准备工作流数据
- workflow_payload = {
- "name": workflow_name,
- "nodes": workflow_data.get("nodes", []),
- "connections": workflow_data.get("connections", {}),
- "settings": workflow_data.get("settings", {}),
- }
- # 先检查是否已存在同名工作流
- list_url = f"{api_url.rstrip('/')}/api/v1/workflows"
- try:
- list_response = requests.get(
- list_url,
- headers=headers,
- timeout=timeout,
- )
- if list_response.status_code == 200:
- existing_workflows = list_response.json().get("data", [])
- existing_wf = None
- for wf in existing_workflows:
- if wf.get("name") == workflow_name:
- existing_wf = wf
- break
- if existing_wf:
- # 已存在同名工作流,跳过创建避免重复
- workflow_id = existing_wf.get("id")
- logger.info(
- f"发现已存在的工作流 (ID: {workflow_id}),跳过部署避免重复"
- )
- logger.info(
- "如需更新工作流,请手动在 n8n 控制台操作或删除后重新部署"
- )
- return True # 返回成功,因为工作流已存在
- except requests.exceptions.RequestException as e:
- logger.warning(f"检查已存在工作流时出错: {e}")
- # 调用 n8n API 创建工作流
- create_url = f"{api_url.rstrip('/')}/api/v1/workflows"
- logger.info(f"调用 n8n API 创建工作流: {create_url}")
- try:
- response = requests.post(
- create_url,
- headers=headers,
- json=workflow_payload,
- timeout=timeout,
- )
- if response.status_code == 401:
- logger.error("n8n API 认证失败,请检查 N8N_API_KEY 配置")
- return False
- elif response.status_code == 403:
- logger.error("n8n API 权限不足")
- return False
- response.raise_for_status()
- created_workflow = response.json()
- workflow_id = created_workflow.get("id")
- logger.info(f"✅ 工作流创建成功! ID: {workflow_id}, 名称: {workflow_name}")
- return True
- except requests.exceptions.Timeout:
- logger.error("n8n API 请求超时,请检查网络连接")
- return False
- except requests.exceptions.ConnectionError:
- logger.error(f"无法连接到 n8n 服务器: {api_url}")
- return False
- except requests.exceptions.HTTPError as e:
- error_detail = ""
- try:
- error_detail = e.response.json()
- except Exception:
- error_detail = e.response.text
- logger.error(
- f"n8n API 错误: {e.response.status_code}, 详情: {error_detail}"
- )
- return False
- except Exception as e:
- logger.error(f"从远程服务器部署工作流失败: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return False
- def find_related_workflow_files(
- task_info: dict[str, Any],
- ) -> list[Path]:
- """
- 查找与任务相关的所有 n8n 工作流文件
- 查找策略:
- 1. 与脚本同目录的工作流文件 (n8n_workflow_*.json)
- 2. datafactory/n8n_workflows 目录下的工作流文件
- 3. 根据任务名称模式匹配
- 4. 根据脚本名称匹配 (去掉 .py 后缀)
- 5. 根据任务 ID 匹配
- 6. 最近修改的工作流文件 (在任务创建后修改的)
- """
- workflow_files: list[Path] = []
- code_name = task_info.get("code_name", "")
- code_path = task_info.get("code_path", "datafactory/scripts")
- task_name = task_info.get("task_name", "")
- task_id = task_info.get("task_id")
- # 获取任务通知时间用于判断文件是否是新创建的
- notified_at_str = task_info.get("notified_at", "")
- notified_at = None
- if notified_at_str:
- with contextlib.suppress(ValueError, TypeError):
- notified_at = datetime.fromisoformat(notified_at_str.replace("Z", "+00:00"))
- # 查找模式1: 与脚本同目录的工作流文件
- script_dir = WORKSPACE_ROOT / code_path
- if script_dir.exists() and script_dir.is_dir():
- for wf_file in script_dir.glob("n8n_workflow_*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- workflow_files.append(wf_file)
- # 也查找以 workflow_ 开头的文件
- for wf_file in script_dir.glob("workflow_*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- workflow_files.append(wf_file)
- # 查找模式2: datafactory/n8n_workflows 目录
- n8n_workflows_dir = WORKSPACE_ROOT / "datafactory" / "n8n_workflows"
- if n8n_workflows_dir.exists():
- for wf_file in n8n_workflows_dir.glob("*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- workflow_files.append(wf_file)
- # 查找模式3: 根据任务名称匹配
- if task_name and task_name != "未知任务":
- # 尝试多种名称变体
- name_patterns = [
- task_name.replace(" ", "_").lower(),
- task_name.replace(" ", "-").lower(),
- task_name.lower(),
- ]
- for pattern in name_patterns:
- if len(pattern) < 3: # 跳过过短的模式
- continue
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*{pattern}*.json"):
- # 验证是文件、未添加过、且是有效的 n8n 工作流文件
- if (
- wf_file.is_file()
- and wf_file not in workflow_files
- and _is_n8n_workflow_file(wf_file)
- ):
- workflow_files.append(wf_file)
- # 查找模式4: 根据脚本名称匹配
- if code_name and code_name.endswith(".py"):
- script_base_name = code_name[:-3] # 去掉 .py
- # 在 datafactory 目录下查找
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(
- f"*{script_base_name}*.json"
- ):
- if (
- wf_file.is_file()
- and wf_file not in workflow_files
- and _is_n8n_workflow_file(wf_file)
- ):
- workflow_files.append(wf_file)
- # 查找模式5: 根据任务 ID 匹配
- if task_id:
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob(f"*task_{task_id}*.json"):
- if (
- wf_file.is_file()
- and wf_file not in workflow_files
- and _is_n8n_workflow_file(wf_file)
- ):
- workflow_files.append(wf_file)
- # 查找模式6: 最近修改的工作流文件(在任务创建后修改的)
- if notified_at:
- for wf_file in (WORKSPACE_ROOT / "datafactory").rglob("*.json"):
- if wf_file.is_file() and wf_file not in workflow_files:
- try:
- mtime = datetime.fromtimestamp(wf_file.stat().st_mtime)
- # 如果文件在任务通知后被修改,可能是相关的工作流
- if mtime > notified_at.replace(
- tzinfo=None
- ) and _is_n8n_workflow_file(wf_file):
- workflow_files.append(wf_file)
- logger.debug(f"发现最近修改的工作流: {wf_file.name}")
- except (OSError, ValueError):
- pass
- return workflow_files
- def _is_n8n_workflow_file(file_path: Path) -> bool:
- """
- 检查文件是否是有效的 n8n 工作流文件
- 通过检查 JSON 结构来验证
- """
- try:
- with open(file_path, encoding="utf-8") as f:
- data = json.load(f)
- # n8n 工作流文件通常包含 nodes 和 connections 字段
- if isinstance(data, dict):
- has_nodes = "nodes" in data
- has_connections = "connections" in data
- has_name = "name" in data
- # 至少需要有 nodes 或符合 n8n 工作流特征
- return has_nodes or (has_name and has_connections)
- return False
- except (json.JSONDecodeError, OSError):
- return False
- def auto_deploy_completed_task(task_info: dict[str, Any]) -> bool:
- """
- 自动部署已完成任务的脚本和工作流到生产服务器
- 部署流程:
- 1. 部署 Python 脚本到生产服务器 (通过 SFTP)
- 2. 查找并部署相关的 n8n 工作流 (通过 n8n API)
- 3. 记录部署结果
- """
- # 优先使用 code_file 字段,其次使用 code_name
- code_file = task_info.get("code_file", "")
- code_name = task_info.get("code_name", "")
- code_path = task_info.get("code_path", "datafactory/scripts")
- task_name = task_info.get("task_name", "未知任务")
- task_id = task_info.get("task_id", "N/A")
- # 确定实际的脚本文件名:优先使用 code_file,如果为空则尝试 code_name
- actual_script_file = code_file if code_file else code_name
- if not actual_script_file or not code_path:
- logger.warning(f"任务 {task_name} (ID: {task_id}) 缺少代码文件信息,跳过部署")
- return False
- logger.info("=" * 60)
- logger.info(f"🚀 开始自动部署任务: {task_name} (ID: {task_id})")
- logger.info("=" * 60)
- deploy_results = {
- "script_deployed": False,
- "workflows_found": 0,
- "workflows_deployed": 0,
- "workflows_failed": 0,
- }
- # 1. 部署 Python 脚本
- if actual_script_file.endswith(".py"):
- # 修复路径重复问题:如果 actual_script_file 已经包含 code_path,则只使用 actual_script_file
- # 否则拼接 code_path 和 actual_script_file
- if actual_script_file.startswith(code_path):
- # actual_script_file 已经是完整路径,如 "datafactory/scripts/task_41_xxx.py"
- script_path = actual_script_file
- elif "/" in actual_script_file or "\\" in actual_script_file:
- # actual_script_file 包含路径分隔符但不以 code_path 开头
- # 可能是其他格式的路径,提取文件名后拼接
- script_filename = Path(actual_script_file).name
- script_path = f"{code_path}/{script_filename}"
- else:
- # actual_script_file 只是文件名,正常拼接
- script_path = f"{code_path}/{actual_script_file}"
- logger.info(f"📦 部署 Python 脚本: {script_path}")
- if deploy_script_to_production(script_path):
- logger.info(f"✅ 脚本 {actual_script_file} 部署成功")
- deploy_results["script_deployed"] = True
- else:
- logger.error(f"❌ 脚本 {actual_script_file} 部署失败")
- # 2. 查找并部署相关的 n8n 工作流文件
- # 2.1 首先从本地查找工作流文件
- logger.info("🔍 查找本地 n8n 工作流文件...")
- workflow_files = find_related_workflow_files(task_info)
- if workflow_files:
- logger.info(f"📋 本地发现 {len(workflow_files)} 个相关工作流文件:")
- for wf_file in workflow_files:
- logger.info(f" - {wf_file.relative_to(WORKSPACE_ROOT)}")
- for wf_file in workflow_files:
- logger.info(f"🔄 部署本地工作流: {wf_file.name}")
- if deploy_n8n_workflow_to_production(str(wf_file)):
- logger.info(f"✅ 工作流 {wf_file.name} 部署成功")
- deploy_results["workflows_deployed"] += 1
- else:
- logger.error(f"❌ 工作流 {wf_file.name} 部署失败")
- deploy_results["workflows_failed"] += 1
- else:
- logger.info("ℹ️ 本地未发现相关工作流文件")
- # 2.2 然后从生产服务器查找并部署工作流文件
- logger.info("🔍 查找生产服务器上的 n8n 工作流文件...")
- remote_workflow_files = find_remote_workflow_files(task_info)
- if remote_workflow_files:
- logger.info(f"📋 远程服务器发现 {len(remote_workflow_files)} 个相关工作流文件:")
- for remote_file in remote_workflow_files:
- logger.info(f" - {Path(remote_file).name}")
- for remote_file in remote_workflow_files:
- logger.info(f"🔄 部署远程工作流: {Path(remote_file).name}")
- if deploy_remote_workflow_to_n8n(remote_file):
- logger.info(f"✅ 远程工作流 {Path(remote_file).name} 部署成功")
- deploy_results["workflows_deployed"] += 1
- else:
- logger.error(f"❌ 远程工作流 {Path(remote_file).name} 部署失败")
- deploy_results["workflows_failed"] += 1
- else:
- logger.info("ℹ️ 远程服务器未发现相关工作流文件")
- # 更新发现的工作流总数
- deploy_results["workflows_found"] = len(workflow_files) + len(remote_workflow_files)
- # 3. 汇总部署结果
- logger.info("=" * 60)
- logger.info(f"📊 部署结果汇总 - 任务: {task_name} (ID: {task_id})")
- logger.info("-" * 40)
- logger.info(
- f" 脚本部署: {'✅ 成功' if deploy_results['script_deployed'] else '❌ 失败或跳过'}"
- )
- logger.info(f" 发现工作流: {deploy_results['workflows_found']} 个")
- logger.info(f" 工作流部署成功: {deploy_results['workflows_deployed']} 个")
- logger.info(f" 工作流部署失败: {deploy_results['workflows_failed']} 个")
- # 判断整体部署是否成功
- deploy_success = (
- deploy_results["script_deployed"] and deploy_results["workflows_failed"] == 0
- )
- if deploy_success:
- logger.info(f"✅ 任务 {task_name} 部署完成!")
- elif deploy_results["script_deployed"]:
- if deploy_results["workflows_failed"] > 0:
- logger.warning(f"⚠️ 任务 {task_name} 脚本部署成功,但部分工作流部署失败")
- else:
- logger.info(f"✅ 任务 {task_name} 脚本部署成功")
- deploy_success = True # 脚本部署成功就认为整体成功
- else:
- logger.error(f"❌ 任务 {task_name} 部署失败")
- logger.info("=" * 60)
- return deploy_success
- # ============================================================================
- # Cursor Agent 自动化
- # ============================================================================
- # Agent 会话状态
- AGENT_SESSION_ACTIVE: bool = False
- AGENT_START_TIME: float = 0
- def get_all_cursor_windows() -> list[dict[str, Any]]:
- """获取所有 Cursor 窗口信息"""
- if not HAS_CURSOR_GUI:
- return []
- cursor_windows: list[dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = "cursor" in title.lower()
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- area = (right - left) * (bottom - top)
- cursor_windows.append(
- {
- "hwnd": hwnd,
- "title": title,
- "class_name": class_name,
- "area": area,
- }
- )
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- return cursor_windows
- def find_cursor_window() -> int | None:
- """查找 Cursor 主窗口句柄"""
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows = get_all_cursor_windows()
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- # 按面积排序,返回最大的窗口(主窗口)
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- return cursor_windows[0]["hwnd"]
- def activate_window(hwnd: int) -> bool:
- """
- 激活指定窗口
- Windows 对 SetForegroundWindow 有限制,只有满足以下条件之一才能成功:
- 1. 调用进程是前台进程
- 2. 调用进程由前台进程启动
- 3. 目标窗口属于前台进程
- 4. 没有其他窗口在前台
- 此函数使用多种技巧绕过这些限制。
- """
- if not HAS_CURSOR_GUI:
- return False
- try:
- # 方法1: 使用 AttachThreadInput 技巧绕过 SetForegroundWindow 限制
- # 这是最可靠的方法,通过将当前线程附加到前台窗口的线程来获取激活权限
- import ctypes
- user32 = ctypes.windll.user32
- # 获取当前前台窗口的线程ID
- foreground_hwnd = user32.GetForegroundWindow()
- foreground_thread_id = user32.GetWindowThreadProcessId(foreground_hwnd, None)
- # 获取当前线程ID
- current_thread_id = ctypes.windll.kernel32.GetCurrentThreadId()
- attached = False
- # 如果当前线程不是前台线程,则附加到前台线程
- if current_thread_id != foreground_thread_id:
- attached = user32.AttachThreadInput(
- current_thread_id, foreground_thread_id, True
- )
- try:
- # 先确保窗口不是最小化状态
- if win32gui.IsIconic(hwnd):
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.2)
- # 使用 BringWindowToTop 将窗口置顶
- user32.BringWindowToTop(hwnd)
- # 显示窗口
- win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
- # 尝试 SetForegroundWindow
- result = user32.SetForegroundWindow(hwnd)
- if not result:
- # 方法2: 使用 Alt 键模拟技巧
- # 发送一个 Alt 键可以让系统认为用户有交互意图
- # 定义必要的常量
- KEYEVENTF_EXTENDEDKEY = 0x0001
- KEYEVENTF_KEYUP = 0x0002
- VK_MENU = 0x12 # Alt 键
- # 模拟按下和释放 Alt 键
- user32.keybd_event(VK_MENU, 0, KEYEVENTF_EXTENDEDKEY, 0)
- user32.keybd_event(
- VK_MENU, 0, KEYEVENTF_EXTENDEDKEY | KEYEVENTF_KEYUP, 0
- )
- time.sleep(0.1)
- # 再次尝试
- result = user32.SetForegroundWindow(hwnd)
- if not result:
- # 方法3: 使用 ShowWindow 配合 SW_SHOWDEFAULT
- win32gui.ShowWindow(hwnd, win32con.SW_SHOWDEFAULT)
- time.sleep(0.1)
- result = user32.SetForegroundWindow(hwnd)
- if not result:
- # 方法4: 使用 SetWindowPos 将窗口置于最顶层
- SWP_NOMOVE = 0x0002
- SWP_NOSIZE = 0x0001
- SWP_SHOWWINDOW = 0x0040
- HWND_TOPMOST = -1
- HWND_NOTOPMOST = -2
- # 先设为最顶层
- user32.SetWindowPos(
- hwnd,
- HWND_TOPMOST,
- 0,
- 0,
- 0,
- 0,
- SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
- )
- time.sleep(0.1)
- # 再取消最顶层(但窗口仍在前台)
- user32.SetWindowPos(
- hwnd,
- HWND_NOTOPMOST,
- 0,
- 0,
- 0,
- 0,
- SWP_NOMOVE | SWP_NOSIZE | SWP_SHOWWINDOW,
- )
- result = user32.SetForegroundWindow(hwnd)
- time.sleep(0.3)
- # 验证是否成功
- current_foreground = user32.GetForegroundWindow()
- if current_foreground == hwnd:
- logger.debug("窗口激活成功")
- return True
- else:
- # 即使 SetForegroundWindow 返回失败,窗口可能已经被置顶并可见
- # 检查窗口是否可见且不是最小化
- if win32gui.IsWindowVisible(hwnd) and not win32gui.IsIconic(hwnd):
- logger.warning("窗口可能未完全激活到前台,但窗口可见,继续执行...")
- return True
- else:
- logger.error("激活窗口失败: 窗口不在前台")
- return False
- finally:
- # 分离线程
- if attached:
- user32.AttachThreadInput(current_thread_id, foreground_thread_id, False)
- except Exception as e:
- logger.error(f"激活窗口失败: {e}")
- # 最后的备用方案:直接尝试基本操作
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- win32gui.ShowWindow(hwnd, win32con.SW_SHOW)
- time.sleep(0.3)
- # 即使失败也返回 True,让调用者继续尝试
- if win32gui.IsWindowVisible(hwnd):
- logger.warning("使用备用方案激活窗口,继续执行...")
- return True
- except Exception:
- pass
- return False
- def open_new_agent() -> bool:
- """在 Cursor 中打开新的 Agent 窗口"""
- global AGENT_SESSION_ACTIVE, AGENT_START_TIME
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- if not activate_window(hwnd):
- return False
- try:
- # 使用 Ctrl+Shift+I 打开新的 Agent/Composer
- logger.info("正在打开新的 Agent...")
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(2.0) # 等待 Agent 窗口打开
- AGENT_SESSION_ACTIVE = True
- AGENT_START_TIME = time.time()
- logger.info("✅ 新的 Agent 已打开")
- return True
- except Exception as e:
- logger.error(f"打开 Agent 失败: {e}")
- return False
- def close_current_agent(force: bool = False, max_retries: int = 3) -> bool:
- """
- 关闭当前的 Agent 会话
- Args:
- force: 是否强制关闭(使用多种方法)
- max_retries: 最大重试次数
- 关闭策略:
- 1. 使用 Escape 键关闭 Agent 面板
- 2. 如果失败,尝试 Ctrl+Shift+I 切换 Agent 面板
- 3. 如果仍失败,尝试点击空白区域并按 Escape
- """
- global AGENT_SESSION_ACTIVE
- if not HAS_CURSOR_GUI:
- AGENT_SESSION_ACTIVE = False
- return False
- if not AGENT_SESSION_ACTIVE and not force:
- logger.info("没有活动的 Agent 会话")
- return True
- logger.info("🔄 正在关闭 Agent...")
- for attempt in range(max_retries):
- try:
- hwnd = find_cursor_window()
- if not hwnd:
- logger.warning("未找到 Cursor 窗口")
- AGENT_SESSION_ACTIVE = False
- return False
- if not activate_window(hwnd):
- logger.warning(f"激活窗口失败 (尝试 {attempt + 1}/{max_retries})")
- time.sleep(0.5)
- continue
- # 方法1: 按 Escape 键关闭 Agent
- logger.debug(f"尝试方法1: Escape 键 (尝试 {attempt + 1}/{max_retries})")
- pyautogui.press("escape")
- time.sleep(0.3)
- pyautogui.press("escape")
- time.sleep(0.3)
- # 方法2: 使用 Ctrl+Shift+I 切换 Agent 面板(关闭)
- if force or attempt > 0:
- logger.debug("尝试方法2: Ctrl+Shift+I 切换")
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(0.5)
- # 方法3: 点击编辑器区域并按 Escape
- if force or attempt > 1:
- logger.debug("尝试方法3: 点击编辑器区域")
- # 获取窗口位置,点击中心偏左位置(编辑器区域)
- try:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- center_x = left + (right - left) // 3 # 偏左1/3位置
- center_y = top + (bottom - top) // 2
- pyautogui.click(center_x, center_y)
- time.sleep(0.2)
- pyautogui.press("escape")
- time.sleep(0.3)
- except Exception as click_err:
- logger.debug(f"点击方法失败: {click_err}")
- AGENT_SESSION_ACTIVE = False
- logger.info("✅ Agent 已关闭")
- return True
- except Exception as e:
- logger.warning(f"关闭 Agent 尝试 {attempt + 1} 失败: {e}")
- time.sleep(0.5)
- # 即使关闭失败,也标记为非活动状态,避免状态不一致
- AGENT_SESSION_ACTIVE = False
- logger.warning("⚠️ Agent 关闭可能未完全成功,但已重置状态")
- return False
- def force_close_all_agents() -> bool:
- """
- 强制关闭所有可能的 Agent 会话
- 用于清理可能遗留的多个 Agent 窗口
- """
- global AGENT_SESSION_ACTIVE
- if not HAS_CURSOR_GUI:
- return False
- logger.info("🔄 强制关闭所有 Agent 会话...")
- try:
- hwnd = find_cursor_window()
- if not hwnd:
- AGENT_SESSION_ACTIVE = False
- return True
- if not activate_window(hwnd):
- AGENT_SESSION_ACTIVE = False
- return False
- # 连续按多次 Escape 确保关闭所有面板
- for _ in range(5):
- pyautogui.press("escape")
- time.sleep(0.2)
- # 使用快捷键关闭可能的 Agent 面板
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(0.3)
- pyautogui.hotkey("ctrl", "shift", "i")
- time.sleep(0.3)
- AGENT_SESSION_ACTIVE = False
- logger.info("✅ 所有 Agent 会话已关闭")
- return True
- except Exception as e:
- logger.error(f"强制关闭 Agent 失败: {e}")
- AGENT_SESSION_ACTIVE = False
- return False
- def type_message_to_agent(message: str) -> bool:
- """向 Agent 输入消息"""
- if not HAS_CURSOR_GUI:
- return False
- try:
- # 等待 Agent 输入框获得焦点
- time.sleep(0.5)
- # 使用剪贴板粘贴(更可靠地处理中文和特殊字符)
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- # 回退到逐字符输入
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- # 按 Enter 发送消息
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Agent")
- return True
- except Exception as e:
- logger.error(f"发送消息到 Agent 失败: {e}")
- return False
- def wait_for_agent_completion(
- timeout: int = 3600,
- check_interval: int = 30,
- ) -> bool:
- """
- 等待 Agent 完成任务
- 通过检查 pending_tasks.json 中的任务状态来判断是否完成
- """
- start_time = time.time()
- logger.info(f"等待 Agent 完成任务(超时: {timeout}s)...")
- while time.time() - start_time < timeout:
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- elapsed = int(time.time() - start_time)
- logger.info(f"✅ 所有任务已完成!耗时: {elapsed}s")
- return True
- remaining = len(processing_ids)
- elapsed = int(time.time() - start_time)
- logger.info(
- f"仍有 {remaining} 个任务进行中... (已等待 {elapsed}s / {timeout}s)"
- )
- time.sleep(check_interval)
- logger.warning("等待超时,仍有未完成的任务")
- return False
- def run_agent_once(
- timeout: int = 3600,
- auto_close: bool = True,
- ) -> bool:
- """
- 执行一次 Agent 任务
- 流程:
- 1. 同步已完成任务到数据库
- 2. 从数据库读取 pending 任务
- 3. 更新任务状态为 processing
- 4. 生成执行指令文件(包含所有 processing 任务)
- 5. 打开 Agent 并发送消息
- 6. 等待任务完成
- 7. 同步完成任务 + 自动部署
- 8. 关闭 Agent
- """
- logger.info("=" * 60)
- logger.info("Agent 单次执行模式")
- logger.info("=" * 60)
- # 1. 先同步已完成任务
- sync_completed_tasks_to_db()
- # 2. 从数据库获取 pending 任务
- logger.info("正在从数据库查询 pending 任务...")
- pending_tasks = get_pending_tasks()
- # 3. 获取当前 processing 任务
- processing_ids = get_processing_task_ids()
- # 4. 检查是否有任务需要执行
- if not pending_tasks and not processing_ids:
- logger.info("✅ 没有待执行的任务")
- return True
- if pending_tasks:
- logger.info(f"发现 {len(pending_tasks)} 个新的 pending 任务")
- # 5. 更新新任务状态为 processing
- for task in pending_tasks:
- update_task_status(task["task_id"], "processing")
- # 6. 写入 pending_tasks.json
- write_pending_tasks_json(pending_tasks)
- if processing_ids:
- logger.info(f"发现 {len(processing_ids)} 个已有的 processing 任务")
- # 7. 获取所有需要执行的任务(包含完整信息)并生成执行指令
- all_tasks_to_execute = get_all_tasks_to_execute()
- if all_tasks_to_execute:
- logger.info(f"共 {len(all_tasks_to_execute)} 个任务需要执行")
- # 生成包含所有任务的执行指令文件
- create_execute_instructions(all_tasks_to_execute)
- else:
- logger.warning("无法获取任务详细信息,跳过生成执行指令")
- # 7. 更新触发器文件
- all_processing_ids = get_processing_task_ids()
- if all_processing_ids:
- update_trigger_file(
- task_count=len(all_processing_ids),
- status="有待执行任务",
- task_ids=all_processing_ids,
- )
- # 8. 打开 Agent 并发送消息
- if not open_new_agent():
- logger.error("❌ 无法打开 Agent")
- return False
- if not type_message_to_agent(AGENT_MESSAGE):
- logger.error("❌ 无法发送消息到 Agent")
- close_current_agent()
- return False
- logger.info(f"已发送消息: {AGENT_MESSAGE[:50]}...")
- # 9. 等待任务完成
- completed = wait_for_agent_completion(timeout=timeout)
- # 10. 立即关闭 Agent(在同步之前)
- logger.info("🔄 任务执行完毕,立即关闭 Agent...")
- if auto_close:
- close_current_agent(force=True)
- time.sleep(1.0) # 等待关闭完成
- # 11. 同步已完成的任务到数据库(触发自动部署)
- logger.info("🔄 开始同步和部署...")
- sync_completed_tasks_to_db()
- if completed:
- logger.info("✅ Agent 已完成所有任务")
- else:
- logger.warning("⚠️ Agent 未能在超时时间内完成所有任务")
- # 强制关闭可能遗留的 Agent
- force_close_all_agents()
- logger.info("=" * 60)
- logger.info("Agent 会话结束")
- logger.info("=" * 60)
- return completed
- def run_agent_loop(
- interval: int = 300,
- timeout: int = 3600,
- auto_close: bool = True,
- ) -> None:
- """
- Agent 循环模式
- 循环执行 Agent 单次任务,直到用户按 Ctrl+C 停止
- 完整流程:
- 1. 同步已完成任务到数据库(触发自动部署)
- 2. 检查是否有新的 pending 任务
- 3. 生成执行指令文件
- 4. 启动 Agent 执行任务
- 5. 等待任务完成
- 6. 同步完成任务并触发自动部署
- 7. 循环...
- """
- global AGENT_SESSION_ACTIVE
- logger.info("=" * 60)
- logger.info("🔄 Agent 循环模式已启动")
- logger.info("=" * 60)
- logger.info(f" 检查间隔: {interval} 秒")
- logger.info(f" 任务超时: {timeout} 秒")
- logger.info(f" 自动部署: {'✅ 已启用' if ENABLE_AUTO_DEPLOY else '❌ 已禁用'}")
- logger.info(f" 自动关闭 Agent: {'✅ 是' if auto_close else '❌ 否'}")
- logger.info("=" * 60)
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- loop_count = 0
- total_tasks_completed = 0
- total_deployments = 0
- try:
- while True:
- try:
- loop_count += 1
- logger.info(f"\n{'=' * 60}")
- logger.info(f"📍 开始第 {loop_count} 轮任务检查...")
- logger.info(f"{'=' * 60}")
- # 1. 同步已完成任务(这会触发自动部署)
- logger.info("🔄 检查并同步已完成的任务...")
- synced_count = sync_completed_tasks_to_db()
- if synced_count > 0:
- total_tasks_completed += synced_count
- total_deployments += synced_count
- logger.info(
- f"✅ 已同步 {synced_count} 个完成的任务(累计: {total_tasks_completed})"
- )
- # 2. 从数据库获取 pending 任务
- logger.info("📡 检查数据库中的 pending 任务...")
- pending_tasks = get_pending_tasks()
- if pending_tasks:
- logger.info(f"📋 发现 {len(pending_tasks)} 个新的 pending 任务:")
- for task in pending_tasks:
- logger.info(f" - [{task['task_id']}] {task['task_name']}")
- # 更新任务状态为 processing
- for task in pending_tasks:
- update_task_status(task["task_id"], "processing")
- # 写入 pending_tasks.json
- write_pending_tasks_json(pending_tasks)
- # 3. 检查是否有 processing 任务
- processing_ids = get_processing_task_ids()
- # 4. 如果有新任务或有 processing 任务,生成包含所有任务的执行指令
- if pending_tasks or processing_ids:
- all_tasks_to_execute = get_all_tasks_to_execute()
- if all_tasks_to_execute:
- logger.info(
- f"📝 生成执行指令文件,共 {len(all_tasks_to_execute)} 个任务"
- )
- create_execute_instructions(all_tasks_to_execute)
- if processing_ids:
- # 如果有活动的 Agent 会话,不需要重新启动
- if AGENT_SESSION_ACTIVE:
- logger.info(
- f"⏳ Agent 正在执行中,剩余 {len(processing_ids)} 个任务"
- )
- else:
- logger.info(
- f"🎯 发现 {len(processing_ids)} 个待处理任务,准备启动 Agent"
- )
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- # 启动 Agent
- if open_new_agent():
- if type_message_to_agent(AGENT_MESSAGE):
- logger.info("✅ 已启动 Agent 并发送执行提醒")
- # 等待任务完成
- task_completed = wait_for_agent_completion(
- timeout=timeout
- )
- # ===== 关键:任务完成后立即关闭 Agent =====
- logger.info("🔄 任务执行完毕,立即关闭 Agent...")
- if auto_close:
- # 使用强制关闭,确保 Agent 被正确关闭
- close_current_agent(force=True)
- # 等待一小段时间确保关闭完成
- time.sleep(1.0)
- # 同步完成的任务(这会触发自动部署)
- logger.info("🔄 开始同步和部署...")
- synced = sync_completed_tasks_to_db()
- if synced > 0:
- total_tasks_completed += synced
- total_deployments += synced
- logger.info(
- f"✅ 本轮完成 {synced} 个任务的同步和部署"
- )
- # 显示本轮统计
- logger.info(f"📊 本轮统计: 完成任务 {synced} 个")
- if ENABLE_AUTO_DEPLOY:
- logger.info(f" 已触发自动部署: {synced} 个")
- # 如果任务未完成(超时),也确保关闭 Agent
- if not task_completed:
- logger.warning("⚠️ 任务超时,强制关闭 Agent")
- force_close_all_agents()
- else:
- logger.warning("❌ 发送消息失败")
- close_current_agent(force=True)
- else:
- logger.warning("❌ 启动 Agent 失败")
- else:
- logger.info("✅ 当前没有待处理任务")
- # 显示累计统计
- logger.info(
- f"\n📈 累计统计: 已完成 {total_tasks_completed} 个任务, "
- f"已部署 {total_deployments} 个"
- )
- logger.info(f"⏰ {interval} 秒后将进行第 {loop_count + 1} 轮检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- import traceback
- logger.error(traceback.format_exc())
- logger.info(f"⏰ {interval} 秒后重试...")
- time.sleep(interval)
- except KeyboardInterrupt:
- # 退出时关闭 Agent
- logger.info("\n" + "=" * 60)
- logger.info("⛔ 收到停止信号,正在退出...")
- if AGENT_SESSION_ACTIVE:
- logger.info("🔄 正在关闭 Agent...")
- close_current_agent()
- # 最后一次同步
- logger.info("🔄 执行最终同步...")
- final_synced = sync_completed_tasks_to_db()
- if final_synced > 0:
- total_tasks_completed += final_synced
- logger.info(f"✅ 最终同步了 {final_synced} 个任务")
- logger.info("=" * 60)
- logger.info("📊 会话统计:")
- logger.info(f" 总循环次数: {loop_count}")
- logger.info(f" 总完成任务: {total_tasks_completed}")
- logger.info(f" 总部署次数: {total_deployments}")
- logger.info("=" * 60)
- logger.info("✅ Agent 循环模式已停止")
- # ============================================================================
- # 交互式菜单
- # ============================================================================
- def show_interactive_menu() -> None:
- """显示交互式菜单并执行用户选择的操作"""
- global ENABLE_AUTO_DEPLOY
- while True:
- print("\n" + "=" * 60)
- print("自动任务执行调度脚本 - Agent 模式")
- print("=" * 60)
- print("\n请选择操作模式:\n")
- print(" 1. Agent 单次执行")
- print(" 2. Agent 循环模式(含自动部署脚本和n8n工作流)")
- print(" 3. Agent 循环模式(禁用部署)")
- print(" 4. 测试生产服务器连接")
- print(" 5. 查看当前任务状态")
- print(" 6. 手动触发任务部署")
- print(" 7. 强制关闭所有 Agent")
- print(" 0. 退出")
- print("\n" + "-" * 60)
- try:
- choice = input("请输入选项 [0-5]: ").strip()
- except (KeyboardInterrupt, EOFError):
- print("\n再见!")
- break
- if choice == "0":
- print("再见!")
- break
- elif choice == "1":
- print("\n启动 Agent 单次执行模式...")
- run_agent_once(timeout=3600, auto_close=True)
- input("\n按 Enter 键返回菜单...")
- elif choice == "2":
- try:
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
- interval = int(interval_str) if interval_str else 300
- except ValueError:
- interval = 300
- print("\n🚀 启动 Agent 循环模式(含自动部署)")
- print(f" 检查间隔: {interval} 秒")
- print(" 自动部署: ✅ 已启用")
- print("\n 任务完成后将自动:")
- print(" - 部署 Python 脚本到生产服务器")
- print(" - 查找并部署相关 n8n 工作流")
- print("\n按 Ctrl+C 停止服务并返回菜单\n")
- ENABLE_AUTO_DEPLOY = True
- try:
- run_agent_loop(interval=interval)
- except KeyboardInterrupt:
- print("\n循环已停止")
- elif choice == "3":
- try:
- interval_str = input("请输入检查间隔(秒,默认300): ").strip()
- interval = int(interval_str) if interval_str else 300
- except ValueError:
- interval = 300
- print(f"\n启动 Agent 循环模式(禁用部署),检查间隔: {interval} 秒")
- print("按 Ctrl+C 停止服务并返回菜单\n")
- ENABLE_AUTO_DEPLOY = False
- try:
- run_agent_loop(interval=interval)
- except KeyboardInterrupt:
- print("\n循环已停止")
- elif choice == "4":
- print("\n测试生产服务器连接...")
- if test_ssh_connection():
- print("✅ 连接测试成功")
- else:
- print("❌ 连接测试失败")
- input("\n按 Enter 键返回菜单...")
- elif choice == "5":
- print("\n当前任务状态:")
- print("-" * 40)
- # 从数据库获取 pending 任务
- pending_tasks = get_pending_tasks()
- print(f" 数据库中 pending 任务: {len(pending_tasks)} 个")
- for task in pending_tasks:
- print(f" - [{task['task_id']}] {task['task_name']}")
- # 从本地文件获取 processing 任务
- processing_ids = get_processing_task_ids()
- print(f" 本地 processing 任务: {len(processing_ids)} 个")
- if processing_ids:
- print(f" 任务 ID: {processing_ids}")
- # 显示已完成的任务
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- all_local_tasks = json.load(f)
- completed_tasks = [
- t for t in all_local_tasks if t.get("status") == "completed"
- ]
- print(f" 本地 completed 任务: {len(completed_tasks)} 个")
- for task in completed_tasks:
- print(
- f" - [{task.get('task_id')}] {task.get('task_name')} -> {task.get('code_file', 'N/A')}"
- )
- except Exception:
- pass
- input("\n按 Enter 键返回菜单...")
- elif choice == "6":
- print("\n手动触发任务部署")
- print("-" * 40)
- # 显示已完成的任务列表
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- all_tasks = json.load(f)
- completed_tasks = [
- t for t in all_tasks if t.get("status") == "completed"
- ]
- if not completed_tasks:
- print("没有已完成的任务可供部署")
- input("\n按 Enter 键返回菜单...")
- continue
- print("已完成的任务:")
- for idx, task in enumerate(completed_tasks, 1):
- task_id = task.get("task_id", "N/A")
- task_name = task.get("task_name", "未知")
- code_file = task.get("code_file", "N/A")
- print(f" {idx}. [{task_id}] {task_name}")
- print(f" 代码文件: {code_file}")
- print("\n 0. 部署全部")
- print(" q. 返回菜单")
- try:
- selection = input("\n请选择要部署的任务编号: ").strip().lower()
- if selection == "q":
- continue
- tasks_to_deploy = []
- if selection == "0":
- tasks_to_deploy = completed_tasks
- else:
- try:
- idx = int(selection) - 1
- if 0 <= idx < len(completed_tasks):
- tasks_to_deploy = [completed_tasks[idx]]
- else:
- print("❌ 无效的编号")
- continue
- except ValueError:
- print("❌ 请输入有效的数字")
- continue
- if tasks_to_deploy:
- print(f"\n🚀 开始部署 {len(tasks_to_deploy)} 个任务...")
- ENABLE_AUTO_DEPLOY = True
- success_count = 0
- for task in tasks_to_deploy:
- if auto_deploy_completed_task(task):
- success_count += 1
- print(
- f"\n📊 部署完成: {success_count}/{len(tasks_to_deploy)} 成功"
- )
- except (KeyboardInterrupt, EOFError):
- pass
- except Exception as e:
- print(f"❌ 读取任务列表失败: {e}")
- else:
- print("没有本地任务记录")
- input("\n按 Enter 键返回菜单...")
- elif choice == "7":
- print("\n🔄 强制关闭所有 Agent 会话...")
- if HAS_CURSOR_GUI:
- if force_close_all_agents():
- print("✅ 所有 Agent 会话已关闭")
- else:
- print("⚠️ 关闭过程中可能出现问题,请检查 Cursor 窗口")
- else:
- print("❌ 当前环境不支持 GUI 自动化")
- input("\n按 Enter 键返回菜单...")
- else:
- print("❌ 无效的选项,请重新选择")
- # ============================================================================
- # 主函数
- # ============================================================================
- def main() -> None:
- """主函数"""
- parser = argparse.ArgumentParser(
- description="自动任务执行调度脚本 (Agent 模式)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # Agent 单次执行
- python scripts/auto_execute_tasks.py --agent-run
- # Agent 循环模式
- python scripts/auto_execute_tasks.py --agent-loop
- # Agent 循环模式 + 禁用自动部署
- python scripts/auto_execute_tasks.py --agent-loop --no-deploy
- # 设置 Agent 超时时间
- python scripts/auto_execute_tasks.py --agent-run --agent-timeout 7200
- # 立即部署指定任务到生产服务器
- python scripts/auto_execute_tasks.py --deploy-now 123
- # 测试生产服务器连接
- python scripts/auto_execute_tasks.py --test-connection
- """,
- )
- # Agent 模式参数
- parser.add_argument(
- "--agent-run",
- action="store_true",
- help="Agent 单次执行模式",
- )
- parser.add_argument(
- "--agent-loop",
- action="store_true",
- help="Agent 循环模式",
- )
- parser.add_argument(
- "--agent-timeout",
- type=int,
- default=3600,
- help="Agent 等待任务完成的超时时间(秒),默认 3600",
- )
- parser.add_argument(
- "--interval",
- type=int,
- default=300,
- help="循环模式检查间隔(秒),默认 300",
- )
- parser.add_argument(
- "--no-auto-close",
- action="store_true",
- help="任务完成后不自动关闭 Agent",
- )
- # 部署相关参数
- parser.add_argument(
- "--no-deploy",
- action="store_true",
- help="禁用自动部署功能",
- )
- parser.add_argument(
- "--deploy-now",
- type=str,
- metavar="TASK_ID",
- help="立即部署指定任务ID的脚本到生产服务器",
- )
- parser.add_argument(
- "--test-connection",
- action="store_true",
- help="测试到生产服务器的 SSH 连接",
- )
- args = parser.parse_args()
- global ENABLE_AUTO_DEPLOY
- ENABLE_AUTO_DEPLOY = not args.no_deploy
- auto_close = not args.no_auto_close
- # 测试 SSH 连接
- if args.test_connection:
- if test_ssh_connection():
- logger.info("✅ 连接测试成功")
- else:
- logger.error("❌ 连接测试失败")
- return
- # 立即部署指定任务
- if args.deploy_now:
- try:
- task_id = int(args.deploy_now)
- logger.info(f"开始部署任务 {task_id}...")
- # 从 pending_tasks.json 查找任务信息
- if PENDING_TASKS_FILE.exists():
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- task_found = None
- for t in tasks:
- if t.get("task_id") == task_id:
- task_found = t
- break
- if task_found:
- if auto_deploy_completed_task(task_found):
- logger.info(f"✅ 任务 {task_id} 部署成功")
- else:
- logger.error(f"❌ 任务 {task_id} 部署失败")
- else:
- logger.error(f"未找到任务 {task_id}")
- else:
- logger.error("pending_tasks.json 文件不存在")
- except ValueError:
- logger.error(f"无效的任务ID: {args.deploy_now}")
- return
- # Agent 单次执行
- if args.agent_run:
- success = run_agent_once(
- timeout=args.agent_timeout,
- auto_close=auto_close,
- )
- if success:
- logger.info("✅ Agent 单次执行完成")
- else:
- logger.error("❌ Agent 单次执行失败")
- return
- # Agent 循环模式
- if args.agent_loop:
- run_agent_loop(
- interval=args.interval,
- timeout=args.agent_timeout,
- auto_close=auto_close,
- )
- return
- # 没有指定任何模式参数时,显示交互式菜单
- if len(sys.argv) == 1:
- show_interactive_menu()
- else:
- # 显示帮助信息
- parser.print_help()
- if __name__ == "__main__":
- main()
|