重要更新(2024年):修复了智能缓存的上下文感知问题。原设计中缓存键仅基于conversation_id,无法正确处理同一对话中上下文变化的情况。新设计将缓存键改为基于实际上下文内容的哈希,确保相同问题在不同上下文下能够返回正确的缓存结果。
基于现有的vanna+LangChain+LangGraph项目,为ask_agent() API增加Redis对话管理功能,实现上下文连续对话、对话历史记录和智能缓存功能。不修改现有ask() API和SessionAwareMemoryCache。
app_config.py
位置:文件末尾添加新配置段 修改内容:
# ==================== Redis对话管理配置 ====================
# 对话上下文配置
CONVERSATION_CONTEXT_COUNT = 5 # 传递给LLM的上下文消息条数
CONVERSATION_MAX_LENGTH = 20 # 单个对话最大消息数
USER_MAX_CONVERSATIONS = 5 # 用户最大对话数
# 用户管理配置
DEFAULT_ANONYMOUS_USER_PREFIX = "guest" # 匿名用户前缀
GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天
MAX_GUEST_CONVERSATIONS = 3 # guest用户最多3个对话
MAX_REGISTERED_CONVERSATIONS = 10 # 注册用户最多10个对话
# Redis配置
REDIS_HOST = "localhost"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
# 缓存开关配置
ENABLE_CONVERSATION_CONTEXT = True # 是否启用对话上下文
ENABLE_QUESTION_ANSWER_CACHE = True # 是否启用问答结果缓存
# TTL配置(单位:秒)- 修正TTL逻辑
CONVERSATION_TTL = 7 * 24 * 3600 # 对话保存7天
USER_CONVERSATIONS_TTL = 7 * 24 * 3600 # 用户对话列表保存7天(与对话TTL一致)
QUESTION_ANSWER_TTL = 24 * 3600 # 问答结果缓存24小时
GUEST_USER_TTL = 7 * 24 * 3600 # guest用户数据保存7天
requirements.txt
修改内容:添加Redis依赖
# 在现有依赖基础上添加
redis==5.0.1
common/redis_conversation_manager.py
功能:Redis对话管理器核心类(修正版) 完整代码实现:
import redis
import json
import hashlib
import uuid
import time
from datetime import datetime
from typing import List, Dict, Any, Optional
from app_config import (
REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD,
CONVERSATION_CONTEXT_COUNT, CONVERSATION_MAX_LENGTH, USER_MAX_CONVERSATIONS,
CONVERSATION_TTL, USER_CONVERSATIONS_TTL, QUESTION_ANSWER_TTL,
ENABLE_CONVERSATION_CONTEXT, ENABLE_QUESTION_ANSWER_CACHE,
DEFAULT_ANONYMOUS_USER_PREFIX, MAX_GUEST_CONVERSATIONS, MAX_REGISTERED_CONVERSATIONS,
GUEST_USER_TTL
)
class RedisConversationManager:
"""Redis对话管理器 - 修正版"""
def __init__(self):
"""初始化Redis连接"""
try:
self.redis_client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=True,
socket_connect_timeout=5,
socket_timeout=5
)
# 测试连接
self.redis_client.ping()
print(f"[REDIS_CONV] Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
except Exception as e:
print(f"[ERROR] Redis连接失败: {str(e)}")
self.redis_client = None
def is_available(self) -> bool:
"""检查Redis是否可用"""
try:
return self.redis_client is not None and self.redis_client.ping()
except:
return False
# ==================== 用户ID解析(修正版)====================
def resolve_user_id(self, user_id_from_request: Optional[str],
session_id: Optional[str], request_ip: str,
login_user_id: Optional[str] = None) -> str:
"""
智能解析用户ID - 修正版
Args:
user_id_from_request: 请求参数中的user_id
session_id: 浏览器session_id
request_ip: 请求IP地址
login_user_id: 从Flask session中获取的登录用户ID(在ask_agent中获取)
"""
# 1. 优先使用登录用户ID
if login_user_id:
print(f"[REDIS_CONV] 使用登录用户ID: {login_user_id}")
return login_user_id
# 2. 如果没有登录,尝试从请求参数获取user_id
if user_id_from_request:
print(f"[REDIS_CONV] 使用请求参数user_id: {user_id_from_request}")
return user_id_from_request
# 3. 都没有则为匿名用户(guest)
if session_id:
guest_suffix = hashlib.md5(session_id.encode()).hexdigest()[:8]
guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_{guest_suffix}"
print(f"[REDIS_CONV] 生成稳定guest用户: {guest_id}")
return guest_id
# 4. 最后基于IP的临时guest ID
ip_suffix = hashlib.md5(request_ip.encode()).hexdigest()[:8]
temp_guest_id = f"{DEFAULT_ANONYMOUS_USER_PREFIX}_temp_{ip_suffix}"
print(f"[REDIS_CONV] 生成临时guest用户: {temp_guest_id}")
return temp_guest_id
def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str],
continue_conversation: bool) -> tuple[str, dict]:
"""
智能解析对话ID - 改进版
Returns:
tuple: (conversation_id, status_info)
status_info包含:
- status: "existing" | "new" | "invalid_id_new"
- message: 状态说明
- requested_id: 原始请求的ID(如果有)
"""
# 1. 如果指定了conversation_id,验证后使用
if conversation_id_input:
if self._is_valid_conversation(conversation_id_input, user_id):
print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
return conversation_id_input, {
"status": "existing",
"message": "继续已有对话"
}
else:
print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
new_conversation_id = self.create_conversation(user_id)
return new_conversation_id, {
"status": "invalid_id_new",
"message": "您请求的对话不存在或无权访问,已为您创建新对话",
"requested_id": conversation_id_input
}
# 2. 如果要继续最近对话
if continue_conversation:
recent_conversation = self._get_recent_conversation(user_id)
if recent_conversation:
print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
return recent_conversation, {
"status": "existing",
"message": "继续最近对话"
}
# 3. 创建新对话
new_conversation_id = self.create_conversation(user_id)
print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
return new_conversation_id, {
"status": "new",
"message": "创建新对话"
}
def _is_valid_conversation(self, conversation_id: str, user_id: str) -> bool:
"""验证对话是否存在且属于该用户"""
if not self.is_available():
return False
try:
# 检查对话元信息是否存在
meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
if not meta_data:
return False
# 检查是否属于该用户
return meta_data.get('user_id') == user_id
except Exception:
return False
def _get_recent_conversation(self, user_id: str) -> Optional[str]:
"""获取用户最近的对话ID"""
if not self.is_available():
return None
try:
conversations = self.redis_client.lrange(
f"user:{user_id}:conversations", 0, 0
)
return conversations[0] if conversations else None
except Exception:
return None
# ==================== 对话管理 ====================
def create_conversation(self, user_id: str) -> str:
"""创建新对话"""
# 生成包含时间戳的conversation_id
timestamp = int(datetime.now().timestamp())
conversation_id = f"conv_{timestamp}_{uuid.uuid4().hex[:8]}"
if not self.is_available():
return conversation_id # Redis不可用时返回ID,但不存储
try:
# 创建对话元信息
meta_data = {
"conversation_id": conversation_id,
"user_id": user_id,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"message_count": "0"
}
# 保存对话元信息
self.redis_client.hset(
f"conversation:{conversation_id}:meta",
mapping=meta_data
)
self.redis_client.expire(f"conversation:{conversation_id}:meta", CONVERSATION_TTL)
# 添加到用户的对话列表
self._add_conversation_to_user(user_id, conversation_id)
print(f"[REDIS_CONV] 创建对话成功: {conversation_id}")
return conversation_id
except Exception as e:
print(f"[ERROR] 创建对话失败: {str(e)}")
return conversation_id # 返回ID但可能未存储
def save_message(self, conversation_id: str, role: str, content: str,
metadata: Optional[Dict] = None) -> bool:
"""保存消息到对话历史"""
if not self.is_available() or not conversation_id:
return False
try:
message_data = {
"message_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"role": role, # user, assistant
"content": content,
"metadata": metadata or {}
}
# 保存到消息列表(LPUSH添加到头部,最新消息在前)
self.redis_client.lpush(
f"conversation:{conversation_id}:messages",
json.dumps(message_data)
)
# 设置TTL
self.redis_client.expire(f"conversation:{conversation_id}:messages", CONVERSATION_TTL)
# 限制消息数量
self.redis_client.ltrim(
f"conversation:{conversation_id}:messages",
0, CONVERSATION_MAX_LENGTH - 1
)
# 更新元信息
self._update_conversation_meta(conversation_id)
return True
except Exception as e:
print(f"[ERROR] 保存消息失败: {str(e)}")
return False
def get_context(self, conversation_id: str, count: Optional[int] = None) -> str:
"""获取对话上下文,格式化为prompt"""
if not self.is_available() or not ENABLE_CONVERSATION_CONTEXT:
return ""
try:
if count is None:
count = CONVERSATION_CONTEXT_COUNT
# 获取最近的消息(count*2 因为包含用户和助手消息)
message_count = count * 2
messages = self.redis_client.lrange(
f"conversation:{conversation_id}:messages",
0, message_count - 1
)
if not messages:
return ""
# 解析消息并构建上下文(按时间正序)
context_parts = []
for msg_json in reversed(messages): # Redis返回倒序,需要反转
try:
msg_data = json.loads(msg_json)
role = msg_data.get("role", "")
content = msg_data.get("content", "")
if role == "user":
context_parts.append(f"用户: {content}")
elif role == "assistant":
context_parts.append(f"助手: {content}")
except json.JSONDecodeError:
continue
context = "\n".join(context_parts)
print(f"[REDIS_CONV] 获取上下文成功: {len(context_parts)}条消息")
return context
except Exception as e:
print(f"[ERROR] 获取上下文失败: {str(e)}")
return ""
def get_conversation_messages(self, conversation_id: str, limit: Optional[int] = None) -> List[Dict]:
"""获取对话的消息列表"""
if not self.is_available():
return []
try:
if limit:
messages = self.redis_client.lrange(
f"conversation:{conversation_id}:messages", 0, limit - 1
)
else:
messages = self.redis_client.lrange(
f"conversation:{conversation_id}:messages", 0, -1
)
# 解析并按时间正序返回
parsed_messages = []
for msg_json in reversed(messages): # 反转为时间正序
try:
parsed_messages.append(json.loads(msg_json))
except json.JSONDecodeError:
continue
return parsed_messages
except Exception as e:
print(f"[ERROR] 获取对话消息失败: {str(e)}")
return []
def get_conversation_meta(self, conversation_id: str) -> Dict:
"""获取对话元信息"""
if not self.is_available():
return {}
try:
meta_data = self.redis_client.hgetall(f"conversation:{conversation_id}:meta")
return meta_data if meta_data else {}
except Exception as e:
print(f"[ERROR] 获取对话元信息失败: {str(e)}")
return {}
def get_conversations(self, user_id: str, limit: int = None) -> List[Dict]:
"""获取用户的对话列表(按时间倒序)"""
if not self.is_available():
return []
if limit is None:
limit = USER_MAX_CONVERSATIONS
try:
# 获取对话ID列表(已经按时间倒序)
conversation_ids = self.redis_client.lrange(
f"user:{user_id}:conversations", 0, limit - 1
)
conversations = []
for conv_id in conversation_ids:
meta_data = self.get_conversation_meta(conv_id)
if meta_data: # 只返回仍然存在的对话
conversations.append(meta_data)
return conversations
except Exception as e:
print(f"[ERROR] 获取用户对话列表失败: {str(e)}")
return []
# ==================== 智能缓存(修正版)====================
def get_cached_answer(self, question: str, context: str = "") -> Optional[Dict]:
"""检查问答缓存 - 真正上下文感知版"""
if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
return None
try:
cache_key = self._get_cache_key(question, context)
cached_answer = self.redis_client.get(cache_key) # 使用独立key而不是hash
if cached_answer:
print(f"[REDIS_CONV] 缓存命中: {cache_key}")
return json.loads(cached_answer)
return None
except Exception as e:
print(f"[ERROR] 获取缓存答案失败: {str(e)}")
return None
def cache_answer(self, question: str, answer: Dict, context: str = ""):
"""缓存问答结果 - 真正上下文感知版"""
if not self.is_available() or not ENABLE_QUESTION_ANSWER_CACHE:
return
try:
cache_key = self._get_cache_key(question, context)
# 添加缓存时间戳和上下文哈希
answer_with_meta = {
**answer,
"cached_at": datetime.now().isoformat(),
"original_question": question,
"context_hash": hashlib.md5(context.encode()).hexdigest()[:8] if context else ""
}
# 使用独立key,每个缓存项单独设置TTL
self.redis_client.setex(
cache_key,
QUESTION_ANSWER_TTL,
json.dumps(answer_with_meta)
)
print(f"[REDIS_CONV] 缓存答案成功: {cache_key}")
except Exception as e:
print(f"[ERROR] 缓存答案失败: {str(e)}")
def _get_cache_key(self, question: str, context: str = "") -> str:
"""生成真正包含上下文的缓存键"""
if context and ENABLE_CONVERSATION_CONTEXT:
# 使用上下文内容而不是conversation_id
cache_input = f"context:{context}\nquestion:{question}"
else:
cache_input = question
normalized = cache_input.strip().lower()
question_hash = hashlib.md5(normalized.encode('utf-8')).hexdigest()[:16]
return f"qa_cache:{question_hash}"
# ==================== 私有方法 ====================
def _add_conversation_to_user(self, user_id: str, conversation_id: str):
"""添加对话到用户列表,按时间自动排序"""
try:
# 获取用户类型配置
config = self._get_user_type_config(user_id)
# LPUSH添加到列表头部(最新的)
self.redis_client.lpush(f"user:{user_id}:conversations", conversation_id)
# 根据用户类型限制数量
self.redis_client.ltrim(
f"user:{user_id}:conversations",
0, config["max_conversations"] - 1
)
# 设置TTL
self.redis_client.expire(
f"user:{user_id}:conversations",
config["ttl"]
)
except Exception as e:
print(f"[ERROR] 添加对话到用户列表失败: {str(e)}")
def _get_user_type_config(self, user_id: str) -> Dict:
"""根据用户类型返回不同的配置 - 修正版"""
if user_id.startswith(DEFAULT_ANONYMOUS_USER_PREFIX):
return {
"max_conversations": MAX_GUEST_CONVERSATIONS,
"ttl": GUEST_USER_TTL # 使用专门的guest TTL
}
else:
return {
"max_conversations": MAX_REGISTERED_CONVERSATIONS,
"ttl": USER_CONVERSATIONS_TTL
}
def _update_conversation_meta(self, conversation_id: str):
"""更新对话元信息"""
try:
# 获取消息数量
message_count = self.redis_client.llen(f"conversation:{conversation_id}:messages")
# 更新元信息
self.redis_client.hset(
f"conversation:{conversation_id}:meta",
mapping={
"updated_at": datetime.now().isoformat(),
"message_count": str(message_count)
}
)
except Exception as e:
print(f"[ERROR] 更新对话元信息失败: {str(e)}")
# ==================== 管理方法 ====================
def get_stats(self) -> Dict:
"""获取统计信息"""
if not self.is_available():
return {"available": False}
try:
stats = {
"available": True,
"total_users": len(self.redis_client.keys("user:*:conversations")),
"total_conversations": len(self.redis_client.keys("conversation:*:meta")),
"cached_qa_count": len(self.redis_client.keys("qa_cache:*")), # 修正缓存统计
"redis_info": {
"used_memory": self.redis_client.info().get("used_memory_human"),
"connected_clients": self.redis_client.info().get("connected_clients")
}
}
return stats
except Exception as e:
print(f"[ERROR] 获取统计信息失败: {str(e)}")
return {"available": False, "error": str(e)}
def cleanup_expired_conversations(self):
"""清理过期对话(Redis TTL自动处理,这里可添加额外逻辑)"""
if not self.is_available():
return
try:
# 清理用户对话列表中的无效对话ID
user_keys = self.redis_client.keys("user:*:conversations")
cleaned_count = 0
for user_key in user_keys:
conversation_ids = self.redis_client.lrange(user_key, 0, -1)
valid_ids = []
for conv_id in conversation_ids:
# 检查对话是否仍然存在
if self.redis_client.exists(f"conversation:{conv_id}:meta"):
valid_ids.append(conv_id)
else:
cleaned_count += 1
# 如果有无效ID,重建列表
if len(valid_ids) != len(conversation_ids):
self.redis_client.delete(user_key)
if valid_ids:
self.redis_client.lpush(user_key, *reversed(valid_ids))
# 重新设置TTL
self.redis_client.expire(user_key, USER_CONVERSATIONS_TTL)
print(f"[REDIS_CONV] 清理完成,移除了 {cleaned_count} 个无效对话引用")
except Exception as e:
print(f"[ERROR] 清理失败: {str(e)}")
citu_app.py
修改位置1:文件开头导入部分
# 在现有导入基础上添加(文件顶部,避免函数内导入)
from flask import session
from common.redis_conversation_manager import RedisConversationManager
from common.result import (
bad_request_response, service_unavailable_response,
agent_success_response, agent_error_response,
internal_error_response, success_response
)
# 在全局变量区域添加(app实例化后)
redis_conversation_manager = RedisConversationManager()
修改位置2:ask_agent()函数(修正版)
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""
支持对话上下文的ask_agent API - 修正版
"""
req = request.get_json(force=True)
question = req.get("question", None)
browser_session_id = req.get("session_id", None)
# 新增参数解析
user_id_input = req.get("user_id", None)
conversation_id_input = req.get("conversation_id", None)
continue_conversation = req.get("continue_conversation", False)
if not question:
return jsonify(bad_request_response(
response_text="缺少必需参数:question",
missing_params=["question"]
)), 400
try:
# 1. 获取登录用户ID(修正:在函数中获取session信息)
login_user_id = session.get('user_id') if 'user_id' in session else None
# 2. 智能ID解析(修正:传入登录用户ID)
user_id = redis_conversation_manager.resolve_user_id(
user_id_input, browser_session_id, request.remote_addr, login_user_id
)
conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
user_id, conversation_id_input, continue_conversation
)
# 3. 获取上下文(提前到缓存检查之前)
context = redis_conversation_manager.get_context(conversation_id)
# 4. 检查缓存(修正:传入context以实现真正的上下文感知)
cached_answer = redis_conversation_manager.get_cached_answer(question, context)
if cached_answer:
print(f"[AGENT_API] 使用缓存答案")
# 更新对话历史
redis_conversation_manager.save_message(conversation_id, "user", question)
redis_conversation_manager.save_message(
conversation_id, "assistant",
cached_answer.get("data", {}).get("response", ""),
metadata={"from_cache": True}
)
# 添加对话信息到缓存结果
cached_answer["data"]["conversation_id"] = conversation_id
cached_answer["data"]["user_id"] = user_id
cached_answer["data"]["from_cache"] = True
cached_answer["data"].update(conversation_status)
return jsonify(cached_answer)
# 5. 保存用户消息
redis_conversation_manager.save_message(conversation_id, "user", question)
# 6. 构建带上下文的问题
if context:
enhanced_question = f"对话历史:\n{context}\n\n当前问题: {question}"
print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
else:
enhanced_question = question
print(f"[AGENT_API] 新对话,无上下文")
# 7. 现有Agent处理逻辑(保持不变)
try:
agent = get_citu_langraph_agent()
except Exception as e:
print(f"[CRITICAL] Agent初始化失败: {str(e)}")
return jsonify(service_unavailable_response(
response_text="AI服务暂时不可用,请稍后重试",
can_retry=True
)), 503
agent_result = agent.process_question(
question=enhanced_question, # 使用增强后的问题
session_id=browser_session_id
)
# 8. 处理Agent结果
if agent_result.get("success", False):
assistant_response = agent_result.get("data", {}).get("response", "")
# 保存助手回复
redis_conversation_manager.save_message(
conversation_id, "assistant", assistant_response,
metadata={
"type": agent_result.get("data", {}).get("type"),
"sql": agent_result.get("data", {}).get("sql"),
"execution_path": agent_result.get("data", {}).get("execution_path")
}
)
# 缓存成功的答案(修正:传入context实现真正的上下文感知)
cache_data = {
**agent_result,
"conversation_id": conversation_id,
"user_id": user_id,
"from_cache": False
}
redis_conversation_manager.cache_answer(question, cache_data, context)
# 构建返回数据(修正:使用现有的agent_success_response)
result_data = agent_result.get("data", {})
result_data.update({
"conversation_id": conversation_id,
"user_id": user_id,
"is_guest_user": user_id.startswith("guest"),
"context_used": bool(context),
"from_cache": False,
"conversation_status": conversation_status["status"],
"conversation_message": conversation_status["message"],
"requested_conversation_id": conversation_status.get("requested_id")
})
return jsonify(agent_success_response(
response_type=result_data.get("type", "UNKNOWN"),
response_text=result_data.get("response", ""),
data=result_data
))
else:
# 错误处理(修正:确保使用现有的错误响应格式)
error_data = {
"response": agent_result.get("error", "Agent处理失败"),
"conversation_id": conversation_id,
"user_id": user_id,
"error_type": "agent_processing_failed"
}
return jsonify({
"success": False,
"code": agent_result.get("error_code", 500),
"message": "处理失败",
"data": error_data
}), agent_result.get("error_code", 500)
except Exception as e:
print(f"[ERROR] ask_agent执行失败: {str(e)}")
return jsonify(internal_error_response(
response_text="查询处理失败,请稍后重试"
)), 500
修改位置3:新增管理API
# 在文件末尾添加新的管理API
@app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id: str):
"""获取用户的对话列表(按时间倒序)"""
try:
limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
conversations = redis_conversation_manager.get_conversations(user_id, limit)
return jsonify(success_response(
response_text="获取用户对话列表成功",
data={
"user_id": user_id,
"conversations": conversations,
"total_count": len(conversations)
}
))
except Exception as e:
return jsonify(internal_error_response(
response_text="获取对话列表失败,请稍后重试"
)), 500
@app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
def get_conversation_messages(conversation_id: str):
"""获取特定对话的消息历史"""
try:
limit = request.args.get('limit', type=int) # 可选参数
messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
meta = redis_conversation_manager.get_conversation_meta(conversation_id)
return jsonify(success_response(
response_text="获取对话消息成功",
data={
"conversation_id": conversation_id,
"conversation_meta": meta,
"messages": messages,
"message_count": len(messages)
}
))
except Exception as e:
return jsonify(internal_error_response(
response_text="获取对话消息失败"
)), 500
@app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
def get_conversation_context(conversation_id: str):
"""获取对话上下文(格式化用于LLM)"""
try:
count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
context = redis_conversation_manager.get_context(conversation_id, count)
return jsonify(success_response(
response_text="获取对话上下文成功",
data={
"conversation_id": conversation_id,
"context": context,
"context_message_count": count
}
))
except Exception as e:
return jsonify(internal_error_response(
response_text="获取对话上下文失败"
)), 500
@app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
def conversation_stats():
"""获取对话系统统计信息"""
try:
stats = redis_conversation_manager.get_stats()
return jsonify(success_response(
response_text="获取统计信息成功",
data=stats
))
except Exception as e:
return jsonify(internal_error_response(
response_text="获取统计信息失败,请稍后重试"
)), 500
@app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
def conversation_cleanup():
"""手动清理过期对话"""
try:
redis_conversation_manager.cleanup_expired_conversations()
return jsonify(success_response(
response_text="对话清理完成"
))
except Exception as e:
return jsonify(internal_error_response(
response_text="对话清理失败,请稍后重试"
)), 500
# 用户对话列表(按时间倒序,最新在前)
user:{user_id}:conversations → LIST [
"conv_1703123456_a1b2c3d4",
"conv_1703120000_x1y2z3w4"
]
TTL: 7天(与对话TTL保持一致)
# 对话消息历史(按时间倒序,最新在前)
conversation:{conv_id}:messages → LIST [
"{\"message_id\":\"msg_002\",\"timestamp\":\"2024-01-01T10:00:05\",\"role\":\"assistant\",\"content\":\"查询结果...\",\"metadata\":{\"type\":\"DATABASE\"}}",
"{\"message_id\":\"msg_001\",\"timestamp\":\"2024-01-01T10:00:00\",\"role\":\"user\",\"content\":\"查询销售数据\",\"metadata\":{}}"
]
TTL: 7天
# 对话元信息
conversation:{conv_id}:meta → HASH {
"conversation_id": "conv_1703123456_a1b2c3d4",
"user_id": "guest_abc123",
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:05:00",
"message_count": "4"
}
TTL: 7天
# 问答结果缓存(真正上下文感知版)
qa_cache:{context_question_hash} → STRING {
"success": true,
"data": {...},
"cached_at": "2024-01-01T10:00:00",
"original_question": "查询销售数据",
"context_hash": "a1b2c3d4" // 上下文内容的哈希值
}
TTL: 24小时(每个缓存项独立设置)
注意:缓存键基于上下文内容和问题的组合哈希,而非conversation_id
{
"question": "请问当前系统中每个高速服务区的经理是谁?", // 必需
"session_id": "test_session_001", // 可选,用于生成稳定guest_id
"user_id": "john_doe", // 可选,优先级低于登录session
"conversation_id": "conv_1703123456_a1b2c3d4", // 可选,继续特定对话
"continue_conversation": true // 可选,继续最近对话
}
{
"success": true,
"code": 200,
"message": "操作成功",
"data": {
"response": "查询结果...",
"type": "DATABASE",
"sql": "SELECT ...",
"query_result": {
"rows": [...],
"columns": [...],
"row_count": 10
},
"summary": "查询结果显示...",
// 新增字段
"conversation_id": "conv_1703123456_a1b2c3d4",
"user_id": "guest_abc123",
"is_guest_user": true,
"context_used": false,
"from_cache": false,
"conversation_status": "existing",
"conversation_message": "继续已有对话",
"requested_conversation_id": null,
"execution_path": ["classify", "agent_database", "format_response"],
"classification_info": {
"confidence": 0.95,
"reason": "匹配数据库关键词",
"method": "rule_based"
}
}
}
GET /api/v0/user/{user_id}/conversations?limit=10
响应:
{
"success": true,
"data": {
"user_id": "guest_abc123",
"conversations": [
{
"conversation_id": "conv_1703123456_a1b2c3d4",
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:05:00",
"message_count": "6"
}
],
"total_count": 1
}
}
GET /api/v0/conversation/{conversation_id}/messages?limit=20
响应:
{
"success": true,
"data": {
"conversation_id": "conv_1703123456_a1b2c3d4",
"conversation_meta": {
"user_id": "guest_abc123",
"created_at": "2024-01-01T10:00:00",
"message_count": "6"
},
"messages": [
{
"message_id": "msg_001",
"timestamp": "2024-01-01T10:00:00",
"role": "user",
"content": "查询服务区数据"
},
{
"message_id": "msg_002",
"timestamp": "2024-01-01T10:00:05",
"role": "assistant",
"content": "查询结果显示...",
"metadata": {
"type": "DATABASE",
"sql": "SELECT ...",
"execution_path": ["classify", "agent_database"]
}
}
],
"message_count": 2
}
}
class ConversationManager {
constructor() {
this.sessionId = this.generateSessionId();
this.currentConversationId = null;
this.userId = localStorage.getItem('user_id'); // 登录用户ID
}
async ask(question, continueConversation = true) {
const payload = {
question,
session_id: this.sessionId // 保证guest_id稳定
};
// 登录用户传递真实ID
if (this.userId) {
payload.user_id = this.userId;
}
// 继续当前对话
if (continueConversation && this.currentConversationId) {
payload.conversation_id = this.currentConversationId;
}
const response = await fetch('/api/v0/ask_agent', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify(payload)
});
const result = await response.json();
// 记录conversation_id供下次使用
if (result.success) {
this.currentConversationId = result.data.conversation_id;
this.currentUserId = result.data.user_id;
}
return result;
}
async getUserConversations(limit = 5) {
if (!this.currentUserId) return [];
const response = await fetch(`/api/v0/user/${this.currentUserId}/conversations?limit=${limit}`);
const result = await response.json();
return result.success ? result.data.conversations : [];
}
async getConversationMessages(conversationId) {
const response = await fetch(`/api/v0/conversation/${conversationId}/messages`);
const result = await response.json();
return result.success ? result.data : null;
}
startNewConversation() {
this.currentConversationId = null;
}
generateSessionId() {
return 'session_' + Date.now() + '_' + Math.random().toString(36).substring(2);
}
}
// 使用示例
const conv = new ConversationManager();
// 第一次提问(自动创建对话)
const result1 = await conv.ask("查询服务区数据");
console.log("对话ID:", result1.data.conversation_id);
// 第二次提问(自动继续对话)
const result2 = await conv.ask("经理都是谁?");
// 查看对话历史
const conversations = await conv.getUserConversations();
// 查看特定对话详情
const detail = await conv.getConversationMessages(conversations[0].conversation_id);
// 开始新话题
conv.startNewConversation();
const result3 = await conv.ask("查询其他数据");
优先级:P0(必须完成)
app_config.py
添加配置common/redis_conversation_manager.py
requirements.txt
优先级:P0(必须完成)
citu_app.py
的ask_agent()函数优先级:P1(重要)
优先级:P2(可选)
# test_redis_conversation_manager.py
import unittest
from common.redis_conversation_manager import RedisConversationManager
class TestRedisConversationManager(unittest.TestCase):
def setUp(self):
self.manager = RedisConversationManager()
def test_user_id_resolution(self):
# 测试用户ID解析逻辑
user_id = self.manager.resolve_user_id(None, "session_123", "127.0.0.1")
self.assertTrue(user_id.startswith("guest_"))
def test_conversation_creation(self):
# 测试对话创建
conv_id = self.manager.create_conversation("test_user")
self.assertTrue(conv_id.startswith("conv_"))
def test_message_saving(self):
# 测试消息保存
conv_id = self.manager.create_conversation("test_user")
result = self.manager.save_message(conv_id, "user", "test message")
self.assertTrue(result)
# test_ask_agent_integration.py
import requests
import json
def test_ask_agent_with_context():
# 第一次对话
response1 = requests.post('http://localhost:5000/api/v0/ask_agent',
json={"question": "查询服务区数据"})
result1 = response1.json()
# 第二次对话(带上下文)
response2 = requests.post('http://localhost:5000/api/v0/ask_agent',
json={
"question": "经理都是谁?",
"conversation_id": result1["data"]["conversation_id"]
})
result2 = response2.json()
assert result2["data"]["context_used"] == True
# docker-compose.yml
version: '3'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
volumes:
redis_data:
当前设计中,当用户传入无效的 conversation_id
时,系统会静默地创建新对话,用户无法得知他们请求的对话不存在或无权访问。
修改 resolve_conversation_id
方法返回值
def resolve_conversation_id(self, user_id: str, conversation_id_input: Optional[str],
continue_conversation: bool) -> tuple[str, dict]:
"""
智能解析对话ID - 改进版
Returns:
tuple: (conversation_id, status_info)
status_info包含:
- status: "existing" | "new" | "invalid_id_new"
- message: 状态说明
- requested_id: 原始请求的ID(如果有)
"""
# 1. 如果指定了conversation_id,验证后使用
if conversation_id_input:
if self._is_valid_conversation(conversation_id_input, user_id):
print(f"[REDIS_CONV] 使用指定对话: {conversation_id_input}")
return conversation_id_input, {
"status": "existing",
"message": "继续已有对话"
}
else:
print(f"[WARN] 无效的conversation_id: {conversation_id_input},创建新对话")
new_conversation_id = self.create_conversation(user_id)
return new_conversation_id, {
"status": "invalid_id_new",
"message": "您请求的对话不存在或无权访问,已为您创建新对话",
"requested_id": conversation_id_input
}
# 2. 如果要继续最近对话
if continue_conversation:
recent_conversation = self._get_recent_conversation(user_id)
if recent_conversation:
print(f"[REDIS_CONV] 继续最近对话: {recent_conversation}")
return recent_conversation, {
"status": "existing",
"message": "继续最近对话"
}
# 3. 创建新对话
new_conversation_id = self.create_conversation(user_id)
print(f"[REDIS_CONV] 创建新对话: {new_conversation_id}")
return new_conversation_id, {
"status": "new",
"message": "创建新对话"
}
修改 ask_agent
API 响应
```python
conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id( user_id, conversation_id_input, continue_conversation )
result_data.update({ "conversation_id": conversation_id, "user_id": user_id, "is_guest_user": user_id.startswith("guest"), "context_used": bool(context), "from_cache": False, "conversation_status": conversation_status["status"], "conversation_message": conversation_status["message"], "requested_conversation_id": conversation_status.get("requested_id") }) ```
API响应示例 ```json // 情况1:请求无效的conversation_id { "success": true, "data": {
"response": "查询结果...",
"conversation_id": "conv_1703123456_new123",
"conversation_status": "invalid_id_new",
"conversation_message": "您请求的对话不存在或已过期,已为您开启新对话",
"requested_conversation_id": "conv_invalid_xyz789",
// ... 其他字段
} }
// 情况2:成功继续已有对话 {
"success": true,
"data": {
"response": "查询结果...",
"conversation_id": "conv_1703123456_abc123",
"conversation_status": "existing",
"conversation_message": "继续已有对话",
// ... 其他字段
}
}
4. **前端处理示例**
```javascript
async ask(question, conversationId) {
const response = await fetch('/api/v0/ask_agent', {
method: 'POST',
body: JSON.stringify({
question,
conversation_id: conversationId
})
});
const result = await response.json();
// 检查对话状态
if (result.data.conversation_status === 'invalid_id_new') {
// 提示用户
this.showNotification(
'提示:您请求的对话不存在或已过期,已为您开启新对话',
'warning'
);
// 更新本地conversation_id
this.currentConversationId = result.data.conversation_id;
}
return result;
}
对话权限验证增强
对话状态管理
性能优化