|
@@ -0,0 +1,1365 @@
|
|
|
+# Redis对话管理系统详细设计与开发文档(修正版)
|
|
|
+
|
|
|
+> **重要更新(2024年)**:修复了智能缓存的上下文感知问题。原设计中缓存键仅基于conversation_id,无法正确处理同一对话中上下文变化的情况。新设计将缓存键改为基于实际上下文内容的哈希,确保相同问题在不同上下文下能够返回正确的缓存结果。
|
|
|
+
|
|
|
+## 1. 项目概述与实施目标
|
|
|
+
|
|
|
+### 1.1 项目背景
|
|
|
+基于现有的vanna+LangChain+LangGraph项目,为ask_agent() API增加Redis对话管理功能,实现上下文连续对话、对话历史记录和智能缓存功能。**不修改现有ask() API和SessionAwareMemoryCache**。
|
|
|
+
|
|
|
+### 1.2 核心功能
|
|
|
+- **智能用户识别**:支持登录用户、请求传参用户、匿名guest用户
|
|
|
+- **对话上下文管理**:支持多轮连续对话,可配置上下文长度
|
|
|
+- **智能缓存系统**:问答结果缓存,避免重复计算
|
|
|
+- **RESTful查询API**:支持查询用户对话列表和对话详情
|
|
|
+- **容错降级设计**:Redis不可用时自动降级
|
|
|
+
|
|
|
+### 1.3 技术架构
|
|
|
+- **Redis存储层**:对话数据持久化和TTL自动清理
|
|
|
+- **ask_agent() API增强**:集成对话上下文和缓存功能
|
|
|
+- **管理API**:提供对话查询和统计功能
|
|
|
+- **配置化管理**:所有参数可通过app_config.py配置
|
|
|
+
|
|
|
+## 2. 需要修改的文件详细清单
|
|
|
+
|
|
|
+### 2.1 配置文件修改
|
|
|
+
|
|
|
+#### 📝 修改文件:`app_config.py`
|
|
|
+**位置**:文件末尾添加新配置段
|
|
|
+**修改内容**:
|
|
|
+```python
|
|
|
+# ==================== Redis对话管理配置 ====================
|
|
|
+
|
|
|
+# 对话上下文配置
|
|
|
+CONVERSATION_CONTEXT_COUNT = 5 # 传递给LLM的上下文消息条数
|
|
|
+CONVERSATION_MAX_LENGTH = 20 # 单个对话最大消息数
|
|
|
+USER_MAX_CONVERSATIONS = 5 # 用户最大对话数
|
|
|
+
|
|
|
+# 用户管理配置
|
|
|
+DEFAULT_ANONYMOUS_USER_PREFIX = "guest" # 匿名用户前缀
|
|
|
+GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天
|
|
|
+MAX_GUEST_CONVERSATIONS = 3 # guest用户最多3个对话
|
|
|
+MAX_REGISTERED_CONVERSATIONS = 10 # 注册用户最多10个对话
|
|
|
+
|
|
|
+# Redis配置
|
|
|
+REDIS_HOST = "localhost"
|
|
|
+REDIS_PORT = 6379
|
|
|
+REDIS_DB = 0
|
|
|
+REDIS_PASSWORD = None
|
|
|
+
|
|
|
+# 缓存开关配置
|
|
|
+ENABLE_CONVERSATION_CONTEXT = True # 是否启用对话上下文
|
|
|
+ENABLE_QUESTION_ANSWER_CACHE = True # 是否启用问答结果缓存
|
|
|
+
|
|
|
+# TTL配置(单位:秒)- 修正TTL逻辑
|
|
|
+CONVERSATION_TTL = 7 * 24 * 3600 # 对话保存7天
|
|
|
+USER_CONVERSATIONS_TTL = 7 * 24 * 3600 # 用户对话列表保存7天(与对话TTL一致)
|
|
|
+QUESTION_ANSWER_TTL = 24 * 3600 # 问答结果缓存24小时
|
|
|
+GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天
|
|
|
+```
|
|
|
+
|
|
|
+#### 📝 修改文件:`requirements.txt`
|
|
|
+**修改内容**:添加Redis依赖
|
|
|
+```txt
|
|
|
+# 在现有依赖基础上添加
|
|
|
+redis==5.0.1
|
|
|
+```
|
|
|
+
|
|
|
+### 2.2 核心组件开发
|
|
|
+
|
|
|
+#### 🆕 新增文件:`common/redis_conversation_manager.py`
|
|
|
+**功能**:Redis对话管理器核心类(修正版)
|
|
|
+**完整代码实现**:
|
|
|
+
|
|
|
+```python
|
|
|
+import redis
|
|
|
+import json
|
|
|
+import hashlib
|
|
|
+import uuid
|
|
|
+import time
|
|
|
+from datetime import datetime
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
+from app_config import (
|
|
|
+ REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD,
|
|
|
+ CONVERSATION_CONTEXT_COUNT, CONVERSATION_MAX_LENGTH, USER_MAX_CONVERSATIONS,
|
|
|
+ CONVERSATION_TTL, USER_CONVERSATIONS_TTL, QUESTION_ANSWER_TTL,
|
|
|
+ ENABLE_CONVERSATION_CONTEXT, ENABLE_QUESTION_ANSWER_CACHE,
|
|
|
+ DEFAULT_ANONYMOUS_USER_PREFIX, MAX_GUEST_CONVERSATIONS, MAX_REGISTERED_CONVERSATIONS,
|
|
|
+ GUEST_USER_TTL
|
|
|
+)
|
|
|
+
|
|
|
+class RedisConversationManager:
|
|
|
+ """Redis对话管理器 - 修正版"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ """初始化Redis连接"""
|
|
|
+ try:
|
|
|
+ self.redis_client = redis.Redis(
|
|
|
+ host=REDIS_HOST,
|
|
|
+ port=REDIS_PORT,
|
|
|
+ db=REDIS_DB,
|
|
|
+ password=REDIS_PASSWORD,
|
|
|
+ decode_responses=True,
|
|
|
+ socket_connect_timeout=5,
|
|
|
+ socket_timeout=5
|
|
|
+ )
|
|
|
+ # 测试连接
|
|
|
+ self.redis_client.ping()
|
|
|
+ print(f"[REDIS_CONV] Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] Redis连接失败: {str(e)}")
|
|
|
+ self.redis_client = None
|
|
|
+
|
|
|
+ def is_available(self) -> bool:
|
|
|
+ """检查Redis是否可用"""
|
|
|
+ try:
|
|
|
+ return self.redis_client is not None and self.redis_client.ping()
|
|
|
+ except:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # ==================== 用户ID解析(修正版)====================
|
|
|
+
|
|
|
+ def resolve_user_id(self, user_id_from_request: Optional[str],
|
|
|
+ session_id: Optional[str], request_ip: str,
|
|
|
+ login_user_id: Optional[str] = None) -> str:
|
|
|
+ """
|
|
|
+ 智能解析用户ID - 修正版
|
|
|
+
|
|
|
+ Args:
|
|
|
+ user_id_from_request: 请求参数中的user_id
|
|
|
+ session_id: 浏览器session_id
|
|
|
+ request_ip: 请求IP地址
|
|
|
+ login_user_id: 从Flask session中获取的登录用户ID(在ask_agent中获取)
|
|
|
+ """
|
|
|
+
|
|
|
+ # 1. 优先使用登录用户ID
|
|
|
+ if login_user_id:
|
|
|
+ print(f"[REDIS_CONV] 使用登录用户ID: {login_user_id}")
|
|
|
+ return login_user_id
|
|
|
+
|
|
|
+ # 2. 如果没有登录,尝试从请求参数获取user_id
|
|
|
+ if user_id_from_request:
|
|
|
+ print(f"[REDIS_CONV] 使用请求参数user_id: {user_id_from_request}")
|
|
|
+ return user_id_from_request
|
|
|
+
|
|
|
+ # 3. 都没有则为匿名用户(guest)
|
|
|
+ if session_id:
|
|
|
+ guest_suffix = hashlib.md5(session_id.encode()).hexdigest()[:8]
|
|
|
+ guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_{guest_suffix}"
|
|
|
+ print(f"[REDIS_CONV] 生成稳定guest用户: {guest_id}")
|
|
|
+ return guest_id
|
|
|
+
|
|
|
+ # 4. 最后基于IP的临时guest ID
|
|
|
+ ip_suffix = hashlib.md5(request_ip.encode()).hexdigest()[:8]
|
|
|
+ temp_guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_temp_{ip_suffix}"
|
|
|
+ print(f"[REDIS_CONV] 生成临时guest用户: {temp_guest_id}")
|
|
|
+ return temp_guest_id
|
|
|
+
|
|
|
+ def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str],
|
|
|
+ continue_conversation: bool) -> tuple[str, dict]:
|
|
|
+ """
|
|
|
+ 智能解析对话ID - 改进版
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple: (conversation_id, status_info)
|
|
|
+ status_info包含:
|
|
|
+ - status: "existing" | "new" | "invalid_id_new"
|
|
|
+ - message: 状态说明
|
|
|
+ - requested_id: 原始请求的ID(如果有)
|
|
|
+ """
|
|
|
+
|
|
|
+ # 1. 如果指定了conversation_id,验证后使用
|
|
|
+ if conversation_id_input:
|
|
|
+ if self._is_valid_conversation(conversation_id_input, user_id):
|
|
|
+ print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
|
|
|
+ return conversation_id_input, {
|
|
|
+ "status": "existing",
|
|
|
+ "message": "继续已有对话"
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
|
|
|
+ new_conversation_id = self.create_conversation(user_id)
|
|
|
+ return new_conversation_id, {
|
|
|
+ "status": "invalid_id_new",
|
|
|
+ "message": "您请求的对话不存在或无权访问,已为您创建新对话",
|
|
|
+ "requested_id": conversation_id_input
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 如果要继续最近对话
|
|
|
+ if continue_conversation:
|
|
|
+ recent_conversation = self._get_recent_conversation(user_id)
|
|
|
+ if recent_conversation:
|
|
|
+ print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
|
|
|
+ return recent_conversation, {
|
|
|
+ "status": "existing",
|
|
|
+ "message": "继续最近对话"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 3. 创建新对话
|
|
|
+ new_conversation_id = self.create_conversation(user_id)
|
|
|
+ print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
|
|
|
+ return new_conversation_id, {
|
|
|
+ "status": "new",
|
|
|
+ "message": "创建新对话"
|
|
|
+ }
|
|
|
+
|
|
|
+ def _is_valid_conversation(self, conversation_id: str, user_id: str) -> bool:
|
|
|
+ """验证对话是否存在且属于该用户"""
|
|
|
+ if not self.is_available():
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 检查对话元信息是否存在
|
|
|
+ meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
|
|
|
+ if not meta_data:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查是否属于该用户
|
|
|
+ return meta_data.get('user_id') == user_id
|
|
|
+
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _get_recent_conversation(self, user_id: str) -> Optional[str]:
|
|
|
+ """获取用户最近的对话ID"""
|
|
|
+ if not self.is_available():
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ conversations = self.redis_client.lrange(
|
|
|
+ f"user:{user_id}:conversations", 0, 0
|
|
|
+ )
|
|
|
+ return conversations[0] if conversations else None
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # ==================== 对话管理 ====================
|
|
|
+
|
|
|
+ def create_conversation(self, user_id: str) -> str:
|
|
|
+ """创建新对话"""
|
|
|
+ # 生成包含时间戳的conversation_id
|
|
|
+ timestamp = int(datetime.now().timestamp())
|
|
|
+ conversation_id = f"conv_{timestamp}_{uuid.uuid4().hex[:8]}"
|
|
|
+
|
|
|
+ if not self.is_available():
|
|
|
+ return conversation_id # Redis不可用时返回ID,但不存储
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 创建对话元信息
|
|
|
+ meta_data = {
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "user_id": user_id,
|
|
|
+ "created_at": datetime.now().isoformat(),
|
|
|
+ "updated_at": datetime.now().isoformat(),
|
|
|
+ "message_count": "0"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存对话元信息
|
|
|
+ self.redis_client.hset(
|
|
|
+ f"conversation:{conversation_id}:meta",
|
|
|
+ mapping=meta_data
|
|
|
+ )
|
|
|
+ self.redis_client.expire(f"conversation:{conversation_id}:meta", CONVERSATION_TTL)
|
|
|
+
|
|
|
+ # 添加到用户的对话列表
|
|
|
+ self._add_conversation_to_user(user_id, conversation_id)
|
|
|
+
|
|
|
+ print(f"[REDIS_CONV] 创建对话成功: {conversation_id}")
|
|
|
+ return conversation_id
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 创建对话失败: {str(e)}")
|
|
|
+ return conversation_id # 返回ID但可能未存储
|
|
|
+
|
|
|
+ def save_message(self, conversation_id: str, role: str, content: str,
|
|
|
+ metadata: Optional[Dict] = None) -> bool:
|
|
|
+ """保存消息到对话历史"""
|
|
|
+ if not self.is_available() or not conversation_id:
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ message_data = {
|
|
|
+ "message_id": str(uuid.uuid4()),
|
|
|
+ "timestamp": datetime.now().isoformat(),
|
|
|
+ "role": role, # user, assistant
|
|
|
+ "content": content,
|
|
|
+ "metadata": metadata or {}
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存到消息列表(LPUSH添加到头部,最新消息在前)
|
|
|
+ self.redis_client.lpush(
|
|
|
+ f"conversation:{conversation_id}:messages",
|
|
|
+ json.dumps(message_data)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 设置TTL
|
|
|
+ self.redis_client.expire(f"conversation:{conversation_id}:messages", CONVERSATION_TTL)
|
|
|
+
|
|
|
+ # 限制消息数量
|
|
|
+ self.redis_client.ltrim(
|
|
|
+ f"conversation:{conversation_id}:messages",
|
|
|
+ 0, CONVERSATION_MAX_LENGTH - 1
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新元信息
|
|
|
+ self._update_conversation_meta(conversation_id)
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 保存消息失败: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def get_context(self, conversation_id: str, count: Optional[int] = None) -> str:
|
|
|
+ """获取对话上下文,格式化为prompt"""
|
|
|
+ if not self.is_available() or not ENABLE_CONVERSATION_CONTEXT:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ if count is None:
|
|
|
+ count = CONVERSATION_CONTEXT_COUNT
|
|
|
+
|
|
|
+ # 获取最近的消息(count*2 因为包含用户和助手消息)
|
|
|
+ message_count = count * 2
|
|
|
+ messages = self.redis_client.lrange(
|
|
|
+ f"conversation:{conversation_id}:messages",
|
|
|
+ 0, message_count - 1
|
|
|
+ )
|
|
|
+
|
|
|
+ if not messages:
|
|
|
+ return ""
|
|
|
+
|
|
|
+ # 解析消息并构建上下文(按时间正序)
|
|
|
+ context_parts = []
|
|
|
+ for msg_json in reversed(messages): # Redis返回倒序,需要反转
|
|
|
+ try:
|
|
|
+ msg_data = json.loads(msg_json)
|
|
|
+ role = msg_data.get("role", "")
|
|
|
+ content = msg_data.get("content", "")
|
|
|
+
|
|
|
+ if role == "user":
|
|
|
+ context_parts.append(f"用户: {content}")
|
|
|
+ elif role == "assistant":
|
|
|
+ context_parts.append(f"助手: {content}")
|
|
|
+
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ continue
|
|
|
+
|
|
|
+ context = "\n".join(context_parts)
|
|
|
+ print(f"[REDIS_CONV] 获取上下文成功: {len(context_parts)}条消息")
|
|
|
+ return context
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取上下文失败: {str(e)}")
|
|
|
+ return ""
|
|
|
+
|
|
|
+ def get_conversation_messages(self, conversation_id: str, limit: Optional[int] = None) -> List[Dict]:
|
|
|
+ """获取对话的消息列表"""
|
|
|
+ if not self.is_available():
|
|
|
+ return []
|
|
|
+
|
|
|
+ try:
|
|
|
+ if limit:
|
|
|
+ messages = self.redis_client.lrange(
|
|
|
+ f"conversation:{conversation_id}:messages", 0, limit - 1
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ messages = self.redis_client.lrange(
|
|
|
+ f"conversation:{conversation_id}:messages", 0, -1
|
|
|
+ )
|
|
|
+
|
|
|
+ # 解析并按时间正序返回
|
|
|
+ parsed_messages = []
|
|
|
+ for msg_json in reversed(messages): # 反转为时间正序
|
|
|
+ try:
|
|
|
+ parsed_messages.append(json.loads(msg_json))
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ continue
|
|
|
+
|
|
|
+ return parsed_messages
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取对话消息失败: {str(e)}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ def get_conversation_meta(self, conversation_id: str) -> Dict:
|
|
|
+ """获取对话元信息"""
|
|
|
+ if not self.is_available():
|
|
|
+ return {}
|
|
|
+
|
|
|
+ try:
|
|
|
+ meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
|
|
|
+ return meta_data if meta_data else {}
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取对话元信息失败: {str(e)}")
|
|
|
+ return {}
|
|
|
+
|
|
|
+ def get_conversations(self, user_id: str, limit: int = None) -> List[Dict]:
|
|
|
+ """获取用户的对话列表(按时间倒序)"""
|
|
|
+ if not self.is_available():
|
|
|
+ return []
|
|
|
+
|
|
|
+ if limit is None:
|
|
|
+ limit = USER_MAX_CONVERSATIONS
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取对话ID列表(已经按时间倒序)
|
|
|
+ conversation_ids = self.redis_client.lrange(
|
|
|
+ f"user:{user_id}:conversations", 0, limit - 1
|
|
|
+ )
|
|
|
+
|
|
|
+ conversations = []
|
|
|
+ for conv_id in conversation_ids:
|
|
|
+ meta_data = self.get_conversation_meta(conv_id)
|
|
|
+ if meta_data: # 只返回仍然存在的对话
|
|
|
+ conversations.append(meta_data)
|
|
|
+
|
|
|
+ return conversations
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取用户对话列表失败: {str(e)}")
|
|
|
+ return []
|
|
|
+
|
|
|
+ # ==================== 智能缓存(修正版)====================
|
|
|
+
|
|
|
+ def get_cached_answer(self, question: str, context: str = "") -> Optional[Dict]:
|
|
|
+ """检查问答缓存 - 真正上下文感知版"""
|
|
|
+ if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
|
|
|
+ return None
|
|
|
+
|
|
|
+ try:
|
|
|
+ cache_key = self._get_cache_key(question, context)
|
|
|
+ cached_answer = self.redis_client.get(cache_key) # 使用独立key而不是hash
|
|
|
+
|
|
|
+ if cached_answer:
|
|
|
+ print(f"[REDIS_CONV] 缓存命中: {cache_key}")
|
|
|
+ return json.loads(cached_answer)
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取缓存答案失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def cache_answer(self, question: str, answer: Dict, context: str = ""):
|
|
|
+ """缓存问答结果 - 真正上下文感知版"""
|
|
|
+ if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ cache_key = self._get_cache_key(question, context)
|
|
|
+
|
|
|
+ # 添加缓存时间戳和上下文哈希
|
|
|
+ answer_with_meta = {
|
|
|
+ **answer,
|
|
|
+ "cached_at": datetime.now().isoformat(),
|
|
|
+ "original_question": question,
|
|
|
+ "context_hash": hashlib.md5(context.encode()).hexdigest()[:8] if context else ""
|
|
|
+ }
|
|
|
+
|
|
|
+ # 使用独立key,每个缓存项单独设置TTL
|
|
|
+ self.redis_client.setex(
|
|
|
+ cache_key,
|
|
|
+ QUESTION_ANSWER_TTL,
|
|
|
+ json.dumps(answer_with_meta)
|
|
|
+ )
|
|
|
+
|
|
|
+ print(f"[REDIS_CONV] 缓存答案成功: {cache_key}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 缓存答案失败: {str(e)}")
|
|
|
+
|
|
|
+ def _get_cache_key(self, question: str, context: str = "") -> str:
|
|
|
+ """生成真正包含上下文的缓存键"""
|
|
|
+ if context and ENABLE_CONVERSATION_CONTEXT:
|
|
|
+ # 使用上下文内容而不是conversation_id
|
|
|
+ cache_input = f"context:{context}\nquestion:{question}"
|
|
|
+ else:
|
|
|
+ cache_input = question
|
|
|
+
|
|
|
+ normalized = cache_input.strip().lower()
|
|
|
+ question_hash = hashlib.md5(normalized.encode('utf-8')).hexdigest()[:16]
|
|
|
+ return f"qa_cache:{question_hash}"
|
|
|
+
|
|
|
+ # ==================== 私有方法 ====================
|
|
|
+
|
|
|
+ def _add_conversation_to_user(self, user_id: str, conversation_id: str):
|
|
|
+ """添加对话到用户列表,按时间自动排序"""
|
|
|
+ try:
|
|
|
+ # 获取用户类型配置
|
|
|
+ config = self._get_user_type_config(user_id)
|
|
|
+
|
|
|
+ # LPUSH添加到列表头部(最新的)
|
|
|
+ self.redis_client.lpush(f"user:{user_id}:conversations", conversation_id)
|
|
|
+
|
|
|
+ # 根据用户类型限制数量
|
|
|
+ self.redis_client.ltrim(
|
|
|
+ f"user:{user_id}:conversations",
|
|
|
+ 0, config["max_conversations"] - 1
|
|
|
+ )
|
|
|
+
|
|
|
+ # 设置TTL
|
|
|
+ self.redis_client.expire(
|
|
|
+ f"user:{user_id}:conversations",
|
|
|
+ config["ttl"]
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 添加对话到用户列表失败: {str(e)}")
|
|
|
+
|
|
|
+ def _get_user_type_config(self, user_id: str) -> Dict:
|
|
|
+ """根据用户类型返回不同的配置 - 修正版"""
|
|
|
+ if user_id.startswith(DEFAULT_ANONYMOUS_USER_PREFIX):
|
|
|
+ return {
|
|
|
+ "max_conversations": MAX_GUEST_CONVERSATIONS,
|
|
|
+ "ttl": GUEST_USER_TTL # 使用专门的guest TTL
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ return {
|
|
|
+ "max_conversations": MAX_REGISTERED_CONVERSATIONS,
|
|
|
+ "ttl": USER_CONVERSATIONS_TTL
|
|
|
+ }
|
|
|
+
|
|
|
+ def _update_conversation_meta(self, conversation_id: str):
|
|
|
+ """更新对话元信息"""
|
|
|
+ try:
|
|
|
+ # 获取消息数量
|
|
|
+ message_count = self.redis_client.llen(f"conversation:{conversation_id}:messages")
|
|
|
+
|
|
|
+ # 更新元信息
|
|
|
+ self.redis_client.hset(
|
|
|
+ f"conversation:{conversation_id}:meta",
|
|
|
+ mapping={
|
|
|
+ "updated_at": datetime.now().isoformat(),
|
|
|
+ "message_count": str(message_count)
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 更新对话元信息失败: {str(e)}")
|
|
|
+
|
|
|
+ # ==================== 管理方法 ====================
|
|
|
+
|
|
|
+ def get_stats(self) -> Dict:
|
|
|
+ """获取统计信息"""
|
|
|
+ if not self.is_available():
|
|
|
+ return {"available": False}
|
|
|
+
|
|
|
+ try:
|
|
|
+ stats = {
|
|
|
+ "available": True,
|
|
|
+ "total_users": len(self.redis_client.keys("user:*:conversations")),
|
|
|
+ "total_conversations": len(self.redis_client.keys("conversation:*:meta")),
|
|
|
+ "cached_qa_count": len(self.redis_client.keys("qa_cache:*")), # 修正缓存统计
|
|
|
+ "redis_info": {
|
|
|
+ "used_memory": self.redis_client.info().get("used_memory_human"),
|
|
|
+ "connected_clients": self.redis_client.info().get("connected_clients")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return stats
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 获取统计信息失败: {str(e)}")
|
|
|
+ return {"available": False, "error": str(e)}
|
|
|
+
|
|
|
+ def cleanup_expired_conversations(self):
|
|
|
+ """清理过期对话(Redis TTL自动处理,这里可添加额外逻辑)"""
|
|
|
+ if not self.is_available():
|
|
|
+ return
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 清理用户对话列表中的无效对话ID
|
|
|
+ user_keys = self.redis_client.keys("user:*:conversations")
|
|
|
+ cleaned_count = 0
|
|
|
+
|
|
|
+ for user_key in user_keys:
|
|
|
+ conversation_ids = self.redis_client.lrange(user_key, 0, -1)
|
|
|
+ valid_ids = []
|
|
|
+
|
|
|
+ for conv_id in conversation_ids:
|
|
|
+ # 检查对话是否仍然存在
|
|
|
+ if self.redis_client.exists(f"conversation:{conv_id}:meta"):
|
|
|
+ valid_ids.append(conv_id)
|
|
|
+ else:
|
|
|
+ cleaned_count += 1
|
|
|
+
|
|
|
+ # 如果有无效ID,重建列表
|
|
|
+ if len(valid_ids) != len(conversation_ids):
|
|
|
+ self.redis_client.delete(user_key)
|
|
|
+ if valid_ids:
|
|
|
+ self.redis_client.lpush(user_key, *reversed(valid_ids))
|
|
|
+ # 重新设置TTL
|
|
|
+ self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
|
|
|
+
|
|
|
+ print(f"[REDIS_CONV] 清理完成,移除了 {cleaned_count} 个无效对话引用")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] 清理失败: {str(e)}")
|
|
|
+```
|
|
|
+
|
|
|
+### 2.3 主要API修改
|
|
|
+
|
|
|
+#### 📝 修改文件:`citu_app.py`
|
|
|
+**修改位置1**:文件开头导入部分
|
|
|
+```python
|
|
|
+# 在现有导入基础上添加(文件顶部,避免函数内导入)
|
|
|
+from flask import session
|
|
|
+from common.redis_conversation_manager import RedisConversationManager
|
|
|
+from common.result import (
|
|
|
+ bad_request_response, service_unavailable_response,
|
|
|
+ agent_success_response, agent_error_response,
|
|
|
+ internal_error_response, success_response
|
|
|
+)
|
|
|
+
|
|
|
+# 在全局变量区域添加(app实例化后)
|
|
|
+redis_conversation_manager = RedisConversationManager()
|
|
|
+```
|
|
|
+
|
|
|
+**修改位置2**:ask_agent()函数(修正版)
|
|
|
+```python
|
|
|
+@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
|
|
|
+def ask_agent():
|
|
|
+ """
|
|
|
+ 支持对话上下文的ask_agent API - 修正版
|
|
|
+ """
|
|
|
+ req = request.get_json(force=True)
|
|
|
+ question = req.get("question", None)
|
|
|
+ browser_session_id = req.get("session_id", None)
|
|
|
+
|
|
|
+ # 新增参数解析
|
|
|
+ user_id_input = req.get("user_id", None)
|
|
|
+ conversation_id_input = req.get("conversation_id", None)
|
|
|
+ continue_conversation = req.get("continue_conversation", False)
|
|
|
+
|
|
|
+ if not question:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="缺少必需参数:question",
|
|
|
+ missing_params=["question"]
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 获取登录用户ID(修正:在函数中获取session信息)
|
|
|
+ login_user_id = session.get('user_id') if 'user_id' in session else None
|
|
|
+
|
|
|
+ # 2. 智能ID解析(修正:传入登录用户ID)
|
|
|
+ user_id = redis_conversation_manager.resolve_user_id(
|
|
|
+ user_id_input, browser_session_id, request.remote_addr, login_user_id
|
|
|
+ )
|
|
|
+ conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
|
|
|
+ user_id, conversation_id_input, continue_conversation
|
|
|
+ )
|
|
|
+
|
|
|
+ # 3. 获取上下文(提前到缓存检查之前)
|
|
|
+ context = redis_conversation_manager.get_context(conversation_id)
|
|
|
+
|
|
|
+ # 4. 检查缓存(修正:传入context以实现真正的上下文感知)
|
|
|
+ cached_answer = redis_conversation_manager.get_cached_answer(question, context)
|
|
|
+ if cached_answer:
|
|
|
+ print(f"[AGENT_API] 使用缓存答案")
|
|
|
+
|
|
|
+ # 更新对话历史
|
|
|
+ redis_conversation_manager.save_message(conversation_id, "user", question)
|
|
|
+ redis_conversation_manager.save_message(
|
|
|
+ conversation_id, "assistant",
|
|
|
+ cached_answer.get("data", {}).get("response", ""),
|
|
|
+ metadata={"from_cache": True}
|
|
|
+ )
|
|
|
+
|
|
|
+ # 添加对话信息到缓存结果
|
|
|
+ cached_answer["data"]["conversation_id"] = conversation_id
|
|
|
+ cached_answer["data"]["user_id"] = user_id
|
|
|
+ cached_answer["data"]["from_cache"] = True
|
|
|
+ cached_answer["data"].update(conversation_status)
|
|
|
+
|
|
|
+ return jsonify(cached_answer)
|
|
|
+
|
|
|
+ # 5. 保存用户消息
|
|
|
+ redis_conversation_manager.save_message(conversation_id, "user", question)
|
|
|
+
|
|
|
+ # 6. 构建带上下文的问题
|
|
|
+ if context:
|
|
|
+ enhanced_question = f"对话历史:\n{context}\n\n当前问题: {question}"
|
|
|
+ print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
|
|
|
+ else:
|
|
|
+ enhanced_question = question
|
|
|
+ print(f"[AGENT_API] 新对话,无上下文")
|
|
|
+
|
|
|
+ # 7. 现有Agent处理逻辑(保持不变)
|
|
|
+ try:
|
|
|
+ agent = get_citu_langraph_agent()
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[CRITICAL] Agent初始化失败: {str(e)}")
|
|
|
+ return jsonify(service_unavailable_response(
|
|
|
+ response_text="AI服务暂时不可用,请稍后重试",
|
|
|
+ can_retry=True
|
|
|
+ )), 503
|
|
|
+
|
|
|
+ agent_result = agent.process_question(
|
|
|
+ question=enhanced_question, # 使用增强后的问题
|
|
|
+ session_id=browser_session_id
|
|
|
+ )
|
|
|
+
|
|
|
+ # 8. 处理Agent结果
|
|
|
+ if agent_result.get("success", False):
|
|
|
+ assistant_response = agent_result.get("data", {}).get("response", "")
|
|
|
+
|
|
|
+ # 保存助手回复
|
|
|
+ redis_conversation_manager.save_message(
|
|
|
+ conversation_id, "assistant", assistant_response,
|
|
|
+ metadata={
|
|
|
+ "type": agent_result.get("data", {}).get("type"),
|
|
|
+ "sql": agent_result.get("data", {}).get("sql"),
|
|
|
+ "execution_path": agent_result.get("data", {}).get("execution_path")
|
|
|
+ }
|
|
|
+ )
|
|
|
+
|
|
|
+ # 缓存成功的答案(修正:传入context实现真正的上下文感知)
|
|
|
+ cache_data = {
|
|
|
+ **agent_result,
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "user_id": user_id,
|
|
|
+ "from_cache": False
|
|
|
+ }
|
|
|
+ redis_conversation_manager.cache_answer(question, cache_data, context)
|
|
|
+
|
|
|
+ # 构建返回数据(修正:使用现有的agent_success_response)
|
|
|
+ result_data = agent_result.get("data", {})
|
|
|
+ result_data.update({
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "user_id": user_id,
|
|
|
+ "is_guest_user": user_id.startswith("guest"),
|
|
|
+ "context_used": bool(context),
|
|
|
+ "from_cache": False,
|
|
|
+ "conversation_status": conversation_status["status"],
|
|
|
+ "conversation_message": conversation_status["message"],
|
|
|
+ "requested_conversation_id": conversation_status.get("requested_id")
|
|
|
+ })
|
|
|
+
|
|
|
+ return jsonify(agent_success_response(
|
|
|
+ response_type=result_data.get("type", "UNKNOWN"),
|
|
|
+ response_text=result_data.get("response", ""),
|
|
|
+ data=result_data
|
|
|
+ ))
|
|
|
+ else:
|
|
|
+ # 错误处理(修正:确保使用现有的错误响应格式)
|
|
|
+ error_data = {
|
|
|
+ "response": agent_result.get("error", "Agent处理失败"),
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "user_id": user_id,
|
|
|
+ "error_type": "agent_processing_failed"
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify({
|
|
|
+ "success": False,
|
|
|
+ "code": agent_result.get("error_code", 500),
|
|
|
+ "message": "处理失败",
|
|
|
+ "data": error_data
|
|
|
+ }), agent_result.get("error_code", 500)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"[ERROR] ask_agent执行失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="查询处理失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+```
|
|
|
+
|
|
|
+**修改位置3**:新增管理API
|
|
|
+```python
|
|
|
+# 在文件末尾添加新的管理API
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
|
|
|
+def get_user_conversations(user_id: str):
|
|
|
+ """获取用户的对话列表(按时间倒序)"""
|
|
|
+ try:
|
|
|
+ limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
|
|
|
+ conversations = redis_conversation_manager.get_conversations(user_id, limit)
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取用户对话列表成功",
|
|
|
+ data={
|
|
|
+ "user_id": user_id,
|
|
|
+ "conversations": conversations,
|
|
|
+ "total_count": len(conversations)
|
|
|
+ }
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取对话列表失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
|
|
|
+def get_conversation_messages(conversation_id: str):
|
|
|
+ """获取特定对话的消息历史"""
|
|
|
+ try:
|
|
|
+ limit = request.args.get('limit', type=int) # 可选参数
|
|
|
+ messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
|
|
|
+ meta = redis_conversation_manager.get_conversation_meta(conversation_id)
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取对话消息成功",
|
|
|
+ data={
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "conversation_meta": meta,
|
|
|
+ "messages": messages,
|
|
|
+ "message_count": len(messages)
|
|
|
+ }
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取对话消息失败"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
|
|
|
+def get_conversation_context(conversation_id: str):
|
|
|
+ """获取对话上下文(格式化用于LLM)"""
|
|
|
+ try:
|
|
|
+ count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
|
|
|
+ context = redis_conversation_manager.get_context(conversation_id, count)
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取对话上下文成功",
|
|
|
+ data={
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "context": context,
|
|
|
+ "context_message_count": count
|
|
|
+ }
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取对话上下文失败"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
|
|
|
+def conversation_stats():
|
|
|
+ """获取对话系统统计信息"""
|
|
|
+ try:
|
|
|
+ stats = redis_conversation_manager.get_stats()
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取统计信息成功",
|
|
|
+ data=stats
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取统计信息失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
|
|
|
+def conversation_cleanup():
|
|
|
+ """手动清理过期对话"""
|
|
|
+ try:
|
|
|
+ redis_conversation_manager.cleanup_expired_conversations()
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="对话清理完成"
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="对话清理失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+```
|
|
|
+
|
|
|
+## 3. Redis数据结构设计
|
|
|
+
|
|
|
+### 3.1 数据存储格式
|
|
|
+```redis
|
|
|
+# 用户对话列表(按时间倒序,最新在前)
|
|
|
+user:{user_id}:conversations → LIST [
|
|
|
+ "conv_1703123456_a1b2c3d4",
|
|
|
+ "conv_1703120000_x1y2z3w4"
|
|
|
+]
|
|
|
+TTL: 7天(与对话TTL保持一致)
|
|
|
+
|
|
|
+# 对话消息历史(按时间倒序,最新在前)
|
|
|
+conversation:{conv_id}:messages → LIST [
|
|
|
+ "{\"message_id\":\"msg_002\",\"timestamp\":\"2024-01-01T10:00:05\",\"role\":\"assistant\",\"content\":\"查询结果...\",\"metadata\":{\"type\":\"DATABASE\"}}",
|
|
|
+ "{\"message_id\":\"msg_001\",\"timestamp\":\"2024-01-01T10:00:00\",\"role\":\"user\",\"content\":\"查询销售数据\",\"metadata\":{}}"
|
|
|
+]
|
|
|
+TTL: 7天
|
|
|
+
|
|
|
+# 对话元信息
|
|
|
+conversation:{conv_id}:meta → HASH {
|
|
|
+ "conversation_id": "conv_1703123456_a1b2c3d4",
|
|
|
+ "user_id": "guest_abc123",
|
|
|
+ "created_at": "2024-01-01T10:00:00",
|
|
|
+ "updated_at": "2024-01-01T10:05:00",
|
|
|
+ "message_count": "4"
|
|
|
+}
|
|
|
+TTL: 7天
|
|
|
+
|
|
|
+# 问答结果缓存(真正上下文感知版)
|
|
|
+qa_cache:{context_question_hash} → STRING {
|
|
|
+ "success": true,
|
|
|
+ "data": {...},
|
|
|
+ "cached_at": "2024-01-01T10:00:00",
|
|
|
+ "original_question": "查询销售数据",
|
|
|
+ "context_hash": "a1b2c3d4" // 上下文内容的哈希值
|
|
|
+}
|
|
|
+TTL: 24小时(每个缓存项独立设置)
|
|
|
+注意:缓存键基于上下文内容和问题的组合哈希,而非conversation_id
|
|
|
+```
|
|
|
+
|
|
|
+## 4. API接口设计
|
|
|
+
|
|
|
+### 4.1 ask_agent() API(增强版)
|
|
|
+
|
|
|
+#### 请求参数
|
|
|
+```json
|
|
|
+{
|
|
|
+ "question": "请问当前系统中每个高速服务区的经理是谁?", // 必需
|
|
|
+ "session_id": "test_session_001", // 可选,用于生成稳定guest_id
|
|
|
+ "user_id": "john_doe", // 可选,优先级低于登录session
|
|
|
+ "conversation_id": "conv_1703123456_a1b2c3d4", // 可选,继续特定对话
|
|
|
+ "continue_conversation": true // 可选,继续最近对话
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 响应格式
|
|
|
+```json
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "操作成功",
|
|
|
+ "data": {
|
|
|
+ "response": "查询结果...",
|
|
|
+ "type": "DATABASE",
|
|
|
+ "sql": "SELECT ...",
|
|
|
+ "query_result": {
|
|
|
+ "rows": [...],
|
|
|
+ "columns": [...],
|
|
|
+ "row_count": 10
|
|
|
+ },
|
|
|
+ "summary": "查询结果显示...",
|
|
|
+
|
|
|
+ // 新增字段
|
|
|
+ "conversation_id": "conv_1703123456_a1b2c3d4",
|
|
|
+ "user_id": "guest_abc123",
|
|
|
+ "is_guest_user": true,
|
|
|
+ "context_used": false,
|
|
|
+ "from_cache": false,
|
|
|
+ "conversation_status": "existing",
|
|
|
+ "conversation_message": "继续已有对话",
|
|
|
+ "requested_conversation_id": null,
|
|
|
+
|
|
|
+ "execution_path": ["classify", "agent_database", "format_response"],
|
|
|
+ "classification_info": {
|
|
|
+ "confidence": 0.95,
|
|
|
+ "reason": "匹配数据库关键词",
|
|
|
+ "method": "rule_based"
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 4.2 查询API设计
|
|
|
+
|
|
|
+#### 获取用户对话列表
|
|
|
+```http
|
|
|
+GET /api/v0/user/{user_id}/conversations?limit=10
|
|
|
+
|
|
|
+响应:
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "data": {
|
|
|
+ "user_id": "guest_abc123",
|
|
|
+ "conversations": [
|
|
|
+ {
|
|
|
+ "conversation_id": "conv_1703123456_a1b2c3d4",
|
|
|
+ "created_at": "2024-01-01T10:00:00",
|
|
|
+ "updated_at": "2024-01-01T10:05:00",
|
|
|
+ "message_count": "6"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "total_count": 1
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+#### 获取对话消息详情
|
|
|
+```http
|
|
|
+GET /api/v0/conversation/{conversation_id}/messages?limit=20
|
|
|
+
|
|
|
+响应:
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "data": {
|
|
|
+ "conversation_id": "conv_1703123456_a1b2c3d4",
|
|
|
+ "conversation_meta": {
|
|
|
+ "user_id": "guest_abc123",
|
|
|
+ "created_at": "2024-01-01T10:00:00",
|
|
|
+ "message_count": "6"
|
|
|
+ },
|
|
|
+ "messages": [
|
|
|
+ {
|
|
|
+ "message_id": "msg_001",
|
|
|
+ "timestamp": "2024-01-01T10:00:00",
|
|
|
+ "role": "user",
|
|
|
+ "content": "查询服务区数据"
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "message_id": "msg_002",
|
|
|
+ "timestamp": "2024-01-01T10:00:05",
|
|
|
+ "role": "assistant",
|
|
|
+ "content": "查询结果显示...",
|
|
|
+ "metadata": {
|
|
|
+ "type": "DATABASE",
|
|
|
+ "sql": "SELECT ...",
|
|
|
+ "execution_path": ["classify", "agent_database"]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "message_count": 2
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+## 5. 前端集成示例
|
|
|
+
|
|
|
+### 5.1 简化的对话管理器
|
|
|
+```javascript
|
|
|
+class ConversationManager {
|
|
|
+ constructor() {
|
|
|
+ this.sessionId = this.generateSessionId();
|
|
|
+ this.currentConversationId = null;
|
|
|
+ this.userId = localStorage.getItem('user_id'); // 登录用户ID
|
|
|
+ }
|
|
|
+
|
|
|
+ async ask(question, continueConversation = true) {
|
|
|
+ const payload = {
|
|
|
+ question,
|
|
|
+ session_id: this.sessionId // 保证guest_id稳定
|
|
|
+ };
|
|
|
+
|
|
|
+ // 登录用户传递真实ID
|
|
|
+ if (this.userId) {
|
|
|
+ payload.user_id = this.userId;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 继续当前对话
|
|
|
+ if (continueConversation && this.currentConversationId) {
|
|
|
+ payload.conversation_id = this.currentConversationId;
|
|
|
+ }
|
|
|
+
|
|
|
+ const response = await fetch('/api/v0/ask_agent', {
|
|
|
+ method: 'POST',
|
|
|
+ headers: {'Content-Type': 'application/json'},
|
|
|
+ body: JSON.stringify(payload)
|
|
|
+ });
|
|
|
+
|
|
|
+ const result = await response.json();
|
|
|
+
|
|
|
+ // 记录conversation_id供下次使用
|
|
|
+ if (result.success) {
|
|
|
+ this.currentConversationId = result.data.conversation_id;
|
|
|
+ this.currentUserId = result.data.user_id;
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ async getUserConversations(limit = 5) {
|
|
|
+ if (!this.currentUserId) return [];
|
|
|
+
|
|
|
+ const response = await fetch(`/api/v0/user/${this.currentUserId}/conversations?limit=${limit}`);
|
|
|
+ const result = await response.json();
|
|
|
+
|
|
|
+ return result.success ? result.data.conversations : [];
|
|
|
+ }
|
|
|
+
|
|
|
+ async getConversationMessages(conversationId) {
|
|
|
+ const response = await fetch(`/api/v0/conversation/${conversationId}/messages`);
|
|
|
+ const result = await response.json();
|
|
|
+
|
|
|
+ return result.success ? result.data : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ startNewConversation() {
|
|
|
+ this.currentConversationId = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ generateSessionId() {
|
|
|
+ return 'session_' + Date.now() + '_' + Math.random().toString(36).substring(2);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 使用示例
|
|
|
+const conv = new ConversationManager();
|
|
|
+
|
|
|
+// 第一次提问(自动创建对话)
|
|
|
+const result1 = await conv.ask("查询服务区数据");
|
|
|
+console.log("对话ID:", result1.data.conversation_id);
|
|
|
+
|
|
|
+// 第二次提问(自动继续对话)
|
|
|
+const result2 = await conv.ask("经理都是谁?");
|
|
|
+
|
|
|
+// 查看对话历史
|
|
|
+const conversations = await conv.getUserConversations();
|
|
|
+
|
|
|
+// 查看特定对话详情
|
|
|
+const detail = await conv.getConversationMessages(conversations[0].conversation_id);
|
|
|
+
|
|
|
+// 开始新话题
|
|
|
+conv.startNewConversation();
|
|
|
+const result3 = await conv.ask("查询其他数据");
|
|
|
+```
|
|
|
+
|
|
|
+## 6. 实施步骤和优先级
|
|
|
+
|
|
|
+### 阶段1:基础实施(1-2天)
|
|
|
+**优先级:P0(必须完成)**
|
|
|
+1. ✅ 修改`app_config.py`添加配置
|
|
|
+2. ✅ 创建`common/redis_conversation_manager.py`
|
|
|
+3. ✅ 更新`requirements.txt`
|
|
|
+4. ✅ Redis连接测试
|
|
|
+
|
|
|
+### 阶段2:核心集成(2-3天)
|
|
|
+**优先级:P0(必须完成)**
|
|
|
+1. ✅ 修改`citu_app.py`的ask_agent()函数
|
|
|
+2. ✅ 实现智能ID解析逻辑
|
|
|
+3. ✅ 集成对话上下文功能
|
|
|
+4. ✅ 实现基础缓存功能
|
|
|
+5. ✅ 端到端测试
|
|
|
+
|
|
|
+### 阶段3:管理API(1-2天)
|
|
|
+**优先级:P1(重要)**
|
|
|
+1. ✅ 添加用户对话列表查询API
|
|
|
+2. ✅ 添加对话消息详情查询API
|
|
|
+3. ✅ 添加对话上下文查询API
|
|
|
+4. ✅ 添加统计信息API
|
|
|
+
|
|
|
+### 阶段4:完善优化(1-2天)
|
|
|
+**优先级:P2(可选)**
|
|
|
+1. ✅ 添加清理管理API
|
|
|
+2. ✅ 完善错误处理
|
|
|
+3. ✅ 性能优化
|
|
|
+4. ✅ 文档完善
|
|
|
+
|
|
|
+## 7. 测试验证
|
|
|
+
|
|
|
+### 7.1 单元测试
|
|
|
+```python
|
|
|
+# test_redis_conversation_manager.py
|
|
|
+import unittest
|
|
|
+from common.redis_conversation_manager import RedisConversationManager
|
|
|
+
|
|
|
+class TestRedisConversationManager(unittest.TestCase):
|
|
|
+ def setUp(self):
|
|
|
+ self.manager = RedisConversationManager()
|
|
|
+
|
|
|
+ def test_user_id_resolution(self):
|
|
|
+ # 测试用户ID解析逻辑
|
|
|
+ user_id = self.manager.resolve_user_id(None, "session_123", "127.0.0.1")
|
|
|
+ self.assertTrue(user_id.startswith("guest_"))
|
|
|
+
|
|
|
+ def test_conversation_creation(self):
|
|
|
+ # 测试对话创建
|
|
|
+ conv_id = self.manager.create_conversation("test_user")
|
|
|
+ self.assertTrue(conv_id.startswith("conv_"))
|
|
|
+
|
|
|
+ def test_message_saving(self):
|
|
|
+ # 测试消息保存
|
|
|
+ conv_id = self.manager.create_conversation("test_user")
|
|
|
+ result = self.manager.save_message(conv_id, "user", "test message")
|
|
|
+ self.assertTrue(result)
|
|
|
+```
|
|
|
+
|
|
|
+### 7.2 集成测试
|
|
|
+```python
|
|
|
+# test_ask_agent_integration.py
|
|
|
+import requests
|
|
|
+import json
|
|
|
+
|
|
|
+def test_ask_agent_with_context():
|
|
|
+ # 第一次对话
|
|
|
+ response1 = requests.post('http://localhost:5000/api/v0/ask_agent',
|
|
|
+ json={"question": "查询服务区数据"})
|
|
|
+ result1 = response1.json()
|
|
|
+
|
|
|
+ # 第二次对话(带上下文)
|
|
|
+ response2 = requests.post('http://localhost:5000/api/v0/ask_agent',
|
|
|
+ json={
|
|
|
+ "question": "经理都是谁?",
|
|
|
+ "conversation_id": result1["data"]["conversation_id"]
|
|
|
+ })
|
|
|
+ result2 = response2.json()
|
|
|
+
|
|
|
+ assert result2["data"]["context_used"] == True
|
|
|
+```
|
|
|
+
|
|
|
+## 8. 部署和运维
|
|
|
+
|
|
|
+### 8.1 Redis部署
|
|
|
+```yaml
|
|
|
+# docker-compose.yml
|
|
|
+version: '3'
|
|
|
+services:
|
|
|
+ redis:
|
|
|
+ image: redis:7-alpine
|
|
|
+ ports:
|
|
|
+ - "6379:6379"
|
|
|
+ volumes:
|
|
|
+ - redis_data:/data
|
|
|
+ command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
|
|
|
+
|
|
|
+volumes:
|
|
|
+ redis_data:
|
|
|
+```
|
|
|
+
|
|
|
+## 9. 设计改进建议
|
|
|
+
|
|
|
+### 9.1 无效conversation_id的处理优化
|
|
|
+
|
|
|
+#### 问题描述
|
|
|
+当前设计中,当用户传入无效的 `conversation_id` 时,系统会静默地创建新对话,用户无法得知他们请求的对话不存在或无权访问。
|
|
|
+
|
|
|
+#### 改进方案
|
|
|
+
|
|
|
+1. **修改 `resolve_conversation_id` 方法返回值**
|
|
|
+```python
|
|
|
+def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str],
|
|
|
+ continue_conversation: bool) -> tuple[str, dict]:
|
|
|
+ """
|
|
|
+ 智能解析对话ID - 改进版
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple: (conversation_id, status_info)
|
|
|
+ status_info包含:
|
|
|
+ - status: "existing" | "new" | "invalid_id_new"
|
|
|
+ - message: 状态说明
|
|
|
+ - requested_id: 原始请求的ID(如果有)
|
|
|
+ """
|
|
|
+
|
|
|
+ # 1. 如果指定了conversation_id,验证后使用
|
|
|
+ if conversation_id_input:
|
|
|
+ if self._is_valid_conversation(conversation_id_input, user_id):
|
|
|
+ print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
|
|
|
+ return conversation_id_input, {
|
|
|
+ "status": "existing",
|
|
|
+ "message": "继续已有对话"
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
|
|
|
+ new_conversation_id = self.create_conversation(user_id)
|
|
|
+ return new_conversation_id, {
|
|
|
+ "status": "invalid_id_new",
|
|
|
+ "message": "您请求的对话不存在或无权访问,已为您创建新对话",
|
|
|
+ "requested_id": conversation_id_input
|
|
|
+ }
|
|
|
+
|
|
|
+ # 2. 如果要继续最近对话
|
|
|
+ if continue_conversation:
|
|
|
+ recent_conversation = self._get_recent_conversation(user_id)
|
|
|
+ if recent_conversation:
|
|
|
+ print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
|
|
|
+ return recent_conversation, {
|
|
|
+ "status": "existing",
|
|
|
+ "message": "继续最近对话"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 3. 创建新对话
|
|
|
+ new_conversation_id = self.create_conversation(user_id)
|
|
|
+ print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
|
|
|
+ return new_conversation_id, {
|
|
|
+ "status": "new",
|
|
|
+ "message": "创建新对话"
|
|
|
+ }
|
|
|
+```
|
|
|
+
|
|
|
+2. **修改 `ask_agent` API 响应**
|
|
|
+```python
|
|
|
+# 在ask_agent函数中
|
|
|
+conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
|
|
|
+ user_id, conversation_id_input, continue_conversation
|
|
|
+)
|
|
|
+
|
|
|
+# 在最终返回结果中添加状态信息
|
|
|
+result_data.update({
|
|
|
+ "conversation_id": conversation_id,
|
|
|
+ "user_id": user_id,
|
|
|
+ "is_guest_user": user_id.startswith("guest"),
|
|
|
+ "context_used": bool(context),
|
|
|
+ "from_cache": False,
|
|
|
+ "conversation_status": conversation_status["status"],
|
|
|
+ "conversation_message": conversation_status["message"],
|
|
|
+ "requested_conversation_id": conversation_status.get("requested_id")
|
|
|
+})
|
|
|
+```
|
|
|
+
|
|
|
+3. **API响应示例**
|
|
|
+```json
|
|
|
+// 情况1:请求无效的conversation_id
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "data": {
|
|
|
+ "response": "查询结果...",
|
|
|
+ "conversation_id": "conv_1703123456_new123",
|
|
|
+ "conversation_status": "invalid_id_new",
|
|
|
+ "conversation_message": "您请求的对话不存在或已过期,已为您开启新对话",
|
|
|
+ "requested_conversation_id": "conv_invalid_xyz789",
|
|
|
+ // ... 其他字段
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 情况2:成功继续已有对话
|
|
|
+{
|
|
|
+ "success": true,
|
|
|
+ "data": {
|
|
|
+ "response": "查询结果...",
|
|
|
+ "conversation_id": "conv_1703123456_abc123",
|
|
|
+ "conversation_status": "existing",
|
|
|
+ "conversation_message": "继续已有对话",
|
|
|
+ // ... 其他字段
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+4. **前端处理示例**
|
|
|
+```javascript
|
|
|
+async ask(question, conversationId) {
|
|
|
+ const response = await fetch('/api/v0/ask_agent', {
|
|
|
+ method: 'POST',
|
|
|
+ body: JSON.stringify({
|
|
|
+ question,
|
|
|
+ conversation_id: conversationId
|
|
|
+ })
|
|
|
+ });
|
|
|
+
|
|
|
+ const result = await response.json();
|
|
|
+
|
|
|
+ // 检查对话状态
|
|
|
+ if (result.data.conversation_status === 'invalid_id_new') {
|
|
|
+ // 提示用户
|
|
|
+ this.showNotification(
|
|
|
+ '提示:您请求的对话不存在或已过期,已为您开启新对话',
|
|
|
+ 'warning'
|
|
|
+ );
|
|
|
+ // 更新本地conversation_id
|
|
|
+ this.currentConversationId = result.data.conversation_id;
|
|
|
+ }
|
|
|
+
|
|
|
+ return result;
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 9.2 其他潜在改进
|
|
|
+
|
|
|
+1. **对话权限验证增强**
|
|
|
+ - 添加对话的所有权验证
|
|
|
+ - 支持对话分享功能(生成分享链接)
|
|
|
+ - 支持对话的读/写权限控制
|
|
|
+
|
|
|
+2. **对话状态管理**
|
|
|
+ - 添加对话的活跃/归档状态
|
|
|
+ - 支持手动结束对话
|
|
|
+ - 对话标题和标签功能
|
|
|
+
|
|
|
+3. **性能优化**
|
|
|
+ - 对话列表分页加载
|
|
|
+ - 消息懒加载
|
|
|
+ - 缓存预热机制
|