trigger_cursor_execution.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. #!/usr/bin/env python3
  2. """
  3. Cursor任务执行触发器
  4. 这个脚本会读取pending_tasks.json,并以明确格式输出任务信息,
  5. 供Cursor识别并自动执行。
  6. 支持两种模式:
  7. 1. 单次执行:python scripts/trigger_cursor_execution.py --once
  8. 2. 定期执行:python scripts/trigger_cursor_execution.py --interval 300
  9. 定期执行模式下,脚本会:
  10. - 定期检查processing任务
  11. - 自动生成执行指令文件(.cursor/task_execute_instructions.md)
  12. - 输出任务信息供Cursor识别
  13. """
  14. import json
  15. import sys
  16. import time
  17. import argparse
  18. import logging
  19. from pathlib import Path
  20. from datetime import datetime
  21. # 配置日志
  22. logging.basicConfig(
  23. level=logging.INFO,
  24. format='%(asctime)s - %(levelname)s - %(message)s'
  25. )
  26. logger = logging.getLogger('CursorTaskTrigger')
  27. def get_task_file_path(task, workspace):
  28. """获取任务文件路径"""
  29. import os
  30. import re
  31. task_file = task.get('task_file')
  32. if not task_file and task.get('code_path'):
  33. # 从任务名称生成文件名
  34. task_name = task['task_name']
  35. safe_name = re.sub(r'[^\w\u4e00-\u9fff]+', '_', task_name)
  36. safe_name = re.sub(r'_+', '_', safe_name).strip('_')
  37. code_path = task.get('code_path', 'app/core/data_flow')
  38. task_file = os.path.join(workspace, code_path, f"{safe_name}.py")
  39. if not os.path.exists(task_file):
  40. task_file = None
  41. return task_file
  42. def create_execute_instructions_file(workspace, processing_tasks):
  43. """
  44. 创建任务执行指令文件,供Cursor自动读取
  45. Args:
  46. workspace: 工作区路径
  47. processing_tasks: 待处理任务列表
  48. """
  49. instructions_file = workspace / '.cursor' / 'task_execute_instructions.md'
  50. instructions_file.parent.mkdir(parents=True, exist_ok=True)
  51. with open(instructions_file, 'w', encoding='utf-8') as f:
  52. f.write(f"# 🤖 Cursor自动任务执行指令\n\n")
  53. f.write(f"**⚠️ 重要:Cursor AI请立即执行此文件中的任务!**\n\n")
  54. f.write(f"**生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
  55. f.write(f"**待执行任务数量**: {len(processing_tasks)}\n\n")
  56. f.write(f"**执行状态**: ⏳ 等待执行\n\n")
  57. f.write("---\n\n")
  58. f.write(f"## 📋 执行指令\n\n")
  59. f.write(f"**Cursor AI:请按照以下步骤执行任务:**\n\n")
  60. f.write(f"1. 依次处理每个任务\n")
  61. f.write(f"2. 打开任务文件并实现功能\n")
  62. f.write(f"3. 完成后调用MCP工具更新状态\n\n")
  63. f.write("---\n\n")
  64. for idx, task in enumerate(processing_tasks, 1):
  65. task_file = get_task_file_path(task, workspace)
  66. f.write(f"## 🔴 任务 {idx}/{len(processing_tasks)}: [{task['task_id']}] {task['task_name']}\n\n")
  67. f.write(f"**执行状态**: ⏳ 待执行\n\n")
  68. f.write(f"**任务文件**: `{task_file or '未创建'}`\n\n")
  69. f.write(f"### 📝 任务描述\n\n{task['task_description']}\n\n")
  70. f.write(f"### ✅ 执行步骤(Cursor AI请按此执行)\n\n")
  71. f.write(f"#### 步骤1:打开任务文件\n")
  72. f.write(f"- 文件路径: `{task_file or 'N/A'}`\n")
  73. f.write(f"- 如果文件不存在,需要先创建\n\n")
  74. f.write(f"#### 步骤2:实现功能\n")
  75. f.write(f"- 仔细阅读上述任务描述\n")
  76. f.write(f"- 根据描述实现完整的功能\n")
  77. f.write(f"- 确保代码符合项目规范(PEP 8)\n")
  78. f.write(f"- 添加必要的错误处理和日志记录\n\n")
  79. f.write(f"#### 步骤3:验证代码\n")
  80. f.write(f"- 确保代码可以正常运行\n")
  81. f.write(f"- 检查代码逻辑是否正确\n\n")
  82. f.write(f"#### 步骤4:更新任务状态\n")
  83. f.write(f"- 调用MCP工具 `update_task_status`\n")
  84. f.write(f"- 使用以下参数:\n\n")
  85. f.write(f"```json\n")
  86. f.write(f"{{\n")
  87. f.write(f" \"tool\": \"update_task_status\",\n")
  88. f.write(f" \"task_id\": {task['task_id']},\n")
  89. if task_file:
  90. import os
  91. file_name = os.path.basename(task_file)
  92. file_dir = os.path.dirname(task_file).replace(str(workspace), '').strip('\\').strip('/').replace('\\', '/')
  93. f.write(f" \"code_name\": \"{file_name}\",\n")
  94. f.write(f" \"code_path\": \"{file_dir}\",\n")
  95. f.write(f" \"status\": \"completed\"\n")
  96. f.write(f"}}\n")
  97. f.write(f"```\n\n")
  98. f.write(f"**⚠️ 重要:必须完成上述所有步骤!**\n\n")
  99. f.write("---\n\n")
  100. logger.info(f"✅ 任务执行指令文件已创建: {instructions_file}")
  101. # 创建触发器文件,更新时间戳
  102. trigger_file = workspace / '.cursor' / 'task_trigger.txt'
  103. with open(trigger_file, 'w', encoding='utf-8') as f:
  104. f.write(f"CURSOR_AUTO_EXECUTE_TASK_TRIGGER\n")
  105. f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
  106. f.write(f"状态: 有 {len(processing_tasks)} 个任务待执行\n\n")
  107. f.write(f"此文件用于触发Cursor自动执行任务。\n")
  108. f.write(f"当此文件更新时间在5分钟内时,Cursor应该检查 .cursor/task_execute_instructions.md 并执行其中的任务。\n")
  109. logger.info(f"✅ 触发器文件已更新: {trigger_file}")
  110. def execute_tasks_once():
  111. """
  112. 执行一次任务检查和触发
  113. Returns:
  114. 找到的任务数量
  115. """
  116. workspace = Path(__file__).parent.parent
  117. pending_tasks_file = workspace / '.cursor' / 'pending_tasks.json'
  118. if not pending_tasks_file.exists():
  119. logger.info("⚠️ 没有找到pending任务文件")
  120. return 0
  121. try:
  122. with open(pending_tasks_file, 'r', encoding='utf-8') as f:
  123. pending_tasks = json.load(f)
  124. if not pending_tasks:
  125. logger.info("✅ 没有pending任务")
  126. return 0
  127. # 处理所有任务(如果没有status字段,默认为processing)
  128. # 只处理状态为processing或没有status的任务
  129. processing_tasks = [
  130. t for t in pending_tasks
  131. if t.get('status', 'processing') == 'processing' or 'status' not in t
  132. ]
  133. if not processing_tasks:
  134. logger.info("✅ 没有processing状态的任务")
  135. return 0
  136. logger.info(f"📋 找到 {len(processing_tasks)} 个processing任务,需要执行")
  137. # 创建执行指令文件
  138. create_execute_instructions_file(workspace, processing_tasks)
  139. # 输出任务信息供Cursor识别
  140. print("\n" + "=" * 80)
  141. print(f"🤖 [CURSOR-AUTO-EXECUTE] 发现 {len(processing_tasks)} 个待执行任务")
  142. print("=" * 80 + "\n")
  143. for task in processing_tasks:
  144. task_file = get_task_file_path(task, workspace)
  145. print(f"\n🤖 [CURSOR-EXECUTE-TASK]")
  146. print(f"任务ID: {task['task_id']}")
  147. print(f"任务名称: {task['task_name']}")
  148. print(f"任务文件: {task_file or '未创建'}")
  149. print(f"\n任务描述:\n{task['task_description']}")
  150. print(f"\n执行指令:")
  151. print(f"1. 打开任务文件: {task_file or task.get('code_path', 'app/core/data_flow') + '/' + task['task_name'] + '.py'}")
  152. print(f"2. 根据任务描述实现功能")
  153. print(f"3. 调用MCP工具更新状态:")
  154. print(f" 工具: update_task_status")
  155. print(f" 参数: {{")
  156. print(f" \"task_id\": {task['task_id']},")
  157. if task_file:
  158. import os
  159. file_path = task_file
  160. file_name = os.path.basename(file_path)
  161. file_dir = os.path.dirname(file_path).replace(str(workspace), '').strip('\\').strip('/').replace('\\', '/')
  162. print(f" \"code_name\": \"{file_name}\",")
  163. print(f" \"code_path\": \"{file_dir}\",")
  164. print(f" \"status\": \"completed\"")
  165. print(f" }}")
  166. print(f"\n🔚 [END-CURSOR-EXECUTE-TASK]\n")
  167. print("=" * 80)
  168. print(f"\n💡 提示:任务执行指令已保存到 .cursor/task_execute_instructions.md")
  169. print(f"💡 Cursor可以自动读取此文件并执行任务\n")
  170. return len(processing_tasks)
  171. except Exception as e:
  172. logger.error(f"❌ 读取任务文件失败: {e}")
  173. return 0
  174. def execute_tasks_loop(interval=300):
  175. """
  176. 循环执行任务检查和触发
  177. Args:
  178. interval: 检查间隔(秒),默认300秒(5分钟)
  179. """
  180. logger.info("=" * 80)
  181. logger.info("🚀 Cursor任务执行触发器已启动(定期模式)")
  182. logger.info(f"⏰ 检查间隔: {interval}秒 ({interval//60}分钟)")
  183. logger.info("按 Ctrl+C 停止服务")
  184. logger.info("=" * 80 + "\n")
  185. try:
  186. while True:
  187. try:
  188. count = execute_tasks_once()
  189. if count > 0:
  190. logger.info(f"\n✅ 已触发 {count} 个任务执行")
  191. logger.info(f"💡 任务指令已保存到 .cursor/task_execute_instructions.md")
  192. logger.info(f"💡 Cursor将自动读取并执行任务\n")
  193. logger.info(f"\n⏳ 下次检查时间: {interval}秒后...")
  194. time.sleep(interval)
  195. except KeyboardInterrupt:
  196. raise
  197. except Exception as e:
  198. logger.error(f"❌ 执行出错: {e}")
  199. logger.info(f"⏳ {interval}秒后重试...")
  200. time.sleep(interval)
  201. except KeyboardInterrupt:
  202. logger.info("\n" + "=" * 80)
  203. logger.info("⛔ 用户停止了Cursor任务执行触发器")
  204. logger.info("=" * 80)
  205. def main():
  206. """
  207. 主函数
  208. """
  209. parser = argparse.ArgumentParser(
  210. description='Cursor任务执行触发器 - 定期检查并触发任务执行'
  211. )
  212. parser.add_argument(
  213. '--once',
  214. action='store_true',
  215. help='只执行一次检查,不持续运行'
  216. )
  217. parser.add_argument(
  218. '--interval',
  219. type=int,
  220. default=300,
  221. help='检查间隔(秒),默认300秒(5分钟)'
  222. )
  223. args = parser.parse_args()
  224. if args.once:
  225. # 执行一次
  226. count = execute_tasks_once()
  227. logger.info(f"\n✅ 完成!找到 {count} 个待执行任务")
  228. else:
  229. # 循环执行
  230. execute_tasks_loop(interval=args.interval)
  231. if __name__ == '__main__':
  232. main()