Jelajahi Sumber

增强agent写入的redis数据的管理能力,增加一个api,增强原有的cleanup api.

wangxq 2 minggu lalu
induk
melakukan
0d8b9b469f

+ 290 - 2
common/redis_conversation_manager.py

@@ -519,12 +519,13 @@ class RedisConversationManager:
     def cleanup_expired_conversations(self):
         """清理过期对话(Redis TTL自动处理,这里可添加额外逻辑)"""
         if not self.is_available():
-            return
+            return {"processed_users": 0, "cleaned_references": 0}
         
         try:
             # 清理用户对话列表中的无效对话ID
             user_keys = self.redis_client.keys("user:*:conversations")
             cleaned_count = 0
+            processed_users = len(user_keys)
             
             for user_key in user_keys:
                 conversation_ids = self.redis_client.lrange(user_key, 0, -1)
@@ -543,12 +544,299 @@ class RedisConversationManager:
                     if valid_ids:
                         self.redis_client.lpush(user_key, *reversed(valid_ids))
                         # 重新设置TTL
-                        self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
+                        if USER_CONVERSATIONS_TTL:
+                            self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
             
             self.logger.info(f"清理完成,移除了 {cleaned_count} 个无效对话引用")
+            return {"processed_users": processed_users, "cleaned_references": cleaned_count}
             
         except Exception as e:
             self.logger.error(f"清理失败: {str(e)}")
