Преглед на файлове

react_agent 准备添加checkpoint删除功能.

wangxq преди 3 седмици
родител
ревизия
443b9c7c06
променени са 2 файла, в които са добавени 1208 реда и са изтрити 0 реда
  1. 845 0
      docs/react_agent checkpoint 删除策略参考.md
  2. 363 0
      docs/react_agent_checkpoint_management_api_design.md

+ 845 - 0
docs/react_agent checkpoint 删除策略参考.md

@@ -0,0 +1,845 @@
+你的观察非常准确!确实,对于大多数应用场景,只需要保留最近几个checkpoint就够了,中间的大量checkpoint只是在浪费存储空间。你的分析完全正确!从你的截图可以看到`wang1:20250729235038043`有36个checkpoint,但实际上只有最新的几个有意义。TTL确实不适合这个场景,因为它会"一刀切"地删除所有过期的checkpoint。
+
+## 核心问题
+
+**TTL的局限性:**
+
+- ✅ 能自动清理旧数据
+- ❌ 但是是"全部清理",不是"保留最近N个"
+- ❌ 无法精确控制保留策略
+
+**实际需求:**
+
+- 保留最近5-10个checkpoint(支持回滚调试)
+- 删除更早的checkpoint(节省存储空间)
+
+## 推荐解决方案
+
+### 方案1:应用层定期清理(推荐)
+
+```Python
+def cleanup_old_checkpoints(checkpointer, thread_id: str, keep_count: int = 10):
+    """保留最近N个checkpoint,删除其余的"""
+    try:
+        config = {"configurable": {"thread_id": thread_id}}
+        
+        # 获取所有checkpoint(已按时间排序)
+        all_checkpoints = list(checkpointer.list(config))
+        
+        print(f"Thread {thread_id}: 总共{len(all_checkpoints)}个checkpoint")
+        
+        if len(all_checkpoints) <= keep_count:
+            print("无需清理")
+            return
+        
+        # 保留最新的keep_count个,删除其余的
+        to_delete = all_checkpoints[keep_count:]
+        
+        print(f"将删除{len(to_delete)}个旧checkpoint")
+        
+        for checkpoint_tuple in to_delete:
+            try:
+                # 删除checkpoint
+                delete_config = {
+                    "configurable": {
+                        "thread_id": thread_id,
+                        "checkpoint_id": checkpoint_tuple.checkpoint["id"]
+                    }
+                }
+                
+                checkpointer.delete(delete_config)
+                print(f"已删除: {checkpoint_tuple.checkpoint['id'][:8]}...")
+                
+            except Exception as e:
+                print(f"删除失败: {e}")
+                
+    except Exception as e:
+        print(f"清理失败: {e}")
+
+# 立即清理你的thread
+checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+cleanup_old_checkpoints(checkpointer, "wang1:20250729235038043", keep_count=5)
+```
+
+### 方案2:集成到应用流程中
+
+```Python
+class AutoCleanupCheckpointer:
+    """自动清理的Checkpointer包装器"""
+    
+    def __init__(self, base_checkpointer, keep_count=10, cleanup_frequency=20):
+        self.base_checkpointer = base_checkpointer
+        self.keep_count = keep_count
+        self.cleanup_frequency = cleanup_frequency
+        self.operation_count = 0
+    
+    def put(self, config, checkpoint, metadata, new_versions):
+        # 正常保存checkpoint
+        result = self.base_checkpointer.put(config, checkpoint, metadata, new_versions)
+        
+        # 每N次操作触发一次清理
+        self.operation_count += 1
+        if self.operation_count % self.cleanup_frequency == 0:
+            thread_id = config.get("configurable", {}).get("thread_id")
+            if thread_id:
+                self._auto_cleanup(thread_id)
+        
+        return result
+    
+    def _auto_cleanup(self, thread_id):
+        """自动清理该thread的旧checkpoint"""
+        try:
+            config = {"configurable": {"thread_id": thread_id}}
+            all_checkpoints = list(self.base_checkpointer.list(config))
+            
+            if len(all_checkpoints) > self.keep_count:
+                to_delete = all_checkpoints[self.keep_count:]
+                
+                for cp in to_delete:
+                    delete_config = {
+                        "configurable": {
+                            "thread_id": thread_id,
+                            "checkpoint_id": cp.checkpoint["id"]
+                        }
+                    }
+                    try:
+                        self.base_checkpointer.delete(delete_config)
+                    except:
+                        pass  # 忽略删除错误
+                
+                print(f"自动清理{thread_id}:删除{len(to_delete)}个旧checkpoint")
+        except:
+            pass  # 忽略清理错误
+    
+    def __getattr__(self, name):
+        return getattr(self.base_checkpointer, name)
+
+# 使用自动清理的checkpointer
+base_checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+auto_checkpointer = AutoCleanupCheckpointer(base_checkpointer, keep_count=10)
+
+graph = builder.compile(checkpointer=auto_checkpointer)
+```
+
+### 方案3:定时任务清理
+
+```Python
+import threading
+import time
+
+def start_checkpoint_cleanup_service(checkpointer, keep_count=10, interval_hours=1):
+    """启动后台清理服务"""
+    
+    def cleanup_worker():
+        while True:
+            try:
+                print("开始定期checkpoint清理...")
+                
+                # 这里需要获取所有thread_id
+                # 你可以维护一个thread_id列表,或者从Redis扫描
+                thread_ids = get_all_thread_ids()  # 需要你实现这个函数
+                
+                for thread_id in thread_ids:
+                    cleanup_old_checkpoints(checkpointer, thread_id, keep_count)
+                
+                print("定期清理完成")
+                
+            except Exception as e:
+                print(f"定期清理出错: {e}")
+            
+            # 等待下次清理
+            time.sleep(interval_hours * 3600)
+    
+    # 启动后台线程
+    cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
+    cleanup_thread.start()
+    
+    print(f"后台清理服务已启动,每{interval_hours}小时清理一次")
+    return cleanup_thread
+```
+
+## 立即行动建议
+
+### 1. 立即清理你现有的checkpoint
+
+```Python
+# 清理你的thread,只保留最新5个
+checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+cleanup_old_checkpoints(checkpointer, "wang1:20250729235038043", keep_count=5)
+
+# 从36个减少到5个,节省大量存储空间!
+```
+
+### 2. 设置长期策略
+
+```Python
+# 选择其中一种方式:
+
+# 方式A: 集成到应用中(推荐)
+auto_checkpointer = AutoCleanupCheckpointer(checkpointer, keep_count=10)
+graph = builder.compile(checkpointer=auto_checkpointer)
+
+# 方式B: 独立的定时清理
+start_checkpoint_cleanup_service(checkpointer, keep_count=10, interval_hours=2)
+```
+
+## 总结
+
+你的观察非常准确:
+
+1. **36个checkpoint中只有最新几个有用**
+2. **TTL会"全删",不符合需求**
+3. **需要"保留最近N个"的策略**
+
+**推荐方案:保留最近10个checkpoint + 每次使用时自动清理**
+
+这样既能支持必要的回滚调试,又能有效控制存储成本!
+
+在graph流程中执行checkpoint删除确实可能影响响应时间,特别是当需要删除的checkpoint很多时。你的担心完全合理!在graph流程中执行checkpoint删除确实会影响响应时间,特别是当需要删除很多checkpoint时。
+
+## 性能影响分析
+
+```Python
+# 用户期望的响应时间
+def chat_node(state):
+    response = llm.invoke(state["messages"])  # 1-3秒
+    return {"messages": [response]}
+
+# 如果加上清理操作
+def chat_node_with_cleanup(state):
+    response = llm.invoke(state["messages"])  # 1-3秒
+    
+    # 清理36个checkpoint可能需要额外的1-5秒!
+    cleanup_old_checkpoints(checkpointer, thread_id, keep_count=10)
+    
+    return {"messages": [response]}
+    
+# 用户感受:原本2秒变成了6秒!
+```
+
+## 更好的解决方案:后台异步清理
+
+### 推荐方案:独立的后台清理服务
+
+```Python
+import threading
+import queue
+
+class BackgroundCheckpointCleaner:
+    """后台checkpoint清理服务 - 不影响用户响应"""
+    
+    def __init__(self, checkpointer, keep_count=10):
+        self.checkpointer = checkpointer
+        self.keep_count = keep_count
+        self.cleanup_queue = queue.Queue()
+        self.worker_thread = None
+        
+    def request_cleanup(self, thread_id: str):
+        """请求清理(非阻塞,立即返回)"""
+        self.cleanup_queue.put(thread_id)
+        print(f"已加入清理队列: {thread_id}")
+    
+    def start_worker(self):
+        """启动后台工作线程"""
+        def worker():
+            while True:
+                try:
+                    thread_id = self.cleanup_queue.get(timeout=1)
+                    print(f"后台清理开始: {thread_id}")
+                    
+                    # 在后台执行清理,不影响用户请求
+                    self._do_cleanup(thread_id)
+                    
+                    print(f"后台清理完成: {thread_id}")
+                    self.cleanup_queue.task_done()
+                    
+                except queue.Empty:
+                    continue
+                except Exception as e:
+                    print(f"后台清理出错: {e}")
+        
+        self.worker_thread = threading.Thread(target=worker, daemon=True)
+        self.worker_thread.start()
+        print("后台清理服务已启动")
+    
+    def _do_cleanup(self, thread_id: str):
+        """实际清理操作(在后台线程中执行)"""
+        try:
+            config = {"configurable": {"thread_id": thread_id}}
+            all_checkpoints = list(self.checkpointer.list(config))
+            
+            if len(all_checkpoints) <= self.keep_count:
+                return
+            
+            to_delete = all_checkpoints[self.keep_count:]
+            
+            for cp in to_delete:
+                delete_config = {
+                    "configurable": {
+                        "thread_id": thread_id,
+                        "checkpoint_id": cp.checkpoint["id"]
+                    }
+                }
+                self.checkpointer.delete(delete_config)
+            
+            print(f"后台删除了{len(to_delete)}个checkpoint")
+            
+        except Exception as e:
+            print(f"后台清理失败: {e}")
+
+# 全局清理服务
+cleanup_service = BackgroundCheckpointCleaner(checkpointer, keep_count=10)
+cleanup_service.start_worker()
+```
+
+### 基于阈值的智能触发
+
+```Python
+class SmartCheckpointer:
+    """智能checkpointer:达到阈值才触发后台清理"""
+    
+    def __init__(self, base_checkpointer, cleanup_service, threshold=30):
+        self.base_checkpointer = base_checkpointer
+        self.cleanup_service = cleanup_service
+        self.threshold = threshold
+        self.checkpoint_counts = {}
+    
+    def put(self, config, checkpoint, metadata, new_versions):
+        # 正常保存(不影响性能)
+        result = self.base_checkpointer.put(config, checkpoint, metadata, new_versions)
+        
+        # 计数并在达到阈值时触发后台清理
+        thread_id = config.get("configurable", {}).get("thread_id")
+        if thread_id:
+            count = self.checkpoint_counts.get(thread_id, 0) + 1
+            self.checkpoint_counts[thread_id] = count
+            
+            # 达到30个checkpoint时触发后台清理
+            if count >= self.threshold:
+                self.cleanup_service.request_cleanup(thread_id)  # 非阻塞
+                self.checkpoint_counts[thread_id] = 10  # 重置计数
+        
+        return result
+    
+    def __getattr__(self, name):
+        return getattr(self.base_checkpointer, name)
+```
+
+## 实际部署方案
+
+### 1. 应用启动时初始化
+
+```Python
+# main.py
+from langgraph.checkpoint.redis import RedisSaver
+
+# 初始化
+checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+
+# 启动后台清理服务
+cleanup_service = BackgroundCheckpointCleaner(checkpointer, keep_count=10)
+cleanup_service.start_worker()
+
+# 使用智能checkpointer
+smart_checkpointer = SmartCheckpointer(checkpointer, cleanup_service, threshold=30)
+
+# 编译图
+graph = builder.compile(checkpointer=smart_checkpointer)
+```
+
+### 2. 用户接口保持快速响应
+
+```Python
+def chat_api(user_input: str, thread_id: str):
+    """聊天API - 快速响应,不被清理阻塞"""
+    
+    config = {"configurable": {"thread_id": thread_id}}
+    
+    # 快速响应用户(1-3秒)
+    result = graph.invoke({"messages": [HumanMessage(user_input)]}, config)
+    
+    # 清理操作在后台异步执行,不影响响应时间
+    
+    return result["messages"][-1].content
+```
+
+### 3. 定时批量清理(可选)
+
+```Python
+import schedule
+import threading
+
+def setup_scheduled_cleanup():
+    """设置定时清理:每天凌晨2点执行"""
+    
+    def daily_cleanup():
+        print("开始每日checkpoint清理...")
+        # 获取活跃的thread_id列表
+        active_threads = ["wang1:20250729235038043"]  # 你的thread列表
+        
+        for thread_id in active_threads:
+            cleanup_service.request_cleanup(thread_id)
+        
+        print("每日清理任务已提交到后台队列")
+    
+    schedule.every().day.at("02:00").do(daily_cleanup)
+    
+    def run_scheduler():
+        while True:
+            schedule.run_pending()
+            time.sleep(3600)  # 每小时检查一次
+    
+    threading.Thread(target=run_scheduler, daemon=True).start()
+    print("定时清理已设置:每天凌晨2点执行")
+```
+
+## 立即清理现有数据
+
+```Python
+# 一次性清理脚本(离线运行)
+def cleanup_existing_checkpoints():
+    """清理现有的冗余checkpoint"""
+    checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+    
+    thread_id = "wang1:20250729235038043"
+    config = {"configurable": {"thread_id": thread_id}}
+    
+    all_checkpoints = list(checkpointer.list(config))
+    print(f"当前有{len(all_checkpoints)}个checkpoint")
+    
+    if len(all_checkpoints) > 10:
+        to_delete = all_checkpoints[10:]  # 保留最新10个
+        
+        for cp in to_delete:
+            delete_config = {
+                "configurable": {
+                    "thread_id": thread_id,
+                    "checkpoint_id": cp.checkpoint["id"]
+                }
+            }
+            checkpointer.delete(delete_config)
+        
+        print(f"删除了{len(to_delete)}个旧checkpoint")
+    else:
+        print("无需清理")
+
+# 运行一次立即清理
+if __name__ == "__main__":
+    cleanup_existing_checkpoints()
+```
+
+## 总结
+
+你的直觉是对的!**在graph流程中执行checkpoint删除会明显影响用户体验**。
+
+**推荐策略:**
+
+1. **后台异步清理** - 用户响应不受影响
+2. **阈值触发** - 只在必要时清理
+3. **定时批量清理** - 在低峰期执行
+
+**核心原则:用户响应优先,清理操作后台执行**
+
+这样既保证了快速响应,又能有效管理存储空间!
+
+# Flask应用中的Checkpoint定时清理方案
+
+## 方案对比
+
+| 方案            | 优势             | 劣势                         | 推荐场景         |
+| --------------- | ---------------- | ---------------------------- | ---------------- |
+| Flask内置定时器 | 简单,集成度高   | 影响主进程,不够健壮         | 开发测试         |
+| 外部脚本+cron   | 独立进程,稳定   | 需要额外配置                 | **生产环境推荐** |
+| Celery          | 专业,功能强大   | 配置复杂,需要Redis/RabbitMQ | 大型应用         |
+| APScheduler     | 功能丰富,易集成 | 在Flask进程内运行            | 中小型应用       |
+
+## 推荐方案:外部脚本 + API调用
+
+### 1. Flask应用提供清理API
+
+```Python
+# app.py
+from flask import Flask, jsonify, request
+from langgraph.checkpoint.redis import RedisSaver
+import time
+import threading
+
+app = Flask(__name__)
+
+# 初始化checkpointer
+checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
+
+def cleanup_thread_checkpoints(thread_id: str, keep_count: int = 10):
+    """清理单个thread的旧checkpoint"""
+    try:
+        config = {"configurable": {"thread_id": thread_id}}
+        all_checkpoints = list(checkpointer.list(config))
+        
+        if len(all_checkpoints) <= keep_count:
+            return {"status": "no_cleanup_needed", "total": len(all_checkpoints)}
+        
+        to_delete = all_checkpoints[keep_count:]
+        deleted_count = 0
+        
+        for checkpoint_tuple in to_delete:
+            try:
+                delete_config = {
+                    "configurable": {
+                        "thread_id": thread_id,
+                        "checkpoint_id": checkpoint_tuple.checkpoint["id"]
+                    }
+                }
+                checkpointer.delete(delete_config)
+                deleted_count += 1
+            except Exception as e:
+                print(f"删除checkpoint失败: {e}")
+        
+        return {
+            "status": "success",
+            "total_checkpoints": len(all_checkpoints),
+            "deleted_count": deleted_count,
+            "remaining_count": len(all_checkpoints) - deleted_count
+        }
+        
+    except Exception as e:
+        return {"status": "error", "message": str(e)}
+
+def get_all_thread_ids():
+    """获取所有thread_id - 根据你的实际情况实现"""
+    # 方法1: 如果你在数据库中维护了thread_id列表
+    # return db.query("SELECT DISTINCT thread_id FROM conversations")
+    
+    # 方法2: 从Redis扫描
+    try:
+        import redis
+        redis_client = redis.from_url("redis://localhost:6379")
+        
+        thread_ids = set()
+        for key in redis_client.scan_iter(match="checkpoint:*"):
+            key_str = key.decode('utf-8')
+            parts = key_str.split(':')
+            if len(parts) >= 3:
+                thread_id = parts[1]
+                thread_ids.add(thread_id)
+        
+        return list(thread_ids)
+    except Exception as e:
+        print(f"获取thread_id失败: {e}")
+        return []
+
+@app.route('/api/cleanup/thread/<thread_id>', methods=['POST'])
+def cleanup_single_thread(thread_id):
+    """清理单个thread的checkpoint"""
+    keep_count = request.json.get('keep_count', 10) if request.json else 10
+    
+    result = cleanup_thread_checkpoints(thread_id, keep_count)
+    return jsonify(result)
+
+@app.route('/api/cleanup/all', methods=['POST'])
+def cleanup_all_threads():
+    """清理所有thread的checkpoint"""
+    keep_count = request.json.get('keep_count', 10) if request.json else 10
+    
+    thread_ids = get_all_thread_ids()
+    results = {}
+    total_deleted = 0
+    
+    for thread_id in thread_ids:
+        result = cleanup_thread_checkpoints(thread_id, keep_count)
+        results[thread_id] = result
+        if result["status"] == "success":
+            total_deleted += result["deleted_count"]
+    
+    return jsonify({
+        "status": "completed",
+        "processed_threads": len(thread_ids),
+        "total_deleted": total_deleted,
+        "results": results
+    })
+
+@app.route('/api/cleanup/stats', methods=['GET'])
+def cleanup_stats():
+    """获取checkpoint统计信息"""
+    thread_ids = get_all_thread_ids()
+    stats = {}
+    total_checkpoints = 0
+    
+    for thread_id in thread_ids:
+        try:
+            config = {"configurable": {"thread_id": thread_id}}
+            checkpoints = list(checkpointer.list(config))
+            count = len(checkpoints)
+            stats[thread_id] = count
+            total_checkpoints += count
+        except Exception as e:
+            stats[thread_id] = f"error: {e}"
+    
+    return jsonify({
+        "total_threads": len(thread_ids),
+        "total_checkpoints": total_checkpoints,
+        "thread_stats": stats
+    })
+
+# 你的其他API路由...
+@app.route('/api/chat', methods=['POST'])
+def chat():
+    # 你的聊天逻辑
+    pass
+
+if __name__ == '__main__':
+    app.run(debug=True)
+```
+
+### 2. 独立的清理脚本
+
+```Python
+# cleanup_scheduler.py
+import requests
+import time
+import schedule
+import logging
+from datetime import datetime
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+class CheckpointCleanupScheduler:
+    def __init__(self, flask_api_url="http://localhost:5000", keep_count=10):
+        self.api_url = flask_api_url
+        self.keep_count = keep_count
+    
+    def cleanup_all_checkpoints(self):
+        """调用Flask API清理所有checkpoint"""
+        try:
+            logger.info("开始定时清理所有checkpoint...")
+            
+            # 调用Flask API
+            response = requests.post(
+                f"{self.api_url}/api/cleanup/all",
+                json={"keep_count": self.keep_count},
+                timeout=300  # 5分钟超时
+            )
+            
+            if response.status_code == 200:
+                result = response.json()
+                logger.info(f"清理完成: 处理了{result['processed_threads']}个thread,"
+                           f"删除了{result['total_deleted']}个checkpoint")
+                return True
+            else:
+                logger.error(f"清理失败: HTTP {response.status_code}")
+                return False
+                
+        except Exception as e:
+            logger.error(f"清理出错: {e}")
+            return False
+    
+    def get_cleanup_stats(self):
+        """获取清理统计信息"""
+        try:
+            response = requests.get(f"{self.api_url}/api/cleanup/stats", timeout=30)
+            if response.status_code == 200:
+                stats = response.json()
+                logger.info(f"当前状态: {stats['total_threads']}个thread,"
+                           f"共{stats['total_checkpoints']}个checkpoint")
+                return stats
+            else:
+                logger.error(f"获取统计失败: HTTP {response.status_code}")
+                return None
+        except Exception as e:
+            logger.error(f"获取统计出错: {e}")
+            return None
+    
+    def cleanup_specific_thread(self, thread_id: str):
+        """清理特定thread"""
+        try:
+            response = requests.post(
+                f"{self.api_url}/api/cleanup/thread/{thread_id}",
+                json={"keep_count": self.keep_count},
+                timeout=60
+            )
+            
+            if response.status_code == 200:
+                result = response.json()
+                logger.info(f"清理thread {thread_id}: {result}")
+                return True
+            else:
+                logger.error(f"清理thread {thread_id}失败: HTTP {response.status_code}")
+                return False
+                
+        except Exception as e:
+            logger.error(f"清理thread {thread_id}出错: {e}")
+            return False
+
+# 创建调度器实例
+scheduler = CheckpointCleanupScheduler(
+    flask_api_url="http://localhost:5000",  # 你的Flask应用地址
+    keep_count=10  # 保留最近10个checkpoint
+)
+
+# 设置定时任务
+def daily_cleanup():
+    """每日清理任务"""
+    logger.info("=== 开始每日checkpoint清理 ===")
+    
+    # 先获取统计信息
+    stats = scheduler.get_cleanup_stats()
+    
+    # 执行清理
+    success = scheduler.cleanup_all_checkpoints()
+    
+    if success:
+        logger.info("=== 每日清理完成 ===")
+    else:
+        logger.error("=== 每日清理失败 ===")
+
+def weekly_stats():
+    """每周统计报告"""
+    logger.info("=== 每周checkpoint统计 ===")
+    scheduler.get_cleanup_stats()
+
+# 设置定时计划
+schedule.every().day.at("02:00").do(daily_cleanup)       # 每天凌晨2点清理
+schedule.every().monday.at("09:00").do(weekly_stats)     # 每周一上午9点统计
+
+# 主循环
+def main():
+    logger.info("Checkpoint清理调度器已启动")
+    logger.info("清理计划: 每天凌晨2:00执行")
+    logger.info("统计计划: 每周一上午9:00执行")
+    
+    while True:
+        try:
+            schedule.run_pending()
+            time.sleep(60)  # 每分钟检查一次
+        except KeyboardInterrupt:
+            logger.info("调度器已停止")
+            break
+        except Exception as e:
+            logger.error(f"调度器出错: {e}")
+            time.sleep(60)
+
+if __name__ == "__main__":
+    main()
+```
+
+### 3. 系统级部署
+
+#### 方法A: 使用systemd服务(推荐)
+
+```Bash
+# /etc/systemd/system/checkpoint-cleanup.service
+[Unit]
+Description=Checkpoint Cleanup Scheduler
+After=network.target
+
+[Service]
+Type=simple
+User=your_user
+WorkingDirectory=/path/to/your/project
+Environment=PATH=/path/to/your/venv/bin
+ExecStart=/path/to/your/venv/bin/python cleanup_scheduler.py
+Restart=always
+RestartSec=10
+
+[Install]
+WantedBy=multi-user.target
+# 启用服务
+sudo systemctl daemon-reload
+sudo systemctl enable checkpoint-cleanup.service
+sudo systemctl start checkpoint-cleanup.service
+
+# 查看状态
+sudo systemctl status checkpoint-cleanup.service
+```
+
+#### 方法B: 使用cron + 简单脚本
+
+```Bash
+# cleanup_once.py - 单次执行的清理脚本
+import requests
+import sys
+
+def cleanup_once():
+    try:
+        response = requests.post(
+            "http://localhost:5000/api/cleanup/all",
+            json={"keep_count": 10},
+            timeout=300
+        )
+        
+        if response.status_code == 200:
+            result = response.json()
+            print(f"清理成功: 删除了{result['total_deleted']}个checkpoint")
+            return 0
+        else:
+            print(f"清理失败: HTTP {response.status_code}")
+            return 1
+            
+    except Exception as e:
+        print(f"清理出错: {e}")
+        return 1
+
+if __name__ == "__main__":
+    sys.exit(cleanup_once())
+# 添加到crontab
+crontab -e
+
+# 每天凌晨2点执行清理
+0 2 * * * /path/to/venv/bin/python /path/to/cleanup_once.py >> /var/log/checkpoint_cleanup.log 2>&1
+```
+
+## 方案4: Flask内置定时器(简单场景)
+
+```Python
+# app.py - 如果坚持在Flask内部做定时
+import threading
+import time
+
+def start_cleanup_thread():
+    """启动清理线程"""
+    def cleanup_worker():
+        while True:
+            try:
+                # 等待24小时
+                time.sleep(24 * 3600)
+                
+                # 执行清理
+                thread_ids = get_all_thread_ids()
+                for thread_id in thread_ids:
+                    cleanup_thread_checkpoints(thread_id, keep_count=10)
+                
+                print("定时清理完成")
+                
+            except Exception as e:
+                print(f"定时清理出错: {e}")
+    
+    cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
+    cleanup_thread.start()
+    print("内置清理线程已启动")
+
+# 在Flask启动时调用
+if __name__ == '__main__':
+    start_cleanup_thread()  # 启动清理线程
+    app.run(debug=True)
+```
+
+## 推荐部署方案
+
+**对于生产环境,我强烈推荐:**
+
+1. **Flask提供API接口**(方案1)
+2. **独立的Python调度脚本**(方案2)
+3. **systemd服务管理**(方案3A)
+
+这样的架构:
+
+- ✅ Flask专注于业务逻辑
+- ✅ 清理逻辑独立,不影响主应用
+- ✅ 可以灵活调整清理策略
+- ✅ 便于监控和调试
+- ✅ 服务重启不影响定时任务
+

