| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773 |
- #!/usr/bin/env python3
- """
- 自动任务执行核心调度脚本
- 工作流程:
- 1. 从 PostgreSQL 数据库 task_list 表中读取 pending 任务
- 2. 生成 .cursor/task_execute_instructions.md 执行指令文件
- 3. 更新任务状态为 processing,并维护 .cursor/pending_tasks.json
- 4. 更新 .cursor/task_trigger.txt 触发器文件
- 5. (可选)向 Cursor Chat 发送执行提醒
- 6. Cursor 完成任务后,将 pending_tasks.json 中的状态改为 completed
- 7. 调度脚本将 completed 状态的任务同步回数据库
- 使用方式:
- # 执行一次任务检查
- python scripts/auto_execute_tasks.py --once
- # 循环模式(每 5 分钟检查)
- python scripts/auto_execute_tasks.py --interval 300
- # 刷新触发器文件(用于现有 processing 任务)
- python scripts/auto_execute_tasks.py --refresh-trigger
- # 立即发送 Chat 消息触发 Cursor 执行
- python scripts/auto_execute_tasks.py --send-chat-now
- # 启动 Chat 自动触发循环(每 60 秒检查,有任务时自动发送消息)
- python scripts/auto_execute_tasks.py --chat-loop --chat-interval 60
- """
- from __future__ import annotations
- import json
- import time
- import argparse
- import logging
- from pathlib import Path
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple
- # ============================================================================
- # 日志配置
- # ============================================================================
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ============================================================================
- # Windows GUI 自动化依赖(可选)
- # ============================================================================
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- try:
- import win32gui
- import win32con
- import pyautogui
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- HAS_CURSOR_GUI = True
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- pass
- except ImportError:
- logger.info(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui),"
- "将禁用自动 Cursor Chat 功能。"
- )
- # ============================================================================
- # 全局配置
- # ============================================================================
- WORKSPACE_ROOT = Path(__file__).parent.parent
- CURSOR_DIR = WORKSPACE_ROOT / ".cursor"
- PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json"
- INSTRUCTIONS_FILE = CURSOR_DIR / "task_execute_instructions.md"
- TRIGGER_FILE = CURSOR_DIR / "task_trigger.txt"
- # 命令行参数控制的全局变量
- ENABLE_CHAT: bool = False
- CHAT_MESSAGE: str = "请阅读 .cursor/task_execute_instructions.md 并执行任务。"
- CHAT_INPUT_POS: Optional[Tuple[int, int]] = None
- # ============================================================================
- # 数据库操作
- # ============================================================================
- def get_db_connection():
- """获取数据库连接(使用 production 环境配置)"""
- try:
- import psycopg2
- import sys
- sys.path.insert(0, str(WORKSPACE_ROOT))
- from app.config.config import config
- # 强制使用 production 环境的数据库配置
- app_config = config['production']
- db_uri = app_config.SQLALCHEMY_DATABASE_URI
- return psycopg2.connect(db_uri)
- except ImportError as e:
- logger.error(f"导入依赖失败: {e}")
- return None
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- return None
- def get_pending_tasks() -> List[Dict[str, Any]]:
- """从数据库获取所有 pending 任务"""
- try:
- from psycopg2.extras import RealDictCursor
- conn = get_db_connection()
- if not conn:
- return []
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute("""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """)
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
- return [dict(task) for task in tasks]
- except Exception as e:
- logger.error(f"获取 pending 任务失败: {e}")
- return []
- def update_task_status(
- task_id: int,
- status: str,
- code_name: Optional[str] = None,
- code_path: Optional[str] = None,
- ) -> bool:
- """更新任务状态"""
- try:
- conn = get_db_connection()
- if not conn:
- return False
- cursor = conn.cursor()
- if code_name and code_path:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, code_name, code_path, task_id))
- else:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, task_id))
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- return updated
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ============================================================================
- # 触发器文件管理
- # ============================================================================
- def update_trigger_file(
- task_count: int = 0,
- status: str = "有待执行任务",
- task_ids: Optional[List[int]] = None,
- ) -> None:
- """
- 更新触发器文件 .cursor/task_trigger.txt
- Cursor Rules 会检测此文件的更新时间,如果在 5 分钟内更新,
- 则触发自动任务执行。
- """
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- task_ids_str = ", ".join(str(tid) for tid in (task_ids or []))
- content = f"""CURSOR_AUTO_EXECUTE_TASK_TRIGGER
- 生成时间: {gen_time}
- 状态: {status}
- 待处理任务数: {task_count}
- 任务ID列表: [{task_ids_str}]
- 此文件用于触发Cursor自动执行任务。
- 当此文件更新时间在5分钟内时,Cursor应该检查 .cursor/task_execute_instructions.md 并执行其中的任务。
- 【重要】Cursor AI 请立即执行以下操作:
- 1. 读取 .cursor/task_execute_instructions.md 文件
- 2. 按照文件中的任务描述,依次完成每个任务
- 3. 完成后更新 .cursor/pending_tasks.json 中对应任务的状态为 completed
- """
- with TRIGGER_FILE.open("w", encoding="utf-8") as f:
- f.write(content)
- logger.info(f"✅ 触发器文件已更新: {TRIGGER_FILE}")
- def get_processing_task_ids() -> List[int]:
- """从 pending_tasks.json 获取所有 processing 状态的任务 ID"""
- if not PENDING_TASKS_FILE.exists():
- return []
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- return [
- t.get("task_id") for t in tasks
- if t.get("status") == "processing" and t.get("task_id")
- ]
- except Exception:
- return []
- # ============================================================================
- # 任务文件生成
- # ============================================================================
- def write_pending_tasks_json(tasks: List[Dict[str, Any]]) -> None:
- """将任务列表写入 .cursor/pending_tasks.json"""
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- # 读取现有任务
- existing_tasks = []
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- existing_tasks = json.load(f)
- except Exception:
- existing_tasks = []
- existing_ids = {t["task_id"] for t in existing_tasks if "task_id" in t}
- # 添加新任务
- for task in tasks:
- if task["task_id"] not in existing_ids:
- task_info = {
- "task_id": task["task_id"],
- "task_name": task["task_name"],
- "code_path": task.get("code_path", ""),
- "code_name": task.get("code_name", ""),
- "status": "processing",
- "notified_at": datetime.now().isoformat(),
- "code_file": task.get("code_file", ""),
- }
- existing_tasks.append(task_info)
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(existing_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"✅ pending_tasks.json 已更新,任务数: {len(existing_tasks)}")
- def create_execute_instructions(tasks: List[Dict[str, Any]]) -> None:
- """生成任务执行指令文件 .cursor/task_execute_instructions.md"""
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# 🤖 Cursor 自动任务执行指令\n\n")
- f.write("**⚠️ 重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 📋 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task['task_id']
- task_name = task['task_name']
- task_desc = task['task_description']
- create_time = task.get("create_time", "")
- if hasattr(create_time, "strftime"):
- create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
- f.write(f"## 🔴 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **创建时间**: {create_time}\n")
- f.write(f"- **创建者**: {task.get('create_by', 'unknown')}\n\n")
- f.write(f"### 📝 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已创建: {INSTRUCTIONS_FILE}")
- # ============================================================================
- # 状态同步
- # ============================================================================
- def sync_completed_tasks_to_db() -> int:
- """将 pending_tasks.json 中 completed 的任务同步到数据库"""
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as e:
- logger.error(f"读取 pending_tasks.json 失败: {e}")
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- remaining_tasks = []
- for t in tasks:
- if t.get("status") == "completed":
- task_id = t.get("task_id")
- if not task_id:
- continue
- code_name = t.get("code_name")
- code_path = t.get("code_path")
- if update_task_status(task_id, "completed", code_name, code_path):
- updated += 1
- logger.info(f"已同步任务 {task_id} 为 completed")
- else:
- remaining_tasks.append(t)
- else:
- remaining_tasks.append(t)
- if updated > 0:
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(remaining_tasks, f, indent=2, ensure_ascii=False)
- logger.info(f"本次共同步 {updated} 个 completed 任务到数据库")
- return updated
- # ============================================================================
- # Cursor Chat 自动化
- # ============================================================================
- def find_cursor_window() -> Optional[int]:
- """查找 Cursor 主窗口句柄"""
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows: List[Dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = "cursor" in title.lower()
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- area = (right - left) * (bottom - top)
- cursor_windows.append({"hwnd": hwnd, "area": area})
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- return cursor_windows[0]["hwnd"]
- def send_chat_message(
- message: str, input_pos: Optional[Tuple[int, int]]
- ) -> bool:
- """在 Cursor Chat 中发送消息"""
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化")
- return False
- hwnd = find_cursor_window()
- if not hwnd:
- return False
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.3)
- win32gui.SetForegroundWindow(hwnd)
- time.sleep(0.5)
- except Exception as e:
- logger.error(f"激活 Cursor 窗口失败: {e}")
- return False
- # 点击输入框或使用快捷键
- if input_pos:
- x, y = input_pos
- pyautogui.click(x, y)
- time.sleep(0.4)
- else:
- pyautogui.hotkey("ctrl", "l")
- time.sleep(1.0)
- pyautogui.hotkey("ctrl", "a")
- time.sleep(0.2)
- # 输入消息
- if HAS_PYPERCLIP:
- try:
- pyperclip.copy(message)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(0.5)
- except Exception:
- pyautogui.write(message, interval=0.03)
- else:
- pyautogui.write(message, interval=0.03)
- time.sleep(0.3)
- pyautogui.press("enter")
- logger.info("✅ 消息已发送到 Cursor Chat")
- return True
- def send_chat_for_tasks(force: bool = False) -> bool:
- """
- 向 Cursor Chat 发送任务提醒
- Args:
- force: 强制发送,忽略 ENABLE_CHAT 设置
- Returns:
- 是否成功发送
- """
- if not force and not ENABLE_CHAT:
- return False
- if not HAS_CURSOR_GUI:
- logger.warning("未安装 GUI 自动化依赖,无法发送 Chat 消息")
- return False
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("没有 processing 任务,跳过发送")
- return False
- logger.info(f"📤 发送任务提醒到 Cursor Chat({len(processing_ids)} 个任务)...")
- return send_chat_message(CHAT_MESSAGE, CHAT_INPUT_POS)
- def chat_trigger_loop(interval: int = 60) -> None:
- """
- 循环模式:定期检查 processing 任务并发送 Chat 消息
- Args:
- interval: 检查间隔(秒),默认 60 秒
- """
- logger.info("=" * 60)
- logger.info("🚀 Cursor Chat 自动触发服务已启动")
- logger.info(f"⏰ 检查间隔: {interval} 秒")
- logger.info(f"📝 发送消息: {CHAT_MESSAGE}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- last_sent_time = 0
- min_send_interval = 120 # 最小发送间隔(秒),避免频繁打扰
- try:
- while True:
- try:
- # 1. 先同步已完成任务
- sync_completed_tasks_to_db()
- # 2. 检查是否有 processing 任务
- processing_ids = get_processing_task_ids()
- if processing_ids:
- current_time = time.time()
- time_since_last = current_time - last_sent_time
- if time_since_last >= min_send_interval:
- logger.info(f"📋 发现 {len(processing_ids)} 个待处理任务")
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- # 发送 Chat 消息
- if send_chat_for_tasks(force=True):
- last_sent_time = current_time
- logger.info("✅ 已发送执行提醒")
- else:
- logger.warning("⚠️ 发送失败,将在下次重试")
- else:
- remaining = int(min_send_interval - time_since_last)
- logger.info(
- f"⏳ 距离上次发送不足 {min_send_interval}s,"
- f"还需等待 {remaining}s"
- )
- else:
- logger.info("✅ 没有待处理任务")
- logger.info(f"⏳ {interval} 秒后再次检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- time.sleep(interval)
- except KeyboardInterrupt:
- logger.info("\n⛔ 服务已停止")
- # ============================================================================
- # 主执行流程
- # ============================================================================
- def auto_execute_tasks_once() -> int:
- """执行一次任务检查和处理"""
- # 1. 先同步已完成任务到数据库
- sync_completed_tasks_to_db()
- # 2. 获取 pending 任务
- logger.info("🔍 检查 pending 任务...")
- tasks = get_pending_tasks()
- # 3. 检查是否有现有的 processing 任务
- existing_processing_ids = get_processing_task_ids()
- if not tasks and not existing_processing_ids:
- logger.info("✅ 没有 pending 或 processing 任务")
- # 更新触发器文件为"已完成"状态
- update_trigger_file(0, "所有任务已完成", [])
- return 0
- if tasks:
- logger.info(f"📋 找到 {len(tasks)} 个 pending 任务")
- # 4. 更新任务状态为 processing
- for task in tasks:
- update_task_status(task["task_id"], "processing")
- # 5. 写入 pending_tasks.json
- write_pending_tasks_json(tasks)
- # 6. 生成执行指令文件
- create_execute_instructions(tasks)
- # 7. 更新触发器文件(关键:触发 Cursor 自动执行)
- all_processing_ids = get_processing_task_ids()
- if all_processing_ids:
- update_trigger_file(
- task_count=len(all_processing_ids),
- status="有待执行任务",
- task_ids=all_processing_ids,
- )
- logger.info(f"🔔 已更新触发器,等待 Cursor 执行 {len(all_processing_ids)} 个任务")
- return len(tasks)
- def auto_execute_tasks_loop(interval: int = 300) -> None:
- """循环执行任务检查"""
- logger.info("=" * 60)
- logger.info("🚀 自动任务执行服务已启动")
- logger.info(f"⏰ 检查间隔: {interval} 秒")
- logger.info(f"💬 自动 Chat: {'已启用' if ENABLE_CHAT else '未启用'}")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 60)
- try:
- while True:
- try:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks()
- logger.info(f"✅ 已处理 {count} 个任务")
- logger.info(f"⏳ {interval} 秒后再次检查...")
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- time.sleep(interval)
- except KeyboardInterrupt:
- logger.info("\n⛔ 服务已停止")
- def refresh_trigger_only() -> None:
- """仅刷新触发器文件(用于现有 processing 任务)"""
- processing_ids = get_processing_task_ids()
- if not processing_ids:
- logger.info("没有 processing 状态的任务")
- update_trigger_file(0, "所有任务已完成", [])
- return
- logger.info(f"📋 发现 {len(processing_ids)} 个 processing 任务")
- # 重新生成指令文件(从 pending_tasks.json 读取)
- if PENDING_TASKS_FILE.exists():
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- all_tasks = json.load(f)
- processing_tasks = [
- t for t in all_tasks
- if t.get("status") == "processing"
- ]
- if processing_tasks:
- # 重新生成执行指令文件
- create_execute_instructions_from_json(processing_tasks)
- except Exception as e:
- logger.error(f"读取任务文件失败: {e}")
- # 更新触发器文件
- update_trigger_file(
- task_count=len(processing_ids),
- status="有待执行任务",
- task_ids=processing_ids,
- )
- logger.info("✅ 触发器已刷新,等待 Cursor 执行任务")
- def create_execute_instructions_from_json(tasks: List[Dict[str, Any]]) -> None:
- """从 pending_tasks.json 格式的任务列表生成执行指令文件"""
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- with INSTRUCTIONS_FILE.open("w", encoding="utf-8") as f:
- f.write("# 🤖 Cursor 自动任务执行指令\n\n")
- f.write("**⚠️ 重要:请立即执行以下任务!**\n\n")
- gen_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- f.write(f"**生成时间**: {gen_time}\n\n")
- f.write(f"**待执行任务数量**: {len(tasks)}\n\n")
- f.write("## 📋 任务完成后的操作\n\n")
- f.write("完成每个任务后,请更新 `.cursor/pending_tasks.json` 中")
- f.write("对应任务的 `status` 为 `completed`,\n")
- f.write("并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。\n\n")
- f.write("调度脚本会自动将完成的任务同步到数据库。\n\n")
- f.write("---\n\n")
- for idx, task in enumerate(tasks, 1):
- task_id = task.get('task_id', 'N/A')
- task_name = task.get('task_name', '未命名任务')
- task_desc = task.get('task_description', '无描述')
- notified_at = task.get('notified_at', '')
- f.write(f"## 🔴 任务 {idx}: {task_name}\n\n")
- f.write(f"- **任务ID**: `{task_id}`\n")
- f.write(f"- **通知时间**: {notified_at}\n")
- f.write(f"- **代码路径**: {task.get('code_path', 'N/A')}\n\n")
- f.write(f"### 📝 任务描述\n\n{task_desc}\n\n")
- f.write("---\n\n")
- logger.info(f"✅ 执行指令文件已更新: {INSTRUCTIONS_FILE}")
- def main() -> None:
- """主函数"""
- parser = argparse.ArgumentParser(
- description="自动任务执行调度脚本",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- )
- parser.add_argument(
- "--once", action="store_true", help="只执行一次"
- )
- parser.add_argument(
- "--interval", type=int, default=300, help="检查间隔(秒)"
- )
- parser.add_argument(
- "--enable-chat", action="store_true", help="启用自动 Cursor Chat"
- )
- parser.add_argument(
- "--chat-input-pos", type=str, help='Chat 输入框位置 "x,y"'
- )
- parser.add_argument(
- "--chat-message", type=str,
- default="请阅读 .cursor/task_execute_instructions.md 并执行任务。",
- help="发送到 Chat 的消息"
- )
- parser.add_argument(
- "--refresh-trigger", action="store_true",
- help="仅刷新触发器文件(用于现有 processing 任务)"
- )
- parser.add_argument(
- "--chat-loop", action="store_true",
- help="启动 Chat 自动触发循环(定期发送执行消息)"
- )
- parser.add_argument(
- "--chat-interval", type=int, default=60,
- help="Chat 触发循环的检查间隔(秒),默认 60"
- )
- parser.add_argument(
- "--send-chat-now", action="store_true",
- help="立即发送一次 Chat 消息(用于手动触发)"
- )
- args = parser.parse_args()
- global ENABLE_CHAT, CHAT_INPUT_POS, CHAT_MESSAGE
- ENABLE_CHAT = bool(args.enable_chat)
- CHAT_MESSAGE = args.chat_message
- if args.chat_input_pos:
- try:
- x, y = args.chat_input_pos.split(",")
- CHAT_INPUT_POS = (int(x.strip()), int(y.strip()))
- except Exception:
- pass
- # 仅刷新触发器
- if args.refresh_trigger:
- refresh_trigger_only()
- return
- # 立即发送一次 Chat 消息
- if args.send_chat_now:
- if send_chat_for_tasks(force=True):
- logger.info("✅ 消息已发送")
- else:
- logger.error("❌ 发送失败")
- return
- # Chat 自动触发循环模式
- if args.chat_loop:
- chat_trigger_loop(interval=args.chat_interval)
- return
- if args.once:
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_tasks()
- logger.info(f"✅ 完成!处理了 {count} 个任务")
- else:
- auto_execute_tasks_loop(interval=args.interval)
- if __name__ == "__main__":
- main()
|