Redis对话管理系统详细设计与开发文档.md 48 KB

Redis对话管理系统详细设计与开发文档(修正版)

重要更新(2024年):修复了智能缓存的上下文感知问题。原设计中缓存键仅基于conversation_id,无法正确处理同一对话中上下文变化的情况。新设计将缓存键改为基于实际上下文内容的哈希,确保相同问题在不同上下文下能够返回正确的缓存结果。

1. 项目概述与实施目标

1.1 项目背景

基于现有的vanna+LangChain+LangGraph项目,为ask_agent() API增加Redis对话管理功能,实现上下文连续对话、对话历史记录和智能缓存功能。不修改现有ask() API和SessionAwareMemoryCache

1.2 核心功能

  • 智能用户识别:支持登录用户、请求传参用户、匿名guest用户
  • 对话上下文管理:支持多轮连续对话,可配置上下文长度
  • 智能缓存系统:问答结果缓存,避免重复计算
  • RESTful查询API:支持查询用户对话列表和对话详情
  • 容错降级设计:Redis不可用时自动降级

1.3 技术架构

  • Redis存储层:对话数据持久化和TTL自动清理
  • ask_agent() API增强:集成对话上下文和缓存功能
  • 管理API:提供对话查询和统计功能
  • 配置化管理:所有参数可通过app_config.py配置

2. 需要修改的文件详细清单

2.1 配置文件修改

📝 修改文件:app_config.py

位置:文件末尾添加新配置段 修改内容

# ==================== Redis对话管理配置 ====================

# 对话上下文配置
CONVERSATION_CONTEXT_COUNT = 5          # 传递给LLM的上下文消息条数
CONVERSATION_MAX_LENGTH = 20            # 单个对话最大消息数
USER_MAX_CONVERSATIONS = 5              # 用户最大对话数

# 用户管理配置
DEFAULT_ANONYMOUS_USER_PREFIX = "guest" # 匿名用户前缀
GUEST_USER_TTL = 7 * 24 * 3600         # guest用户数据保存7天
MAX_GUEST_CONVERSATIONS = 3             # guest用户最多3个对话
MAX_REGISTERED_CONVERSATIONS = 10       # 注册用户最多10个对话

# Redis配置
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None

# 缓存开关配置
ENABLE_CONVERSATION_CONTEXT = True      # 是否启用对话上下文
ENABLE_QUESTION_ANSWER_CACHE = True     # 是否启用问答结果缓存

# TTL配置(单位:秒)- 修正TTL逻辑
CONVERSATION_TTL = 7 * 24 * 3600        # 对话保存7天
USER_CONVERSATIONS_TTL = 7 * 24 * 3600  # 用户对话列表保存7天(与对话TTL一致)
QUESTION_ANSWER_TTL = 24 * 3600         # 问答结果缓存24小时
GUEST_USER_TTL = 7 * 24 * 3600         # guest用户数据保存7天

📝 修改文件:requirements.txt

修改内容:添加Redis依赖

# 在现有依赖基础上添加
redis==5.0.1

2.2 核心组件开发

🆕 新增文件:common/redis_conversation_manager.py

功能:Redis对话管理器核心类(修正版) 完整代码实现

import redis
import json
import hashlib
import uuid
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from app_config import (
    REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD,
    CONVERSATION_CONTEXT_COUNT, CONVERSATION_MAX_LENGTH, USER_MAX_CONVERSATIONS,
    CONVERSATION_TTL, USER_CONVERSATIONS_TTL, QUESTION_ANSWER_TTL,
    ENABLE_CONVERSATION_CONTEXT, ENABLE_QUESTION_ANSWER_CACHE,
    DEFAULT_ANONYMOUS_USER_PREFIX, MAX_GUEST_CONVERSATIONS, MAX_REGISTERED_CONVERSATIONS,
    GUEST_USER_TTL
)

