auto_tasks_chat_runner.py 11 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 自动任务+自动Chat一体化脚本
  5. 功能整合:
  6. 1. 从 task_list 表中读取所有 pending 任务
  7. 2. 在本地生成 .cursor/pending_tasks.json 和 task_execute_instructions.md
  8. 3. 自动在 Cursor Chat 中发送「请检查并执行所有待处理任务。」消息
  9. 4. 轮询 .cursor/pending_tasks.json 中状态为 completed 的任务,并自动回写 task_list 状态
  10. 依赖:
  11. - scripts/auto_execute_tasks.py 负责数据库连接和任务状态更新
  12. - scripts/trigger_cursor_execution.py 负责生成 task_execute_instructions.md 和触发文件
  13. - scripts/cursor_auto_chat.py 负责在 Cursor 中自动发送 Chat 消息
  14. 使用方式:
  15. 1) 单次执行(检查一次并触发一次 Chat):
  16. python scripts/auto_tasks_chat_runner.py --once
  17. 2) 守护进程模式(推荐):
  18. python scripts/auto_tasks_chat_runner.py --daemon --interval 300
  19. """
  20. from __future__ import annotations
  21. import json
  22. import time
  23. import argparse
  24. import logging
  25. from pathlib import Path
  26. from typing import Any, Dict, List, Optional, Tuple
  27. # 复用现有脚本中的能力
  28. from scripts.auto_execute_tasks import get_pending_tasks, update_task_status # type: ignore[import]
  29. from scripts.trigger_cursor_execution import create_execute_instructions_file # type: ignore[import]
  30. from scripts.cursor_auto_chat import CursorAutoChat # type: ignore[import]
  31. logger = logging.getLogger("AutoTasksChatRunner")
  32. logging.basicConfig(
  33. level=logging.INFO,
  34. format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  35. )
  36. WORKSPACE_ROOT = Path(__file__).parent.parent
  37. CURSOR_DIR = WORKSPACE_ROOT / ".cursor"
  38. PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json"
  39. def write_pending_tasks_file(tasks: List[Dict[str, Any]]) -> None:
  40. """
  41. 将数据库中的 pending 任务写入 .cursor/pending_tasks.json
  42. 约定:
  43. - 初始写入时,status 字段统一设为 'processing'
  44. - Cursor / 用户 / 自动化脚本 可以把 status 更新为 'completed'
  45. """
  46. CURSOR_DIR.mkdir(parents=True, exist_ok=True)
  47. payload: List[Dict[str, Any]] = []
  48. for t in tasks:
  49. payload.append(
  50. {
  51. "task_id": t["task_id"],
  52. "task_name": t["task_name"],
  53. "task_description": t["task_description"],
  54. "code_path": t.get("code_path") or "app/core/data_flow",
  55. "code_name": t.get("code_name") or "",
  56. "status": "processing",
  57. "notified_at": t.get("create_time") and t["create_time"].isoformat()
  58. if hasattr(t.get("create_time"), "isoformat")
  59. else None,
  60. "task_file": t.get("task_file") or "",
  61. }
  62. )
  63. with PENDING_TASKS_FILE.open("w", encoding="utf-8") as f:
  64. json.dump(payload, f, ensure_ascii=False, indent=2)
  65. logger.info("已写入 .cursor/pending_tasks.json,任务数量: %d", len(payload))
  66. def load_pending_tasks_file() -> List[Dict[str, Any]]:
  67. """读取 .cursor/pending_tasks.json(如果不存在则返回空列表)"""
  68. if not PENDING_TASKS_FILE.exists():
  69. return []
  70. try:
  71. with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
  72. data = json.load(f)
  73. if isinstance(data, list):
  74. return data
  75. return []
  76. except Exception as exc: # noqa: BLE001
  77. logger.error("读取 pending_tasks.json 失败: %s", exc)
  78. return []
  79. def sync_completed_tasks_to_db() -> int:
  80. """
  81. 将 .cursor/pending_tasks.json 中 status == 'completed' 的任务,同步回数据库。
  82. 返回:
  83. 成功更新到数据库的任务数量
  84. """
  85. tasks = load_pending_tasks_file()
  86. if not tasks:
  87. logger.info("pending_tasks.json 中没有任务记录")
  88. return 0
  89. updated = 0
  90. for t in tasks:
  91. if t.get("status") != "completed":
  92. continue
  93. task_id = t.get("task_id")
  94. code_name = t.get("code_name") or ""
  95. code_path = t.get("code_path") or ""
  96. if not task_id:
  97. continue
  98. ok = update_task_status(
  99. task_id=task_id,
  100. status="completed",
  101. code_name=code_name or None,
  102. code_path=code_path or None,
  103. )
  104. if ok:
  105. updated += 1
  106. logger.info(
  107. "已将任务 %s 同步为 completed (code_name=%s, code_path=%s)",
  108. task_id,
  109. code_name,
  110. code_path,
  111. )
  112. if updated == 0:
  113. logger.info("没有需要同步到数据库的 completed 任务")
  114. else:
  115. logger.info("本次共同步 %d 个任务状态到数据库", updated)
  116. return updated
  117. def prepare_tasks_for_execution() -> int:
  118. """
  119. 检查数据库中的 pending 任务,生成本地任务文件和 Cursor 指令文件。
  120. 返回:
  121. 准备好的 pending 任务数量
  122. """
  123. logger.info("开始从数据库读取 pending 任务...")
  124. tasks = get_pending_tasks()
  125. if not tasks:
  126. logger.info("数据库中没有 pending 任务")
  127. return 0
  128. logger.info("从数据库读取到 %d 个 pending 任务", len(tasks))
  129. # 写入 .cursor/pending_tasks.json
  130. write_pending_tasks_file(tasks)
  131. # 生成 task_execute_instructions.md + task_trigger.txt
  132. # trigger_cursor_execution.create_execute_instructions_file 期望的是 processing_tasks 列表
  133. create_execute_instructions_file(WORKSPACE_ROOT, tasks)
  134. return len(tasks)
  135. def send_chat_to_cursor(message: str, input_box_pos: Optional[Tuple[int, int]]) -> bool:
  136. """
  137. 使用 CursorAutoChat 在 Cursor 中发送一条 Chat 消息。
  138. """
  139. logger.info("准备向 Cursor Chat 发送消息: %s", message)
  140. tool = CursorAutoChat(
  141. message=message,
  142. interval=300,
  143. input_box_pos=input_box_pos,
  144. )
  145. return tool.execute_once()
  146. def run_once(message: str, input_box_pos: Optional[Tuple[int, int]]) -> None:
  147. """
  148. 单次执行流程:
  149. 1. 同步已完成任务到数据库
  150. 2. 从数据库读取 pending 任务并生成本地文件
  151. 3. 如果有任务,向 Cursor 发送 Chat 消息
  152. """
  153. logger.info("=" * 80)
  154. logger.info("开始单次自动任务 + Chat 执行流程")
  155. # 第一步:先把本地已完成任务同步到数据库
  156. sync_completed_tasks_to_db()
  157. # 第二步:准备新的 pending 任务
  158. count = prepare_tasks_for_execution()
  159. if count == 0:
  160. logger.info("当前没有新的 pending 任务,无需触发 Cursor Chat")
  161. logger.info("=" * 80)
  162. return
  163. # 第三步:通过 Cursor Chat 提醒 AI 执行所有待处理任务
  164. ok = send_chat_to_cursor(message=message, input_box_pos=input_box_pos)
  165. if ok:
  166. logger.info("已通过 Cursor Chat 发送任务执行指令")
  167. else:
  168. logger.warning("向 Cursor Chat 发送消息失败,请检查窗口状态")
  169. logger.info("=" * 80)
  170. def run_daemon(
  171. message: str,
  172. input_box_pos: Optional[Tuple[int, int]],
  173. interval: int,
  174. sync_interval: int,
  175. ) -> None:
  176. """
  177. 守护进程模式:
  178. - 每次循环:
  179. 1) 同步本地 completed 任务到数据库
  180. 2) 检查数据库 pending 任务并生成本地文件
  181. 3) 如有任务则触发 Cursor Chat
  182. - 之后休眠 interval 秒
  183. """
  184. logger.info("=" * 80)
  185. logger.info("自动任务 + 自动Chat 一体化守护进程已启动")
  186. logger.info("检查间隔: %d 秒", interval)
  187. logger.info("同步 completed 状态间隔(同检查逻辑): %d 秒", sync_interval)
  188. logger.info("按 Ctrl+C 结束")
  189. logger.info("=" * 80)
  190. try:
  191. while True:
  192. try:
  193. # 先同步 completed 状态
  194. sync_completed_tasks_to_db()
  195. # 再检查新的 pending 任务并触发 Chat
  196. count = prepare_tasks_for_execution()
  197. if count > 0:
  198. send_chat_to_cursor(message=message, input_box_pos=input_box_pos)
  199. logger.info("下次检查将在 %d 秒后进行...", interval)
  200. time.sleep(interval)
  201. except KeyboardInterrupt:
  202. raise
  203. except Exception as exc: # noqa: BLE001
  204. logger.error("守护进程循环中出错: %s", exc)
  205. logger.info("将在 %d 秒后重试...", interval)
  206. time.sleep(interval)
  207. except KeyboardInterrupt:
  208. logger.info("\n自动任务 + 自动Chat 守护进程已停止")
  209. def parse_args() -> argparse.Namespace:
  210. parser = argparse.ArgumentParser(
  211. description="自动任务 + 自动Cursor Chat 一体化工具",
  212. formatter_class=argparse.RawDescriptionHelpFormatter,
  213. epilog="""
  214. 示例:
  215. # 单次执行:检查pending、生成文件并触发一次Chat
  216. python scripts/auto_tasks_chat_runner.py --once
  217. # 守护进程模式(每5分钟检查一次)
  218. python scripts/auto_tasks_chat_runner.py --daemon --interval 300
  219. # 自定义Chat消息
  220. python scripts/auto_tasks_chat_runner.py --daemon --message "请执行所有待处理任务"
  221. # 指定Chat输入框位置
  222. python scripts/auto_tasks_chat_runner.py --daemon --input-box-pos "1180,965"
  223. """,
  224. )
  225. parser.add_argument(
  226. "--once",
  227. action="store_true",
  228. help="只执行一次,不持续运行",
  229. )
  230. parser.add_argument(
  231. "--daemon",
  232. action="store_true",
  233. help="以守护进程模式运行,定期检查任务并触发Chat",
  234. )
  235. parser.add_argument(
  236. "--interval",
  237. type=int,
  238. default=300,
  239. help="守护进程模式下的检查间隔(秒),默认 300 秒",
  240. )
  241. parser.add_argument(
  242. "--message",
  243. type=str,
  244. default="请检查并执行所有待处理任务。",
  245. help='发送到 Cursor Chat 的消息内容,默认: "请检查并执行所有待处理任务。"',
  246. )
  247. parser.add_argument(
  248. "--input-box-pos",
  249. type=str,
  250. default=None,
  251. help='Cursor Chat 输入框位置,格式为 "x,y"(例如 "1180,965"),不指定则自动尝试定位',
  252. )
  253. return parser.parse_args()
  254. def main() -> None:
  255. args = parse_args()
  256. # 解析输入框位置
  257. input_box_pos: Optional[Tuple[int, int]] = None
  258. if args.input_box_pos:
  259. try:
  260. x_str, y_str = args.input_box_pos.split(",")
  261. input_box_pos = (int(x_str.strip()), int(y_str.strip()))
  262. logger.info("使用指定的输入框位置: %s", input_box_pos)
  263. except Exception as exc: # noqa: BLE001
  264. logger.warning("解析输入框位置失败 %r: %s", args.input_box_pos, exc)
  265. input_box_pos = None
  266. if args.once:
  267. run_once(message=args.message, input_box_pos=input_box_pos)
  268. else:
  269. run_daemon(
  270. message=args.message,
  271. input_box_pos=input_box_pos,
  272. interval=args.interval,
  273. sync_interval=args.interval,
  274. )
  275. if __name__ == "__main__":
  276. main()