Ver Fonte

已经为react_agent添加checkpoint删除和管理功能。

wangxq há 3 semanas atrás
pai
commit
e3d9eef93d
3 ficheiros alterados com 644 adições e 1 exclusões
  1. 268 0
      docs/react_agent_checkpoint_api_usage_guide.md
  2. 4 1
      react_agent/config.py
  3. 372 0
      unified_api.py

+ 268 - 0
docs/react_agent_checkpoint_api_usage_guide.md

@@ -0,0 +1,268 @@
+# React Agent Checkpoint API管理使用说明
+
+## API端点列表
+
+| 方法 | URI | 功能说明 |
+|------|-----|----------|
+| GET | `/api/v0/checkpoint/direct/stats` | 获取Checkpoint统计信息 |
+| GET | `/api/v0/checkpoint/direct/stats?user_id={user_id}` | 获取指定用户的Checkpoint统计信息 |
+| POST | `/api/v0/checkpoint/direct/cleanup` | 清理Checkpoint数据 |
+
+## 概述
+
+React Agent Checkpoint管理API提供了对LangGraph Agent运行过程中产生的checkpoint数据进行管理和监控的功能。这些API通过直接操作Redis数据库,实现对checkpoint的统计查询和清理操作,不依赖Agent实例运行。
+
+## API列表
+
+| API名称 | 路由 | 方法 | 使用目的 |
+|---------|------|------|----------|
+| 获取Checkpoint统计 | `/api/v0/checkpoint/direct/stats` | GET | 查看系统中checkpoint的统计信息,包括用户数量、线程数量、checkpoint总数等 |
+| 清理Checkpoint | `/api/v0/checkpoint/direct/cleanup` | POST | 清理过期的checkpoint数据,支持全局、用户级、线程级的清理操作 |
+
+## API详细说明
+
+### 1. 获取Checkpoint统计
+
+**API路由:** `GET /api/v0/checkpoint/direct/stats`
+
+**使用目的:** 获取系统或指定用户的checkpoint统计信息,用于监控数据量和存储状态。
+
+#### 参数说明
+
+- **查询参数(可选):**
+  - `user_id`: 指定用户ID,获取特定用户的统计信息
+
+#### 使用示例
+
+```bash
+# 获取系统全部统计信息
+curl http://localhost:8084/api/v0/checkpoint/direct/stats
+
+# 获取指定用户统计信息
+curl "http://localhost:8084/api/v0/checkpoint/direct/stats?user_id=wang1"
+```
+
+#### 返回结果说明
+
+**系统全部统计信息:**
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "获取系统checkpoint统计成功",
+  "data": {
+    "operation_type": "system_stats",
+    "total_users": 2,                    // 系统总用户数
+    "total_threads": 4,                  // 系统总线程数
+    "total_checkpoints": 132,            // 系统总checkpoint数
+    "users": [                           // 用户详细信息列表
+      {
+        "user_id": "wang1",              // 用户ID
+        "thread_count": 3,               // 用户线程数
+        "total_checkpoints": 116,        // 用户checkpoint总数
+        "threads": [                     // 线程详细信息
+          {
+            "thread_id": "wang1:20250729235038043",
+            "checkpoint_count": 36       // 线程checkpoint数量
+          }
+          // ... 更多线程
+        ]
+      }
+      // ... 更多用户
+    ],
+    "timestamp": "2025-01-31T10:30:00"   // 统计时间戳
+  }
+}
+```
+
+**指定用户统计信息:**
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "获取用户wang1统计成功",
+  "data": {
+    "operation_type": "user_stats",
+    "user_id": "wang1",                  // 目标用户ID
+    "thread_count": 3,                   // 用户线程数
+    "total_checkpoints": 116,            // 用户checkpoint总数
+    "threads": [                         // 按checkpoint数量降序排列
+      {
+        "thread_id": "wang1:20250801171843665",
+        "checkpoint_count": 64
+      },
+      {
+        "thread_id": "wang1:20250729235038043",
+        "checkpoint_count": 36
+      },
+      {
+        "thread_id": "wang1:20250731141657916",
+        "checkpoint_count": 16
+      }
+    ],
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+### 2. 清理Checkpoint
+
+**API路由:** `POST /api/v0/checkpoint/direct/cleanup`
+
+**使用目的:** 清理过期的checkpoint数据,释放Redis存储空间,每个线程保留最近N个checkpoint。
+
+#### 参数说明
+
+**请求体参数(JSON格式):**
+
+| 参数名 | 类型 | 必需 | 默认值 | 说明 |
+|--------|------|------|--------|------|
+| `keep_count` | int | 否 | 10 | 每个线程保留的checkpoint数量 |
+| `user_id` | string | 否 | - | 指定要清理的用户ID |
+| `thread_id` | string | 否 | - | 指定要清理的线程ID |
+
+**参数逻辑:**
+- 无任何参数:清理所有线程的checkpoint
+- 只有`user_id`:清理指定用户的所有线程
+- 只有`thread_id`:清理指定的线程
+- `user_id`和`thread_id`同时存在:以`thread_id`为准
+
+#### 使用示例
+
+```bash
+# 清理所有线程,每个保留10个checkpoint
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"keep_count": 10}'
+
+# 清理用户wang1的所有线程,每个保留5个checkpoint
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"user_id": "wang1", "keep_count": 5}'
+
+# 清理指定线程,保留8个checkpoint
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"thread_id": "wang1:20250729235038043", "keep_count": 8}'
+```
+
+#### 返回结果说明
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "Checkpoint清理完成",
+  "data": {
+    "operation_type": "cleanup_all",        // 操作类型:cleanup_all|cleanup_user|cleanup_thread
+    "target": "all",                        // 操作目标:all|用户ID|线程ID
+    "keep_count": 10,                       // 保留数量
+    "total_processed": 15,                  // 处理的线程总数
+    "total_deleted": 45,                    // 删除的checkpoint总数
+    "details": {                            // 详细处理结果
+      "wang1:20250729235038043": {
+        "original_count": 36,               // 原始checkpoint数量
+        "deleted_count": 26,                // 删除的checkpoint数量
+        "remaining_count": 10,              // 剩余checkpoint数量
+        "status": "success"                 // 处理状态
+      },
+      "wang1:20250731141657916": {
+        "original_count": 16,
+        "deleted_count": 6,
+        "remaining_count": 10,
+        "status": "success"
+      }
+      // ... 更多线程处理结果
+    },
+    "timestamp": "2025-01-31T10:30:00"      // 操作时间戳
+  }
+}
+```
+
+## 错误处理
+
+### 常见错误情况
+
+1. **Redis连接失败**
+2. **thread_id格式错误**
+3. **用户不存在**
+4. **删除checkpoint失败**
+
+### 错误响应格式
+
+```json
+{
+  "code": 500,
+  "success": false,
+  "message": "请求处理失败",
+  "data": {
+    "response": "具体错误信息",
+    "error_type": "REDIS_CONNECTION_ERROR",  // 错误类型
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+### 错误类型说明
+
+| 错误类型 | 说明 |
+|----------|------|
+| `REDIS_CONNECTION_ERROR` | Redis连接失败 |
+| `INVALID_THREAD_ID` | 线程ID格式错误 |
+| `USER_NOT_FOUND` | 指定用户不存在 |
+| `DELETE_FAILED` | 删除checkpoint操作失败 |
+
+## 使用场景
+
+### 1. 监控场景
+```bash
+# 定期检查系统checkpoint数据量
+curl http://localhost:8084/api/v0/checkpoint/direct/stats
+
+# 检查特定用户的数据量
+curl "http://localhost:8084/api/v0/checkpoint/direct/stats?user_id=wang1"
+```
+
+### 2. 维护场景
+```bash
+# 系统维护:清理所有过期数据
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"keep_count": 10}'
+
+# 用户维护:清理特定用户数据
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"user_id": "wang1", "keep_count": 5}'
+```
+
+### 3. 故障排查场景
+```bash
+# 检查问题线程的checkpoint数量
+curl "http://localhost:8084/api/v0/checkpoint/direct/stats?user_id=problem_user"
+
+# 清理问题线程的数据
+curl -X POST http://localhost:8084/api/v0/checkpoint/direct/cleanup \
+  -H "Content-Type: application/json" \
+  -d '{"thread_id": "problem_user:20250729235038043", "keep_count": 3}'
+```
+
+## 最佳实践
+
+1. **定期监控**:建议定期调用统计API监控checkpoint数据量
+2. **合理清理**:根据实际使用情况设置合适的保留数量(建议5-10个)
+3. **分批处理**:对于大量数据的清理,建议按用户分批进行
+4. **备份策略**:重要操作前可考虑备份关键checkpoint数据
+5. **日志查看**:操作后查看相关日志确认执行结果
+
+## 注意事项
+
+- 清理操作不可逆,请谨慎操作
+- 建议在低峰期进行大批量清理操作
+- 保留数量建议不少于3个,以确保Agent正常回溯
+- API直接操作Redis,不依赖Agent实例状态
+- 所有时间戳均为ISO 8601格式
+
+---
+
+*该文档提供了React Agent Checkpoint API的完整使用指南,帮助用户有效管理和维护checkpoint数据。*

+ 4 - 1
react_agent/config.py

@@ -65,4 +65,7 @@ MAX_LOG_LENGTH = 1000              # 非调试模式下的最大日志长度
 # --- State管理配置 ---
 MESSAGE_TRIM_ENABLED = True        # 是否启用消息裁剪
 MESSAGE_TRIM_COUNT = 100          # 消息数量超过此值时触发裁剪,裁剪后保留此数量的消息
-MESSAGE_TRIM_SEARCH_LIMIT = 20    # 向前搜索HumanMessage的最大条数 
+MESSAGE_TRIM_SEARCH_LIMIT = 20    # 向前搜索HumanMessage的最大条数
+
+# --- Checkpoint管理配置 ---
+CHECKPOINT_KEEP_COUNT = 10         # 每个thread保留的checkpoint数量(API默认值) 

+ 372 - 0
unified_api.py

@@ -3210,6 +3210,378 @@ def get_conversation_summary_api(thread_id: str):
 
 
 
+# ================== Checkpoint 管理 API ==================
+
+@app.route('/api/v0/checkpoint/direct/cleanup', methods=['POST'])
+async def cleanup_checkpoints():
+    """
+    清理checkpoint,保留最近N个
+    
+    请求参数:
+        - keep_count: 可选,保留数量,默认使用配置值
+        - user_id: 可选,指定用户ID
+        - thread_id: 可选,指定线程ID
+        
+    参数逻辑:
+        - 无任何参数:清理所有thread_id的checkpoint
+        - 只有user_id:清理指定用户的所有thread
+        - 只有thread_id:清理指定的thread
+        - user_id和thread_id同时存在:以thread_id为准
+    """
+    try:
+        # 获取请求参数
+        data = request.get_json() or {}
+        keep_count = data.get('keep_count', react_agent_config.CHECKPOINT_KEEP_COUNT)
+        user_id = data.get('user_id')
+        thread_id = data.get('thread_id')
+        
+        logger.info(f"🧹 开始checkpoint清理 - keep_count: {keep_count}, user_id: {user_id}, thread_id: {thread_id}")
+        
+        # 参数验证
+        if keep_count <= 0:
+            return jsonify(bad_request_response(
+                response_text="keep_count必须大于0"
+            )), 400
+        
+        # 验证thread_id格式
+        if thread_id and ':' not in thread_id:
+            return jsonify(bad_request_response(
+                response_text="thread_id格式错误,期望格式: user_id:timestamp"
+            )), 400
+        
+        # 创建Redis连接(异步版本)
+        redis_client = redis.Redis(
+            host=react_agent_config.REDIS_HOST,
+            port=react_agent_config.REDIS_PORT,
+            db=react_agent_config.REDIS_DB,
+            password=react_agent_config.REDIS_PASSWORD,
+            decode_responses=True
+        )
+        await redis_client.ping()
+        
+        # 确定扫描模式和操作类型
+        if thread_id:
+            # 清理指定thread
+            pattern = f"checkpoint:{thread_id}:*"
+            operation_type = "cleanup_thread"
+            target = thread_id
+        elif user_id:
+            # 清理指定用户的所有thread
+            pattern = f"checkpoint:{user_id}:*"
+            operation_type = "cleanup_user"
+            target = user_id
+        else:
+            # 清理所有thread
+            pattern = "checkpoint:*"
+            operation_type = "cleanup_all"
+            target = "all"
+        
+        logger.info(f"   扫描模式: {pattern}")
+        
+        # 扫描匹配的keys
+        keys = []
+        cursor = 0
+        while True:
+            cursor, batch = await redis_client.scan(cursor=cursor, match=pattern, count=1000)
+            keys.extend(batch)
+            if cursor == 0:
+                break
+        
+        logger.info(f"   找到 {len(keys)} 个checkpoint keys")
+        
+        if not keys:
+            await redis_client.close()
+            return jsonify(success_response(
+                response_text="未找到需要清理的checkpoint",
+                data={
+                    "operation_type": operation_type,
+                    "target": target,
+                    "keep_count": keep_count,
+                    "total_processed": 0,
+                    "total_deleted": 0,
+                    "details": {}
+                }
+            )), 200
+        
+        # 按thread_id分组
+        thread_groups = {}
+        for key in keys:
+            parts = key.split(':')
+            if len(parts) >= 3:
+                key_user_id = parts[1]
+                timestamp = parts[2]
+                key_thread_id = f"{key_user_id}:{timestamp}"
+                
+                if key_thread_id not in thread_groups:
+                    thread_groups[key_thread_id] = []
+                thread_groups[key_thread_id].append(key)
+        
+        logger.info(f"   分组结果: {len(thread_groups)} 个threads")
+        
+        # 清理每个thread的checkpoint
+        details = {}
+        total_deleted = 0
+        total_processed = 0
+        
+        for tid, tid_keys in thread_groups.items():
+            original_count = len(tid_keys)
+            
+            if original_count <= keep_count:
+                # 无需清理
+                details[tid] = {
+                    "original_count": original_count,
+                    "deleted_count": 0,
+                    "remaining_count": original_count,
+                    "status": "no_cleanup_needed"
+                }
+                total_processed += 1
+                continue
+            
+            # 按key排序(key包含timestamp,天然有序)
+            tid_keys.sort()
+            keys_to_delete = tid_keys[:-keep_count]
+            
+            # 使用Redis Pipeline批量删除
+            deleted_count = 0
+            if keys_to_delete:
+                try:
+                    pipeline = redis_client.pipeline()
+                    for key in keys_to_delete:
+                        pipeline.delete(key)
+                    await pipeline.execute()
+                    deleted_count = len(keys_to_delete)
+                    
+                    logger.info(f"   Thread {tid}: 删除了 {deleted_count} 个checkpoint")
+                    
+                except Exception as e:
+                    logger.error(f"   Thread {tid}: 批量删除失败: {e}")
+                    # 尝试逐个删除
+                    for key in keys_to_delete:
+                        try:
+                            await redis_client.delete(key)
+                            deleted_count += 1
+                        except Exception as del_error:
+                            logger.error(f"   删除key失败: {key}, 错误: {del_error}")
+            
+            details[tid] = {
+                "original_count": original_count,
+                "deleted_count": deleted_count,
+                "remaining_count": original_count - deleted_count,
+                "status": "success" if deleted_count > 0 else "failed"
+            }
+            
+            total_deleted += deleted_count
+            total_processed += 1
+        
+        await redis_client.aclose()
+        
+        logger.info(f"✅ Checkpoint清理完成 - 处理{total_processed}个threads,删除{total_deleted}个checkpoints")
+        
+        return jsonify(success_response(
+            response_text=f"Checkpoint清理完成,删除了{total_deleted}个checkpoint",
+            data={
+                "operation_type": operation_type,
+                "target": target,
+                "keep_count": keep_count,
+                "total_processed": total_processed,
+                "total_deleted": total_deleted,
+                "details": details
+            }
+        )), 200
+        
+    except redis.ConnectionError as e:
+        logger.error(f"❌ Redis连接失败: {e}")
+        return jsonify(internal_error_response(
+            response_text="Redis连接失败,请检查Redis服务状态"
+        )), 500
+        
+    except Exception as e:
+        logger.error(f"❌ Checkpoint清理失败: {e}")
+        return jsonify(internal_error_response(
+            response_text=f"Checkpoint清理失败: {str(e)}"
+        )), 500
+
+
+@app.route('/api/v0/checkpoint/direct/stats', methods=['GET'])
+async def get_checkpoint_stats():
+    """
+    获取checkpoint统计信息
+    
+    查询参数:
+        - user_id: 可选,指定用户ID
+        
+    调用方式:
+        GET /api/v0/checkpoint/direct/stats                  # 获取全部统计信息
+        GET /api/v0/checkpoint/direct/stats?user_id=wang1   # 获取指定用户统计信息
+    """
+    try:
+        user_id = request.args.get('user_id')
+        
+        logger.info(f"📊 获取checkpoint统计 - user_id: {user_id}")
+        
+        # 创建Redis连接(异步版本)
+        redis_client = redis.Redis(
+            host=react_agent_config.REDIS_HOST,
+            port=react_agent_config.REDIS_PORT,
+            db=react_agent_config.REDIS_DB,
+            password=react_agent_config.REDIS_PASSWORD,
+            decode_responses=True
+        )
+        await redis_client.ping()
+        
+        # 确定扫描模式
+        if user_id:
+            pattern = f"checkpoint:{user_id}:*"
+            operation_type = "user_stats"
+        else:
+            pattern = "checkpoint:*"
+            operation_type = "system_stats"
+        
+        logger.info(f"   扫描模式: {pattern}")
+        
+        # 扫描匹配的keys
+        keys = []
+        cursor = 0
+        while True:
+            cursor, batch = await redis_client.scan(cursor=cursor, match=pattern, count=1000)
+            keys.extend(batch)
+            if cursor == 0:
+                break
+        
+        logger.info(f"   找到 {len(keys)} 个checkpoint keys")
+        
+        await redis_client.aclose()
+        
+        if not keys:
+            if user_id:
+                return jsonify(not_found_response(
+                    response_text=f"用户 {user_id} 没有任何checkpoint"
+                )), 404
+            else:
+                return jsonify(success_response(
+                    response_text="系统中暂无checkpoint数据",
+                    data={
+                        "operation_type": operation_type,
+                        "total_users": 0,
+                        "total_threads": 0,
+                        "total_checkpoints": 0,
+                        "users": []
+                    }
+                )), 200
+        
+        # 按用户和thread分组统计
+        user_stats = {}
+        for key in keys:
+            parts = key.split(':')
+            if len(parts) >= 3:
+                key_user_id = parts[1]
+                timestamp = parts[2]
+                thread_id = f"{key_user_id}:{timestamp}"
+                
+                if key_user_id not in user_stats:
+                    user_stats[key_user_id] = {}
+                
+                if thread_id not in user_stats[key_user_id]:
+                    user_stats[key_user_id][thread_id] = 0
+                
+                user_stats[key_user_id][thread_id] += 1
+        
+        # 构建响应数据
+        if user_id:
+            # 返回指定用户的统计信息
+            if user_id not in user_stats:
+                return jsonify(not_found_response(
+                    response_text=f"用户 {user_id} 没有任何checkpoint"
+                )), 404
+            
+            threads = []
+            total_checkpoints = 0
+            for thread_id, count in user_stats[user_id].items():
+                threads.append({
+                    "thread_id": thread_id,
+                    "checkpoint_count": count
+                })
+                total_checkpoints += count
+            
+            # 按checkpoint数量排序
+            threads.sort(key=lambda x: x["checkpoint_count"], reverse=True)
+            
+            result_data = {
+                "operation_type": operation_type,
+                "user_id": user_id,
+                "thread_count": len(threads),
+                "total_checkpoints": total_checkpoints,
+                "threads": threads
+            }
+            
+            logger.info(f"✅ 获取用户 {user_id} 统计完成 - {len(threads)} threads, {total_checkpoints} checkpoints")
+            
+            return jsonify(success_response(
+                response_text=f"获取用户{user_id}统计成功",
+                data=result_data
+            )), 200
+        
+        else:
+            # 返回系统全部统计信息
+            users = []
+            total_threads = 0
+            total_checkpoints = 0
+            
+            for uid, threads_data in user_stats.items():
+                user_threads = []
+                user_total_checkpoints = 0
+                
+                for thread_id, count in threads_data.items():
+                    user_threads.append({
+                        "thread_id": thread_id,
+                        "checkpoint_count": count
+                    })
+                    user_total_checkpoints += count
+                
+                # 按checkpoint数量排序
+                user_threads.sort(key=lambda x: x["checkpoint_count"], reverse=True)
+                
+                users.append({
+                    "user_id": uid,
+                    "thread_count": len(user_threads),
+                    "total_checkpoints": user_total_checkpoints,
+                    "threads": user_threads
+                })
+                
+                total_threads += len(user_threads)
+                total_checkpoints += user_total_checkpoints
+            
+            # 按用户的checkpoint数量排序
+            users.sort(key=lambda x: x["total_checkpoints"], reverse=True)
+            
+            result_data = {
+                "operation_type": operation_type,
+                "total_users": len(users),
+                "total_threads": total_threads,
+                "total_checkpoints": total_checkpoints,
+                "users": users
+            }
+            
+            logger.info(f"✅ 获取系统统计完成 - {len(users)} users, {total_threads} threads, {total_checkpoints} checkpoints")
+            
+            return jsonify(success_response(
+                response_text="获取系统checkpoint统计成功",
+                data=result_data
+            )), 200
+        
+    except redis.ConnectionError as e:
+        logger.error(f"❌ Redis连接失败: {e}")
+        return jsonify(internal_error_response(
+            response_text="Redis连接失败,请检查Redis服务状态"
+        )), 500
+        
+    except Exception as e:
+        logger.error(f"❌ 获取checkpoint统计失败: {e}")
+        return jsonify(internal_error_response(
+            response_text=f"获取checkpoint统计失败: {str(e)}"
+        )), 500
+
+
 # Data Pipeline 全局变量 - 从 citu_app.py 迁移
 data_pipeline_manager = None
 data_pipeline_file_manager = None