+            raise e
+
+    def enforce_conversation_limits(self, user_id: Optional[str] = None, 
+                                  user_max_conversations: Optional[int] = None,
+                                  conversation_max_length: Optional[int] = None,
+                                  dry_run: bool = False) -> Dict[str, Any]:
+        """
+        执行对话限额策略
+        
+        Args:
+            user_id: 指定用户ID,如果为None则处理所有用户
+            user_max_conversations: 用户最大对话数,如果为None则使用配置值
+            conversation_max_length: 对话最大消息数,如果为None则使用配置值
+            dry_run: 是否为试运行模式
+            
+        Returns:
+            执行结果统计
+        """
+        if not self.is_available():
+            raise Exception("Redis连接不可用")
+        
+        # 使用传入参数或默认配置
+        max_conversations = user_max_conversations if user_max_conversations is not None else USER_MAX_CONVERSATIONS
+        max_length = conversation_max_length if conversation_max_length is not None else CONVERSATION_MAX_LENGTH
+        
+        try:
+            start_time = time.time()
+            
+            # 确定要处理的用户
+            if user_id:
+                user_keys = [f"user:{user_id}:conversations"]
+                mode = "user_specific"
+            else:
+                user_keys = self.redis_client.keys("user:*:conversations")
+                mode = "global"
+            
+            processed_users = 0
+            total_conversations_processed = 0
+            total_conversations_deleted = 0
+            total_messages_trimmed = 0
+            execution_summary = []
+            
+            for user_key in user_keys:
+                user_id_from_key = user_key.split(":")[1]
+                conversation_ids = self.redis_client.lrange(user_key, 0, -1)
+                
+                original_conversations = len(conversation_ids)
+                total_conversations_processed += original_conversations
+                
+                # 1. 检查用户对话数量限制
+                conversations_to_keep = []
+                conversations_to_delete = []
+                
+                if len(conversation_ids) > max_conversations:
+                    # 获取对话的创建时间并排序
+                    conversations_with_time = []
+                    for conv_id in conversation_ids:
+                        meta_key = f"conversation:{conv_id}:meta"
+                        if self.redis_client.exists(meta_key):
+                            meta_data = self.redis_client.hgetall(meta_key)
+                            created_at = meta_data.get('created_at', '0')
+                            conversations_with_time.append((conv_id, created_at))
+                    
+                    # 按创建时间降序排序,保留最新的
+                    conversations_with_time.sort(key=lambda x: x[1], reverse=True)
+                    conversations_to_keep = [conv_id for conv_id, _ in conversations_with_time[:max_conversations]]
+                    conversations_to_delete = [conv_id for conv_id, _ in conversations_with_time[max_conversations:]]
+                else:
+                    conversations_to_keep = conversation_ids
+                
+                # 2. 处理要删除的对话
+                if conversations_to_delete and not dry_run:
+                    for conv_id in conversations_to_delete:
+                        self.redis_client.delete(f"conversation:{conv_id}:meta")
+                        self.redis_client.delete(f"conversation:{conv_id}:messages")
+                    
+                    # 更新用户对话列表
+                    self.redis_client.delete(user_key)
+                    if conversations_to_keep:
+                        self.redis_client.lpush(user_key, *reversed(conversations_to_keep))
+                        if USER_CONVERSATIONS_TTL:
+                            self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
+                
+                total_conversations_deleted += len(conversations_to_delete)
+                
+                # 3. 检查每个保留对话的消息数量限制
+                messages_trimmed_for_user = 0
+                for conv_id in conversations_to_keep:
+                    messages_key = f"conversation:{conv_id}:messages"
+                    current_length = self.redis_client.llen(messages_key)
+                    
+                    if current_length > max_length:
+                        messages_to_trim = current_length - max_length
+                        if not dry_run:
+                            self.redis_client.ltrim(messages_key, 0, max_length - 1)
+                        messages_trimmed_for_user += messages_to_trim
+                
+                total_messages_trimmed += messages_trimmed_for_user
+                processed_users += 1
+                
+                # 记录用户处理结果
+                execution_summary.append({
+                    "user_id": user_id_from_key,
+                    "original_conversations": original_conversations,
+                    "kept_conversations": len(conversations_to_keep),
+                    "deleted_conversations": len(conversations_to_delete),
+                    "messages_trimmed": messages_trimmed_for_user
+                })
+            
+            execution_time_ms = int((time.time() - start_time) * 1000)
+            
+            return {
+                "mode": mode,
+                "dry_run": dry_run,
+                "parameters": {
+                    "user_max_conversations": max_conversations,
+                    "conversation_max_length": max_length
+                },
+                "processed_users": processed_users,
+                "total_conversations_processed": total_conversations_processed,
+                "total_conversations_deleted": total_conversations_deleted,
+                "total_messages_trimmed": total_messages_trimmed,
+                "execution_summary": execution_summary,
+                "execution_time_ms": execution_time_ms
+            }
+            
+        except Exception as e:
+            self.logger.error(f"执行对话限额策略失败: {str(e)}")
+            raise e
+
+    def delete_user_conversations(self, user_id: str) -> Dict[str, Any]:
+        """
+        删除指定用户的所有对话数据
+        
+        Args:
+            user_id: 用户ID
+            
+        Returns:
+            删除结果统计
+        """
+        if not self.is_available():
+            raise Exception("Redis连接不可用")
+        
+        try:
+            start_time = time.time()
+            
+            user_key = f"user:{user_id}:conversations"
+            conversation_ids = self.redis_client.lrange(user_key, 0, -1)
+            
+            deleted_conversations = 0
+            deleted_messages = 0
+            
+            # 删除每个对话的数据
+            for conv_id in conversation_ids:
+                meta_key = f"conversation:{conv_id}:meta"
+                messages_key = f"conversation:{conv_id}:messages"
+                
+                if self.redis_client.exists(meta_key):
+                    self.redis_client.delete(meta_key)
+                    deleted_conversations += 1
+                
+                if self.redis_client.exists(messages_key):
+                    message_count = self.redis_client.llen(messages_key)
+                    self.redis_client.delete(messages_key)
+                    deleted_messages += message_count
+            
+            # 删除用户对话索引
+            self.redis_client.delete(user_key)
+            
+            execution_time_ms = int((time.time() - start_time) * 1000)
+            
+            return {
+                "user_id": user_id,
+                "deleted_conversations": deleted_conversations,
+                "deleted_messages": deleted_messages,
+                "execution_time_ms": execution_time_ms
+            }
+            
+        except Exception as e:
+            self.logger.error(f"删除用户对话失败: {str(e)}")
+            raise e
+
+    def delete_conversation(self, conversation_id: str) -> Dict[str, Any]:
+        """
+        删除指定的对话
+        
+        Args:
+            conversation_id: 对话ID
+            
+        Returns:
+            删除结果统计
+        """
+        if not self.is_available():
+            raise Exception("Redis连接不可用")
+        
+        try:
+            start_time = time.time()
+            
+            meta_key = f"conversation:{conversation_id}:meta"
+            messages_key = f"conversation:{conversation_id}:messages"
+            
+            # 检查对话是否存在
+            if not self.redis_client.exists(meta_key):
+                # 对话不存在,返回空结果(符合DELETE操作的幂等性原则)
+                return {
+                    "conversation_id": conversation_id,
+                    "user_id": None,
+                    "deleted_messages": 0,
+                    "execution_time_ms": int((time.time() - start_time) * 1000),
+                    "existed": False
+                }
+            
+            # 获取对话所属用户
+            meta_data = self.redis_client.hgetall(meta_key)
+            user_id = meta_data.get('user_id')
+            
+            # 统计要删除的消息数
+            deleted_messages = self.redis_client.llen(messages_key) if self.redis_client.exists(messages_key) else 0
+            
+            # 删除对话数据
+            self.redis_client.delete(meta_key)
+            self.redis_client.delete(messages_key)
+            
+            # 从用户对话列表中移除
+            if user_id:
+                user_key = f"user:{user_id}:conversations"
+                self.redis_client.lrem(user_key, 0, conversation_id)
+            
+            execution_time_ms = int((time.time() - start_time) * 1000)
+            
+            return {
+                "conversation_id": conversation_id,
+                "user_id": user_id,
+                "deleted_messages": deleted_messages,
+                "execution_time_ms": execution_time_ms,
+                "existed": True
+            }
+            
+        except Exception as e:
+            self.logger.error(f"删除对话失败: {str(e)}")
+            raise e
+
+    def clear_all_agent_data(self) -> Dict[str, Any]:
+        """
+        清空所有agent对话数据
+        
+        Returns:
+            删除结果统计
+        """
+        if not self.is_available():
+            raise Exception("Redis连接不可用")
+        
+        try:
+            start_time = time.time()
+            
+            # 扫描并删除所有相关键
+            meta_keys = self.redis_client.keys("conversation:*:meta")
+            messages_keys = self.redis_client.keys("conversation:*:messages")
+            user_keys = self.redis_client.keys("user:*:conversations")
+            
+            deleted_conversation_metas = len(meta_keys)
+            deleted_conversation_messages = len(messages_keys)
+            deleted_user_conversations = len(user_keys)
+            
+            # 批量删除
+            all_keys = meta_keys + messages_keys + user_keys
+            if all_keys:
+                self.redis_client.delete(*all_keys)
+            
+            total_keys_deleted = len(all_keys)
+            execution_time_ms = int((time.time() - start_time) * 1000)
+            
+            self.logger.warning(f"已清空所有agent对话数据,共删除 {total_keys_deleted} 个键")
+            
+            return {
+                "deleted_conversation_metas": deleted_conversation_metas,
+                "deleted_conversation_messages": deleted_conversation_messages,
+                "deleted_user_conversations": deleted_user_conversations,
+                "total_keys_deleted": total_keys_deleted,
+                "execution_time_ms": execution_time_ms
+            }
+            
+        except Exception as e:
+            self.logger.error(f"清空所有agent数据失败: {str(e)}")
+            raise e
     
     # ==================== 问答缓存管理方法 ====================
     

+ 337 - 0
docs/agent_conversation_management_api_design.md