class RedisConversationManager:
    """Redis对话管理器 - 修正版"""
    
    def __init__(self):
        """初始化Redis连接"""
        try:
            self.redis_client = redis.Redis(
                host=REDIS_HOST,
                port=REDIS_PORT,
                db=REDIS_DB,
                password=REDIS_PASSWORD,
                decode_responses=True,
                socket_connect_timeout=5,
                socket_timeout=5
            )
            # 测试连接
            self.redis_client.ping()
            print(f"[REDIS_CONV] Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
        except Exception as e:
            print(f"[ERROR] Redis连接失败: {str(e)}")
            self.redis_client = None
    
    def is_available(self) -> bool:
        """检查Redis是否可用"""
        try:
            return self.redis_client is not None and self.redis_client.ping()
        except:
            return False
    
    # ==================== 用户ID解析(修正版)====================
    
    def resolve_user_id(self, user_id_from_request: Optional[str], 
                       session_id: Optional[str], request_ip: str,
                       login_user_id: Optional[str] = None) -> str:
        """
        智能解析用户ID - 修正版
        
        Args:
            user_id_from_request: 请求参数中的user_id
            session_id: 浏览器session_id
            request_ip: 请求IP地址
            login_user_id: 从Flask session中获取的登录用户ID(在ask_agent中获取)
        """
        
        # 1. 优先使用登录用户ID
        if login_user_id:
            print(f"[REDIS_CONV] 使用登录用户ID: {login_user_id}")
            return login_user_id
        
        # 2. 如果没有登录,尝试从请求参数获取user_id
        if user_id_from_request:
            print(f"[REDIS_CONV] 使用请求参数user_id: {user_id_from_request}")
            return user_id_from_request
        
        # 3. 都没有则为匿名用户(guest)
        if session_id:
            guest_suffix = hashlib.md5(session_id.encode()).hexdigest()[:8]
            guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_{guest_suffix}"
            print(f"[REDIS_CONV] 生成稳定guest用户: {guest_id}")
            return guest_id
        
        # 4. 最后基于IP的临时guest ID
        ip_suffix = hashlib.md5(request_ip.encode()).hexdigest()[:8]
        temp_guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_temp_{ip_suffix}"
        print(f"[REDIS_CONV] 生成临时guest用户: {temp_guest_id}")
        return temp_guest_id
    
    def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str], 
                              continue_conversation: bool) -> tuple[str, dict]:
        """
        智能解析对话ID - 改进版
        
        Returns:
            tuple: (conversation_id, status_info)
            status_info包含:
            - status: "existing" | "new" | "invalid_id_new"
            - message: 状态说明
            - requested_id: 原始请求的ID(如果有)
        """
        
        # 1. 如果指定了conversation_id,验证后使用
        if conversation_id_input:
            if self._is_valid_conversation(conversation_id_input, user_id):
                print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
                return conversation_id_input, {
                    "status": "existing",
                    "message": "继续已有对话"
                }
            else:
                print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
                new_conversation_id = self.create_conversation(user_id)
                return new_conversation_id, {
                    "status": "invalid_id_new",
                    "message": "您请求的对话不存在或无权访问,已为您创建新对话",
                    "requested_id": conversation_id_input
                }
        
        # 2. 如果要继续最近对话
        if continue_conversation:
            recent_conversation = self._get_recent_conversation(user_id)
            if recent_conversation:
                print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
                return recent_conversation, {
                    "status": "existing",
                    "message": "继续最近对话"
                }
        
        # 3. 创建新对话
        new_conversation_id = self.create_conversation(user_id)
        print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
        return new_conversation_id, {
            "status": "new",
            "message": "创建新对话"
        }
    
    def _is_valid_conversation(self, conversation_id: str, user_id: str) -> bool:
        """验证对话是否存在且属于该用户"""
        if not self.is_available():
            return False
        
        try:
            # 检查对话元信息是否存在
            meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
            if not meta_data:
                return False
            
            # 检查是否属于该用户
            return meta_data.get('user_id') == user_id
            
        except Exception:
            return False
    
    def _get_recent_conversation(self, user_id: str) -> Optional[str]:
        """获取用户最近的对话ID"""
        if not self.is_available():
            return None
        
        try:
            conversations = self.redis_client.lrange(
                f"user:{user_id}:conversations", 0, 0
            )
            return conversations[0] if conversations else None
        except Exception:
            return None
    
    # ==================== 对话管理 ====================
    
    def create_conversation(self, user_id: str) -> str:
        """创建新对话"""
        # 生成包含时间戳的conversation_id
        timestamp = int(datetime.now().timestamp())
        conversation_id = f"conv_{timestamp}_{uuid.uuid4().hex[:8]}"
        
        if not self.is_available():
            return conversation_id  # Redis不可用时返回ID,但不存储
        
        try:
            # 创建对话元信息
            meta_data = {
                "conversation_id": conversation_id,
                "user_id": user_id,
                "created_at": datetime.now().isoformat(),
                "updated_at": datetime.now().isoformat(),
                "message_count": "0"
            }
            
            # 保存对话元信息
            self.redis_client.hset(
                f"conversation:{conversation_id}:meta",
                mapping=meta_data
            )
            self.redis_client.expire(f"conversation:{conversation_id}:meta", CONVERSATION_TTL)
            
            # 添加到用户的对话列表
            self._add_conversation_to_user(user_id, conversation_id)
            
            print(f"[REDIS_CONV] 创建对话成功: {conversation_id}")
            return conversation_id
            
        except Exception as e:
            print(f"[ERROR] 创建对话失败: {str(e)}")
            return conversation_id  # 返回ID但可能未存储
    
    def save_message(self, conversation_id: str, role: str, content: str, 
                    metadata: Optional[Dict] = None) -> bool:
        """保存消息到对话历史"""
        if not self.is_available() or not conversation_id:
            return False
        
        try:
            message_data = {
                "message_id": str(uuid.uuid4()),
                "timestamp": datetime.now().isoformat(),
                "role": role,  # user, assistant
                "content": content,
                "metadata": metadata or {}
            }
            
            # 保存到消息列表(LPUSH添加到头部,最新消息在前)
            self.redis_client.lpush(
                f"conversation:{conversation_id}:messages",
                json.dumps(message_data)
            )
            
            # 设置TTL
            self.redis_client.expire(f"conversation:{conversation_id}:messages", CONVERSATION_TTL)
            
            # 限制消息数量
            self.redis_client.ltrim(
                f"conversation:{conversation_id}:messages",
                0, CONVERSATION_MAX_LENGTH - 1
            )
            
            # 更新元信息
            self._update_conversation_meta(conversation_id)
            
            return True
            
        except Exception as e:
            print(f"[ERROR] 保存消息失败: {str(e)}")
            return False
    
    def get_context(self, conversation_id: str, count: Optional[int] = None) -> str:
        """获取对话上下文,格式化为prompt"""
        if not self.is_available() or not ENABLE_CONVERSATION_CONTEXT:
            return ""
        
        try:
            if count is None:
                count = CONVERSATION_CONTEXT_COUNT
            
            # 获取最近的消息(count*2 因为包含用户和助手消息)
            message_count = count * 2
            messages = self.redis_client.lrange(
                f"conversation:{conversation_id}:messages",
                0, message_count - 1
            )
            
            if not messages:
                return ""
            
            # 解析消息并构建上下文(按时间正序)
            context_parts = []
            for msg_json in reversed(messages):  # Redis返回倒序,需要反转
                try:
                    msg_data = json.loads(msg_json)
                    role = msg_data.get("role", "")
                    content = msg_data.get("content", "")
                    
                    if role == "user":
                        context_parts.append(f"用户: {content}")
                    elif role == "assistant":
                        context_parts.append(f"助手: {content}")
                        
                except json.JSONDecodeError:
                    continue
            
            context = "\n".join(context_parts)
            print(f"[REDIS_CONV] 获取上下文成功: {len(context_parts)}条消息")
            return context
            
        except Exception as e:
            print(f"[ERROR] 获取上下文失败: {str(e)}")
            return ""
    
    def get_conversation_messages(self, conversation_id: str, limit: Optional[int] = None) -> List[Dict]:
        """获取对话的消息列表"""
        if not self.is_available():
            return []
        
        try:
            if limit:
                messages = self.redis_client.lrange(
                    f"conversation:{conversation_id}:messages", 0, limit - 1
                )
            else:
                messages = self.redis_client.lrange(
                    f"conversation:{conversation_id}:messages", 0, -1
                )
            
            # 解析并按时间正序返回
            parsed_messages = []
            for msg_json in reversed(messages):  # 反转为时间正序
                try:
                    parsed_messages.append(json.loads(msg_json))
                except json.JSONDecodeError:
                    continue
                    
            return parsed_messages
            
        except Exception as e:
            print(f"[ERROR] 获取对话消息失败: {str(e)}")
            return []
    
    def get_conversation_meta(self, conversation_id: str) -> Dict:
        """获取对话元信息"""
        if not self.is_available():
            return {}
        
        try:
            meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
            return meta_data if meta_data else {}
        except Exception as e:
            print(f"[ERROR] 获取对话元信息失败: {str(e)}")
            return {}
    
    def get_conversations(self, user_id: str, limit: int = None) -> List[Dict]:
        """获取用户的对话列表(按时间倒序)"""
        if not self.is_available():
            return []
        
        if limit is None:
            limit = USER_MAX_CONVERSATIONS
        
        try:
            # 获取对话ID列表(已经按时间倒序)
            conversation_ids = self.redis_client.lrange(
                f"user:{user_id}:conversations", 0, limit - 1
            )
            
            conversations = []
            for conv_id in conversation_ids:
                meta_data = self.get_conversation_meta(conv_id)
                if meta_data:  # 只返回仍然存在的对话
                    conversations.append(meta_data)
            
            return conversations
            
        except Exception as e:
            print(f"[ERROR] 获取用户对话列表失败: {str(e)}")
            return []
    
    # ==================== 智能缓存(修正版)====================
    
    def get_cached_answer(self, question: str, context: str = "") -> Optional[Dict]:
        """检查问答缓存 - 真正上下文感知版"""
        if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
            return None
        
        try:
            cache_key = self._get_cache_key(question, context)
            cached_answer = self.redis_client.get(cache_key)  # 使用独立key而不是hash
            
            if cached_answer:
                print(f"[REDIS_CONV] 缓存命中: {cache_key}")
                return json.loads(cached_answer)
            
            return None
            
        except Exception as e:
            print(f"[ERROR] 获取缓存答案失败: {str(e)}")
            return None
    
    def cache_answer(self, question: str, answer: Dict, context: str = ""):
        """缓存问答结果 - 真正上下文感知版"""
        if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
            return
        
        try:
            cache_key = self._get_cache_key(question, context)
            
            # 添加缓存时间戳和上下文哈希
            answer_with_meta = {
                **answer,
                "cached_at": datetime.now().isoformat(),
                "original_question": question,
                "context_hash": hashlib.md5(context.encode()).hexdigest()[:8] if context else ""
            }
            
            # 使用独立key,每个缓存项单独设置TTL
            self.redis_client.setex(
                cache_key, 
                QUESTION_ANSWER_TTL,
                json.dumps(answer_with_meta)
            )
            
            print(f"[REDIS_CONV] 缓存答案成功: {cache_key}")
            
        except Exception as e:
            print(f"[ERROR] 缓存答案失败: {str(e)}")
    
    def _get_cache_key(self, question: str, context: str = "") -> str:
        """生成真正包含上下文的缓存键"""
        if context and ENABLE_CONVERSATION_CONTEXT:
            # 使用上下文内容而不是conversation_id
            cache_input = f"context:{context}\nquestion:{question}"
        else:
            cache_input = question
        
        normalized = cache_input.strip().lower()
        question_hash = hashlib.md5(normalized.encode('utf-8')).hexdigest()[:16]
        return f"qa_cache:{question_hash}"
    
    # ==================== 私有方法 ====================
    
    def _add_conversation_to_user(self, user_id: str, conversation_id: str):
        """添加对话到用户列表,按时间自动排序"""
        try:
            # 获取用户类型配置
            config = self._get_user_type_config(user_id)
            
            # LPUSH添加到列表头部(最新的)
            self.redis_client.lpush(f"user:{user_id}:conversations", conversation_id)
            
            # 根据用户类型限制数量
            self.redis_client.ltrim(
                f"user:{user_id}:conversations", 
                0, config["max_conversations"] - 1
            )
            
            # 设置TTL
            self.redis_client.expire(
                f"user:{user_id}:conversations", 
                config["ttl"]
            )
            
        except Exception as e:
            print(f"[ERROR] 添加对话到用户列表失败: {str(e)}")
    
    def _get_user_type_config(self, user_id: str) -> Dict:
        """根据用户类型返回不同的配置 - 修正版"""
        if user_id.startswith(DEFAULT_ANONYMOUS_USER_PREFIX):
            return {
                "max_conversations": MAX_GUEST_CONVERSATIONS,
                "ttl": GUEST_USER_TTL  # 使用专门的guest TTL
            }
        else:
            return {
                "max_conversations": MAX_REGISTERED_CONVERSATIONS,
                "ttl": USER_CONVERSATIONS_TTL
            }
    
    def _update_conversation_meta(self, conversation_id: str):
        """更新对话元信息"""
        try:
            # 获取消息数量
            message_count = self.redis_client.llen(f"conversation:{conversation_id}:messages")
            
            # 更新元信息
            self.redis_client.hset(
                f"conversation:{conversation_id}:meta",
                mapping={
                    "updated_at": datetime.now().isoformat(),
                    "message_count": str(message_count)
                }
            )
            
        except Exception as e:
            print(f"[ERROR] 更新对话元信息失败: {str(e)}")
    
    # ==================== 管理方法 ====================
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        if not self.is_available():
            return {"available": False}
        
        try:
            stats = {
                "available": True,
                "total_users": len(self.redis_client.keys("user:*:conversations")),
                "total_conversations": len(self.redis_client.keys("conversation:*:meta")),
                "cached_qa_count": len(self.redis_client.keys("qa_cache:*")),  # 修正缓存统计
                "redis_info": {
                    "used_memory": self.redis_client.info().get("used_memory_human"),
                    "connected_clients": self.redis_client.info().get("connected_clients")
                }
            }
            
            return stats
            
        except Exception as e:
            print(f"[ERROR] 获取统计信息失败: {str(e)}")
            return {"available": False, "error": str(e)}
    
    def cleanup_expired_conversations(self):
        """清理过期对话(Redis TTL自动处理,这里可添加额外逻辑)"""
        if not self.is_available():
            return
        
        try:
            # 清理用户对话列表中的无效对话ID
            user_keys = self.redis_client.keys("user:*:conversations")
            cleaned_count = 0
            
            for user_key in user_keys:
                conversation_ids = self.redis_client.lrange(user_key, 0, -1)
                valid_ids = []
                
                for conv_id in conversation_ids:
                    # 检查对话是否仍然存在
                    if self.redis_client.exists(f"conversation:{conv_id}:meta"):
                        valid_ids.append(conv_id)
                    else:
                        cleaned_count += 1
                
                # 如果有无效ID,重建列表
                if len(valid_ids) != len(conversation_ids):
                    self.redis_client.delete(user_key)
                    if valid_ids:
                        self.redis_client.lpush(user_key, *reversed(valid_ids))
                        # 重新设置TTL
                        self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
            
            print(f"[REDIS_CONV] 清理完成,移除了 {cleaned_count} 个无效对话引用")
            
        except Exception as e:
            print(f"[ERROR] 清理失败: {str(e)}")

