enhanced_redis_api.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. """
  2. enhanced_redis_api.py - 完整的Redis直接访问API
  3. 支持include_tools开关参数,可以控制是否包含工具调用信息
  4. """
  5. import redis
  6. import json
  7. from typing import List, Dict, Any, Optional
  8. from datetime import datetime
  9. import logging
  10. # 导入配置
  11. try:
  12. from . import config
  13. except ImportError:
  14. import config
  15. logger = logging.getLogger(__name__)
  16. def get_conversation_detail_from_redis(thread_id: str, include_tools: bool = False) -> Dict[str, Any]:
  17. """
  18. 直接从Redis获取对话详细信息
  19. Args:
  20. thread_id: 线程ID,格式为 user_id:timestamp
  21. include_tools: 是否包含工具调用信息
  22. - True: 返回所有消息(human/ai/tool/system)
  23. - False: 只返回human和ai消息,且清理ai消息中的工具调用信息
  24. Returns:
  25. 包含对话详细信息的字典
  26. """
  27. try:
  28. # 创建Redis连接
  29. redis_client = redis.Redis(
  30. host=config.REDIS_HOST,
  31. port=config.REDIS_PORT,
  32. db=config.REDIS_DB,
  33. password=config.REDIS_PASSWORD,
  34. decode_responses=True
  35. )
  36. redis_client.ping()
  37. # 扫描该thread的所有checkpoint keys
  38. pattern = f"checkpoint:{thread_id}:*"
  39. logger.info(f"🔍 扫描模式: {pattern}, include_tools: {include_tools}")
  40. keys = []
  41. cursor = 0
  42. while True:
  43. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  44. keys.extend(batch)
  45. if cursor == 0:
  46. break
  47. logger.info(f"📋 找到 {len(keys)} 个keys")
  48. if not keys:
  49. redis_client.close()
  50. return {
  51. "success": False,
  52. "error": f"未找到对话 {thread_id}",
  53. "data": None
  54. }
  55. # 获取最新的checkpoint(按key排序,最大的是最新的)
  56. latest_key = max(keys)
  57. logger.info(f"🔍 使用最新key: {latest_key}")
  58. # 检查key类型并获取数据
  59. key_type = redis_client.type(latest_key)
  60. logger.info(f"🔍 Key类型: {key_type}")
  61. data = None
  62. if key_type == 'string':
  63. data = redis_client.get(latest_key)
  64. elif key_type == 'ReJSON-RL':
  65. # RedisJSON类型
  66. try:
  67. data = redis_client.execute_command('JSON.GET', latest_key)
  68. except Exception as json_error:
  69. logger.error(f"❌ JSON.GET 失败: {json_error}")
  70. redis_client.close()
  71. return {
  72. "success": False,
  73. "error": f"无法读取RedisJSON数据: {json_error}",
  74. "data": None
  75. }
  76. else:
  77. redis_client.close()
  78. return {
  79. "success": False,
  80. "error": f"不支持的key类型: {key_type}",
  81. "data": None
  82. }
  83. if not data:
  84. redis_client.close()
  85. return {
  86. "success": False,
  87. "error": "没有找到有效数据",
  88. "data": None
  89. }
  90. # 解析JSON数据
  91. try:
  92. checkpoint_data = json.loads(data)
  93. logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}")
  94. except json.JSONDecodeError as e:
  95. redis_client.close()
  96. return {
  97. "success": False,
  98. "error": f"JSON解析失败: {e}",
  99. "data": None
  100. }
  101. # 提取消息数据
  102. messages = extract_messages_from_checkpoint(checkpoint_data)
  103. logger.info(f"🔍 找到 {len(messages)} 条原始消息")
  104. # 🔑 关键改进:构建消息ID到时间戳的映射(模仿LangGraph API)
  105. logger.debug(f"🔍 开始构建消息时间戳映射...")
  106. message_timestamps = _build_message_timestamp_map_from_redis(redis_client, thread_id)
  107. # 提取最新checkpoint时间戳作为备用
  108. checkpoint_ts = None
  109. if ('checkpoint' in checkpoint_data and
  110. isinstance(checkpoint_data['checkpoint'], dict) and
  111. 'ts' in checkpoint_data['checkpoint']):
  112. ts_value = checkpoint_data['checkpoint']['ts']
  113. if isinstance(ts_value, str):
  114. try:
  115. # 转换为中国时区格式
  116. from datetime import datetime
  117. import pytz
  118. dt = datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
  119. china_tz = pytz.timezone('Asia/Shanghai')
  120. china_dt = dt.astimezone(china_tz)
  121. checkpoint_ts = china_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
  122. logger.debug(f"🕒 备用checkpoint时间戳: {checkpoint_ts}")
  123. except Exception as e:
  124. logger.warning(f"⚠️ 时间戳转换失败: {e}")
  125. # 解析并过滤消息 - 使用消息时间戳映射
  126. parsed_messages = parse_and_filter_messages(messages, include_tools, checkpoint_ts, message_timestamps)
  127. # 提取用户ID
  128. user_id = thread_id.split(':')[0] if ':' in thread_id else 'unknown'
  129. # 解析created_at时间(从thread_id中提取)
  130. created_at = None
  131. if ':' in thread_id:
  132. try:
  133. timestamp_str = thread_id.split(':')[1]
  134. # 转换为中国时区格式
  135. from datetime import datetime
  136. import pytz
  137. # 假设timestamp是YYYYMMDDHHmmssSSS格式
  138. dt = datetime.strptime(timestamp_str, '%Y%m%d%H%M%S%f')
  139. china_tz = pytz.timezone('Asia/Shanghai')
  140. china_dt = china_tz.localize(dt)
  141. created_at = china_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
  142. logger.info(f"🕒 解析created_at: {created_at}")
  143. except Exception as e:
  144. logger.warning(f"⚠️ 解析created_at失败: {e}")
  145. # 生成对话统计信息
  146. stats = generate_conversation_stats(parsed_messages, include_tools)
  147. redis_client.close()
  148. return {
  149. "success": True,
  150. "data": {
  151. "thread_id": thread_id,
  152. "conversation_id": thread_id, # 添加conversation_id字段
  153. "created_at": created_at, # 添加created_at字段
  154. "user_id": user_id,
  155. "include_tools": include_tools,
  156. "message_count": len(parsed_messages),
  157. "messages": parsed_messages,
  158. "stats": stats,
  159. "metadata": {
  160. "latest_checkpoint_key": latest_key,
  161. "total_raw_messages": len(messages),
  162. "filtered_message_count": len(parsed_messages),
  163. "filter_mode": "full_conversation" if include_tools else "human_ai_only"
  164. }
  165. }
  166. }
  167. except Exception as e:
  168. logger.error(f"❌ 获取对话详情失败: {e}")
  169. import traceback
  170. traceback.print_exc()
  171. return {
  172. "success": False,
  173. "error": str(e),
  174. "data": None
  175. }
  176. def extract_messages_from_checkpoint(checkpoint_data: Dict[str, Any]) -> List[Any]:
  177. """
  178. 从checkpoint数据中提取消息列表
  179. """
  180. messages = []
  181. # 尝试不同的数据结构路径
  182. if 'checkpoint' in checkpoint_data:
  183. checkpoint = checkpoint_data['checkpoint']
  184. if isinstance(checkpoint, dict) and 'channel_values' in checkpoint:
  185. channel_values = checkpoint['channel_values']
  186. if isinstance(channel_values, dict) and 'messages' in channel_values:
  187. messages = channel_values['messages']
  188. # 如果没有找到,尝试直接路径
  189. if not messages and 'channel_values' in checkpoint_data:
  190. channel_values = checkpoint_data['channel_values']
  191. if isinstance(channel_values, dict) and 'messages' in channel_values:
  192. messages = channel_values['messages']
  193. return messages
  194. def _build_message_timestamp_map_from_redis(redis_client, thread_id: str) -> Dict[str, str]:
  195. """
  196. 构建消息ID到时间戳的映射,模仿LangGraph API的逻辑
  197. 遍历所有历史checkpoint,为每条消息记录其首次出现的时间戳
  198. """
  199. message_timestamps = {}
  200. try:
  201. # 获取所有checkpoint keys
  202. pattern = f"checkpoint:{thread_id}:*"
  203. keys = []
  204. cursor = 0
  205. while True:
  206. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  207. keys.extend(batch)
  208. if cursor == 0:
  209. break
  210. if not keys:
  211. logger.warning(f"⚠️ 未找到任何checkpoint keys: {pattern}")
  212. return message_timestamps
  213. # 获取所有checkpoint数据并按时间戳排序
  214. checkpoints_with_ts = []
  215. for key in keys:
  216. try:
  217. # 检查key类型并获取数据
  218. key_type = redis_client.type(key)
  219. data = None
  220. if key_type == 'string':
  221. data = redis_client.get(key)
  222. elif key_type == 'ReJSON-RL':
  223. data = redis_client.execute_command('JSON.GET', key)
  224. else:
  225. continue
  226. if not data:
  227. continue
  228. # 解析checkpoint数据
  229. checkpoint_data = json.loads(data)
  230. # 提取时间戳
  231. checkpoint_ts = None
  232. if ('checkpoint' in checkpoint_data and
  233. isinstance(checkpoint_data['checkpoint'], dict) and
  234. 'ts' in checkpoint_data['checkpoint']):
  235. ts_value = checkpoint_data['checkpoint']['ts']
  236. if isinstance(ts_value, str):
  237. try:
  238. # 转换为中国时区格式
  239. from datetime import datetime
  240. import pytz
  241. dt = datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
  242. china_tz = pytz.timezone('Asia/Shanghai')
  243. china_dt = dt.astimezone(china_tz)
  244. checkpoint_ts = china_dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
  245. except Exception as e:
  246. logger.warning(f"⚠️ 解析时间戳失败 {key}: {e}")
  247. continue
  248. if checkpoint_ts:
  249. checkpoints_with_ts.append({
  250. 'key': key,
  251. 'timestamp': checkpoint_ts,
  252. 'data': checkpoint_data,
  253. 'raw_ts': ts_value # 用于排序
  254. })
  255. except Exception as e:
  256. logger.warning(f"⚠️ 处理checkpoint失败 {key}: {e}")
  257. continue
  258. # 按时间戳排序(最早的在前)
  259. checkpoints_with_ts.sort(key=lambda x: x['raw_ts'])
  260. logger.debug(f"🔍 找到 {len(checkpoints_with_ts)} 个有效checkpoint,按时间排序")
  261. # 遍历每个checkpoint,为新消息分配时间戳
  262. for checkpoint_info in checkpoints_with_ts:
  263. checkpoint_data = checkpoint_info['data']
  264. checkpoint_ts = checkpoint_info['timestamp']
  265. # 提取消息
  266. messages = extract_messages_from_checkpoint(checkpoint_data)
  267. # 为这个checkpoint中的新消息分配时间戳
  268. for msg in messages:
  269. if isinstance(msg, dict) and 'kwargs' in msg:
  270. msg_id = msg['kwargs'].get('id')
  271. if msg_id and msg_id not in message_timestamps:
  272. message_timestamps[msg_id] = checkpoint_ts
  273. logger.debug(f"🕒 消息 {msg_id} 分配时间戳: {checkpoint_ts}")
  274. logger.debug(f"✅ 成功构建消息时间戳映射,包含 {len(message_timestamps)} 条消息")
  275. except Exception as e:
  276. logger.error(f"❌ 构建消息时间戳映射失败: {e}")
  277. import traceback
  278. traceback.print_exc()
  279. return message_timestamps
  280. def parse_and_filter_messages(raw_messages: List[Any], include_tools: bool, checkpoint_ts: str = None, message_timestamps: Dict[str, str] = None) -> List[Dict[str, Any]]:
  281. """
  282. 解析和过滤消息列表 - 关键的开关逻辑实现
  283. Args:
  284. raw_messages: 原始消息列表
  285. include_tools: 是否包含工具消息
  286. - True: 返回所有消息类型
  287. - False: 只返回human/ai,且清理ai消息中的工具信息
  288. Returns:
  289. 解析后的消息列表
  290. """
  291. parsed_messages = []
  292. for msg in raw_messages:
  293. try:
  294. # 获取消息ID,用于查找其真实时间戳
  295. msg_id = None
  296. if isinstance(msg, dict) and 'kwargs' in msg:
  297. msg_id = msg['kwargs'].get('id')
  298. # 使用消息映射中的时间戳,如果没有则使用checkpoint时间戳
  299. msg_timestamp = checkpoint_ts
  300. if message_timestamps and msg_id and msg_id in message_timestamps:
  301. msg_timestamp = message_timestamps[msg_id]
  302. logger.debug(f"🕒 使用消息映射时间戳: {msg_id} -> {msg_timestamp}")
  303. parsed_msg = parse_single_message(msg, msg_timestamp)
  304. if not parsed_msg:
  305. continue
  306. msg_type = parsed_msg['role'] # 使用新的字段名
  307. if include_tools:
  308. # 完整模式:包含所有消息类型
  309. parsed_messages.append(parsed_msg)
  310. logger.debug(f"✅ [完整模式] 包含消息: {msg_type}")
  311. else:
  312. # 简化模式:只包含human和ai消息
  313. if msg_type == 'human':
  314. parsed_messages.append(parsed_msg)
  315. logger.debug(f"✅ [简化模式] 包含human消息")
  316. elif msg_type == 'ai':
  317. # 清理AI消息,移除工具调用信息
  318. cleaned_msg = clean_ai_message_for_simple_mode(parsed_msg)
  319. # 只包含有实际内容的AI消息
  320. if cleaned_msg['content'].strip() and not cleaned_msg.get('is_intermediate_step', False):
  321. parsed_messages.append(cleaned_msg)
  322. logger.debug(f"✅ [简化模式] 包含有内容的ai消息")
  323. else:
  324. logger.debug(f"⏭️ [简化模式] 跳过空的ai消息或中间步骤")
  325. else:
  326. # 跳过tool、system等消息
  327. logger.debug(f"⏭️ [简化模式] 跳过 {msg_type} 消息")
  328. except Exception as e:
  329. logger.warning(f"⚠️ 解析消息失败: {e}")
  330. continue
  331. logger.info(f"📊 解析结果: {len(parsed_messages)} 条消息 (include_tools={include_tools})")
  332. return parsed_messages
  333. def parse_single_message(msg: Any, checkpoint_ts: str = None) -> Optional[Dict[str, Any]]:
  334. """
  335. 解析单个消息,支持LangChain序列化格式
  336. Args:
  337. msg: 原始消息数据
  338. checkpoint_ts: checkpoint的时间戳,用作消息的真实时间(备用)
  339. """
  340. if isinstance(msg, dict):
  341. # LangChain序列化格式
  342. if (msg.get('lc') == 1 and
  343. msg.get('type') == 'constructor' and
  344. 'id' in msg and
  345. isinstance(msg['id'], list) and
  346. 'kwargs' in msg):
  347. kwargs = msg['kwargs']
  348. msg_class = msg['id'][-1] if msg['id'] else 'Unknown'
  349. # 确定消息类型
  350. if msg_class == 'HumanMessage':
  351. msg_type = 'human'
  352. elif msg_class == 'AIMessage':
  353. msg_type = 'ai'
  354. elif msg_class == 'ToolMessage':
  355. msg_type = 'tool'
  356. elif msg_class == 'SystemMessage':
  357. msg_type = 'system'
  358. else:
  359. msg_type = 'unknown'
  360. # 使用传入的时间戳(现在将通过消息ID映射获得)
  361. final_timestamp = checkpoint_ts or datetime.now().isoformat()
  362. # 构建基础消息对象 - 修改字段名称
  363. parsed_msg = {
  364. "role": msg_type, # type -> role
  365. "content": kwargs.get('content', ''),
  366. "message_id": kwargs.get('id'), # id -> message_id
  367. "timestamp": final_timestamp # 使用消息自己的时间戳或备用时间戳
  368. }
  369. # 处理AI消息的特殊字段
  370. if msg_type == 'ai':
  371. # 工具调用信息
  372. tool_calls = kwargs.get('tool_calls', [])
  373. parsed_msg['tool_calls'] = tool_calls
  374. parsed_msg['has_tool_calls'] = len(tool_calls) > 0
  375. # 额外的AI消息元数据
  376. additional_kwargs = kwargs.get('additional_kwargs', {})
  377. if additional_kwargs:
  378. parsed_msg['additional_kwargs'] = additional_kwargs
  379. response_metadata = kwargs.get('response_metadata', {})
  380. if response_metadata:
  381. parsed_msg['response_metadata'] = response_metadata
  382. # 处理工具消息的特殊字段
  383. elif msg_type == 'tool':
  384. parsed_msg['tool_name'] = kwargs.get('name')
  385. parsed_msg['tool_call_id'] = kwargs.get('tool_call_id')
  386. parsed_msg['status'] = kwargs.get('status', 'unknown')
  387. return parsed_msg
  388. # 简单字典格式
  389. elif 'type' in msg:
  390. return {
  391. "role": msg.get('type', 'unknown'), # type -> role
  392. "content": msg.get('content', ''),
  393. "message_id": msg.get('id'), # id -> message_id
  394. "timestamp": checkpoint_ts or datetime.now().isoformat() # 使用真实时间戳
  395. }
  396. return None
  397. def clean_ai_message_for_simple_mode(ai_msg: Dict[str, Any]) -> Dict[str, Any]:
  398. """
  399. 调试版本:清理AI消息用于简化模式
  400. """
  401. original_content = ai_msg.get("content", "")
  402. logger.debug(f"🔍 清理AI消息,原始内容: '{original_content}', 长度: {len(original_content)}")
  403. cleaned_msg = {
  404. "role": ai_msg["role"], # 使用新的字段名
  405. "content": original_content,
  406. "message_id": ai_msg.get("message_id"), # 使用新的字段名
  407. "timestamp": ai_msg.get("timestamp")
  408. }
  409. # 处理内容格式化
  410. content = original_content.strip()
  411. # 注释掉 [Formatted Output] 清理逻辑 - 源头已不生成前缀
  412. # if '[Formatted Output]' in content:
  413. # logger.info(f"🔍 发现 [Formatted Output] 标记")
  414. #
  415. # if content.startswith('[Formatted Output]\n'):
  416. # # 去掉标记,保留后面的实际内容
  417. # actual_content = content.replace('[Formatted Output]\n', '')
  418. # logger.info(f"🔍 去除标记后的内容: '{actual_content}', 长度: {len(actual_content)}")
  419. # cleaned_msg["content"] = actual_content
  420. # content = actual_content
  421. # elif content == '[Formatted Output]' or content == '[Formatted Output]\n':
  422. # # 如果只有标记没有内容
  423. # logger.info(f"🔍 只有标记没有实际内容")
  424. # cleaned_msg["content"] = ""
  425. # cleaned_msg["is_intermediate_step"] = True
  426. # content = ""
  427. # 如果清理后内容为空或只有空白,标记为中间步骤
  428. if not content.strip():
  429. logger.debug(f"🔍 内容为空,标记为中间步骤")
  430. cleaned_msg["is_intermediate_step"] = True
  431. cleaned_msg["content"] = ""
  432. # 添加简化模式标记
  433. cleaned_msg["simplified"] = True
  434. logger.debug(f"🔍 清理结果: '{cleaned_msg['content']}', 是否中间步骤: {cleaned_msg.get('is_intermediate_step', False)}")
  435. return cleaned_msg
  436. def generate_conversation_stats(messages: List[Dict[str, Any]], include_tools: bool) -> Dict[str, Any]:
  437. """
  438. 生成对话统计信息
  439. Args:
  440. messages: 解析后的消息列表
  441. include_tools: 是否包含工具信息(影响统计内容)
  442. Returns:
  443. 统计信息字典
  444. """
  445. stats = {
  446. "total_messages": len(messages),
  447. "human_messages": 0,
  448. "ai_messages": 0,
  449. "conversation_rounds": 0,
  450. "include_tools_mode": include_tools
  451. }
  452. # 添加工具相关统计(仅在include_tools=True时)
  453. if include_tools:
  454. stats.update({
  455. "tool_messages": 0,
  456. "system_messages": 0,
  457. "messages_with_tools": 0,
  458. "unique_tools_used": set()
  459. })
  460. for msg in messages:
  461. msg_type = msg.get('type', 'unknown')
  462. if msg_type == 'human':
  463. stats["human_messages"] += 1
  464. elif msg_type == 'ai':
  465. stats["ai_messages"] += 1
  466. # 工具相关统计
  467. if include_tools and msg.get('has_tool_calls', False):
  468. stats["messages_with_tools"] += 1
  469. # 统计使用的工具
  470. tool_calls = msg.get('tool_calls', [])
  471. for tool_call in tool_calls:
  472. if isinstance(tool_call, dict) and 'name' in tool_call:
  473. stats["unique_tools_used"].add(tool_call['name'])
  474. elif include_tools:
  475. if msg_type == 'tool':
  476. stats["tool_messages"] += 1
  477. # 记录工具名称
  478. tool_name = msg.get('tool_name')
  479. if tool_name:
  480. stats["unique_tools_used"].add(tool_name)
  481. elif msg_type == 'system':
  482. stats["system_messages"] += 1
  483. # 计算对话轮次
  484. stats["conversation_rounds"] = stats["human_messages"]
  485. # 转换set为list(JSON序列化)
  486. if include_tools and "unique_tools_used" in stats:
  487. stats["unique_tools_used"] = list(stats["unique_tools_used"])
  488. return stats
  489. def format_timestamp_readable(timestamp: str) -> str:
  490. """格式化时间戳为可读格式"""
  491. try:
  492. if len(timestamp) >= 14:
  493. year = timestamp[:4]
  494. month = timestamp[4:6]
  495. day = timestamp[6:8]
  496. hour = timestamp[8:10]
  497. minute = timestamp[10:12]
  498. second = timestamp[12:14]
  499. return f"{year}-{month}-{day} {hour}:{minute}:{second}"
  500. except Exception:
  501. pass
  502. return timestamp
  503. # =================== 测试函数 ===================
  504. def test_conversation_detail_with_switch():
  505. """
  506. 测试对话详情获取功能,重点测试include_tools开关
  507. """
  508. print("🧪 测试对话详情获取(开关参数测试)...")
  509. # 测试thread_id(请替换为实际存在的thread_id)
  510. test_thread_id = "wang:20250709195048728323"
  511. print(f"\n1. 测试完整模式(include_tools=True)...")
  512. result_full = get_conversation_detail_from_redis(test_thread_id, include_tools=True)
  513. if result_full['success']:
  514. data = result_full['data']
  515. print(f" ✅ 成功获取完整对话")
  516. print(f" 📊 消息数量: {data['message_count']}")
  517. print(f" 📈 统计信息: {data['stats']}")
  518. print(f" 🔧 包含工具: {data['stats'].get('tool_messages', 0)} 条工具消息")
  519. # 显示消息类型分布
  520. message_types = {}
  521. for msg in data['messages']:
  522. msg_type = msg['type']
  523. message_types[msg_type] = message_types.get(msg_type, 0) + 1
  524. print(f" 📋 消息类型分布: {message_types}")
  525. else:
  526. print(f" ❌ 获取失败: {result_full['error']}")
  527. print(f"\n2. 测试简化模式(include_tools=False)...")
  528. result_simple = get_conversation_detail_from_redis(test_thread_id, include_tools=False)
  529. if result_simple['success']:
  530. data = result_simple['data']
  531. print(f" ✅ 成功获取简化对话")
  532. print(f" 📊 消息数量: {data['message_count']}")
  533. print(f" 📈 统计信息: {data['stats']}")
  534. # 显示消息类型分布
  535. message_types = {}
  536. for msg in data['messages']:
  537. msg_type = msg['type']
  538. message_types[msg_type] = message_types.get(msg_type, 0) + 1
  539. print(f" 📋 消息类型分布: {message_types}")
  540. # 显示前几条消息示例
  541. print(f" 💬 消息示例:")
  542. for i, msg in enumerate(data['messages'][:4]):
  543. content_preview = str(msg['content'])[:50] + "..." if len(str(msg['content'])) > 50 else str(msg['content'])
  544. simplified_mark = " [简化]" if msg.get('simplified') else ""
  545. print(f" [{i+1}] {msg['type']}: {content_preview}{simplified_mark}")
  546. else:
  547. print(f" ❌ 获取失败: {result_simple['error']}")
  548. # 比较两种模式
  549. if result_full['success'] and result_simple['success']:
  550. full_count = result_full['data']['message_count']
  551. simple_count = result_simple['data']['message_count']
  552. difference = full_count - simple_count
  553. print(f"\n3. 模式比较:")
  554. print(f" 📊 完整模式消息数: {full_count}")
  555. print(f" 📊 简化模式消息数: {simple_count}")
  556. print(f" 📊 过滤掉的消息数: {difference}")
  557. print(f" 🎯 过滤效果: {'有效' if difference > 0 else '无差异'}")
  558. if __name__ == "__main__":
  559. test_conversation_detail_with_switch()