@@ -0,0 +1,337 @@
+# Agent对话管理API设计文档
+
+## 概述
+
+本文档设计两个API来增强agent模块中Redis对话历史记录的管理能力:
+
+1. **新增API**: `/api/v0/conversation_limit_enforcement` - 对话限额执行API
+2. **增强API**: `/api/v0/conversation_cleanup` - 对话清理API增强版
+
+这两个API专门操作agent模块写入Redis的三种Key:
+- `conversation:{conversation_id}:meta` (对话元数据)
+- `conversation:{conversation_id}:messages` (对话消息列表)
+- `user:{user_id}:conversations` (用户对话索引)
+
+## API 1: 对话限额执行API
+
+### 基本信息
+
+- **路由**: `POST /api/v0/conversation_limit_enforcement`
+- **功能**: 执行对话数量和消息数量的限制策略,确保数据量符合配置要求
+- **作用**: 对现有数据进行"补救性"的限额执行,补充自动 `LTRIM` 策略可能遗漏的情况
+
+### 请求参数 (JSON Body)
+
+| 参数                        | 类型    | 必填 | 描述                                                     |
+| --------------------------- | ------- | ---- | -------------------------------------------------------- |
+| `user_id`                   | String  | 否   | 指定用户ID,如果提供则只处理该用户的数据                 |
+| `user_max_conversations`    | Integer | 否   | 每个用户保留的最大对话数,默认从 `app_config.py` 获取   |
+| `conversation_max_length`   | Integer | 否   | 每个对话保留的最大消息数,默认从 `app_config.py` 获取   |
+| `dry_run`                   | Boolean | 否   | 是否为试运行模式,默认 `false`。为 `true` 时只返回分析结果,不执行实际操作 |
+
+### 处理逻辑
+
+#### 全局模式 (未提供 `user_id`)
+1. 扫描所有 `user:*:conversations` Key
+2. 对每个用户执行限额检查和执行
+3. 返回全局统计信息
+
+#### 指定用户模式 (提供了 `user_id`)
+1. 只处理指定用户的 `user:{user_id}:conversations`
+2. 执行该用户的限额检查和执行
+3. 返回该用户的处理结果
+
+#### 限额执行步骤
+1. **用户对话数限制**:
+   - 获取用户的对话列表 `user:{user_id}:conversations`
+   - 如果对话数超过 `user_max_conversations`
+   - 按时间排序,保留最新的N个对话
+   - 删除多余对话的所有相关数据 (`:meta`, `:messages`)
+   - 更新用户对话列表
+
+2. **单对话消息数限制**:
+   - 对每个保留的对话,检查消息数量
+   - 如果消息数超过 `conversation_max_length`
+   - 使用 `LTRIM` 只保留最新的N条消息
+
+### 返回结果 (JSON)
+
+```json
+{
+  "success": true,
+  "message": "限额执行完成",
+  "data": {
+    "mode": "global|user_specific",
+    "dry_run": false,
+    "parameters": {
+      "user_max_conversations": 5,
+      "conversation_max_length": 10
+    },
+    "processed_users": 15,
+    "total_conversations_processed": 45,
+    "total_conversations_deleted": 8,
+    "total_messages_trimmed": 120,
+    "execution_summary": [
+      {
+        "user_id": "guest",
+        "original_conversations": 7,
+        "kept_conversations": 5,
+        "deleted_conversations": 2,
+        "messages_trimmed": 15
+      }
+    ],
+    "execution_time_ms": 1250
+  }
+}
+```
+
+## API 2: 对话清理API增强版
+
+### 基本信息
+
+- **路由**: `POST /api/v0/conversation_cleanup`
+- **功能**: 增强的对话清理功能,支持精确删除和批量清理
+- **兼容性**: 保持原有无参数调用的功能
+
+### 请求参数 (JSON Body)
+
+| 参数                     | 类型    | 必填 | 描述                                           |
+| ------------------------ | ------- | ---- | ---------------------------------------------- |
+| `user_id`                | String  | 否   | 删除指定用户的所有对话数据                     |
+| `conversation_id`        | String  | 否   | 删除指定的对话 (支持 `conversation_id` 格式)   |
+| `thread_id`              | String  | 否   | 删除指定的对话 (支持 `thread_id` 格式,与 `conversation_id` 等效) |
+| `clear_all_agent_data`   | Boolean | 否   | 是否清空所有agent对话数据,默认 `false`        |
+| `cleanup_invalid_refs`   | Boolean | 否   | 是否只清理无效引用,默认 `false`               |
+
+**注意**: 所有操作模式参数互斥,一次请求只能执行一种操作:
+- `user_id` (模式1)
+- `conversation_id` 或 `thread_id` (模式2)
+- `clear_all_agent_data: true` (模式3)
+- `cleanup_invalid_refs: true` (模式4)
+
+如果同时提供多个操作参数,API将返回参数冲突错误。
+
+### 处理逻辑
+
+#### 模式1: 删除指定用户 (提供 `user_id`)
+1. 获取用户的所有对话 `user:{user_id}:conversations`
+2. 逐个删除每个对话的 `:meta` 和 `:messages`
+3. 删除用户对话索引 `user:{user_id}:conversations`
+4. 返回删除统计
+
+#### 模式2: 删除指定对话 (提供 `conversation_id` 或 `thread_id`)
+1. 验证对话是否存在
+2. 获取对话所属用户
+3. 删除 `conversation:{id}:meta`
+4. 删除 `conversation:{id}:messages`
+5. 从 `user:{user_id}:conversations` 中移除该对话ID
+6. 返回删除结果
+
+#### 模式3: 清空所有agent对话数据 (提供 `clear_all_agent_data: true`)
+**⚠️ 危险操作**: 此模式将完全清空所有agent对话历史数据,不可恢复!
+
+**参数冲突处理**: 如果同时提供了其他操作模式参数(如 `user_id`、`conversation_id`、`thread_id`、`cleanup_invalid_refs`),API将立即返回参数冲突错误,**不会执行任何删除操作**。
+
+**执行步骤**:
+1. 扫描并删除所有 `conversation:*:meta` Key
+2. 扫描并删除所有 `conversation:*:messages` Key  
+3. 扫描并删除所有 `user:*:conversations` Key
+4. 返回删除统计
+
+#### 模式4: 清理无效引用 (提供 `cleanup_invalid_refs: true`)
+1. 扫描所有 `user:*:conversations` 列表
+2. 对每个用户对话列表中的conversation_id,检查对应的 `conversation:{id}:meta` 是否存在
+3. 如果元数据不存在,从用户对话列表中移除该conversation_id
+4. 重建清理后的用户对话列表
+5. 返回清理统计
+
+#### 无参数请求处理
+如果请求中没有提供任何参数,API将返回错误,要求明确指定操作模式。
+
+### 返回结果 (JSON)
+
+#### 参数冲突错误
+```json
+{
+  "success": false,
+  "message": "参数冲突:不能同时提供多个操作模式参数",
+  "error": "检测到冲突参数: user_id 和 clear_all_agent_data",
+  "valid_modes": [
+    "user_id (删除指定用户)",
+    "conversation_id 或 thread_id (删除指定对话,两参数等同)",
+    "clear_all_agent_data (清空所有数据)",
+    "cleanup_invalid_refs (清理无效引用)"
+  ]
+}
+```
+
+#### 删除用户模式
+```json
+{
+  "success": true,
+  "message": "用户对话数据删除完成",
+  "data": {
+    "operation_mode": "delete_user",
+    "user_id": "guest",
+    "deleted_conversations": 5,
+    "deleted_messages": 47,
+    "execution_time_ms": 150
+  }
+}
+```
+
+#### 删除对话模式
+```json
+{
+  "success": true,
+  "message": "对话删除完成",
+  "data": {
+    "operation_mode": "delete_conversation",
+    "conversation_id": "guest:20250125143022155",
+    "user_id": "guest",
+    "deleted_messages": 8,
+    "execution_time_ms": 45
+  }
+}
+```
+
+#### 清空所有数据模式
+```json
+{
+  "success": true,
+  "message": "所有agent对话数据清空完成",
+  "data": {
+    "operation_mode": "clear_all_agent_data",
+    "deleted_conversation_metas": 150,
+    "deleted_conversation_messages": 150,
+    "deleted_user_conversations": 25,
+    "total_keys_deleted": 325,
+    "execution_time_ms": 2500
+  }
+}
+```
+
+#### 清理无效引用模式
+```json
+{
+  "success": true,
+  "message": "无效引用清理完成",
+  "data": {
+    "operation_mode": "cleanup_invalid_refs",
+    "processed_users": 12,
+    "cleaned_references": 3,
+    "execution_time_ms": 200
+  }
+}
+```
+
+## 实现要点
+
+### 1. 数据安全
+- 所有删除操作需要在事务中执行或确保原子性
+- 删除前验证数据的存在性和所有权
+- 提供详细的操作日志
+- **特别注意**: `clear_all_agent_data` 是危险操作,建议增加二次确认机制
+- 参数互斥检查:确保不同操作模式的参数不能同时提供
+
+### 2. 性能考虑
+- 大批量操作时使用管道 (Pipeline) 提高效率
+- 限制单次操作的最大数据量,避免长时间阻塞
+- 对于全局模式,考虑分页处理
+
+### 3. 错误处理
+- **参数互斥检查**:
+  - 不能同时提供多个操作模式的参数
+  - 具体互斥规则:
+    - 共有四组参数组合:`user_id`/ `conversation_id`(`thread_id` ) / `clear_all_agent_data`/`cleanup_invalid_refs`
+    - 这四组参数不能同时传递。
+    - `conversation_id` 和 `thread_id` 完全等同
+  - 无参数时必须返回错误,要求指定操作模式
+- Redis连接异常处理
+- 部分操作失败时的回滚策略
+- 对于 `clear_all_agent_data` 操作,增加额外的安全检查
+
+### 4. 日志记录
+- 记录所有重要操作的详细日志
+- 包含用户ID、对话ID、操作类型等关键信息
+- 便于问题排查和审计
+
+## 使用示例
+
+### 限额执行API
+
+1. **全局限额执行**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"user_max_conversations": 3, "conversation_max_length": 8}' \
+     http://127.0.0.1:5000/api/v0/conversation_limit_enforcement
+```
+
+2. **指定用户限额执行**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"user_id": "guest", "user_max_conversations": 5}' \
+     http://127.0.0.1:5000/api/v0/conversation_limit_enforcement
+```
+
+3. **试运行模式**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"dry_run": true}' \
+     http://127.0.0.1:5000/api/v0/conversation_limit_enforcement
+```
+
+### 对话清理API
+
+1. **删除指定用户**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"user_id": "guest"}' \
+     http://127.0.0.1:5000/api/v0/conversation_cleanup
+```
+
+2. **删除指定对话**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"conversation_id": "guest:20250125143022155"}' \
+     http://127.0.0.1:5000/api/v0/conversation_cleanup
+```
+
+3. **清空所有agent对话数据** (⚠️ 危险操作):
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"clear_all_agent_data": true}' \
+     http://127.0.0.1:5000/api/v0/conversation_cleanup
+```
+
+4. **清理无效引用**:
+```bash
+curl -X POST -H "Content-Type: application/json" \
+     -d '{"cleanup_invalid_refs": true}' \
+     http://127.0.0.1:5000/api/v0/conversation_cleanup
+```
+
+5. **无参数调用** (将返回错误):
+```bash
+curl -X POST http://127.0.0.1:5000/api/v0/conversation_cleanup
+
+# 返回结果:
+{
+  "success": false,
+  "message": "参数错误:必须指定操作模式",
+  "error": "请提供以下参数组合之一: user_id | conversation_id/thread_id | clear_all_agent_data | cleanup_invalid_refs"
+}
+```
+
+## 总结
+
+这两个API的设计目标是:
+
+1. **限额执行API**: 提供"补救性"的数据量控制,处理自动限制策略可能遗漏的历史数据
+2. **增强清理API**: 提供多层次的数据删除能力:
+   - **精确删除**: 删除指定用户或指定对话
+   - **完全清空**: 清空所有agent对话历史数据 (危险操作)
+   - **维护清理**: 清理无效引用,保持数据一致性
+   - **明确操作**: 不支持无参数调用,必须明确指定操作模式,避免误操作
+
+两个API相互补充,形成完整的agent对话数据管理体系,满足从日常维护到紧急清理的各种需求。