2.3 主要API修改

📝 修改文件:citu_app.py

修改位置1:文件开头导入部分

# 在现有导入基础上添加(文件顶部,避免函数内导入)
from flask import session
from common.redis_conversation_manager import RedisConversationManager
from common.result import (
    bad_request_response, service_unavailable_response, 
    agent_success_response, agent_error_response,
    internal_error_response, success_response
)

# 在全局变量区域添加(app实例化后)
redis_conversation_manager = RedisConversationManager()

修改位置2:ask_agent()函数(修正版)

@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    """
    支持对话上下文的ask_agent API - 修正版
    """
    req = request.get_json(force=True)
    question = req.get("question", None)
    browser_session_id = req.get("session_id", None)
    
    # 新增参数解析
    user_id_input = req.get("user_id", None)
    conversation_id_input = req.get("conversation_id", None)
    continue_conversation = req.get("continue_conversation", False)
    
    if not question:
        return jsonify(bad_request_response(
            response_text="缺少必需参数:question",
            missing_params=["question"]
        )), 400

    try:
        # 1. 获取登录用户ID(修正:在函数中获取session信息)
        login_user_id = session.get('user_id') if 'user_id' in session else None
        
        # 2. 智能ID解析(修正:传入登录用户ID)
        user_id = redis_conversation_manager.resolve_user_id(
            user_id_input, browser_session_id, request.remote_addr, login_user_id
        )
        conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
            user_id, conversation_id_input, continue_conversation
        )
        
        # 3. 获取上下文(提前到缓存检查之前)
        context = redis_conversation_manager.get_context(conversation_id)
        
        # 4. 检查缓存(修正:传入context以实现真正的上下文感知)
        cached_answer = redis_conversation_manager.get_cached_answer(question, context)
        if cached_answer:
            print(f"[AGENT_API] 使用缓存答案")
            
            # 更新对话历史
            redis_conversation_manager.save_message(conversation_id, "user", question)
            redis_conversation_manager.save_message(
                conversation_id, "assistant", 
                cached_answer.get("data", {}).get("response", ""),
                metadata={"from_cache": True}
            )
            
            # 添加对话信息到缓存结果
            cached_answer["data"]["conversation_id"] = conversation_id
            cached_answer["data"]["user_id"] = user_id
            cached_answer["data"]["from_cache"] = True
            cached_answer["data"].update(conversation_status)
            
            return jsonify(cached_answer)
        
        # 5. 保存用户消息
        redis_conversation_manager.save_message(conversation_id, "user", question)
        
        # 6. 构建带上下文的问题
        if context:
            enhanced_question = f"对话历史:\n{context}\n\n当前问题: {question}"
            print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
        else:
            enhanced_question = question
            print(f"[AGENT_API] 新对话,无上下文")
        
        # 7. 现有Agent处理逻辑(保持不变)
        try:
            agent = get_citu_langraph_agent()
        except Exception as e:
            print(f"[CRITICAL] Agent初始化失败: {str(e)}")
            return jsonify(service_unavailable_response(
                response_text="AI服务暂时不可用,请稍后重试",
                can_retry=True
            )), 503
        
        agent_result = agent.process_question(
            question=enhanced_question,  # 使用增强后的问题
            session_id=browser_session_id
        )
        
        # 8. 处理Agent结果
        if agent_result.get("success", False):
            assistant_response = agent_result.get("data", {}).get("response", "")
            
            # 保存助手回复
            redis_conversation_manager.save_message(
                conversation_id, "assistant", assistant_response,
                metadata={
                    "type": agent_result.get("data", {}).get("type"),
                    "sql": agent_result.get("data", {}).get("sql"),
                    "execution_path": agent_result.get("data", {}).get("execution_path")
                }
            )
            
            # 缓存成功的答案(修正:传入context实现真正的上下文感知)
            cache_data = {
                **agent_result,
                "conversation_id": conversation_id,
                "user_id": user_id,
                "from_cache": False
            }
            redis_conversation_manager.cache_answer(question, cache_data, context)
            
            # 构建返回数据(修正:使用现有的agent_success_response)
            result_data = agent_result.get("data", {})
            result_data.update({
                "conversation_id": conversation_id,
                "user_id": user_id,
                "is_guest_user": user_id.startswith("guest"),
                "context_used": bool(context),
                "from_cache": False,
                "conversation_status": conversation_status["status"],
                "conversation_message": conversation_status["message"],
                "requested_conversation_id": conversation_status.get("requested_id")
            })
            
            return jsonify(agent_success_response(
                response_type=result_data.get("type", "UNKNOWN"),
                response_text=result_data.get("response", ""),
                data=result_data
            ))
        else:
            # 错误处理(修正:确保使用现有的错误响应格式)
            error_data = {
                "response": agent_result.get("error", "Agent处理失败"),
                "conversation_id": conversation_id,
                "user_id": user_id,
                "error_type": "agent_processing_failed"
            }
            
            return jsonify({
                "success": False,
                "code": agent_result.get("error_code", 500),
                "message": "处理失败",
                "data": error_data
            }), agent_result.get("error_code", 500)
        
    except Exception as e:
        print(f"[ERROR] ask_agent执行失败: {str(e)}")
        return jsonify(internal_error_response(
            response_text="查询处理失败,请稍后重试"
        )), 500

