|
@@ -24,10 +24,13 @@ from core.logging import initialize_logging, get_app_logger
|
|
|
initialize_logging()
|
|
|
|
|
|
# 标准 Flask 导入
|
|
|
-from flask import Flask, request, jsonify, session, send_file
|
|
|
+from flask import Flask, request, jsonify, session, send_file, Response, stream_with_context
|
|
|
import redis.asyncio as redis
|
|
|
from werkzeug.utils import secure_filename
|
|
|
|
|
|
+# 导入标准化响应格式
|
|
|
+from common.result import success_response, internal_error_response, bad_request_response
|
|
|
+
|
|
|
# 基础依赖
|
|
|
import pandas as pd
|
|
|
import json
|
|
@@ -100,6 +103,121 @@ def _format_timestamp_to_china_time(timestamp_str):
|
|
|
logger.warning(f"⚠️ 时间格式化失败: {e}")
|
|
|
return timestamp_str
|
|
|
|
|
|
+def _parse_conversation_created_time(conversation_id: str) -> Optional[str]:
|
|
|
+ """从conversation_id解析创建时间并转换为中国时区格式"""
|
|
|
+ try:
|
|
|
+ # conversation_id格式: "wang10:20250717211620915"
|
|
|
+ if ':' not in conversation_id:
|
|
|
+ return None
|
|
|
+
|
|
|
+ parts = conversation_id.split(':')
|
|
|
+ if len(parts) < 2:
|
|
|
+ return None
|
|
|
+
|
|
|
+ timestamp_str = parts[1] # "20250717211620915"
|
|
|
+
|
|
|
+ # 解析时间戳: YYYYMMDDHHMMSSMMM (17位)
|
|
|
+ if len(timestamp_str) != 17:
|
|
|
+ logger.warning(f"⚠️ conversation_id时间戳长度不正确: {timestamp_str}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ year = timestamp_str[:4]
|
|
|
+ month = timestamp_str[4:6]
|
|
|
+ day = timestamp_str[6:8]
|
|
|
+ hour = timestamp_str[8:10]
|
|
|
+ minute = timestamp_str[10:12]
|
|
|
+ second = timestamp_str[12:14]
|
|
|
+ millisecond = timestamp_str[14:17]
|
|
|
+
|
|
|
+ # 构造datetime对象
|
|
|
+ dt = datetime(
|
|
|
+ int(year), int(month), int(day),
|
|
|
+ int(hour), int(minute), int(second),
|
|
|
+ int(millisecond) * 1000 # 毫秒转微秒
|
|
|
+ )
|
|
|
+
|
|
|
+ # 转换为中国时区
|
|
|
+ china_tz = pytz.timezone('Asia/Shanghai')
|
|
|
+ # 假设原始时间戳是中国时区
|
|
|
+ china_dt = china_tz.localize(dt)
|
|
|
+
|
|
|
+ # 格式化为要求的格式
|
|
|
+ formatted_time = china_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # 保留3位毫秒
|
|
|
+ return formatted_time
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"⚠️ 解析conversation_id时间戳失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+def _get_conversation_updated_time(redis_client, thread_id: str) -> Optional[str]:
|
|
|
+ """获取对话的最后更新时间(从Redis checkpoint数据中的ts字段)"""
|
|
|
+ try:
|
|
|
+ # 扫描该thread的所有checkpoint keys
|
|
|
+ pattern = f"checkpoint:{thread_id}:*"
|
|
|
+
|
|
|
+ keys = []
|
|
|
+ cursor = 0
|
|
|
+ while True:
|
|
|
+ cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
|
|
|
+ keys.extend(batch)
|
|
|
+ if cursor == 0:
|
|
|
+ break
|
|
|
+
|
|
|
+ if not keys:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 获取最新的checkpoint(按key排序,最大的是最新的)
|
|
|
+ latest_key = max(keys)
|
|
|
+
|
|
|
+ # 检查key类型并获取数据
|
|
|
+ key_type = redis_client.type(latest_key)
|
|
|
+
|
|
|
+ data = None
|
|
|
+ if key_type == 'string':
|
|
|
+ data = redis_client.get(latest_key)
|
|
|
+ elif key_type == 'ReJSON-RL':
|
|
|
+ # RedisJSON类型
|
|
|
+ try:
|
|
|
+ data = redis_client.execute_command('JSON.GET', latest_key)
|
|
|
+ except Exception as json_error:
|
|
|
+ logger.error(f"❌ JSON.GET失败: {json_error}")
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if not data:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 解析JSON数据
|
|
|
+ try:
|
|
|
+ checkpoint_data = json.loads(data)
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 检查checkpoint中的ts字段
|
|
|
+ if ('checkpoint' in checkpoint_data and
|
|
|
+ isinstance(checkpoint_data['checkpoint'], dict) and
|
|
|
+ 'ts' in checkpoint_data['checkpoint']):
|
|
|
+
|
|
|
+ ts_value = checkpoint_data['checkpoint']['ts']
|
|
|
+
|
|
|
+ # 解析ts字段(应该是ISO格式的时间戳)
|
|
|
+ if isinstance(ts_value, str):
|
|
|
+ try:
|
|
|
+ dt = datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
|
|
|
+ china_tz = pytz.timezone('Asia/Shanghai')
|
|
|
+ china_dt = dt.astimezone(china_tz)
|
|
|
+ formatted_time = china_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
|
+ return formatted_time
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"⚠️ 获取对话更新时间失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
def validate_request_data(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""验证请求数据,并支持从thread_id中推断user_id"""
|
|
|
errors = []
|
|
@@ -350,12 +468,21 @@ def get_user_conversations_simple_sync(user_id: str, limit: int = 10):
|
|
|
preview = content[:50] + "..." if len(content) > 50 else content
|
|
|
break
|
|
|
|
|
|
+ # 解析时间戳
|
|
|
+ created_at = _parse_conversation_created_time(thread_id)
|
|
|
+ updated_at = _get_conversation_updated_time(redis_client, thread_id)
|
|
|
+
|
|
|
+ # 如果无法获取updated_at,使用created_at作为备选
|
|
|
+ if not updated_at:
|
|
|
+ updated_at = created_at
|
|
|
+
|
|
|
conversations.append({
|
|
|
- "thread_id": thread_id,
|
|
|
+ "conversation_id": thread_id, # thread_id -> conversation_id
|
|
|
"user_id": user_id,
|
|
|
- "timestamp": thread_info["timestamp"],
|
|
|
"message_count": len(messages),
|
|
|
- "conversation_preview": preview
|
|
|
+ "conversation_title": preview, # conversation_preview -> conversation_title
|
|
|
+ "created_at": created_at,
|
|
|
+ "updated_at": updated_at
|
|
|
})
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
@@ -2184,17 +2311,14 @@ async def get_user_conversations_react(user_id: str):
|
|
|
response_text=f"获取用户对话列表失败: {str(e)}"
|
|
|
)), 500
|
|
|
|
|
|
-@app.route('/api/v0/react/users/<user_id>/conversations/<thread_id>', methods=['GET'])
|
|
|
-async def get_user_conversation_detail_react(user_id: str, thread_id: str):
|
|
|
+@app.route('/api/v0/react/conversations/<thread_id>', methods=['GET'])
|
|
|
+async def get_user_conversation_detail_react(thread_id: str):
|
|
|
"""异步获取特定对话的详细历史(从 custom_react_agent 迁移)"""
|
|
|
global _react_agent_instance
|
|
|
|
|
|
try:
|
|
|
- # 验证thread_id格式是否匹配user_id
|
|
|
- if not thread_id.startswith(f"{user_id}:"):
|
|
|
- return jsonify(bad_request_response(
|
|
|
- response_text=f"Thread ID {thread_id} 不属于用户 {user_id}"
|
|
|
- )), 400
|
|
|
+ # 从thread_id中提取user_id
|
|
|
+ user_id = thread_id.split(':')[0] if ':' in thread_id else 'unknown'
|
|
|
|
|
|
logger.info(f"📖 异步获取用户 {user_id} 的对话 {thread_id} 详情")
|
|
|
|
|
@@ -2308,35 +2432,31 @@ def test_redis_connection():
|
|
|
|
|
|
@app.route('/api/v0/react/direct/users/<user_id>/conversations', methods=['GET'])
|
|
|
def test_get_user_conversations_simple(user_id: str):
|
|
|
- """测试简单Redis查询获取用户对话列表(从 custom_react_agent 迁移)"""
|
|
|
+ """直接从Redis获取用户对话列表"""
|
|
|
try:
|
|
|
limit = request.args.get('limit', 10, type=int)
|
|
|
limit = max(1, min(limit, 50))
|
|
|
|
|
|
- logger.info(f"🧪 测试获取用户 {user_id} 的对话列表(简单Redis方式)")
|
|
|
+ logger.info(f"📋 获取用户 {user_id} 的对话列表(直接Redis方式)")
|
|
|
|
|
|
# 使用简单Redis查询
|
|
|
conversations = get_user_conversations_simple_sync(user_id, limit)
|
|
|
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "method": "simple_redis_query",
|
|
|
- "data": {
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取用户对话列表成功",
|
|
|
+ data={
|
|
|
"user_id": user_id,
|
|
|
"conversations": conversations,
|
|
|
"total_count": len(conversations),
|
|
|
"limit": limit
|
|
|
- },
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 200
|
|
|
+ }
|
|
|
+ )), 200
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"❌ 测试简单Redis查询失败: {e}")
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": str(e),
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 500
|
|
|
+ logger.error(f"❌ 获取用户对话列表失败: {e}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=f"获取用户对话列表失败: {str(e)}"
|
|
|
+ )), 500
|
|
|
|
|
|
@app.route('/api/v0/react/direct/conversations/<thread_id>', methods=['GET'])
|
|
|
def get_conversation_detail_api(thread_id: str):
|
|
@@ -2370,32 +2490,26 @@ def get_conversation_detail_api(thread_id: str):
|
|
|
# 如果提供了user_id,验证thread_id是否属于该用户
|
|
|
thread_user_id = thread_id.split(':')[0]
|
|
|
if user_id and thread_user_id != user_id:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": f"Thread ID {thread_id} does not belong to user {user_id}",
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 400
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"Thread ID {thread_id} does not belong to user {user_id}"
|
|
|
+ )), 400
|
|
|
|
|
|
logger.info(f"📖 获取对话详情 - Thread: {thread_id}, Include Tools: {include_tools}")
|
|
|
|
|
|
# 检查enhanced_redis_api是否可用
|
|
|
if get_conversation_detail_from_redis is None:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "enhanced_redis_api 模块不可用",
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 503
|
|
|
+ return jsonify(service_unavailable_response(
|
|
|
+ response_text="enhanced_redis_api 模块不可用"
|
|
|
+ )), 503
|
|
|
|
|
|
# 从Redis获取对话详情(使用我们的新函数)
|
|
|
result = get_conversation_detail_from_redis(thread_id, include_tools)
|
|
|
|
|
|
if not result['success']:
|
|
|
logger.warning(f"⚠️ 获取对话详情失败: {result['error']}")
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": result['error'],
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 404
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=result['error']
|
|
|
+ )), 404
|
|
|
|
|
|
# 添加API元数据
|
|
|
result['data']['api_metadata'] = {
|
|
@@ -2411,22 +2525,19 @@ def get_conversation_detail_api(thread_id: str):
|
|
|
mode_desc = "完整模式" if include_tools else "简化模式"
|
|
|
logger.info(f"✅ 成功获取对话详情 - Messages: {result['data']['message_count']}, Mode: {mode_desc}")
|
|
|
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "data": result['data'],
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 200
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=f"获取对话详情成功 ({mode_desc})",
|
|
|
+ data=result['data']
|
|
|
+ )), 200
|
|
|
|
|
|
except Exception as e:
|
|
|
import traceback
|
|
|
logger.error(f"❌ 获取对话详情异常: {e}")
|
|
|
logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
|
|
|
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": str(e),
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 500
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=f"获取对话详情失败: {str(e)}"
|
|
|
+ )), 500
|
|
|
|
|
|
@app.route('/api/v0/react/direct/conversations/<thread_id>/compare', methods=['GET'])
|
|
|
def compare_conversation_modes_api(thread_id: str):
|
|
@@ -2518,31 +2629,25 @@ def get_conversation_summary_api(thread_id: str):
|
|
|
|
|
|
# 验证thread_id格式
|
|
|
if ':' not in thread_id:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "Invalid thread_id format. Expected format: user_id:timestamp",
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 400
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="Invalid thread_id format. Expected format: user_id:timestamp"
|
|
|
+ )), 400
|
|
|
|
|
|
logger.info(f"📊 获取对话摘要 - Thread: {thread_id}, Include Tools: {include_tools}")
|
|
|
|
|
|
# 检查enhanced_redis_api是否可用
|
|
|
if get_conversation_detail_from_redis is None:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "enhanced_redis_api 模块不可用",
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 503
|
|
|
+ return jsonify(service_unavailable_response(
|
|
|
+ response_text="enhanced_redis_api 模块不可用"
|
|
|
+ )), 503
|
|
|
|
|
|
# 获取完整对话信息
|
|
|
result = get_conversation_detail_from_redis(thread_id, include_tools)
|
|
|
|
|
|
if not result['success']:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": result['error'],
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 404
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=result['error']
|
|
|
+ )), 404
|
|
|
|
|
|
# 只返回摘要信息,不包含具体消息
|
|
|
data = result['data']
|
|
@@ -2563,14 +2668,14 @@ def get_conversation_summary_api(thread_id: str):
|
|
|
if messages:
|
|
|
# 第一条human消息预览
|
|
|
for msg in messages:
|
|
|
- if msg['type'] == 'human':
|
|
|
+ if msg['role'] == 'human':
|
|
|
content = str(msg['content'])
|
|
|
summary['first_message_preview'] = content[:100] + "..." if len(content) > 100 else content
|
|
|
break
|
|
|
|
|
|
# 最后一条ai消息预览
|
|
|
for msg in reversed(messages):
|
|
|
- if msg['type'] == 'ai' and msg.get('content', '').strip():
|
|
|
+ if msg['role'] == 'ai' and msg.get('content', '').strip():
|
|
|
content = str(msg['content'])
|
|
|
summary['last_message_preview'] = content[:100] + "..." if len(content) > 100 else content
|
|
|
break
|
|
@@ -2587,19 +2692,16 @@ def get_conversation_summary_api(thread_id: str):
|
|
|
|
|
|
logger.info(f"✅ 成功获取对话摘要")
|
|
|
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "data": summary,
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 200
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取对话摘要成功",
|
|
|
+ data=summary
|
|
|
+ )), 200
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"❌ 获取对话摘要失败: {e}")
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": str(e),
|
|
|
- "timestamp": datetime.now().isoformat()
|
|
|
- }), 500
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=f"获取对话摘要失败: {str(e)}"
|
|
|
+ )), 500
|
|
|
|
|
|
# ==================== 启动逻辑 ====================
|
|
|
|