||
- #!/usr/bin/env python3
- """
- 自动任务执行 + Cursor Chat 工具(合并版)
- 本脚本整合了原来的:
- - auto_execute_tasks.py:检查数据库中的 pending 任务,生成本地任务文件和 pending_tasks.json
- - cursor_auto_chat.py:在 Windows 下自动操作 Cursor Chat,发送指定消息
- 合并后,一个脚本即可完成:
- 1. 从 task_list 中读取 pending 任务
- 2. 为任务生成本地 Python 占位文件
- 3. 维护 .cursor/pending_tasks.json
- 4. (可选)在 Cursor Chat 中自动发起「请检查并执行所有待处理任务。」
- 5. 将 .cursor/pending_tasks.json 中 status=completed 的任务状态同步到 task_list
- 使用方式:
- 1. 仅任务调度(不发 Chat):
- python scripts/auto_execute_tasks.py --once
- python scripts/auto_execute_tasks.py --interval 300
- 2. 任务调度 + 自动 Chat:
- python scripts/auto_execute_tasks.py --once --enable-chat --chat-input-pos "1180,965"
- python scripts/auto_execute_tasks.py --interval 300 --enable-chat --chat-input-pos "1180,965"
- """
- from __future__ import annotations
- import json
- import time
- import argparse
- import logging
- import sys
- from pathlib import Path
- from datetime import datetime
- from typing import Any, Dict, List, Optional, Tuple
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s - %(levelname)s - %(message)s",
- )
- logger = logging.getLogger("AutoExecuteTasks")
- # ==== Cursor Chat 相关依赖(Windows GUI 自动化) ====
- try:
- import win32gui
- import win32con
- import win32process
- import pyautogui
- try:
- import pyperclip
- HAS_PYPERCLIP = True
- except ImportError:
- HAS_PYPERCLIP = False
- HAS_CURSOR_GUI = True
- # pyautogui 安全 & 节奏
- pyautogui.FAILSAFE = True
- pyautogui.PAUSE = 0.5
- except ImportError:
- HAS_CURSOR_GUI = False
- HAS_PYPERCLIP = False
- logger.warning(
- "未安装 Windows GUI 自动化依赖(pywin32/pyautogui/pyperclip),"
- "将禁用自动 Cursor Chat 功能。"
- )
- # 全局配置(由命令行参数控制)
- ENABLE_CHAT: bool = False
- CHAT_MESSAGE: str = "请检查并执行所有待处理任务。"
- CHAT_INPUT_POS: Optional[Tuple[int, int]] = None
- WORKSPACE_ROOT = Path(__file__).parent.parent
- CURSOR_DIR = WORKSPACE_ROOT / ".cursor"
- PENDING_TASKS_FILE = CURSOR_DIR / "pending_tasks.json"
- def get_db_connection():
- """
- 获取数据库连接
-
- Returns:
- 数据库连接对象,如果失败返回None
- """
- try:
- import psycopg2
- from psycopg2.extras import RealDictCursor
-
- # 读取数据库配置
- config_file = Path(__file__).parent.parent / 'mcp-servers' / 'task-manager' / 'config.json'
- with open(config_file, 'r', encoding='utf-8') as f:
- config = json.load(f)
-
- db_uri = config['database']['uri']
- conn = psycopg2.connect(db_uri)
- return conn
-
- except Exception as e:
- logger.error(f"连接数据库失败: {e}")
- return None
- def get_pending_tasks():
- """
- 从数据库获取所有pending任务
- """
- try:
- from psycopg2.extras import RealDictCursor
-
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor(cursor_factory=RealDictCursor)
-
- # 查询pending任务
- cursor.execute("""
- SELECT task_id, task_name, task_description, status,
- code_name, code_path, create_time, create_by
- FROM task_list
- WHERE status = 'pending'
- ORDER BY create_time ASC
- """)
-
- tasks = cursor.fetchall()
- cursor.close()
- conn.close()
-
- return [dict(task) for task in tasks]
-
- except Exception as e:
- logger.error(f"获取pending任务失败: {e}")
- return []
- def update_task_status(task_id, status, code_name=None, code_path=None):
- """
- 更新任务状态
-
- Args:
- task_id: 任务ID
- status: 新状态('pending', 'processing', 'completed', 'failed')
- code_name: 代码文件名(可选)
- code_path: 代码文件路径(可选)
-
- Returns:
- 是否更新成功
- """
- try:
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- # 构建更新SQL
- if code_name and code_path:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, code_name = %s, code_path = %s,
- update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, code_name, code_path, task_id))
- else:
- cursor.execute("""
- UPDATE task_list
- SET status = %s, update_time = CURRENT_TIMESTAMP
- WHERE task_id = %s
- """, (status, task_id))
-
- conn.commit()
- updated = cursor.rowcount > 0
- cursor.close()
- conn.close()
-
- if updated:
- logger.info(f"✅ 任务 {task_id} 状态已更新为: {status}")
- else:
- logger.warning(f"⚠️ 任务 {task_id} 状态更新失败(任务不存在)")
-
- return updated
-
- except Exception as e:
- logger.error(f"更新任务状态失败: {e}")
- return False
- # ==== Cursor Chat 辅助函数(简化版,依赖固定输入框坐标) ====
- def _find_cursor_window() -> Optional[int]:
- """
- 查找 Cursor 主窗口句柄(简化版)
- 通过窗口标题 / 类名中包含 'Cursor' / 'Chrome_WidgetWin_1' 来判断。
- """
- if not HAS_CURSOR_GUI:
- return None
- cursor_windows: List[Dict[str, Any]] = []
- def enum_windows_callback(hwnd, _extra):
- if win32gui.IsWindowVisible(hwnd):
- title = win32gui.GetWindowText(hwnd) or ""
- class_name = win32gui.GetClassName(hwnd) or ""
- is_cursor = False
- if "cursor" in title.lower():
- is_cursor = True
- if class_name and "chrome_widgetwin" in class_name.lower():
- is_cursor = True
- if is_cursor:
- left, top, right, bottom = win32gui.GetWindowRect(hwnd)
- width = right - left
- height = bottom - top
- area = width * height
- cursor_windows.append(
- {
- "hwnd": hwnd,
- "title": title,
- "class": class_name,
- "width": width,
- "height": height,
- "area": area,
- }
- )
- return True
- win32gui.EnumWindows(enum_windows_callback, None)
- if not cursor_windows:
- logger.warning("未找到 Cursor 窗口")
- return None
- # 选取面积最大的窗口作为主窗口
- cursor_windows.sort(key=lambda x: x["area"], reverse=True)
- main = cursor_windows[0]
- logger.info(
- "找到 Cursor 主窗口: %s (%s), size=%dx%d, hwnd=%s",
- main["title"],
- main["class"],
- main["width"],
- main["height"],
- main["hwnd"],
- )
- return main["hwnd"]
- def _activate_cursor_window(hwnd: int) -> bool:
- """激活 Cursor 主窗口"""
- if not HAS_CURSOR_GUI:
- return False
- try:
- win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
- time.sleep(0.3)
- win32gui.SetForegroundWindow(hwnd)
- time.sleep(0.5)
- logger.info("Cursor 窗口已激活")
- return True
- except Exception as exc: # noqa: BLE001
- logger.error("激活 Cursor 窗口失败: %s", exc)
- return False
- def _send_chat_message_once(message: str, input_pos: Optional[Tuple[int, int]]) -> bool:
- """
- 在 Cursor Chat 中发送一条消息(单次):
- 1. 激活窗口
- 2. 如果提供了输入框坐标,则移动并点击
- 3. 使用剪贴板粘贴消息
- 4. 按 Enter 提交
- """
- if not HAS_CURSOR_GUI:
- logger.warning("当前环境不支持 Cursor GUI 自动化,跳过自动发 Chat")
- return False
- hwnd = _find_cursor_window()
- if not hwnd:
- return False
- if not _activate_cursor_window(hwnd):
- return False
- # 点击/激活输入框
- if input_pos:
- x, y = input_pos
- logger.info("移动鼠标到输入框位置: (%d, %d)", x, y)
- pyautogui.moveTo(x, y, duration=0.3)
- time.sleep(0.2)
- pyautogui.click(x, y)
- time.sleep(0.4)
- else:
- # 未指定位置时,尝试快捷键打开 Chat(Ctrl+K),然后点击窗口底部中间
- logger.info("未指定输入框位置,尝试使用 Ctrl+K 打开 Chat")
- pyautogui.hotkey("ctrl", "k")
- time.sleep(1.5)
- screen_width, screen_height = pyautogui.size()
- x, y = int(screen_width * 0.6), int(screen_height * 0.9)
- pyautogui.moveTo(x, y, duration=0.3)
- pyautogui.click(x, y)
- time.sleep(0.4)
- # 清空旧内容
- pyautogui.hotkey("ctrl", "a")
- time.sleep(0.3)
- # 再次点击保证焦点
- pyautogui.click()
- time.sleep(0.2)
- logger.info("正在向 Cursor Chat 输入消息: %s", message)
- # 优先使用剪贴板(兼容中文)
- if HAS_PYPERCLIP:
- try:
- old_clipboard = pyperclip.paste()
- except Exception: # noqa: BLE001
- old_clipboard = None
- try:
- pyperclip.copy(message)
- time.sleep(0.3)
- pyautogui.hotkey("ctrl", "v")
- time.sleep(1.0)
- logger.info("使用剪贴板粘贴消息成功")
- except Exception as exc: # noqa: BLE001
- logger.error("剪贴板粘贴失败: %s,退回到直接输入", exc)
- pyautogui.write(message, interval=0.05)
- time.sleep(0.8)
- finally:
- if old_clipboard is not None:
- try:
- pyperclip.copy(old_clipboard)
- except Exception:
- pass
- else:
- pyautogui.write(message, interval=0.05)
- time.sleep(0.8)
- # 提交(Enter)
- logger.info("按 Enter 提交消息")
- pyautogui.press("enter")
- time.sleep(1.0)
- logger.info("消息已提交")
- return True
- def send_chat_for_pending_tasks() -> None:
- """
- 如果启用了 Chat 功能且存在 pending 任务,则向 Cursor Chat 发送一次统一消息。
- """
- if not ENABLE_CHAT:
- return
- # 仅检查 .cursor/pending_tasks.json 中是否存在 status 为 processing 的任务
- if not PENDING_TASKS_FILE.exists():
- logger.info("未找到 .cursor/pending_tasks.json,跳过自动 Chat")
- return
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- data = json.load(f)
- if not isinstance(data, list) or not any(
- t.get("status") == "processing" for t in data
- ):
- logger.info("pending_tasks.json 中没有 processing 任务,跳过自动 Chat")
- return
- except Exception as exc: # noqa: BLE001
- logger.error("读取 pending_tasks.json 失败: %s", exc)
- return
- logger.info("检测到 processing 任务,准备自动向 Cursor Chat 发送提醒消息")
- _send_chat_message_once(CHAT_MESSAGE, CHAT_INPUT_POS)
- def sync_completed_tasks_from_pending_file() -> int:
- """
- 将 .cursor/pending_tasks.json 中 status == 'completed' 的任务,同步到数据库。
- Returns:
- 成功更新的任务数量
- """
- if not PENDING_TASKS_FILE.exists():
- return 0
- try:
- with PENDING_TASKS_FILE.open("r", encoding="utf-8") as f:
- tasks = json.load(f)
- except Exception as exc: # noqa: BLE001
- logger.error("读取 pending_tasks.json 失败: %s", exc)
- return 0
- if not isinstance(tasks, list):
- return 0
- updated = 0
- for t in tasks:
- if t.get("status") != "completed":
- continue
- task_id = t.get("task_id")
- if not task_id:
- continue
- code_name = t.get("code_name")
- code_path = t.get("code_path")
- if update_task_status(task_id, "completed", code_name, code_path):
- updated += 1
- logger.info(
- "已根据 pending_tasks.json 将任务 %s 同步为 completed (code_name=%s, code_path=%s)",
- task_id,
- code_name,
- code_path,
- )
- if updated:
- logger.info("本次共同步 %d 个 completed 任务到数据库", updated)
- return updated
- def print_task_for_cursor_execution(task, task_file_path=None):
- """
- 打印任务信息,供Cursor识别并执行
-
- 这个函数会以特定格式输出任务,Cursor可以识别并自动执行
- """
- print("\n" + "=" * 80)
- print(f"🤖 [AUTO-EXECUTE-TASK] Task ID: {task['task_id']}")
- print("=" * 80)
- print(f"\n**任务名称**: {task['task_name']}")
- print(f"**任务ID**: {task['task_id']}")
- print(f"**状态**: processing(已更新)")
- print(f"**创建时间**: {task['create_time']}")
- print(f"**创建者**: {task['create_by']}")
-
- if task_file_path:
- print(f"**任务文件**: {task_file_path}")
-
- print(f"\n## 任务描述\n")
- print(task['task_description'])
- print(f"\n## 执行指令")
- print(f"\n请Cursor AI根据上述任务描述,自动完成以下步骤:")
- print(f"1. 打开并查看任务文件: {task_file_path or '未创建'}")
- print(f"2. 根据任务描述实现具体功能")
- print(f"3. 确保代码符合项目规范")
- print(f"4. 完成后调用MCP工具更新任务状态:")
- print(f" 工具: update_task_status")
- print(f" 参数: {{")
- print(f" \"task_id\": {task['task_id']},")
- if task_file_path:
- import os
- file_name = os.path.basename(task_file_path)
- file_dir = os.path.dirname(task_file_path).replace(str(Path(__file__).parent.parent), '').strip('\\').strip('/')
- print(f" \"code_name\": \"{file_name}\",")
- print(f" \"code_path\": \"{file_dir}\",")
- print(f" \"status\": \"completed\"")
- print(f" }}")
- print(f"\n任务文件保存路径:{task.get('code_path', 'app/core/data_flow')}")
- print("\n" + "=" * 80)
- print(f"🔚 [END-AUTO-EXECUTE-TASK]")
- print("=" * 80 + "\n")
- def create_task_file(task):
- """
- 在指定目录创建任务文件
-
- Args:
- task: 任务字典
-
- Returns:
- 生成的文件路径,如果失败返回None
- """
- try:
- workspace = Path(__file__).parent.parent
- code_path = task.get('code_path', 'app/core/data_flow')
- target_dir = workspace / code_path
-
- # 确保目录存在
- target_dir.mkdir(parents=True, exist_ok=True)
-
- # 生成文件名(从任务名称或代码名称)
- code_name = task.get('code_name')
- if not code_name:
- # 从任务名称生成文件名
- import re
- task_name = task['task_name']
- # 清理文件名:去除特殊字符,替换为下划线
- safe_name = re.sub(r'[^\w\u4e00-\u9fff]+', '_', task_name)
- safe_name = re.sub(r'_+', '_', safe_name).strip('_')
- code_name = f"{safe_name}.py"
-
- # 确保是.py文件
- if not code_name.endswith('.py'):
- code_name = f"{code_name}.py"
-
- file_path = target_dir / code_name
-
- # 如果文件已存在,添加时间戳
- if file_path.exists():
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- base_name = file_path.stem
- file_path = target_dir / f"{base_name}_{timestamp}.py"
- code_name = file_path.name
-
- # 生成任务文件内容
- file_content = f'''#!/usr/bin/env python3
- """
- {task['task_name']}
- 任务ID: {task['task_id']}
- 创建时间: {task['create_time']}
- 创建者: {task['create_by']}
- 任务描述:
- {task['task_description']}
- 注意:此文件为任务占位符,需要根据任务描述实现具体功能。
- """
- # TODO: 根据任务描述实现功能
- # {task['task_description'][:100]}...
- if __name__ == '__main__':
- print("任务文件已创建,请根据任务描述实现具体功能")
- pass
- '''
-
- # 写入文件
- with open(file_path, 'w', encoding='utf-8') as f:
- f.write(file_content)
-
- logger.info(f"✅ 任务文件已创建: {file_path}")
-
- # 更新数据库中的code_name和code_path
- update_task_status(
- task['task_id'],
- 'processing', # 状态改为processing
- code_name=code_name,
- code_path=code_path
- )
-
- return str(file_path)
-
- except Exception as e:
- logger.error(f"创建任务文件失败: {e}")
- # 即使文件创建失败,也要更新状态为processing
- update_task_status(task['task_id'], 'processing')
- return None
- def notify_cursor_to_execute_task(task, task_file_path=None):
- """
- 通知Cursor执行任务
-
- 通过创建一个标记文件,让Cursor知道有新任务需要执行
- """
- workspace = Path(__file__).parent.parent
- task_trigger_file = workspace / '.cursor' / 'pending_tasks.json'
- task_trigger_file.parent.mkdir(parents=True, exist_ok=True)
-
- # 读取现有的pending tasks
- pending_tasks = []
- if task_trigger_file.exists():
- try:
- with open(task_trigger_file, 'r', encoding='utf-8') as f:
- pending_tasks = json.load(f)
- except:
- pending_tasks = []
-
- # 检查任务是否已存在
- task_exists = any(t['task_id'] == task['task_id'] for t in pending_tasks)
- if not task_exists:
- task_info = {
- 'task_id': task['task_id'],
- 'task_name': task['task_name'],
- 'task_description': task['task_description'],
- 'code_path': task.get('code_path', 'app/core/data_flow'),
- 'code_name': task.get('code_name'),
- 'status': 'processing', # 标记为processing
- 'notified_at': datetime.now().isoformat()
- }
-
- if task_file_path:
- task_info['task_file'] = task_file_path
-
- pending_tasks.append(task_info)
-
- # 写入文件
- with open(task_trigger_file, 'w', encoding='utf-8') as f:
- json.dump(pending_tasks, f, indent=2, ensure_ascii=False)
-
- logger.info(f"✅ Task {task['task_id']} added to pending_tasks.json")
- def auto_execute_tasks_once():
- """
- 执行一次任务检查和通知
- """
- # 先尝试把本地 completed 状态同步到数据库
- sync_completed_tasks_from_pending_file()
- logger.info("🔍 检查pending任务...")
-
- tasks = get_pending_tasks()
-
- if not tasks:
- logger.info("✅ 没有pending任务")
- return 0
-
- logger.info(f"📋 找到 {len(tasks)} 个pending任务")
-
- processed_count = 0
- for task in tasks:
- logger.info(f"\n{'='*80}")
- logger.info(f"处理任务: [{task['task_id']}] {task['task_name']}")
- logger.info(f"{'='*80}")
-
- try:
- # 1. 创建任务文件(同时更新状态为processing)
- task_file_path = create_task_file(task)
-
- if not task_file_path:
- logger.warning(f"⚠️ 任务 {task['task_id']} 文件创建失败,但状态已更新为processing")
- # 即使文件创建失败,也继续通知Cursor
-
- # 2. 打印任务详情,供Cursor识别
- print_task_for_cursor_execution(task, task_file_path)
-
- # 3. 创建通知文件
- notify_cursor_to_execute_task(task, task_file_path)
-
- processed_count += 1
- logger.info(f"✅ 任务 {task['task_id']} 处理完成")
-
- except Exception as e:
- logger.error(f"❌ 处理任务 {task['task_id']} 时出错: {e}")
- # 标记任务为failed
- update_task_status(task['task_id'], 'failed')
-
- return processed_count
- def auto_execute_tasks_loop(interval=300):
- """
- 循环执行任务检查
-
- Args:
- interval: 检查间隔(秒),默认300秒(5分钟)
- """
- logger.info("=" * 80)
- logger.info("🚀 自动任务执行服务已启动")
- logger.info(f"⏰ 检查间隔: {interval}秒 ({interval//60}分钟)")
- logger.info("按 Ctrl+C 停止服务")
- logger.info("=" * 80 + "\n")
-
- try:
- while True:
- try:
- count = auto_execute_tasks_once()
- # 如果有新任务且启用了自动 Chat,则向 Cursor 发送提醒
- if count > 0:
- send_chat_for_pending_tasks()
- if count > 0:
- logger.info(f"\n✅ 已通知 {count} 个任务")
-
- logger.info(f"\n⏳ 下次检查时间: {interval}秒后...")
- time.sleep(interval)
-
- except KeyboardInterrupt:
- raise
- except Exception as e:
- logger.error(f"❌ 执行出错: {e}")
- logger.info(f"⏳ {interval}秒后重试...")
- time.sleep(interval)
-
- except KeyboardInterrupt:
- logger.info("\n" + "=" * 80)
- logger.info("⛔ 用户停止了自动任务执行服务")
- logger.info("=" * 80)
- def main():
- """
- 主函数
- """
- parser = argparse.ArgumentParser(
- description='自动任务执行脚本(含可选Cursor Chat)- 定期检查并执行pending任务'
- )
- parser.add_argument(
- '--once',
- action='store_true',
- help='只执行一次,不循环'
- )
- parser.add_argument(
- '--interval',
- type=int,
- default=300,
- help='检查间隔(秒),默认300秒(5分钟)'
- )
- parser.add_argument(
- '--enable-chat',
- action='store_true',
- help='启用自动 Cursor Chat,在有pending任务时自动向 Cursor 发送提醒消息'
- )
- parser.add_argument(
- '--chat-input-pos',
- type=str,
- default=None,
- help='Cursor Chat 输入框位置,格式 "x,y"(例如 "1180,965"),不指定则自动尝试定位'
- )
-
- args = parser.parse_args()
- # 设置全局 Chat 配置
- global ENABLE_CHAT, CHAT_INPUT_POS # noqa: PLW0603
- ENABLE_CHAT = bool(args.enable_chat)
- CHAT_INPUT_POS = None
- if args.chat_input_pos:
- try:
- x_str, y_str = args.chat_input_pos.split(',')
- CHAT_INPUT_POS = (int(x_str.strip()), int(y_str.strip()))
- logger.info("使用指定的 Chat 输入框位置: %s", CHAT_INPUT_POS)
- except Exception as exc: # noqa: BLE001
- logger.warning("解析 --chat-input-pos 失败 %r: %s", args.chat_input_pos, exc)
-
- if args.once:
- # 执行一次
- count = auto_execute_tasks_once()
- if count > 0:
- send_chat_for_pending_tasks()
- logger.info(f"\n✅ 完成!处理了 {count} 个任务")
- else:
- # 循环执行
- auto_execute_tasks_loop(interval=args.interval)
- if __name__ == '__main__':
- main()
|