修改位置3:新增管理API

# 在文件末尾添加新的管理API

@app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id: str):
    """获取用户的对话列表(按时间倒序)"""
    try:
        limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
        conversations = redis_conversation_manager.get_conversations(user_id, limit)
        
        return jsonify(success_response(
            response_text="获取用户对话列表成功",
            data={
                "user_id": user_id,
                "conversations": conversations,
                "total_count": len(conversations)
            }
        ))
        
    except Exception as e:
        return jsonify(internal_error_response(
            response_text="获取对话列表失败,请稍后重试"
        )), 500

@app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
def get_conversation_messages(conversation_id: str):
    """获取特定对话的消息历史"""
    try:
        limit = request.args.get('limit', type=int)  # 可选参数
        messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
        meta = redis_conversation_manager.get_conversation_meta(conversation_id)
        
        return jsonify(success_response(
            response_text="获取对话消息成功",
            data={
                "conversation_id": conversation_id,
                "conversation_meta": meta,
                "messages": messages,
                "message_count": len(messages)
            }
        ))
        
    except Exception as e:
        return jsonify(internal_error_response(
            response_text="获取对话消息失败"
        )), 500

@app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])  
def get_conversation_context(conversation_id: str):
    """获取对话上下文(格式化用于LLM)"""
    try:
        count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
        context = redis_conversation_manager.get_context(conversation_id, count)
        
        return jsonify(success_response(
            response_text="获取对话上下文成功",
            data={
                "conversation_id": conversation_id,
                "context": context,
                "context_message_count": count
            }
        ))
        
    except Exception as e:
        return jsonify(internal_error_response(
            response_text="获取对话上下文失败"
        )), 500

