# Redis对话管理系统详细设计与开发文档(修正版) > **重要更新(2024年)**:修复了智能缓存的上下文感知问题。原设计中缓存键仅基于conversation_id,无法正确处理同一对话中上下文变化的情况。新设计将缓存键改为基于实际上下文内容的哈希,确保相同问题在不同上下文下能够返回正确的缓存结果。 ## 1. 项目概述与实施目标 ### 1.1 项目背景 基于现有的vanna+LangChain+LangGraph项目,为ask_agent() API增加Redis对话管理功能,实现上下文连续对话、对话历史记录和智能缓存功能。**不修改现有ask() API和SessionAwareMemoryCache**。 ### 1.2 核心功能 - **智能用户识别**:支持登录用户、请求传参用户、匿名guest用户 - **对话上下文管理**:支持多轮连续对话,可配置上下文长度 - **智能缓存系统**:问答结果缓存,避免重复计算 - **RESTful查询API**:支持查询用户对话列表和对话详情 - **容错降级设计**:Redis不可用时自动降级 ### 1.3 技术架构 - **Redis存储层**:对话数据持久化和TTL自动清理 - **ask_agent() API增强**:集成对话上下文和缓存功能 - **管理API**:提供对话查询和统计功能 - **配置化管理**:所有参数可通过app_config.py配置 ## 2. 需要修改的文件详细清单 ### 2.1 配置文件修改 #### 📝 修改文件:`app_config.py` **位置**:文件末尾添加新配置段 **修改内容**: ```python # ==================== Redis对话管理配置 ==================== # 对话上下文配置 CONVERSATION_CONTEXT_COUNT = 5 # 传递给LLM的上下文消息条数 CONVERSATION_MAX_LENGTH = 20 # 单个对话最大消息数 USER_MAX_CONVERSATIONS = 5 # 用户最大对话数 # 用户管理配置 DEFAULT_ANONYMOUS_USER_PREFIX = "guest" # 匿名用户前缀 GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天 MAX_GUEST_CONVERSATIONS = 3 # guest用户最多3个对话 MAX_REGISTERED_CONVERSATIONS = 10 # 注册用户最多10个对话 # Redis配置 REDIS_HOST = "localhost" REDIS_PORT = 6379 REDIS_DB = 0 REDIS_PASSWORD = None # 缓存开关配置 ENABLE_CONVERSATION_CONTEXT = True # 是否启用对话上下文 ENABLE_QUESTION_ANSWER_CACHE = True # 是否启用问答结果缓存 # TTL配置(单位:秒)- 修正TTL逻辑 CONVERSATION_TTL = 7 * 24 * 3600 # 对话保存7天 USER_CONVERSATIONS_TTL = 7 * 24 * 3600 # 用户对话列表保存7天(与对话TTL一致) QUESTION_ANSWER_TTL = 24 * 3600 # 问答结果缓存24小时 GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天 ``` #### 📝 修改文件:`requirements.txt` **修改内容**:添加Redis依赖 ```txt # 在现有依赖基础上添加 redis==5.0.1 ``` ### 2.2 核心组件开发 #### 🆕 新增文件:`common/redis_conversation_manager.py` **功能**:Redis对话管理器核心类(修正版) **完整代码实现**: ```python import redis import json import hashlib import uuid import time from datetime import datetime from typing import List, Dict, Any, Optional from app_config import ( REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, CONVERSATION_CONTEXT_COUNT, CONVERSATION_MAX_LENGTH, USER_MAX_CONVERSATIONS, CONVERSATION_TTL, USER_CONVERSATIONS_TTL, QUESTION_ANSWER_TTL, ENABLE_CONVERSATION_CONTEXT, ENABLE_QUESTION_ANSWER_CACHE, DEFAULT_ANONYMOUS_USER_PREFIX, MAX_GUEST_CONVERSATIONS, MAX_REGISTERED_CONVERSATIONS, GUEST_USER_TTL ) class RedisConversationManager: """Redis对话管理器 - 修正版""" def __init__(self): """初始化Redis连接""" try: self.redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) # 测试连接 self.redis_client.ping() print(f"[REDIS_CONV] Redis连接成功: {REDIS_HOST}:{REDIS_PORT}") except Exception as e: print(f"[ERROR] Redis连接失败: {str(e)}") self.redis_client = None def is_available(self) -> bool: """检查Redis是否可用""" try: return self.redis_client is not None and self.redis_client.ping() except: return False # ==================== 用户ID解析(修正版)==================== def resolve_user_id(self, user_id_from_request: Optional[str], session_id: Optional[str], request_ip: str, login_user_id: Optional[str] = None) -> str: """ 智能解析用户ID - 修正版 Args: user_id_from_request: 请求参数中的user_id session_id: 浏览器session_id request_ip: 请求IP地址 login_user_id: 从Flask session中获取的登录用户ID(在ask_agent中获取) """ # 1. 优先使用登录用户ID if login_user_id: print(f"[REDIS_CONV] 使用登录用户ID: {login_user_id}") return login_user_id # 2. 如果没有登录,尝试从请求参数获取user_id if user_id_from_request: print(f"[REDIS_CONV] 使用请求参数user_id: {user_id_from_request}") return user_id_from_request # 3. 都没有则为匿名用户(guest) if session_id: guest_suffix = hashlib.md5(session_id.encode()).hexdigest()[:8] guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_{guest_suffix}" print(f"[REDIS_CONV] 生成稳定guest用户: {guest_id}") return guest_id # 4. 最后基于IP的临时guest ID ip_suffix = hashlib.md5(request_ip.encode()).hexdigest()[:8] temp_guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_temp_{ip_suffix}" print(f"[REDIS_CONV] 生成临时guest用户: {temp_guest_id}") return temp_guest_id def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str], continue_conversation: bool) -> tuple[str, dict]: """ 智能解析对话ID - 改进版 Returns: tuple: (conversation_id, status_info) status_info包含: - status: "existing" | "new" | "invalid_id_new" - message: 状态说明 - requested_id: 原始请求的ID(如果有) """ # 1. 如果指定了conversation_id,验证后使用 if conversation_id_input: if self._is_valid_conversation(conversation_id_input, user_id): print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}") return conversation_id_input, { "status": "existing", "message": "继续已有对话" } else: print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话") new_conversation_id = self.create_conversation(user_id) return new_conversation_id, { "status": "invalid_id_new", "message": "您请求的对话不存在或无权访问,已为您创建新对话", "requested_id": conversation_id_input } # 2. 如果要继续最近对话 if continue_conversation: recent_conversation = self._get_recent_conversation(user_id) if recent_conversation: print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}") return recent_conversation, { "status": "existing", "message": "继续最近对话" } # 3. 创建新对话 new_conversation_id = self.create_conversation(user_id) print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}") return new_conversation_id, { "status": "new", "message": "创建新对话" } def _is_valid_conversation(self, conversation_id: str, user_id: str) -> bool: """验证对话是否存在且属于该用户""" if not self.is_available(): return False try: # 检查对话元信息是否存在 meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta") if not meta_data: return False # 检查是否属于该用户 return meta_data.get('user_id') == user_id except Exception: return False def _get_recent_conversation(self, user_id: str) -> Optional[str]: """获取用户最近的对话ID""" if not self.is_available(): return None try: conversations = self.redis_client.lrange( f"user:{user_id}:conversations", 0, 0 ) return conversations[0] if conversations else None except Exception: return None # ==================== 对话管理 ==================== def create_conversation(self, user_id: str) -> str: """创建新对话""" # 生成包含时间戳的conversation_id timestamp = int(datetime.now().timestamp()) conversation_id = f"conv_{timestamp}_{uuid.uuid4().hex[:8]}" if not self.is_available(): return conversation_id # Redis不可用时返回ID,但不存储 try: # 创建对话元信息 meta_data = { "conversation_id": conversation_id, "user_id": user_id, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat(), "message_count": "0" } # 保存对话元信息 self.redis_client.hset( f"conversation:{conversation_id}:meta", mapping=meta_data ) self.redis_client.expire(f"conversation:{conversation_id}:meta", CONVERSATION_TTL) # 添加到用户的对话列表 self._add_conversation_to_user(user_id, conversation_id) print(f"[REDIS_CONV] 创建对话成功: {conversation_id}") return conversation_id except Exception as e: print(f"[ERROR] 创建对话失败: {str(e)}") return conversation_id # 返回ID但可能未存储 def save_message(self, conversation_id: str, role: str, content: str, metadata: Optional[Dict] = None) -> bool: """保存消息到对话历史""" if not self.is_available() or not conversation_id: return False try: message_data = { "message_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "role": role, # user, assistant "content": content, "metadata": metadata or {} } # 保存到消息列表(LPUSH添加到头部,最新消息在前) self.redis_client.lpush( f"conversation:{conversation_id}:messages", json.dumps(message_data) ) # 设置TTL self.redis_client.expire(f"conversation:{conversation_id}:messages", CONVERSATION_TTL) # 限制消息数量 self.redis_client.ltrim( f"conversation:{conversation_id}:messages", 0, CONVERSATION_MAX_LENGTH - 1 ) # 更新元信息 self._update_conversation_meta(conversation_id) return True except Exception as e: print(f"[ERROR] 保存消息失败: {str(e)}") return False def get_context(self, conversation_id: str, count: Optional[int] = None) -> str: """获取对话上下文,格式化为prompt""" if not self.is_available() or not ENABLE_CONVERSATION_CONTEXT: return "" try: if count is None: count = CONVERSATION_CONTEXT_COUNT # 获取最近的消息(count*2 因为包含用户和助手消息) message_count = count * 2 messages = self.redis_client.lrange( f"conversation:{conversation_id}:messages", 0, message_count - 1 ) if not messages: return "" # 解析消息并构建上下文(按时间正序) context_parts = [] for msg_json in reversed(messages): # Redis返回倒序,需要反转 try: msg_data = json.loads(msg_json) role = msg_data.get("role", "") content = msg_data.get("content", "") if role == "user": context_parts.append(f"用户: {content}") elif role == "assistant": context_parts.append(f"助手: {content}") except json.JSONDecodeError: continue context = "\n".join(context_parts) print(f"[REDIS_CONV] 获取上下文成功: {len(context_parts)}条消息") return context except Exception as e: print(f"[ERROR] 获取上下文失败: {str(e)}") return "" def get_conversation_messages(self, conversation_id: str, limit: Optional[int] = None) -> List[Dict]: """获取对话的消息列表""" if not self.is_available(): return [] try: if limit: messages = self.redis_client.lrange( f"conversation:{conversation_id}:messages", 0, limit - 1 ) else: messages = self.redis_client.lrange( f"conversation:{conversation_id}:messages", 0, -1 ) # 解析并按时间正序返回 parsed_messages = [] for msg_json in reversed(messages): # 反转为时间正序 try: parsed_messages.append(json.loads(msg_json)) except json.JSONDecodeError: continue return parsed_messages except Exception as e: print(f"[ERROR] 获取对话消息失败: {str(e)}") return [] def get_conversation_meta(self, conversation_id: str) -> Dict: """获取对话元信息""" if not self.is_available(): return {} try: meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta") return meta_data if meta_data else {} except Exception as e: print(f"[ERROR] 获取对话元信息失败: {str(e)}") return {} def get_conversations(self, user_id: str, limit: int = None) -> List[Dict]: """获取用户的对话列表(按时间倒序)""" if not self.is_available(): return [] if limit is None: limit = USER_MAX_CONVERSATIONS try: # 获取对话ID列表(已经按时间倒序) conversation_ids = self.redis_client.lrange( f"user:{user_id}:conversations", 0, limit - 1 ) conversations = [] for conv_id in conversation_ids: meta_data = self.get_conversation_meta(conv_id) if meta_data: # 只返回仍然存在的对话 conversations.append(meta_data) return conversations except Exception as e: print(f"[ERROR] 获取用户对话列表失败: {str(e)}") return [] # ==================== 智能缓存(修正版)==================== def get_cached_answer(self, question: str, context: str = "") -> Optional[Dict]: """检查问答缓存 - 真正上下文感知版""" if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE: return None try: cache_key = self._get_cache_key(question, context) cached_answer = self.redis_client.get(cache_key) # 使用独立key而不是hash if cached_answer: print(f"[REDIS_CONV] 缓存命中: {cache_key}") return json.loads(cached_answer) return None except Exception as e: print(f"[ERROR] 获取缓存答案失败: {str(e)}") return None def cache_answer(self, question: str, answer: Dict, context: str = ""): """缓存问答结果 - 真正上下文感知版""" if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE: return try: cache_key = self._get_cache_key(question, context) # 添加缓存时间戳和上下文哈希 answer_with_meta = { **answer, "cached_at": datetime.now().isoformat(), "original_question": question, "context_hash": hashlib.md5(context.encode()).hexdigest()[:8] if context else "" } # 使用独立key,每个缓存项单独设置TTL self.redis_client.setex( cache_key, QUESTION_ANSWER_TTL, json.dumps(answer_with_meta) ) print(f"[REDIS_CONV] 缓存答案成功: {cache_key}") except Exception as e: print(f"[ERROR] 缓存答案失败: {str(e)}") def _get_cache_key(self, question: str, context: str = "") -> str: """生成真正包含上下文的缓存键""" if context and ENABLE_CONVERSATION_CONTEXT: # 使用上下文内容而不是conversation_id cache_input = f"context:{context}\nquestion:{question}" else: cache_input = question normalized = cache_input.strip().lower() question_hash = hashlib.md5(normalized.encode('utf-8')).hexdigest()[:16] return f"qa_cache:{question_hash}" # ==================== 私有方法 ==================== def _add_conversation_to_user(self, user_id: str, conversation_id: str): """添加对话到用户列表,按时间自动排序""" try: # 获取用户类型配置 config = self._get_user_type_config(user_id) # LPUSH添加到列表头部(最新的) self.redis_client.lpush(f"user:{user_id}:conversations", conversation_id) # 根据用户类型限制数量 self.redis_client.ltrim( f"user:{user_id}:conversations", 0, config["max_conversations"] - 1 ) # 设置TTL self.redis_client.expire( f"user:{user_id}:conversations", config["ttl"] ) except Exception as e: print(f"[ERROR] 添加对话到用户列表失败: {str(e)}") def _get_user_type_config(self, user_id: str) -> Dict: """根据用户类型返回不同的配置 - 修正版""" if user_id.startswith(DEFAULT_ANONYMOUS_USER_PREFIX): return { "max_conversations": MAX_GUEST_CONVERSATIONS, "ttl": GUEST_USER_TTL # 使用专门的guest TTL } else: return { "max_conversations": MAX_REGISTERED_CONVERSATIONS, "ttl": USER_CONVERSATIONS_TTL } def _update_conversation_meta(self, conversation_id: str): """更新对话元信息""" try: # 获取消息数量 message_count = self.redis_client.llen(f"conversation:{conversation_id}:messages") # 更新元信息 self.redis_client.hset( f"conversation:{conversation_id}:meta", mapping={ "updated_at": datetime.now().isoformat(), "message_count": str(message_count) } ) except Exception as e: print(f"[ERROR] 更新对话元信息失败: {str(e)}") # ==================== 管理方法 ==================== def get_stats(self) -> Dict: """获取统计信息""" if not self.is_available(): return {"available": False} try: stats = { "available": True, "total_users": len(self.redis_client.keys("user:*:conversations")), "total_conversations": len(self.redis_client.keys("conversation:*:meta")), "cached_qa_count": len(self.redis_client.keys("qa_cache:*")), # 修正缓存统计 "redis_info": { "used_memory": self.redis_client.info().get("used_memory_human"), "connected_clients": self.redis_client.info().get("connected_clients") } } return stats except Exception as e: print(f"[ERROR] 获取统计信息失败: {str(e)}") return {"available": False, "error": str(e)} def cleanup_expired_conversations(self): """清理过期对话(Redis TTL自动处理,这里可添加额外逻辑)""" if not self.is_available(): return try: # 清理用户对话列表中的无效对话ID user_keys = self.redis_client.keys("user:*:conversations") cleaned_count = 0 for user_key in user_keys: conversation_ids = self.redis_client.lrange(user_key, 0, -1) valid_ids = [] for conv_id in conversation_ids: # 检查对话是否仍然存在 if self.redis_client.exists(f"conversation:{conv_id}:meta"): valid_ids.append(conv_id) else: cleaned_count += 1 # 如果有无效ID,重建列表 if len(valid_ids) != len(conversation_ids): self.redis_client.delete(user_key) if valid_ids: self.redis_client.lpush(user_key, *reversed(valid_ids)) # 重新设置TTL self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL) print(f"[REDIS_CONV] 清理完成,移除了 {cleaned_count} 个无效对话引用") except Exception as e: print(f"[ERROR] 清理失败: {str(e)}") ``` ### 2.3 主要API修改 #### 📝 修改文件:`citu_app.py` **修改位置1**:文件开头导入部分 ```python # 在现有导入基础上添加(文件顶部,避免函数内导入) from flask import session from common.redis_conversation_manager import RedisConversationManager from common.result import ( bad_request_response, service_unavailable_response, agent_success_response, agent_error_response, internal_error_response, success_response ) # 在全局变量区域添加(app实例化后) redis_conversation_manager = RedisConversationManager() ``` **修改位置2**:ask_agent()函数(修正版) ```python @app.flask_app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """ 支持对话上下文的ask_agent API - 修正版 """ req = request.get_json(force=True) question = req.get("question", None) browser_session_id = req.get("session_id", None) # 新增参数解析 user_id_input = req.get("user_id", None) conversation_id_input = req.get("conversation_id", None) continue_conversation = req.get("continue_conversation", False) if not question: return jsonify(bad_request_response( response_text="缺少必需参数:question", missing_params=["question"] )), 400 try: # 1. 获取登录用户ID(修正:在函数中获取session信息) login_user_id = session.get('user_id') if 'user_id' in session else None # 2. 智能ID解析(修正:传入登录用户ID) user_id = redis_conversation_manager.resolve_user_id( user_id_input, browser_session_id, request.remote_addr, login_user_id ) conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id( user_id, conversation_id_input, continue_conversation ) # 3. 获取上下文(提前到缓存检查之前) context = redis_conversation_manager.get_context(conversation_id) # 4. 检查缓存(修正:传入context以实现真正的上下文感知) cached_answer = redis_conversation_manager.get_cached_answer(question, context) if cached_answer: print(f"[AGENT_API] 使用缓存答案") # 更新对话历史 redis_conversation_manager.save_message(conversation_id, "user", question) redis_conversation_manager.save_message( conversation_id, "assistant", cached_answer.get("data", {}).get("response", ""), metadata={"from_cache": True} ) # 添加对话信息到缓存结果 cached_answer["data"]["conversation_id"] = conversation_id cached_answer["data"]["user_id"] = user_id cached_answer["data"]["from_cache"] = True cached_answer["data"].update(conversation_status) return jsonify(cached_answer) # 5. 保存用户消息 redis_conversation_manager.save_message(conversation_id, "user", question) # 6. 构建带上下文的问题 if context: enhanced_question = f"对话历史:\n{context}\n\n当前问题: {question}" print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符") else: enhanced_question = question print(f"[AGENT_API] 新对话,无上下文") # 7. 现有Agent处理逻辑(保持不变) try: agent = get_citu_langraph_agent() except Exception as e: print(f"[CRITICAL] Agent初始化失败: {str(e)}") return jsonify(service_unavailable_response( response_text="AI服务暂时不可用,请稍后重试", can_retry=True )), 503 agent_result = agent.process_question( question=enhanced_question, # 使用增强后的问题 session_id=browser_session_id ) # 8. 处理Agent结果 if agent_result.get("success", False): assistant_response = agent_result.get("data", {}).get("response", "") # 保存助手回复 redis_conversation_manager.save_message( conversation_id, "assistant", assistant_response, metadata={ "type": agent_result.get("data", {}).get("type"), "sql": agent_result.get("data", {}).get("sql"), "execution_path": agent_result.get("data", {}).get("execution_path") } ) # 缓存成功的答案(修正:传入context实现真正的上下文感知) cache_data = { **agent_result, "conversation_id": conversation_id, "user_id": user_id, "from_cache": False } redis_conversation_manager.cache_answer(question, cache_data, context) # 构建返回数据(修正:使用现有的agent_success_response) result_data = agent_result.get("data", {}) result_data.update({ "conversation_id": conversation_id, "user_id": user_id, "is_guest_user": user_id.startswith("guest"), "context_used": bool(context), "from_cache": False, "conversation_status": conversation_status["status"], "conversation_message": conversation_status["message"], "requested_conversation_id": conversation_status.get("requested_id") }) return jsonify(agent_success_response( response_type=result_data.get("type", "UNKNOWN"), response_text=result_data.get("response", ""), data=result_data )) else: # 错误处理(修正:确保使用现有的错误响应格式) error_data = { "response": agent_result.get("error", "Agent处理失败"), "conversation_id": conversation_id, "user_id": user_id, "error_type": "agent_processing_failed" } return jsonify({ "success": False, "code": agent_result.get("error_code", 500), "message": "处理失败", "data": error_data }), agent_result.get("error_code", 500) except Exception as e: print(f"[ERROR] ask_agent执行失败: {str(e)}") return jsonify(internal_error_response( response_text="查询处理失败,请稍后重试" )), 500 ``` **修改位置3**:新增管理API ```python # 在文件末尾添加新的管理API @app.flask_app.route('/api/v0/user//conversations', methods=['GET']) def get_user_conversations(user_id: str): """获取用户的对话列表(按时间倒序)""" try: limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int) conversations = redis_conversation_manager.get_conversations(user_id, limit) return jsonify(success_response( response_text="获取用户对话列表成功", data={ "user_id": user_id, "conversations": conversations, "total_count": len(conversations) } )) except Exception as e: return jsonify(internal_error_response( response_text="获取对话列表失败,请稍后重试" )), 500 @app.flask_app.route('/api/v0/conversation//messages', methods=['GET']) def get_conversation_messages(conversation_id: str): """获取特定对话的消息历史""" try: limit = request.args.get('limit', type=int) # 可选参数 messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit) meta = redis_conversation_manager.get_conversation_meta(conversation_id) return jsonify(success_response( response_text="获取对话消息成功", data={ "conversation_id": conversation_id, "conversation_meta": meta, "messages": messages, "message_count": len(messages) } )) except Exception as e: return jsonify(internal_error_response( response_text="获取对话消息失败" )), 500 @app.flask_app.route('/api/v0/conversation//context', methods=['GET']) def get_conversation_context(conversation_id: str): """获取对话上下文(格式化用于LLM)""" try: count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int) context = redis_conversation_manager.get_context(conversation_id, count) return jsonify(success_response( response_text="获取对话上下文成功", data={ "conversation_id": conversation_id, "context": context, "context_message_count": count } )) except Exception as e: return jsonify(internal_error_response( response_text="获取对话上下文失败" )), 500 @app.flask_app.route('/api/v0/conversation_stats', methods=['GET']) def conversation_stats(): """获取对话系统统计信息""" try: stats = redis_conversation_manager.get_stats() return jsonify(success_response( response_text="获取统计信息成功", data=stats )) except Exception as e: return jsonify(internal_error_response( response_text="获取统计信息失败,请稍后重试" )), 500 @app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST']) def conversation_cleanup(): """手动清理过期对话""" try: redis_conversation_manager.cleanup_expired_conversations() return jsonify(success_response( response_text="对话清理完成" )) except Exception as e: return jsonify(internal_error_response( response_text="对话清理失败,请稍后重试" )), 500 ``` ## 3. Redis数据结构设计 ### 3.1 数据存储格式 ```redis # 用户对话列表(按时间倒序,最新在前) user:{user_id}:conversations → LIST [ "conv_1703123456_a1b2c3d4", "conv_1703120000_x1y2z3w4" ] TTL: 7天(与对话TTL保持一致) # 对话消息历史(按时间倒序,最新在前) conversation:{conv_id}:messages → LIST [ "{\"message_id\":\"msg_002\",\"timestamp\":\"2024-01-01T10:00:05\",\"role\":\"assistant\",\"content\":\"查询结果...\",\"metadata\":{\"type\":\"DATABASE\"}}", "{\"message_id\":\"msg_001\",\"timestamp\":\"2024-01-01T10:00:00\",\"role\":\"user\",\"content\":\"查询销售数据\",\"metadata\":{}}" ] TTL: 7天 # 对话元信息 conversation:{conv_id}:meta → HASH { "conversation_id": "conv_1703123456_a1b2c3d4", "user_id": "guest_abc123", "created_at": "2024-01-01T10:00:00", "updated_at": "2024-01-01T10:05:00", "message_count": "4" } TTL: 7天 # 问答结果缓存(真正上下文感知版) qa_cache:{context_question_hash} → STRING { "success": true, "data": {...}, "cached_at": "2024-01-01T10:00:00", "original_question": "查询销售数据", "context_hash": "a1b2c3d4" // 上下文内容的哈希值 } TTL: 24小时(每个缓存项独立设置) 注意:缓存键基于上下文内容和问题的组合哈希,而非conversation_id ``` ## 4. API接口设计 ### 4.1 ask_agent() API(增强版) #### 请求参数 ```json { "question": "请问当前系统中每个高速服务区的经理是谁?", // 必需 "session_id": "test_session_001", // 可选,用于生成稳定guest_id "user_id": "john_doe", // 可选,优先级低于登录session "conversation_id": "conv_1703123456_a1b2c3d4", // 可选,继续特定对话 "continue_conversation": true // 可选,继续最近对话 } ``` #### 响应格式 ```json { "success": true, "code": 200, "message": "操作成功", "data": { "response": "查询结果...", "type": "DATABASE", "sql": "SELECT ...", "query_result": { "rows": [...], "columns": [...], "row_count": 10 }, "summary": "查询结果显示...", // 新增字段 "conversation_id": "conv_1703123456_a1b2c3d4", "user_id": "guest_abc123", "is_guest_user": true, "context_used": false, "from_cache": false, "conversation_status": "existing", "conversation_message": "继续已有对话", "requested_conversation_id": null, "execution_path": ["classify", "agent_database", "format_response"], "classification_info": { "confidence": 0.95, "reason": "匹配数据库关键词", "method": "rule_based" } } } ``` ### 4.2 查询API设计 #### 获取用户对话列表 ```http GET /api/v0/user/{user_id}/conversations?limit=10 响应: { "success": true, "data": { "user_id": "guest_abc123", "conversations": [ { "conversation_id": "conv_1703123456_a1b2c3d4", "created_at": "2024-01-01T10:00:00", "updated_at": "2024-01-01T10:05:00", "message_count": "6" } ], "total_count": 1 } } ``` #### 获取对话消息详情 ```http GET /api/v0/conversation/{conversation_id}/messages?limit=20 响应: { "success": true, "data": { "conversation_id": "conv_1703123456_a1b2c3d4", "conversation_meta": { "user_id": "guest_abc123", "created_at": "2024-01-01T10:00:00", "message_count": "6" }, "messages": [ { "message_id": "msg_001", "timestamp": "2024-01-01T10:00:00", "role": "user", "content": "查询服务区数据" }, { "message_id": "msg_002", "timestamp": "2024-01-01T10:00:05", "role": "assistant", "content": "查询结果显示...", "metadata": { "type": "DATABASE", "sql": "SELECT ...", "execution_path": ["classify", "agent_database"] } } ], "message_count": 2 } } ``` ## 5. 前端集成示例 ### 5.1 简化的对话管理器 ```javascript class ConversationManager { constructor() { this.sessionId = this.generateSessionId(); this.currentConversationId = null; this.userId = localStorage.getItem('user_id'); // 登录用户ID } async ask(question, continueConversation = true) { const payload = { question, session_id: this.sessionId // 保证guest_id稳定 }; // 登录用户传递真实ID if (this.userId) { payload.user_id = this.userId; } // 继续当前对话 if (continueConversation && this.currentConversationId) { payload.conversation_id = this.currentConversationId; } const response = await fetch('/api/v0/ask_agent', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify(payload) }); const result = await response.json(); // 记录conversation_id供下次使用 if (result.success) { this.currentConversationId = result.data.conversation_id; this.currentUserId = result.data.user_id; } return result; } async getUserConversations(limit = 5) { if (!this.currentUserId) return []; const response = await fetch(`/api/v0/user/${this.currentUserId}/conversations?limit=${limit}`); const result = await response.json(); return result.success ? result.data.conversations : []; } async getConversationMessages(conversationId) { const response = await fetch(`/api/v0/conversation/${conversationId}/messages`); const result = await response.json(); return result.success ? result.data : null; } startNewConversation() { this.currentConversationId = null; } generateSessionId() { return 'session_' + Date.now() + '_' + Math.random().toString(36).substring(2); } } // 使用示例 const conv = new ConversationManager(); // 第一次提问(自动创建对话) const result1 = await conv.ask("查询服务区数据"); console.log("对话ID:", result1.data.conversation_id); // 第二次提问(自动继续对话) const result2 = await conv.ask("经理都是谁?"); // 查看对话历史 const conversations = await conv.getUserConversations(); // 查看特定对话详情 const detail = await conv.getConversationMessages(conversations[0].conversation_id); // 开始新话题 conv.startNewConversation(); const result3 = await conv.ask("查询其他数据"); ``` ## 6. 实施步骤和优先级 ### 阶段1:基础实施(1-2天) **优先级:P0(必须完成)** 1. ✅ 修改`app_config.py`添加配置 2. ✅ 创建`common/redis_conversation_manager.py` 3. ✅ 更新`requirements.txt` 4. ✅ Redis连接测试 ### 阶段2:核心集成(2-3天) **优先级:P0(必须完成)** 1. ✅ 修改`citu_app.py`的ask_agent()函数 2. ✅ 实现智能ID解析逻辑 3. ✅ 集成对话上下文功能 4. ✅ 实现基础缓存功能 5. ✅ 端到端测试 ### 阶段3:管理API(1-2天) **优先级:P1(重要)** 1. ✅ 添加用户对话列表查询API 2. ✅ 添加对话消息详情查询API 3. ✅ 添加对话上下文查询API 4. ✅ 添加统计信息API ### 阶段4:完善优化(1-2天) **优先级:P2(可选)** 1. ✅ 添加清理管理API 2. ✅ 完善错误处理 3. ✅ 性能优化 4. ✅ 文档完善 ## 7. 测试验证 ### 7.1 单元测试 ```python # test_redis_conversation_manager.py import unittest from common.redis_conversation_manager import RedisConversationManager class TestRedisConversationManager(unittest.TestCase): def setUp(self): self.manager = RedisConversationManager() def test_user_id_resolution(self): # 测试用户ID解析逻辑 user_id = self.manager.resolve_user_id(None, "session_123", "127.0.0.1") self.assertTrue(user_id.startswith("guest_")) def test_conversation_creation(self): # 测试对话创建 conv_id = self.manager.create_conversation("test_user") self.assertTrue(conv_id.startswith("conv_")) def test_message_saving(self): # 测试消息保存 conv_id = self.manager.create_conversation("test_user") result = self.manager.save_message(conv_id, "user", "test message") self.assertTrue(result) ``` ### 7.2 集成测试 ```python # test_ask_agent_integration.py import requests import json def test_ask_agent_with_context(): # 第一次对话 response1 = requests.post('http://localhost:5000/api/v0/ask_agent', json={"question": "查询服务区数据"}) result1 = response1.json() # 第二次对话(带上下文) response2 = requests.post('http://localhost:5000/api/v0/ask_agent', json={ "question": "经理都是谁?", "conversation_id": result1["data"]["conversation_id"] }) result2 = response2.json() assert result2["data"]["context_used"] == True ``` ## 8. 部署和运维 ### 8.1 Redis部署 ```yaml # docker-compose.yml version: '3' services: redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru volumes: redis_data: ``` ## 9. 设计改进建议 ### 9.1 无效conversation_id的处理优化 #### 问题描述 当前设计中,当用户传入无效的 `conversation_id` 时,系统会静默地创建新对话,用户无法得知他们请求的对话不存在或无权访问。 #### 改进方案 1. **修改 `resolve_conversation_id` 方法返回值** ```python def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str], continue_conversation: bool) -> tuple[str, dict]: """ 智能解析对话ID - 改进版 Returns: tuple: (conversation_id, status_info) status_info包含: - status: "existing" | "new" | "invalid_id_new" - message: 状态说明 - requested_id: 原始请求的ID(如果有) """ # 1. 如果指定了conversation_id,验证后使用 if conversation_id_input: if self._is_valid_conversation(conversation_id_input, user_id): print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}") return conversation_id_input, { "status": "existing", "message": "继续已有对话" } else: print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话") new_conversation_id = self.create_conversation(user_id) return new_conversation_id, { "status": "invalid_id_new", "message": "您请求的对话不存在或无权访问,已为您创建新对话", "requested_id": conversation_id_input } # 2. 如果要继续最近对话 if continue_conversation: recent_conversation = self._get_recent_conversation(user_id) if recent_conversation: print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}") return recent_conversation, { "status": "existing", "message": "继续最近对话" } # 3. 创建新对话 new_conversation_id = self.create_conversation(user_id) print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}") return new_conversation_id, { "status": "new", "message": "创建新对话" } ``` 2. **修改 `ask_agent` API 响应** ```python # 在ask_agent函数中 conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id( user_id, conversation_id_input, continue_conversation ) # 在最终返回结果中添加状态信息 result_data.update({ "conversation_id": conversation_id, "user_id": user_id, "is_guest_user": user_id.startswith("guest"), "context_used": bool(context), "from_cache": False, "conversation_status": conversation_status["status"], "conversation_message": conversation_status["message"], "requested_conversation_id": conversation_status.get("requested_id") }) ``` 3. **API响应示例** ```json // 情况1:请求无效的conversation_id { "success": true, "data": { "response": "查询结果...", "conversation_id": "conv_1703123456_new123", "conversation_status": "invalid_id_new", "conversation_message": "您请求的对话不存在或已过期,已为您开启新对话", "requested_conversation_id": "conv_invalid_xyz789", // ... 其他字段 } } // 情况2:成功继续已有对话 { "success": true, "data": { "response": "查询结果...", "conversation_id": "conv_1703123456_abc123", "conversation_status": "existing", "conversation_message": "继续已有对话", // ... 其他字段 } } ``` 4. **前端处理示例** ```javascript async ask(question, conversationId) { const response = await fetch('/api/v0/ask_agent', { method: 'POST', body: JSON.stringify({ question, conversation_id: conversationId }) }); const result = await response.json(); // 检查对话状态 if (result.data.conversation_status === 'invalid_id_new') { // 提示用户 this.showNotification( '提示:您请求的对话不存在或已过期,已为您开启新对话', 'warning' ); // 更新本地conversation_id this.currentConversationId = result.data.conversation_id; } return result; } ``` ### 9.2 其他潜在改进 1. **对话权限验证增强** - 添加对话的所有权验证 - 支持对话分享功能(生成分享链接) - 支持对话的读/写权限控制 2. **对话状态管理** - 添加对话的活跃/归档状态 - 支持手动结束对话 - 对话标题和标签功能 3. **性能优化** - 对话列表分页加载 - 消息懒加载 - 缓存预热机制