#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 自动任务+自动Chat一体化脚本 功能整合: 1. 从 task_list 表中读取所有 pending 任务 2. 在本地生成 .cursor/pending_tasks.json 和 task_execute_instructions.md 3. 自动在 Cursor Chat 中发送「请检查并执行所有待处理任务。」消息 4. 轮询 .cursor/pending_tasks.json 中状态为 completed 的任务,并自动回写 task_list 状态 依赖: - scripts/auto_execute_tasks.py 负责数据库连接和任务状态更新 - scripts/trigger_cursor_execution.py 负责生成 task_execute_instructions.md 和触发文件 - scripts/cursor_auto_chat.py 负责在 Cursor 中自动发送 Chat 消息 使用方式: 1) 单次执行(检查一次并触发一次 Chat): python scripts/auto_tasks_chat_runner.py --once 2) 守护进程模式(推荐): python scripts/auto_tasks_chat_runner.py --daemon --interval 300 """ from __future__ import annotations import json import time import argparse import logging from pathlib import Path from typing import Any, Dict, List, Optional, Tuple # 复用现有脚本中的能力 from scripts.auto_execute_tasks import get_pending_tasks, update_task_status # type: ignore[import] from scripts.trigger_cursor_execution import create_execute_instructions_file # type: ignore[import] from scripts.cursor_auto_chat import CursorAutoChat # type: ignore[import] logger = logging.getLogger("AutoTasksChatRunner") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) WORKSPACE_ROOT = Path(__file__).parent.parent CURSOR_DIR = WORKSPACE_ROOT / ".cursor" PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json" def write_pending_tasks_file(tasks: List[Dict[str, Any]]) -> None: """ 将数据库中的 pending 任务写入 .cursor/pending_tasks.json 约定: - 初始写入时,status 字段统一设为 'processing' - Cursor / 用户 / 自动化脚本 可以把 status 更新为 'completed' """ CURSOR_DIR.mkdir(parents=True, exist_ok=True) payload: List[Dict[str, Any]] = [] for t in tasks: payload.append( { "task_id": t["task_id"], "task_name": t["task_name"], "task_description": t["task_description"], "code_path": t.get("code_path") or "app/core/data_flow", "code_name": t.get("code_name") or "", "status": "processing", "notified_at": t.get("create_time") and t["create_time"].isoformat() if hasattr(t.get("create_time"), "isoformat") else None, "task_file": t.get("task_file") or "", } ) with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) logger.info("已写入 .cursor/pending_tasks.json,任务数量: %d", len(payload)) def load_pending_tasks_file() -> List[Dict[str, Any]]: """读取 .cursor/pending_tasks.json(如果不存在则返回空列表)""" if not PENDING_TASKS_FILE.exists(): return [] try: with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return data return [] except Exception as exc: # noqa: BLE001 logger.error("读取 pending_tasks.json 失败: %s", exc) return [] def sync_completed_tasks_to_db() -> int: """ 将 .cursor/pending_tasks.json 中 status == 'completed' 的任务,同步回数据库。 返回: 成功更新到数据库的任务数量 """ tasks = load_pending_tasks_file() if not tasks: logger.info("pending_tasks.json 中没有任务记录") return 0 updated = 0 for t in tasks: if t.get("status") != "completed": continue task_id = t.get("task_id") code_name = t.get("code_name") or "" code_path = t.get("code_path") or "" if not task_id: continue ok = update_task_status( task_id=task_id, status="completed", code_name=code_name or None, code_path=code_path or None, ) if ok: updated += 1 logger.info( "已将任务 %s 同步为 completed (code_name=%s, code_path=%s)", task_id, code_name, code_path, ) if updated == 0: logger.info("没有需要同步到数据库的 completed 任务") else: logger.info("本次共同步 %d 个任务状态到数据库", updated) return updated def prepare_tasks_for_execution() -> int: """ 检查数据库中的 pending 任务,生成本地任务文件和 Cursor 指令文件。 返回: 准备好的 pending 任务数量 """ logger.info("开始从数据库读取 pending 任务...") tasks = get_pending_tasks() if not tasks: logger.info("数据库中没有 pending 任务") return 0 logger.info("从数据库读取到 %d 个 pending 任务", len(tasks)) # 写入 .cursor/pending_tasks.json write_pending_tasks_file(tasks) # 生成 task_execute_instructions.md + task_trigger.txt # trigger_cursor_execution.create_execute_instructions_file 期望的是 processing_tasks 列表 create_execute_instructions_file(WORKSPACE_ROOT, tasks) return len(tasks) def send_chat_to_cursor(message: str, input_box_pos: Optional[Tuple[int, int]]) -> bool: """ 使用 CursorAutoChat 在 Cursor 中发送一条 Chat 消息。 """ logger.info("准备向 Cursor Chat 发送消息: %s", message) tool = CursorAutoChat( message=message, interval=300, input_box_pos=input_box_pos, ) return tool.execute_once() def run_once(message: str, input_box_pos: Optional[Tuple[int, int]]) -> None: """ 单次执行流程: 1. 同步已完成任务到数据库 2. 从数据库读取 pending 任务并生成本地文件 3. 如果有任务,向 Cursor 发送 Chat 消息 """ logger.info("=" * 80) logger.info("开始单次自动任务 + Chat 执行流程") # 第一步:先把本地已完成任务同步到数据库 sync_completed_tasks_to_db() # 第二步:准备新的 pending 任务 count = prepare_tasks_for_execution() if count == 0: logger.info("当前没有新的 pending 任务,无需触发 Cursor Chat") logger.info("=" * 80) return # 第三步:通过 Cursor Chat 提醒 AI 执行所有待处理任务 ok = send_chat_to_cursor(message=message, input_box_pos=input_box_pos) if ok: logger.info("已通过 Cursor Chat 发送任务执行指令") else: logger.warning("向 Cursor Chat 发送消息失败,请检查窗口状态") logger.info("=" * 80) def run_daemon( message: str, input_box_pos: Optional[Tuple[int, int]], interval: int, sync_interval: int, ) -> None: """ 守护进程模式: - 每次循环: 1) 同步本地 completed 任务到数据库 2) 检查数据库 pending 任务并生成本地文件 3) 如有任务则触发 Cursor Chat - 之后休眠 interval 秒 """ logger.info("=" * 80) logger.info("自动任务 + 自动Chat 一体化守护进程已启动") logger.info("检查间隔: %d 秒", interval) logger.info("同步 completed 状态间隔(同检查逻辑): %d 秒", sync_interval) logger.info("按 Ctrl+C 结束") logger.info("=" * 80) try: while True: try: # 先同步 completed 状态 sync_completed_tasks_to_db() # 再检查新的 pending 任务并触发 Chat count = prepare_tasks_for_execution() if count > 0: send_chat_to_cursor(message=message, input_box_pos=input_box_pos) logger.info("下次检查将在 %d 秒后进行...", interval) time.sleep(interval) except KeyboardInterrupt: raise except Exception as exc: # noqa: BLE001 logger.error("守护进程循环中出错: %s", exc) logger.info("将在 %d 秒后重试...", interval) time.sleep(interval) except KeyboardInterrupt: logger.info("\n自动任务 + 自动Chat 守护进程已停止") def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="自动任务 + 自动Cursor Chat 一体化工具", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 单次执行:检查pending、生成文件并触发一次Chat python scripts/auto_tasks_chat_runner.py --once # 守护进程模式(每5分钟检查一次) python scripts/auto_tasks_chat_runner.py --daemon --interval 300 # 自定义Chat消息 python scripts/auto_tasks_chat_runner.py --daemon --message "请执行所有待处理任务" # 指定Chat输入框位置 python scripts/auto_tasks_chat_runner.py --daemon --input-box-pos "1180,965" """, ) parser.add_argument( "--once", action="store_true", help="只执行一次,不持续运行", ) parser.add_argument( "--daemon", action="store_true", help="以守护进程模式运行,定期检查任务并触发Chat", ) parser.add_argument( "--interval", type=int, default=300, help="守护进程模式下的检查间隔(秒),默认 300 秒", ) parser.add_argument( "--message", type=str, default="请检查并执行所有待处理任务。", help='发送到 Cursor Chat 的消息内容,默认: "请检查并执行所有待处理任务。"', ) parser.add_argument( "--input-box-pos", type=str, default=None, help='Cursor Chat 输入框位置,格式为 "x,y"(例如 "1180,965"),不指定则自动尝试定位', ) return parser.parse_args() def main() -> None: args = parse_args() # 解析输入框位置 input_box_pos: Optional[Tuple[int, int]] = None if args.input_box_pos: try: x_str, y_str = args.input_box_pos.split(",") input_box_pos = (int(x_str.strip()), int(y_str.strip())) logger.info("使用指定的输入框位置: %s", input_box_pos) except Exception as exc: # noqa: BLE001 logger.warning("解析输入框位置失败 %r: %s", args.input_box_pos, exc) input_box_pos = None if args.once: run_once(message=args.message, input_box_pos=input_box_pos) else: run_daemon( message=args.message, input_box_pos=input_box_pos, interval=args.interval, sync_interval=args.interval, ) if __name__ == "__main__": main()