@app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
def conversation_stats():
    """获取对话系统统计信息"""
    try:
        stats = redis_conversation_manager.get_stats()
        
        return jsonify(success_response(
            response_text="获取统计信息成功",
            data=stats
        ))
        
    except Exception as e:
        return jsonify(internal_error_response(
            response_text="获取统计信息失败,请稍后重试"
        )), 500

@app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
def conversation_cleanup():
    """手动清理过期对话"""
    try:
        redis_conversation_manager.cleanup_expired_conversations()
        
        return jsonify(success_response(
            response_text="对话清理完成"
        ))
        
    except Exception as e:
        return jsonify(internal_error_response(
            response_text="对话清理失败,请稍后重试"
        )), 500

3. Redis数据结构设计

3.1 数据存储格式

# 用户对话列表(按时间倒序,最新在前)
user:{user_id}:conversations → LIST [
  "conv_1703123456_a1b2c3d4",
  "conv_1703120000_x1y2z3w4"
]
TTL: 7天(与对话TTL保持一致)

# 对话消息历史(按时间倒序,最新在前)
conversation:{conv_id}:messages → LIST [
  "{\"message_id\":\"msg_002\",\"timestamp\":\"2024-01-01T10:00:05\",\"role\":\"assistant\",\"content\":\"查询结果...\",\"metadata\":{\"type\":\"DATABASE\"}}",
  "{\"message_id\":\"msg_001\",\"timestamp\":\"2024-01-01T10:00:00\",\"role\":\"user\",\"content\":\"查询销售数据\",\"metadata\":{}}"
]
TTL: 7天

