||
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 自动任务+自动Chat一体化脚本
- 功能整合:
- 1. 从 task_list 表中读取所有 pending 任务
- 2. 在本地生成 .cursor/pending_tasks.json 和 task_execute_instructions.md
- 3. 自动在 Cursor Chat 中发送「请检查并执行所有待处理任务。」消息
- 4. 轮询 .cursor/pending_tasks.json 中状态为 completed 的任务,并自动回写 task_list 状态
- 依赖:
- - scripts/auto_execute_tasks.py 负责数据库连接和任务状态更新
- - scripts/trigger_cursor_execution.py 负责生成 task_execute_instructions.md 和触发文件
- - scripts/cursor_auto_chat.py 负责在 Cursor 中自动发送 Chat 消息
- 使用方式:
- 1) 单次执行(检查一次并触发一次 Chat):
- python scripts/auto_tasks_chat_runner.py --once
- 2) 守护进程模式(推荐):
- python scripts/auto_tasks_chat_runner.py --daemon --interval 300
- """
- from __future__ import annotations
- import json
- import time
- import argparse
- import logging
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple
- # 复用现有脚本中的能力
- from scripts.auto_execute_tasks import get_pending_tasks, update_task_status # type: ignore[import]
- from scripts.trigger_cursor_execution import create_execute_instructions_file # type: ignore[import]
- from scripts.cursor_auto_chat import CursorAutoChat # type: ignore[import]
- logger = logging.getLogger("AutoTasksChatRunner")
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
- )
- WORKSPACE_ROOT = Path(__file__).parent.parent
- CURSOR_DIR = WORKSPACE_ROOT / ".cursor"
- PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json"
- def write_pending_tasks_file(tasks: List[Dict[str, Any]]) -> None:
- """
- 将数据库中的 pending 任务写入 .cursor/pending_tasks.json
- 约定:
- - 初始写入时,status 字段统一设为 'processing'
- - Cursor / 用户 / 自动化脚本 可以把 status 更新为 'completed'
- """
- CURSOR_DIR.mkdir(parents=True, exist_ok=True)
- payload: List[Dict[str, Any]] = []
- for t in tasks:
- payload.append(
- {
- "task_id": t["task_id"],
- "task_name": t["task_name"],
- "task_description": t["task_description"],
- "code_path": t.get("code_path") or "app/core/data_flow",
- "code_name": t.get("code_name") or "",
- "status": "processing",
- "notified_at": t.get("create_time") and t["create_time"].isoformat()
- if hasattr(t.get("create_time"), "isoformat")
- else None,
- "task_file": t.get("task_file") or "",
- }
- )
- with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
- json.dump(payload, f, ensure_ascii=False, indent=2)
- logger.info("已写入 .cursor/pending_tasks.json,任务数量: %d", len(payload))
- def load_pending_tasks_file() -> List[Dict[str, Any]]:
- """读取 .cursor/pending_tasks.json(如果不存在则返回空列表)"""
- if not PENDING_TASKS_FILE.exists():
- return []
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- data = json.load(f)
- if isinstance(data, list):
- return data
- return []
- except Exception as exc: # noqa: BLE001
- logger.error("读取 pending_tasks.json 失败: %s", exc)
- return []
- def sync_completed_tasks_to_db() -> int:
- """
- 将 .cursor/pending_tasks.json 中 status == 'completed' 的任务,同步回数据库。
- 返回:
- 成功更新到数据库的任务数量
- """
- tasks = load_pending_tasks_file()
- if not tasks:
- logger.info("pending_tasks.json 中没有任务记录")
- return 0
- updated = 0
- for t in tasks:
- if t.get("status") != "completed":
- continue
- task_id = t.get("task_id")
- code_name = t.get("code_name") or ""
- code_path = t.get("code_path") or ""
- if not task_id:
- continue
- ok = update_task_status(
- task_id=task_id,
- status="completed",
- code_name=code_name or None,
- code_path=code_path or None,
- )
- if ok:
- updated += 1
- logger.info(
- "已将任务 %s 同步为 completed (code_name=%s, code_path=%s)",
- task_id,
- code_name,
- code_path,
- )
- if updated == 0:
- logger.info("没有需要同步到数据库的 completed 任务")
- else:
- logger.info("本次共同步 %d 个任务状态到数据库", updated)
- return updated
- def prepare_tasks_for_execution() -> int:
- """
- 检查数据库中的 pending 任务,生成本地任务文件和 Cursor 指令文件。
- 返回:
- 准备好的 pending 任务数量
- """
- logger.info("开始从数据库读取 pending 任务...")
- tasks = get_pending_tasks()
- if not tasks:
- logger.info("数据库中没有 pending 任务")
- return 0
- logger.info("从数据库读取到 %d 个 pending 任务", len(tasks))
- # 写入 .cursor/pending_tasks.json
- write_pending_tasks_file(tasks)
- # 生成 task_execute_instructions.md + task_trigger.txt
- # trigger_cursor_execution.create_execute_instructions_file 期望的是 processing_tasks 列表
- create_execute_instructions_file(WORKSPACE_ROOT, tasks)
- return len(tasks)
- def send_chat_to_cursor(message: str, input_box_pos: Optional[Tuple[int, int]]) -> bool:
- """
- 使用 CursorAutoChat 在 Cursor 中发送一条 Chat 消息。
- """
- logger.info("准备向 Cursor Chat 发送消息: %s", message)
- tool = CursorAutoChat(
- message=message,
- interval=300,
- input_box_pos=input_box_pos,
- )
- return tool.execute_once()
- def run_once(message: str, input_box_pos: Optional[Tuple[int, int]]) -> None:
- """
- 单次执行流程:
- 1. 同步已完成任务到数据库
- 2. 从数据库读取 pending 任务并生成本地文件
- 3. 如果有任务,向 Cursor 发送 Chat 消息
- """
- logger.info("=" * 80)
- logger.info("开始单次自动任务 + Chat 执行流程")
- # 第一步:先把本地已完成任务同步到数据库
- sync_completed_tasks_to_db()
- # 第二步:准备新的 pending 任务
- count = prepare_tasks_for_execution()
- if count == 0:
- logger.info("当前没有新的 pending 任务,无需触发 Cursor Chat")
- logger.info("=" * 80)
- return
- # 第三步:通过 Cursor Chat 提醒 AI 执行所有待处理任务
- ok = send_chat_to_cursor(message=message, input_box_pos=input_box_pos)
- if ok:
- logger.info("已通过 Cursor Chat 发送任务执行指令")
- else:
- logger.warning("向 Cursor Chat 发送消息失败,请检查窗口状态")
- logger.info("=" * 80)
- def run_daemon(
- message: str,
- input_box_pos: Optional[Tuple[int, int]],
- interval: int,
- sync_interval: int,
- ) -> None:
- """
- 守护进程模式:
- - 每次循环:
- 1) 同步本地 completed 任务到数据库
- 2) 检查数据库 pending 任务并生成本地文件
- 3) 如有任务则触发 Cursor Chat
- - 之后休眠 interval 秒
- """
- logger.info("=" * 80)
- logger.info("自动任务 + 自动Chat 一体化守护进程已启动")
- logger.info("检查间隔: %d 秒", interval)
- logger.info("同步 completed 状态间隔(同检查逻辑): %d 秒", sync_interval)
- logger.info("按 Ctrl+C 结束")
- logger.info("=" * 80)
- try:
- while True:
- try:
- # 先同步 completed 状态
- sync_completed_tasks_to_db()
- # 再检查新的 pending 任务并触发 Chat
- count = prepare_tasks_for_execution()
- if count > 0:
- send_chat_to_cursor(message=message, input_box_pos=input_box_pos)
- logger.info("下次检查将在 %d 秒后进行...", interval)
- time.sleep(interval)
- except KeyboardInterrupt:
- raise
- except Exception as exc: # noqa: BLE001
- logger.error("守护进程循环中出错: %s", exc)
- logger.info("将在 %d 秒后重试...", interval)
- time.sleep(interval)
- except KeyboardInterrupt:
- logger.info("\n自动任务 + 自动Chat 守护进程已停止")
- def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(
- description="自动任务 + 自动Cursor Chat 一体化工具",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例:
- # 单次执行:检查pending、生成文件并触发一次Chat
- python scripts/auto_tasks_chat_runner.py --once
- # 守护进程模式(每5分钟检查一次)
- python scripts/auto_tasks_chat_runner.py --daemon --interval 300
- # 自定义Chat消息
- python scripts/auto_tasks_chat_runner.py --daemon --message "请执行所有待处理任务"
- # 指定Chat输入框位置
- python scripts/auto_tasks_chat_runner.py --daemon --input-box-pos "1180,965"
- """,
- )
- parser.add_argument(
- "--once",
- action="store_true",
- help="只执行一次,不持续运行",
- )
- parser.add_argument(
- "--daemon",
- action="store_true",
- help="以守护进程模式运行,定期检查任务并触发Chat",
- )
- parser.add_argument(
- "--interval",
- type=int,
- default=300,
- help="守护进程模式下的检查间隔(秒),默认 300 秒",
- )
- parser.add_argument(
- "--message",
- type=str,
- default="请检查并执行所有待处理任务。",
- help='发送到 Cursor Chat 的消息内容,默认: "请检查并执行所有待处理任务。"',
- )
- parser.add_argument(
- "--input-box-pos",
- type=str,
- default=None,
- help='Cursor Chat 输入框位置,格式为 "x,y"(例如 "1180,965"),不指定则自动尝试定位',
- )
- return parser.parse_args()
- def main() -> None:
- args = parse_args()
- # 解析输入框位置
- input_box_pos: Optional[Tuple[int, int]] = None
- if args.input_box_pos:
- try:
- x_str, y_str = args.input_box_pos.split(",")
- input_box_pos = (int(x_str.strip()), int(y_str.strip()))
- logger.info("使用指定的输入框位置: %s", input_box_pos)
- except Exception as exc: # noqa: BLE001
- logger.warning("解析输入框位置失败 %r: %s", args.input_box_pos, exc)
- input_box_pos = None
- if args.once:
- run_once(message=args.message, input_box_pos=input_box_pos)
- else:
- run_daemon(
- message=args.message,
- input_box_pos=input_box_pos,
- interval=args.interval,
- sync_interval=args.interval,
- )
- if __name__ == "__main__":
- main()
|