+ 430 - 0
docs/agent_redis_history_management.md

@@ -0,0 +1,430 @@
+# Agent模块对话历史记录管理说明 (Redis)
+
+本文档旨在详细说明 `agent` 模块(及其依赖的通用模块)如何管理存储在 Redis 中的对话历史记录,包括数据结构、自动保留策略、过期机制以及手动清理功能。
+
+## 核心架构
+
+`agent` 模块本身不直接执行 Redis 操作。实际的 Redis 交互由一个通用的核心模块 `common/redis_conversation_manager.py` 负责。API 层(如 `unified_api.py`)在接收到请求后,会调用这个通用模块来创建、更新或获取对话数据。
+
+## Redis 中的数据结构
+
+系统使用以下三种类型的 Redis Key 来存储和管理对话历史:
+
+1.  **对话元数据 (Hash)**: `conversation:{conversation_id}:meta`
+    - **用途**: 存储对话的基本信息,如创建时间、最后更新时间、所属用户ID和消息总数。
+    - **类型**: `Hash`
+
+2.  **对话消息列表 (List)**: `conversation:{conversation_id}:messages`
+    - **用途**: 存储一个对话中所有的消息内容。每条消息是一个 JSON 字符串,包含 `role`, `content`, `timestamp` 等字段。
+    - **类型**: `List`
+
+3.  **用户对话索引 (List)**: `user:{user_id}:conversations`
+    - **用途**: 存储一个用户所有对话的 `conversation_id`。此列表按时间倒序排列,最新的对话ID在列表的最前面。
+    - **类型**: `List`
+
+## 数据保留与上限策略(自动)
+
+系统采用一种“主动限制”(Proactive Capping)的机制,在数据写入时就自动维护数据的数量上限,无需额外的清理脚本。
+
+### 用户对话数上限
+
+-   **机制**: 当一个新对话被创建时,系统会将新的 `conversation_id` 通过 `LPUSH` 命令添加到该用户对话列表的头部。紧接着,系统会立即使用 `LTRIM` 命令修剪这个列表,只保留最新的 N 个对话。
+-   **配置**: 保留的对话数量由 `app_config.py` 中的 `USER_MAX_CONVERSATIONS` 参数控制。
+    ```python
+    # app_config.py
+    USER_MAX_CONVERSATIONS = 5  # 每个用户最多保留的对话数
+    ```
+
+### 单对话消息数上限
+
+-   **机制**: 当一条新消息被保存时,它会被 `LPUSH` 到对应对话消息列表的头部。同样,系统会立即使用 `LTRIM` 命令修剪该列表,只保留最新的 N 条消息。
+-   **配置**: 保留的消息数量由 `app_config.py` 中的 `CONVERSATION_MAX_LENGTH` 参数控制。
+    ```python
+    # app_config.py
+    CONVERSATION_MAX_LENGTH = 10  # 单个对话最大消息数
+    ```
+
+## 数据过期策略 (TTL)
+
+系统支持为对话数据设置自动过期时间。
+
+-   **机制**: 当一个新对话被创建或有新消息加入时,系统会为该对话的 `:meta` 和 `:messages` 这两个 Key 设置或续期 TTL(Time-To-Live)。
+-   **配置**: TTL 的值由 `app_config.py` 中的 `CONVERSATION_TTL` 参数(单位为秒)控制。
+    ```python
+    # app_config.py
+    CONVERSATION_TTL = 7 * 24 * 3600  # 默认为7天
+    ```
+-   **如何禁用 TTL**: 将 `CONVERSATION_TTL` 的值设置为 `None` 即可禁用 TTL。`redis-py` 客户端在接收到 `None` 作为过期时间时,会忽略该命令,从而使 Key 永久存储。
+    ```python
+    # app_config.py - 禁用TTL的示例
+    CONVERSATION_TTL = None
+    ```
+
+## 对话历史管理 API
+
+系统提供了一套完整的API来管理和操作存储在Redis中的agent对话历史记录。
+
+### 1. 对话查询API
+
+**`GET /api/v0/user/<user_id>/conversations`**
+- **功能**: 获取指定用户的对话列表
+- **操作的Redis Key**: `user:{user_id}:conversations` 和 `conversation:{conversation_id}:meta`
+- **参数**: 
+  - `user_id` (路径参数): 用户ID
+  - `limit` (查询参数, 可选): 返回对话数量限制,默认为 `USER_MAX_CONVERSATIONS`
+- **返回**: 用户的对话列表,包含对话ID、创建时间、更新时间、对话标题等信息
+
+**`GET /api/v0/conversation/<conversation_id>/messages`**
+- **功能**: 获取特定对话的消息历史和元数据
+- **操作的Redis Key**: `conversation:{conversation_id}:messages` 和 `conversation:{conversation_id}:meta`
+- **参数**: 
+  - `conversation_id` (路径参数): 对话ID
+  - `limit` (查询参数, 可选): 返回消息数量限制
+- **返回**: 对话的消息列表、对话元数据、消息总数等信息
+
+### 2. 统计信息API
+
+**`GET /api/v0/conversation_stats`**
+- **功能**: 获取对话系统的统计信息
+- **操作的Redis Key**: 统计 `user:*:conversations`、`conversation:*:meta`、`qa_cache:*` 等Key
+- **参数**: 无
+- **返回**: Redis中对话数据的统计信息,包括:
+  - 总用户数
+  - 总对话数
+  - 问答缓存数量
+  - Redis内存使用情况
+
+### 3. 对话限额执行API
+
+**`POST /api/v0/conversation_limit_enforcement`**
+- **功能**: 执行对话限额策略,对用户对话数量和消息数量进行"补救性"控制
+- **操作的Redis Key**: `user:*:conversations`, `conversation:*:meta`, `conversation:*:messages`
+- **使用场景**: 
+  - 系统配置变更后需要追溯应用新的限额策略
+  - 定期清理以释放Redis内存空间
+  - 数据迁移或维护时的批量整理
+- **参数**: 
+  - `user_id` (可选): 指定用户ID,如果不提供则处理所有用户
+  - `user_max_conversations` (可选): 每用户最大对话数,默认使用 `USER_MAX_CONVERSATIONS` 配置
+  - `conversation_max_length` (可选): 每对话最大消息数,默认使用 `CONVERSATION_MAX_LENGTH` 配置
+  - `dry_run` (可选): 试运行模式,默认为 `false`。设为 `true` 时只模拟执行,不实际删除数据
+- **返回**: 详细的执行统计信息,包括:
+  - `mode`: 执行模式("global"或"user_specific")
+  - `dry_run`: 是否为试运行模式
+  - `processed_users`: 处理的用户数
+  - `total_conversations_processed`: 处理的对话总数
+  - `total_conversations_deleted`: 删除的对话总数
+  - `total_messages_trimmed`: 裁剪的消息总数
+  - `execution_summary`: 每个用户的详细处理结果
+  - `execution_time_ms`: 执行时间(毫秒)
+
+### 4. 手动数据清理API
+
+**`POST /api/v0/conversation_cleanup`**
+- **功能**: 多模式对话清理API,支持精确的数据清理操作
+- **操作的Redis Key**: `user:*:conversations`, `conversation:*:meta`, `conversation:*:messages`
+- **使用场景**:
+  - 清理数据不一致问题(无效引用)
+  - 删除特定用户或对话的数据
+  - 系统维护和数据重置
+- **参数** (四种互斥的操作模式,必须且只能选择一种):
+  
+  **模式1: 删除指定用户**
+  ```json
+  {"user_id": "guest"}
+  ```
+  - 删除指定用户的所有对话数据,包括元数据、消息和用户对话索引
+  
+  **模式2: 删除指定对话**
+  ```json
+  {"conversation_id": "guest:20250125143022155"}
+  // 或
+  {"thread_id": "guest:20250125143022155"}
+  ```
+  - 删除指定的单个对话(conversation_id和thread_id等效)
+  - 同时从所属用户的对话列表中移除引用
+  
+  **模式3: 清空所有agent对话数据** ⚠️ **危险操作**
+  ```json
+  {"clear_all_agent_data": true}
+  ```
+  - 完全清空所有agent模块的对话数据
+  - 删除所有 `conversation:*:meta`、`conversation:*:messages`、`user:*:conversations` Key
+  - **注意**: 此操作不可恢复,仅限于系统重置或测试环境使用
+  
+  **模式4: 清理无效引用**
+  ```json
+  {"cleanup_invalid_refs": true}
+  ```
+  - 清理用户对话列表中指向已不存在对话的无效引用
+  - 维护数据一致性,不删除实际对话数据
+
+- **返回**: 根据操作模式返回相应的统计信息,包括删除的对话数、消息数、处理时间等
+
+### 使用示例
+
+1.  **获取用户对话列表**:
+    ```bash
+    curl -X GET http://127.0.0.1:8084/api/v0/user/guest/conversations
+    ```
+
+2.  **获取特定对话的消息**:
+    ```bash
+    curl -X GET http://127.0.0.1:8084/api/v0/conversation/guest:20250125143022155/messages
+    ```
+
+3.  **获取系统统计信息**:
+    ```bash
+    curl -X GET http://127.0.0.1:8084/api/v0/conversation_stats
+    ```
+
+4.  **对话限额执行示例**:
+
+    **试运行模式 - 查看会删除什么**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"dry_run": true}' \
+         http://127.0.0.1:8084/api/v0/conversation_limit_enforcement
+    ```
+
+    **为特定用户执行限额策略**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"user_id": "guest", "user_max_conversations": 3}' \
+         http://127.0.0.1:8084/api/v0/conversation_limit_enforcement
+    ```
+
+    **全局执行自定义限额策略**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"user_max_conversations": 5, "conversation_max_length": 20}' \
+         http://127.0.0.1:8084/api/v0/conversation_limit_enforcement
+    ```
+
+5.  **对话清理示例**:
+
+    **清理无效引用**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"cleanup_invalid_refs": true}' \
+         http://127.0.0.1:8084/api/v0/conversation_cleanup
+    ```
+
+    **删除特定用户的所有对话**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"user_id": "test_user"}' \
+         http://127.0.0.1:8084/api/v0/conversation_cleanup
+    ```
+
+    **删除特定对话**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"conversation_id": "guest:20250125143022155"}' \
+         http://127.0.0.1:8084/api/v0/conversation_cleanup
+    ```
+
+    **清空所有agent对话数据** ⚠️ **危险操作**:
+    ```bash
+    curl -X POST -H "Content-Type: application/json" \
+         -d '{"clear_all_agent_data": true}' \
+         http://127.0.0.1:8084/api/v0/conversation_cleanup
+    ```
+
+### API响应示例
+
+#### 对话限额执行API响应示例
+
+**试运行模式响应**:
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "操作成功",
+  "data": {
+    "mode": "global",
+    "dry_run": true,
+    "parameters": {
+      "user_max_conversations": 5,
+      "conversation_max_length": 10
+    },
+    "processed_users": 2,
+    "total_conversations_processed": 6,
+    "total_conversations_deleted": 2,
+    "total_messages_trimmed": 0,
+    "execution_summary": [
+      {
+        "user_id": "test_user",
+        "original_conversations": 1,
+        "kept_conversations": 1,
+        "deleted_conversations": 0,
+        "messages_trimmed": 0
+      },
+      {
+        "user_id": "wang11",
+        "original_conversations": 5,
+        "kept_conversations": 3,
+        "deleted_conversations": 2,
+        "messages_trimmed": 0
+      }
+    ],
+    "execution_time_ms": 15,
+    "timestamp": "2025-08-07T22:54:27.941807",
+    "response": "限额执行完成"
+  }
+}
+```
+
+#### 对话清理API响应示例
+
+**清理无效引用响应**:
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "操作成功",
+  "data": {
+    "operation_mode": "cleanup_invalid_refs",
+    "processed_users": 2,
+    "cleaned_references": 0,
+    "response": "无效引用清理完成",
+    "timestamp": "2025-08-07T22:54:27.975516"
+  }
+}
+```
+
+**删除指定对话响应**:
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "操作成功",
+  "data": {
+    "operation_mode": "delete_conversation",
+    "conversation_id": "guest:20250125143022155",
+    "user_id": "guest",
+    "deleted_messages": 8,
+    "execution_time_ms": 2,
+    "existed": true,
+    "response": "对话删除完成",
+    "timestamp": "2025-08-07T22:54:27.990033"
+  }
+}
+```
+
+**参数错误响应示例**:
+```json
+{
+  "code": 400,
+  "success": false,
+  "message": "请求参数错误",
+  "data": {
+    "error_type": "missing_required_params",
+    "response": "参数错误:必须指定操作模式",
+    "error": "请提供以下参数组合之一: user_id | conversation_id/thread_id | clear_all_agent_data | cleanup_invalid_refs",
+    "timestamp": "2025-08-07T22:54:28.002030"
+  }
+}
+```
+
+## 自动与手动策略的关系
+
+系统的对话管理由三层策略共同保障:自动化的实时保留策略、补救性的限额执行API和精确的数据清理API。这三者是互补关系,各司其职。
+
+### 1. 自动保留策略 (`LTRIM` 操作)
+
+-   **角色**: "日常门卫",负责高频的数量控制。
+-   **机制**: 在数据写入的瞬间,通过 `LTRIM` 命令实时确保每个用户的对话数和每个对话的消息数不超过预设上限 (`USER_MAX_CONVERSATIONS` 和 `CONVERSATION_MAX_LENGTH`)。
+-   **特点**: 高效、实时,保证了 Redis 数据的基本健康和数量可控。但它只关心"新旧顺序",不关心对话的实际"质量"(例如,一个仅有一两句话的废弃对话和一个有价值的长对话,它只看谁更新)。
+-   **局限性**: 只在新数据写入时生效,对于已存在的历史数据无能为力。
+
+### 2. 补救性限额执行 (`/api/v0/conversation_limit_enforcement`)
+
+-   **角色**: "历史数据整理师",负责对存量数据进行批量限额控制。
+-   **机制**: 
+    -   扫描所有用户的对话数据,按照指定的限额策略进行裁剪
+    -   支持全局处理或指定用户处理
+    -   可以自定义限额参数,不局限于配置文件中的值
+    -   支持试运行模式,可以预览操作结果而不实际执行
+-   **特点**: 
+    -   **批量处理**: 一次性处理大量历史数据
+    -   **灵活配置**: 可临时调整限额参数
+    -   **安全预览**: 试运行模式确保操作安全
+    -   **详细统计**: 提供完整的执行报告
+-   **使用场景**:
+    -   系统配置更改后,需要追溯应用新的限额策略
+    -   定期维护,释放Redis内存空间
+    -   数据迁移前的批量整理
+
+### 3. 精确数据清理 (`/api/v0/conversation_cleanup`)
+
+-   **角色**: "精确手术刀",负责特定目标的数据清理和维护。
+-   **机制**: 提供四种精确的操作模式:
+    -   **清理无效引用**: 维护数据一致性,清理TTL过期后的孤立引用
+    -   **删除指定用户**: 完全移除某个用户的所有对话数据
+    -   **删除指定对话**: 精确删除单个对话及相关引用
+    -   **清空所有数据**: 系统重置或测试环境的完全清理
+-   **特点**: 
+    -   **操作精确**: 每种模式都有明确的作用范围
+    -   **参数互斥**: 严格的参数验证,避免误操作
+    -   **安全保护**: 危险操作有明确标识和警告
+    -   **数据完整性**: 确保删除操作的原子性和一致性
+-   **使用场景**:
+    -   日常维护中的数据一致性保障
+    -   用户数据删除请求的处理
+    -   系统故障后的数据修复
+    -   开发测试环境的数据重置
+
+### 三层策略的协同工作
+
+1. **预防为主**: 自动保留策略在源头控制数据增长
+2. **定期维护**: 限额执行API处理历史数据积累问题  
+3. **精确清理**: 数据清理API处理特定的维护需求和异常情况
+
+这种分层设计确保了系统在各种场景下都能有效管理Redis中的对话数据,既保证了性能,又提供了灵活性和安全性。
+
+## 安全注意事项与最佳实践
+
+### ⚠️ 危险操作警告
+
+1. **`clear_all_agent_data` 操作**:
+   - 这是一个**不可逆**的操作,会删除所有agent对话数据
+   - 仅限于系统重置、测试环境清理或紧急维护使用
+   - 生产环境使用前务必进行数据备份
+
+2. **限额执行操作**:
+   - 非试运行模式下会实际删除数据
+   - 建议先用 `dry_run: true` 预览操作结果
+   - 重要数据删除前请确保已备份
+
+### 🔒 最佳实践建议
+
+1. **API使用顺序**:
+   ```
+   查看统计 → 试运行预览 → 实际执行 → 验证结果
+   ```
+
+2. **定期维护建议**:
+   - 每周运行一次 `cleanup_invalid_refs` 清理无效引用
+   - 根据Redis内存使用情况,定期执行限额策略
+   - 监控 `/api/v0/conversation_stats` 统计信息
+
+3. **参数验证**:
+   - 所有API都有严格的参数验证
+   - 参数冲突会返回400错误和详细说明
+   - 仔细阅读错误信息以正确使用API
+
+4. **权限控制**:
+   - 危险操作建议添加额外的权限验证
+   - 生产环境建议限制这些API的访问权限
+   - 记录所有管理操作的审计日志
+
+### 📊 监控建议
+
+定期检查以下指标:
+- Redis内存使用量变化
+- 对话数据增长趋势  
+- 无效引用的数量
+- API执行时间和频率
+
+通过合理使用这些API,可以有效地管理和维护Redis中的对话历史数据,确保系统的稳定性和性能。

+ 156 - 4
unified_api.py

@@ -31,7 +31,7 @@ import redis.asyncio as redis
 from werkzeug.utils import secure_filename
 
 # 导入标准化响应格式
-from common.result import success_response, internal_error_response, bad_request_response
+from common.result import success_response, internal_error_response, bad_request_response, error_response
 
 # 基础依赖
 import pandas as pd
@@ -2388,17 +2388,169 @@ def conversation_stats():
             response_text="获取统计信息失败,请稍后重试"
         )), 500
 
+@app.route('/api/v0/conversation_limit_enforcement', methods=['POST'])
+def conversation_limit_enforcement():
+    """对话限额执行API"""
+    try:
+        data = request.get_json() or {}
+        
+        # 获取参数
+        user_id = data.get('user_id')
+        user_max_conversations = data.get('user_max_conversations')
+        conversation_max_length = data.get('conversation_max_length')
+        dry_run = data.get('dry_run', False)
+        
+        # 参数验证
+        if user_max_conversations is not None and (not isinstance(user_max_conversations, int) or user_max_conversations < 1):
+            return jsonify(bad_request_response(
+                response_text="user_max_conversations 必须是大于0的整数"
+            )), 400
+            
+        if conversation_max_length is not None and (not isinstance(conversation_max_length, int) or conversation_max_length < 1):
+            return jsonify(bad_request_response(
+                response_text="conversation_max_length 必须是大于0的整数"
+            )), 400
+        
+        if dry_run is not None and not isinstance(dry_run, bool):
+            return jsonify(bad_request_response(
+                response_text="dry_run 必须是布尔值"
+            )), 400
+        
+        # 执行对话限额策略
+        result = redis_conversation_manager.enforce_conversation_limits(
+            user_id=user_id,
+            user_max_conversations=user_max_conversations,
+            conversation_max_length=conversation_max_length,
+            dry_run=dry_run
+        )
+        
+        return jsonify(success_response(
+            response_text="限额执行完成",
+            data=result
+        ))
+        
+    except Exception as e:
+        logger.error(f"对话限额执行失败: {str(e)}")
+        return jsonify(internal_error_response(
+            response_text="对话限额执行失败,请稍后重试"
+        )), 500
+
 @app.route('/api/v0/conversation_cleanup', methods=['POST'])
 def conversation_cleanup():
-    """手动清理过期对话"""
+    """增强的对话清理API"""
     try:
-        redis_conversation_manager.cleanup_expired_conversations()
+        # 处理请求数据,包括非JSON请求的情况
+        try:
+            data = request.get_json() or {}
+        except Exception as e:
+            # 非JSON请求
+            return jsonify(error_response(
+                response_text="请求格式错误:需要JSON格式的数据",
+                error_type="missing_required_params",
+                message="请求参数错误",
+                code=400
+            )), 400
+        
+        # 获取参数
+        user_id = data.get('user_id')
+        conversation_id = data.get('conversation_id')
+        thread_id = data.get('thread_id')
+        clear_all_agent_data = data.get('clear_all_agent_data', False)
+        cleanup_invalid_refs = data.get('cleanup_invalid_refs', False)
+        
+        # 参数互斥检查
+        provided_params = []
+        if user_id:
+            provided_params.append('user_id')
+        if conversation_id:
+            provided_params.append('conversation_id')
+        if thread_id:
+            provided_params.append('thread_id')
+        if clear_all_agent_data:
+            provided_params.append('clear_all_agent_data')
+        if cleanup_invalid_refs:
+            provided_params.append('cleanup_invalid_refs')
+        
+        # conversation_id 和 thread_id 是等效的
+        if conversation_id and thread_id:
+            return jsonify(error_response(
+                response_text="参数冲突:conversation_id 和 thread_id 不能同时提供",
+                error_type="missing_required_params",
+                message="请求参数错误",
+                code=400
+            )), 400
+        
+        # 检查参数冲突
+        if len(provided_params) > 1:
+            # 简化逻辑:只要不是conversation_id和thread_id的组合,就报错
+            if not (len(provided_params) == 2 and 
+                   'conversation_id' in provided_params and 'thread_id' in provided_params):
+                return jsonify(error_response(
+                    response_text=f"参数冲突:不能同时提供多个操作模式参数,检测到: {', '.join(provided_params)}",
+                    error_type="missing_required_params",
+                    message="请求参数错误",
+                    code=400,
+                    data={
+                        "conflicting_params": provided_params,
+                        "valid_modes": [
+                            "user_id (删除指定用户)",
+                            "conversation_id 或 thread_id (删除指定对话,两参数等同)",
+                            "clear_all_agent_data (清空所有数据)",
+                            "cleanup_invalid_refs (清理无效引用)"
+                        ]
+                    }
+                )), 400
+        
+        # 检查是否没有提供任何参数
+        if len(provided_params) == 0:
+            return jsonify(error_response(
+                response_text="参数错误:必须指定操作模式",
+                error_type="missing_required_params",
+                message="请求参数错误",
+                code=400,
+                data={
+                    "error": "请提供以下参数组合之一: user_id | conversation_id/thread_id | clear_all_agent_data | cleanup_invalid_refs"
+                }
+            )), 400
+        
+        # 执行相应的操作
+        start_time = time.time()
+        
+        if user_id:
+            # 模式1: 删除指定用户
+            result = redis_conversation_manager.delete_user_conversations(user_id)
+            operation_mode = "delete_user"
+            message = "用户对话数据删除完成"
+            
+        elif conversation_id or thread_id:
+            # 模式2: 删除指定对话
+            target_id = conversation_id or thread_id
+            result = redis_conversation_manager.delete_conversation(target_id)
+            operation_mode = "delete_conversation"
+            message = "对话删除完成"
+            
+        elif clear_all_agent_data:
+            # 模式3: 清空所有agent对话数据
+            result = redis_conversation_manager.clear_all_agent_data()
+            operation_mode = "clear_all_agent_data"
+            message = "所有agent对话数据清空完成"
+            
+        elif cleanup_invalid_refs:
+            # 模式4: 清理无效引用
+            result = redis_conversation_manager.cleanup_expired_conversations()
+            operation_mode = "cleanup_invalid_refs"
+            message = "无效引用清理完成"
+        
+        # 添加操作模式到结果中
+        result["operation_mode"] = operation_mode
         
         return jsonify(success_response(
-            response_text="对话清理完成"
+            response_text=message,
+            data=result
         ))
         
     except Exception as e:
+        logger.error(f"对话清理失败: {str(e)}")
         return jsonify(internal_error_response(
             response_text="对话清理失败,请稍后重试"
         )), 500