# 对话元信息
conversation:{conv_id}:meta → HASH {
  "conversation_id": "conv_1703123456_a1b2c3d4",
  "user_id": "guest_abc123",
  "created_at": "2024-01-01T10:00:00",
  "updated_at": "2024-01-01T10:05:00",
  "message_count": "4"
}
TTL: 7天

# 问答结果缓存(真正上下文感知版)
qa_cache:{context_question_hash} → STRING {
  "success": true,
  "data": {...},
  "cached_at": "2024-01-01T10:00:00",
  "original_question": "查询销售数据",
  "context_hash": "a1b2c3d4"  // 上下文内容的哈希值
}
TTL: 24小时(每个缓存项独立设置)
注意:缓存键基于上下文内容和问题的组合哈希,而非conversation_id

4. API接口设计

4.1 ask_agent() API(增强版)

请求参数

{
    "question": "请问当前系统中每个高速服务区的经理是谁?",  // 必需
    "session_id": "test_session_001",                     // 可选,用于生成稳定guest_id
    "user_id": "john_doe",                               // 可选,优先级低于登录session
    "conversation_id": "conv_1703123456_a1b2c3d4",       // 可选,继续特定对话
    "continue_conversation": true                         // 可选,继续最近对话
}

响应格式

{
    "success": true,
    "code": 200,
    "message": "操作成功",
    "data": {
        "response": "查询结果...",
        "type": "DATABASE",
        "sql": "SELECT ...",
        "query_result": {
            "rows": [...],
            "columns": [...],
            "row_count": 10
        },
        "summary": "查询结果显示...",
        
        // 新增字段
        "conversation_id": "conv_1703123456_a1b2c3d4",
        "user_id": "guest_abc123",
        "is_guest_user": true,
        "context_used": false,
        "from_cache": false,
        "conversation_status": "existing",
        "conversation_message": "继续已有对话",
        "requested_conversation_id": null,
        
        "execution_path": ["classify", "agent_database", "format_response"],
        "classification_info": {
            "confidence": 0.95,
            "reason": "匹配数据库关键词",
            "method": "rule_based"
        }
    }
}

4.2 查询API设计