+ 363 - 0
docs/react_agent_checkpoint_management_api_design.md

@@ -0,0 +1,363 @@
+# React Agent Checkpoint管理API设计文档
+
+## 概述
+
+本文档描述了React Agent Checkpoint管理API的设计方案,提供checkpoint清理和统计功能,通过直接操作Redis实现,不依赖LangGraph工作流。
+
+## 设计目标
+
+- **简洁设计**:只需2个API,通过参数控制不同功能
+- **直接操作**:直接连接Redis,不通过Agent实例
+- **灵活控制**:支持全局、用户级、线程级的操作范围
+- **手工调用**:通过API手工触发,不自动执行
+
+## API设计
+
+### API 1: Checkpoint清理
+
+**路由:** `POST /api/v0/checkpoint/direct/cleanup`
+
+**功能:** 清理checkpoint,保留最近N个
+
+**请求参数:**
+```json
+{
+  "keep_count": 10,           // 可选,保留数量,默认使用配置值
+  "user_id": "wang1",         // 可选,指定用户ID
+  "thread_id": "wang1:20250729235038043"  // 可选,指定线程ID
+}
+```
+
+**参数逻辑:**
+- 无任何参数:清理所有thread_id的checkpoint
+- 只有`user_id`:清理指定用户的所有thread
+- 只有`thread_id`:清理指定的thread
+- `user_id`和`thread_id`同时存在:以`thread_id`为准
+
+**响应格式:**
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "Checkpoint清理完成",
+  "data": {
+    "operation_type": "cleanup_all|cleanup_user|cleanup_thread",
+    "target": "all|wang1|wang1:20250729235038043",
+    "keep_count": 10,
+    "total_processed": 15,
+    "total_deleted": 45,
+    "details": {
+      "wang1:20250729235038043": {
+        "original_count": 36,
+        "deleted_count": 26,
+        "remaining_count": 10,
+        "status": "success"
+      },
+      "wang1:20250731141657916": {
+        "original_count": 16,
+        "deleted_count": 6,
+        "remaining_count": 10,
+        "status": "success"
+      }
+    },
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+### API 2: Checkpoint统计
+
+**路由:** `GET /api/v0/checkpoint/direct/stats`
+
+**功能:** 获取checkpoint统计信息
+
+**查询参数:**
+- `user_id`: 可选,指定用户ID
+
+**调用方式:**
+```bash
+# 获取全部统计信息
+GET /api/v0/checkpoint/direct/stats
+
+# 获取指定用户统计信息  
+GET /api/v0/checkpoint/direct/stats?user_id=wang1
+```
+
+**响应格式:**
+
+**全部统计信息:**
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "获取系统checkpoint统计成功",
+  "data": {
+    "operation_type": "system_stats",
+    "total_users": 2,
+    "total_threads": 4,
+    "total_checkpoints": 132,
+    "users": [
+      {
+        "user_id": "wang1",
+        "thread_count": 3,
+        "total_checkpoints": 116,
+        "threads": [
+          {
+            "thread_id": "wang1:20250729235038043",
+            "checkpoint_count": 36
+          },
+          {
+            "thread_id": "wang1:20250731141657916", 
+            "checkpoint_count": 16
+          },
+          {
+            "thread_id": "wang1:20250801171843665",
+            "checkpoint_count": 64
+          }
+        ]
+      },
+      {
+        "user_id": "wang2",
+        "thread_count": 1,
+        "total_checkpoints": 16,
+        "threads": [
+          {
+            "thread_id": "wang2:20250731141659949",
+            "checkpoint_count": 16
+          }
+        ]
+      }
+    ],
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+**指定用户统计信息:**
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "获取用户wang1统计成功",
+  "data": {
+    "operation_type": "user_stats",
+    "user_id": "wang1",
+    "thread_count": 3,
+    "total_checkpoints": 116,
+    "threads": [
+      {
+        "thread_id": "wang1:20250801171843665",
+        "checkpoint_count": 64
+      },
+      {
+        "thread_id": "wang1:20250729235038043", 
+        "checkpoint_count": 36
+      },
+      {
+        "thread_id": "wang1:20250731141657916",
+        "checkpoint_count": 16
+      }
+    ],
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+## 技术实现
+
+### 返回格式标准化
+
+使用`common/result.py`的标准化响应格式:
+
+```python
+from common.result import success_response, error_response, internal_error_response
+
+# 成功响应
+return jsonify(success_response(
+    response_text="Checkpoint清理完成",
+    data={
+        "operation_type": "cleanup_all",
+        "total_deleted": 45,
+        # ... 其他数据
+    }
+))
+
+# 错误响应  
+return jsonify(internal_error_response(
+    response_text="Redis连接失败"
+))
+```
+
+### Redis连接方式
+
+参考`react_agent/enhanced_redis_api.py`的模式:
+
+```python
+# 直接创建Redis连接
+redis_client = redis.Redis(
+    host=config.REDIS_HOST,
+    port=config.REDIS_PORT, 
+    db=config.REDIS_DB,
+    password=config.REDIS_PASSWORD,
+    decode_responses=True
+)
+```
+
+### Checkpoint Key格式
+
+基于现有系统,checkpoint key格式:
+```
+checkpoint:user_id:timestamp:checkpoint_id
+```
+
+示例:
+```
+checkpoint:wang1:20250729235038043:01936451-dd24-641c-8005-c07e5896ad38
+checkpoint:wang1:20250729235038043:01936451-dd29-624b-8006-fc1f3a83e4f5
+checkpoint:wang2:20250731141659949:01936462-72a1-6e5c-8009-378fd98058aa
+```
+
+### 核心操作逻辑
+
+**扫描Keys:**
+```python
+# 扫描所有checkpoint
+pattern = "checkpoint:*"
+# 扫描指定用户
+pattern = f"checkpoint:{user_id}:*"  
+# 扫描指定thread
+pattern = f"checkpoint:{thread_id}:*"
+
+keys = []
+cursor = 0
+while True:
+    cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
+    keys.extend(batch)
+    if cursor == 0:
+        break
+```
+
+**数据分组:**
+```python
+# 按thread_id分组
+thread_groups = {}
+for key in keys:
+    parts = key.split(':')
+    if len(parts) >= 3:
+        user_id = parts[1]
+        timestamp = parts[2] 
+        thread_id = f"{user_id}:{timestamp}"
+        
+        if thread_id not in thread_groups:
+            thread_groups[thread_id] = []
+        thread_groups[thread_id].append(key)
+```
+
+**批量清理操作:**
+```python
+# 保留最近N个checkpoint,使用Redis批量删除
+for thread_id, keys in thread_groups.items():
+    if len(keys) > keep_count:
+        # 按key排序(key包含timestamp,天然有序)
+        keys.sort()
+        # 删除旧的keys
+        keys_to_delete = keys[:-keep_count]
+        
+        # 使用Redis Pipeline批量删除,提升性能
+        if keys_to_delete:
+            pipeline = redis_client.pipeline()
+            for key in keys_to_delete:
+                pipeline.delete(key)
+            pipeline.execute()  # 批量执行删除命令
+```
+
+## 配置参数
+
+在`react_agent/config.py`中添加:
+
+```python
+# --- Checkpoint管理配置 ---
+CHECKPOINT_KEEP_COUNT = 10         # 每个thread保留的checkpoint数量(API默认值)
+```
+
+## API集成位置
+
+- **文件位置:** `unified_api.py`
+- **路由前缀:** `/api/v0/checkpoint/direct/`
+- **依赖模块:** `react_agent.config`、`redis`、`common.result`
+
+## 错误处理
+
+### 常见错误情况:
+
+1. **Redis连接失败**
+2. **thread_id格式错误**
+3. **用户不存在**
+4. **删除checkpoint失败**
+
+### 错误响应格式:
+
+使用`common/result.py`的标准化错误响应:
+
+```json
+{
+  "code": 500,
+  "success": false,
+  "message": "请求处理失败",
+  "data": {
+    "response": "具体错误信息",
+    "error_type": "REDIS_CONNECTION_ERROR|INVALID_THREAD_ID|USER_NOT_FOUND|DELETE_FAILED",
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+## 使用示例
+
+### 清理API调用示例
+
+```bash
+# 1. 清理所有thread,每个保留10个checkpoint
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"keep_count": 10}'
+
+# 2. 清理用户wang1的所有thread,每个保留5个
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"user_id": "wang1", "keep_count": 5}'
+
+# 3. 清理指定thread,保留8个
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"thread_id": "wang1:20250729235038043", "keep_count": 8}'
+```
+
+### 统计API调用示例
+
+```bash
+# 4. 获取全部统计信息
+curl http://localhost:8084/api/v0/checkpoint/direct/stats
+
+# 5. 获取用户wang1的统计信息
+curl "http://localhost:8084/api/v0/checkpoint/direct/stats?user_id=wang1"
+```
+
+## 性能考虑
+
+1. **批量扫描:** 使用`scan`命令避免阻塞Redis
+2. **批量删除:** 使用Redis Pipeline批量删除keys,提升性能
+3. **连接管理:** 操作完成后及时关闭Redis连接
+4. **日志记录:** 记录操作过程便于调试和监控
+5. **单线程处理:** 当前使用单线程顺序处理,避免并发复杂性
+
+## 安全考虑
+
+1. **参数验证:** 验证user_id和thread_id格式
+2. **权限控制:** 可考虑添加用户身份验证
+3. **操作日志:** 记录谁在什么时候执行了清理操作
+4. **回滚机制:** 重要操作前可考虑备份
+
+---
+
+*本文档描述了checkpoint管理API的完整设计方案,为实际开发提供详细的技术规范。*