获取用户对话列表

GET /api/v0/user/{user_id}/conversations?limit=10

响应:
{
    "success": true,
    "data": {
        "user_id": "guest_abc123",
        "conversations": [
            {
                "conversation_id": "conv_1703123456_a1b2c3d4",
                "created_at": "2024-01-01T10:00:00",
                "updated_at": "2024-01-01T10:05:00",
                "message_count": "6"
            }
        ],
        "total_count": 1
    }
}

获取对话消息详情

GET /api/v0/conversation/{conversation_id}/messages?limit=20

响应:
{
    "success": true,
    "data": {
        "conversation_id": "conv_1703123456_a1b2c3d4",
        "conversation_meta": {
            "user_id": "guest_abc123",
            "created_at": "2024-01-01T10:00:00",
            "message_count": "6"
        },
        "messages": [
            {
                "message_id": "msg_001",
                "timestamp": "2024-01-01T10:00:00",
                "role": "user",
                "content": "查询服务区数据"
            },
            {
                "message_id": "msg_002",
                "timestamp": "2024-01-01T10:00:05",
                "role": "assistant", 
                "content": "查询结果显示...",
                "metadata": {
                    "type": "DATABASE",
                    "sql": "SELECT ...",
                    "execution_path": ["classify", "agent_database"]
                }
            }
        ],
        "message_count": 2
    }
}

5. 前端集成示例

5.1 简化的对话管理器

class ConversationManager {
    constructor() {
        this.sessionId = this.generateSessionId();
        this.currentConversationId = null;
        this.userId = localStorage.getItem('user_id'); // 登录用户ID
    }
    
    async ask(question, continueConversation = true) {
        const payload = { 
            question,
            session_id: this.sessionId  // 保证guest_id稳定
        };
        
        // 登录用户传递真实ID
        if (this.userId) {
            payload.user_id = this.userId;
        }
        
        // 继续当前对话
        if (continueConversation && this.currentConversationId) {
            payload.conversation_id = this.currentConversationId;
        }
        
        const response = await fetch('/api/v0/ask_agent', {
            method: 'POST',
            headers: {'Content-Type': 'application/json'},
            body: JSON.stringify(payload)
        });
        
        const result = await response.json();
        
        // 记录conversation_id供下次使用
        if (result.success) {
            this.currentConversationId = result.data.conversation_id;
            this.currentUserId = result.data.user_id;
        }
        
        return result;
    }
    
    async getUserConversations(limit = 5) {
        if (!this.currentUserId) return [];
        
        const response = await fetch(`/api/v0/user/${this.currentUserId}/conversations?limit=${limit}`);
        const result = await response.json();
        
        return result.success ? result.data.conversations : [];
    }
    
    async getConversationMessages(conversationId) {
        const response = await fetch(`/api/v0/conversation/${conversationId}/messages`);
        const result = await response.json();
        
        return result.success ? result.data : null;
    }
    
    startNewConversation() {
        this.currentConversationId = null;
    }
    
    generateSessionId() {
        return 'session_' + Date.now() + '_' + Math.random().toString(36).substring(2);
    }
}

// 使用示例
const conv = new ConversationManager();

// 第一次提问(自动创建对话)
const result1 = await conv.ask("查询服务区数据");
console.log("对话ID:", result1.data.conversation_id);

// 第二次提问(自动继续对话)  
const result2 = await conv.ask("经理都是谁?");

// 查看对话历史
const conversations = await conv.getUserConversations();

// 查看特定对话详情
const detail = await conv.getConversationMessages(conversations[0].conversation_id);

// 开始新话题
conv.startNewConversation();
const result3 = await conv.ask("查询其他数据");

6. 实施步骤和优先级

阶段1:基础实施(1-2天)

优先级:P0(必须完成)

  1. ✅ 修改app_config.py添加配置
  2. ✅ 创建common/redis_conversation_manager.py
  3. ✅ 更新requirements.txt
  4. ✅ Redis连接测试

阶段2:核心集成(2-3天)

优先级:P0(必须完成)

  1. ✅ 修改citu_app.py的ask_agent()函数
  2. ✅ 实现智能ID解析逻辑
  3. ✅ 集成对话上下文功能
  4. ✅ 实现基础缓存功能
  5. ✅ 端到端测试

阶段3:管理API(1-2天)

优先级:P1(重要)

  1. ✅ 添加用户对话列表查询API
  2. ✅ 添加对话消息详情查询API
  3. ✅ 添加对话上下文查询API
  4. ✅ 添加统计信息API

阶段4:完善优化(1-2天)

优先级:P2(可选)

  1. ✅ 添加清理管理API
  2. ✅ 完善错误处理
  3. ✅ 性能优化
  4. ✅ 文档完善

7. 测试验证

7.1 单元测试

# test_redis_conversation_manager.py
import unittest
from common.redis_conversation_manager import RedisConversationManager

class TestRedisConversationManager(unittest.TestCase):
    def setUp(self):
        self.manager = RedisConversationManager()
    
    def test_user_id_resolution(self):
        # 测试用户ID解析逻辑
        user_id = self.manager.resolve_user_id(None, "session_123", "127.0.0.1")
        self.assertTrue(user_id.startswith("guest_"))
    
    def test_conversation_creation(self):
        # 测试对话创建
        conv_id = self.manager.create_conversation("test_user")
        self.assertTrue(conv_id.startswith("conv_"))
    
    def test_message_saving(self):
        # 测试消息保存
        conv_id = self.manager.create_conversation("test_user")
        result = self.manager.save_message(conv_id, "user", "test message")
        self.assertTrue(result)

7.2 集成测试

# test_ask_agent_integration.py
import requests
import json

def test_ask_agent_with_context():
    # 第一次对话
    response1 = requests.post('http://localhost:5000/api/v0/ask_agent', 
        json={"question": "查询服务区数据"})
    result1 = response1.json()
    
    # 第二次对话(带上下文)
    response2 = requests.post('http://localhost:5000/api/v0/ask_agent',
        json={
            "question": "经理都是谁?",
            "conversation_id": result1["data"]["conversation_id"]
        })
    result2 = response2.json()
    
    assert result2["data"]["context_used"] == True

8. 部署和运维

8.1 Redis部署

# docker-compose.yml
version: '3'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    
volumes:
  redis_data:

9. 设计改进建议

9.1 无效conversation_id的处理优化

问题描述

当前设计中,当用户传入无效的 conversation_id 时,系统会静默地创建新对话,用户无法得知他们请求的对话不存在或无权访问。

改进方案

  1. 修改 resolve_conversation_id 方法返回值

    def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str], 
                          continue_conversation: bool) -> tuple[str, dict]:
    """
    智能解析对话ID - 改进版
        
    Returns:
        tuple: (conversation_id, status_info)
        status_info包含:
        - status: "existing" | "new" | "invalid_id_new"
        - message: 状态说明
        - requested_id: 原始请求的ID(如果有)
    """
        
    # 1. 如果指定了conversation_id,验证后使用
    if conversation_id_input:
        if self._is_valid_conversation(conversation_id_input, user_id):
            print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
            return conversation_id_input, {
                "status": "existing",
                "message": "继续已有对话"
            }
        else:
            print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
            new_conversation_id = self.create_conversation(user_id)
            return new_conversation_id, {
                "status": "invalid_id_new",
                "message": "您请求的对话不存在或无权访问,已为您创建新对话",
                "requested_id": conversation_id_input
            }
        
    # 2. 如果要继续最近对话
    if continue_conversation:
        recent_conversation = self._get_recent_conversation(user_id)
        if recent_conversation:
            print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
            return recent_conversation, {
                "status": "existing",
                "message": "继续最近对话"
            }
        
    # 3. 创建新对话
    new_conversation_id = self.create_conversation(user_id)
    print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
    return new_conversation_id, {
        "status": "new",
        "message": "创建新对话"
    }
    
    1. 修改 ask_agent API 响应 ```python

      在ask_agent函数中

      conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id( user_id, conversation_id_input, continue_conversation )

    在最终返回结果中添加状态信息

    result_data.update({ "conversation_id": conversation_id, "user_id": user_id, "is_guest_user": user_id.startswith("guest"), "context_used": bool(context), "from_cache": False, "conversation_status": conversation_status["status"], "conversation_message": conversation_status["message"], "requested_conversation_id": conversation_status.get("requested_id") }) ```

  2. API响应示例 ```json // 情况1:请求无效的conversation_id { "success": true, "data": {

    "response": "查询结果...",
    "conversation_id": "conv_1703123456_new123",
    "conversation_status": "invalid_id_new",
    "conversation_message": "您请求的对话不存在或已过期,已为您开启新对话",
    "requested_conversation_id": "conv_invalid_xyz789",
    // ... 其他字段
    

    } }

// 情况2:成功继续已有对话 {

"success": true,
"data": {
    "response": "查询结果...",
    "conversation_id": "conv_1703123456_abc123",
    "conversation_status": "existing",
    "conversation_message": "继续已有对话",
    // ... 其他字段
}

}


4. **前端处理示例**
```javascript
async ask(question, conversationId) {
    const response = await fetch('/api/v0/ask_agent', {
        method: 'POST',
        body: JSON.stringify({
            question,
            conversation_id: conversationId
        })
    });
    
    const result = await response.json();
    
    // 检查对话状态
    if (result.data.conversation_status === 'invalid_id_new') {
        // 提示用户
        this.showNotification(
            '提示:您请求的对话不存在或已过期,已为您开启新对话',
            'warning'
        );
        // 更新本地conversation_id
        this.currentConversationId = result.data.conversation_id;
    }
    
    return result;
}

9.2 其他潜在改进

  1. 对话权限验证增强

    • 添加对话的所有权验证
    • 支持对话分享功能(生成分享链接)
    • 支持对话的读/写权限控制
  2. 对话状态管理

    • 添加对话的活跃/归档状态
    • 支持手动结束对话
    • 对话标题和标签功能
  3. 性能优化

    • 对话列表分页加载
    • 消息懒加载
    • 缓存预热机制