enhanced_redis_api.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. """
  2. enhanced_redis_api.py - 完整的Redis直接访问API
  3. 支持include_tools开关参数,可以控制是否包含工具调用信息
  4. """
  5. import redis
  6. import json
  7. from typing import List, Dict, Any, Optional
  8. from datetime import datetime
  9. import logging
  10. logger = logging.getLogger(__name__)
  11. def get_conversation_detail_from_redis(thread_id: str, include_tools: bool = False) -> Dict[str, Any]:
  12. """
  13. 直接从Redis获取对话详细信息
  14. Args:
  15. thread_id: 线程ID,格式为 user_id:timestamp
  16. include_tools: 是否包含工具调用信息
  17. - True: 返回所有消息(human/ai/tool/system)
  18. - False: 只返回human和ai消息,且清理ai消息中的工具调用信息
  19. Returns:
  20. 包含对话详细信息的字典
  21. """
  22. try:
  23. # 创建Redis连接
  24. redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  25. redis_client.ping()
  26. # 扫描该thread的所有checkpoint keys
  27. pattern = f"checkpoint:{thread_id}:*"
  28. logger.info(f"🔍 扫描模式: {pattern}, include_tools: {include_tools}")
  29. keys = []
  30. cursor = 0
  31. while True:
  32. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  33. keys.extend(batch)
  34. if cursor == 0:
  35. break
  36. logger.info(f"📋 找到 {len(keys)} 个keys")
  37. if not keys:
  38. redis_client.close()
  39. return {
  40. "success": False,
  41. "error": f"未找到对话 {thread_id}",
  42. "data": None
  43. }
  44. # 获取最新的checkpoint(按key排序,最大的是最新的)
  45. latest_key = max(keys)
  46. logger.info(f"🔍 使用最新key: {latest_key}")
  47. # 检查key类型并获取数据
  48. key_type = redis_client.type(latest_key)
  49. logger.info(f"🔍 Key类型: {key_type}")
  50. data = None
  51. if key_type == 'string':
  52. data = redis_client.get(latest_key)
  53. elif key_type == 'ReJSON-RL':
  54. # RedisJSON类型
  55. try:
  56. data = redis_client.execute_command('JSON.GET', latest_key)
  57. except Exception as json_error:
  58. logger.error(f"❌ JSON.GET 失败: {json_error}")
  59. redis_client.close()
  60. return {
  61. "success": False,
  62. "error": f"无法读取RedisJSON数据: {json_error}",
  63. "data": None
  64. }
  65. else:
  66. redis_client.close()
  67. return {
  68. "success": False,
  69. "error": f"不支持的key类型: {key_type}",
  70. "data": None
  71. }
  72. if not data:
  73. redis_client.close()
  74. return {
  75. "success": False,
  76. "error": "没有找到有效数据",
  77. "data": None
  78. }
  79. # 解析JSON数据
  80. try:
  81. checkpoint_data = json.loads(data)
  82. logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}")
  83. except json.JSONDecodeError as e:
  84. redis_client.close()
  85. return {
  86. "success": False,
  87. "error": f"JSON解析失败: {e}",
  88. "data": None
  89. }
  90. # 提取消息数据
  91. messages = extract_messages_from_checkpoint(checkpoint_data)
  92. logger.info(f"🔍 找到 {len(messages)} 条原始消息")
  93. # 解析并过滤消息 - 这里是关键的开关逻辑
  94. parsed_messages = parse_and_filter_messages(messages, include_tools)
  95. # 提取用户ID
  96. user_id = thread_id.split(':')[0] if ':' in thread_id else 'unknown'
  97. # 生成对话统计信息
  98. stats = generate_conversation_stats(parsed_messages, include_tools)
  99. redis_client.close()
  100. return {
  101. "success": True,
  102. "data": {
  103. "thread_id": thread_id,
  104. "user_id": user_id,
  105. "include_tools": include_tools,
  106. "message_count": len(parsed_messages),
  107. "messages": parsed_messages,
  108. "stats": stats,
  109. "metadata": {
  110. "latest_checkpoint_key": latest_key,
  111. "total_raw_messages": len(messages),
  112. "filtered_message_count": len(parsed_messages),
  113. "filter_mode": "full_conversation" if include_tools else "human_ai_only"
  114. }
  115. }
  116. }
  117. except Exception as e:
  118. logger.error(f"❌ 获取对话详情失败: {e}")
  119. import traceback
  120. traceback.print_exc()
  121. return {
  122. "success": False,
  123. "error": str(e),
  124. "data": None
  125. }
  126. def extract_messages_from_checkpoint(checkpoint_data: Dict[str, Any]) -> List[Any]:
  127. """
  128. 从checkpoint数据中提取消息列表
  129. """
  130. messages = []
  131. # 尝试不同的数据结构路径
  132. if 'checkpoint' in checkpoint_data:
  133. checkpoint = checkpoint_data['checkpoint']
  134. if isinstance(checkpoint, dict) and 'channel_values' in checkpoint:
  135. channel_values = checkpoint['channel_values']
  136. if isinstance(channel_values, dict) and 'messages' in channel_values:
  137. messages = channel_values['messages']
  138. # 如果没有找到,尝试直接路径
  139. if not messages and 'channel_values' in checkpoint_data:
  140. channel_values = checkpoint_data['channel_values']
  141. if isinstance(channel_values, dict) and 'messages' in channel_values:
  142. messages = channel_values['messages']
  143. return messages
  144. def parse_and_filter_messages(raw_messages: List[Any], include_tools: bool) -> List[Dict[str, Any]]:
  145. """
  146. 解析和过滤消息列表 - 关键的开关逻辑实现
  147. Args:
  148. raw_messages: 原始消息列表
  149. include_tools: 是否包含工具消息
  150. - True: 返回所有消息类型
  151. - False: 只返回human/ai,且清理ai消息中的工具信息
  152. Returns:
  153. 解析后的消息列表
  154. """
  155. parsed_messages = []
  156. for msg in raw_messages:
  157. try:
  158. parsed_msg = parse_single_message(msg)
  159. if not parsed_msg:
  160. continue
  161. msg_type = parsed_msg['type']
  162. if include_tools:
  163. # 完整模式:包含所有消息类型
  164. parsed_messages.append(parsed_msg)
  165. logger.debug(f"✅ [完整模式] 包含消息: {msg_type}")
  166. else:
  167. # 简化模式:只包含human和ai消息
  168. if msg_type == 'human':
  169. parsed_messages.append(parsed_msg)
  170. logger.debug(f"✅ [简化模式] 包含human消息")
  171. elif msg_type == 'ai':
  172. # 清理AI消息,移除工具调用信息
  173. cleaned_msg = clean_ai_message_for_simple_mode(parsed_msg)
  174. # 只包含有实际内容的AI消息
  175. if cleaned_msg['content'].strip() and not cleaned_msg.get('is_intermediate_step', False):
  176. parsed_messages.append(cleaned_msg)
  177. logger.debug(f"✅ [简化模式] 包含有内容的ai消息")
  178. else:
  179. logger.debug(f"⏭️ [简化模式] 跳过空的ai消息或中间步骤")
  180. else:
  181. # 跳过tool、system等消息
  182. logger.debug(f"⏭️ [简化模式] 跳过 {msg_type} 消息")
  183. except Exception as e:
  184. logger.warning(f"⚠️ 解析消息失败: {e}")
  185. continue
  186. logger.info(f"📊 解析结果: {len(parsed_messages)} 条消息 (include_tools={include_tools})")
  187. return parsed_messages
  188. def parse_single_message(msg: Any) -> Optional[Dict[str, Any]]:
  189. """
  190. 解析单个消息,支持LangChain序列化格式
  191. """
  192. if isinstance(msg, dict):
  193. # LangChain序列化格式
  194. if (msg.get('lc') == 1 and
  195. msg.get('type') == 'constructor' and
  196. 'id' in msg and
  197. isinstance(msg['id'], list) and
  198. 'kwargs' in msg):
  199. kwargs = msg['kwargs']
  200. msg_class = msg['id'][-1] if msg['id'] else 'Unknown'
  201. # 确定消息类型
  202. if msg_class == 'HumanMessage':
  203. msg_type = 'human'
  204. elif msg_class == 'AIMessage':
  205. msg_type = 'ai'
  206. elif msg_class == 'ToolMessage':
  207. msg_type = 'tool'
  208. elif msg_class == 'SystemMessage':
  209. msg_type = 'system'
  210. else:
  211. msg_type = 'unknown'
  212. # 构建基础消息对象
  213. parsed_msg = {
  214. "type": msg_type,
  215. "content": kwargs.get('content', ''),
  216. "id": kwargs.get('id'),
  217. "timestamp": datetime.now().isoformat()
  218. }
  219. # 处理AI消息的特殊字段
  220. if msg_type == 'ai':
  221. # 工具调用信息
  222. tool_calls = kwargs.get('tool_calls', [])
  223. parsed_msg['tool_calls'] = tool_calls
  224. parsed_msg['has_tool_calls'] = len(tool_calls) > 0
  225. # 额外的AI消息元数据
  226. additional_kwargs = kwargs.get('additional_kwargs', {})
  227. if additional_kwargs:
  228. parsed_msg['additional_kwargs'] = additional_kwargs
  229. response_metadata = kwargs.get('response_metadata', {})
  230. if response_metadata:
  231. parsed_msg['response_metadata'] = response_metadata
  232. # 处理工具消息的特殊字段
  233. elif msg_type == 'tool':
  234. parsed_msg['tool_name'] = kwargs.get('name')
  235. parsed_msg['tool_call_id'] = kwargs.get('tool_call_id')
  236. parsed_msg['status'] = kwargs.get('status', 'unknown')
  237. return parsed_msg
  238. # 简单字典格式
  239. elif 'type' in msg:
  240. return {
  241. "type": msg.get('type', 'unknown'),
  242. "content": msg.get('content', ''),
  243. "id": msg.get('id'),
  244. "timestamp": datetime.now().isoformat()
  245. }
  246. return None
  247. def clean_ai_message_for_simple_mode(ai_msg: Dict[str, Any]) -> Dict[str, Any]:
  248. """
  249. 调试版本:清理AI消息用于简化模式
  250. """
  251. original_content = ai_msg.get("content", "")
  252. logger.info(f"🔍 清理AI消息,原始内容: '{original_content}', 长度: {len(original_content)}")
  253. cleaned_msg = {
  254. "type": ai_msg["type"],
  255. "content": original_content,
  256. "id": ai_msg.get("id"),
  257. "timestamp": ai_msg.get("timestamp")
  258. }
  259. # 处理内容格式化
  260. content = original_content.strip()
  261. # 注释掉 [Formatted Output] 清理逻辑 - 源头已不生成前缀
  262. # if '[Formatted Output]' in content:
  263. # logger.info(f"🔍 发现 [Formatted Output] 标记")
  264. #
  265. # if content.startswith('[Formatted Output]\n'):
  266. # # 去掉标记,保留后面的实际内容
  267. # actual_content = content.replace('[Formatted Output]\n', '')
  268. # logger.info(f"🔍 去除标记后的内容: '{actual_content}', 长度: {len(actual_content)}")
  269. # cleaned_msg["content"] = actual_content
  270. # content = actual_content
  271. # elif content == '[Formatted Output]' or content == '[Formatted Output]\n':
  272. # # 如果只有标记没有内容
  273. # logger.info(f"🔍 只有标记没有实际内容")
  274. # cleaned_msg["content"] = ""
  275. # cleaned_msg["is_intermediate_step"] = True
  276. # content = ""
  277. # 如果清理后内容为空或只有空白,标记为中间步骤
  278. if not content.strip():
  279. logger.info(f"🔍 内容为空,标记为中间步骤")
  280. cleaned_msg["is_intermediate_step"] = True
  281. cleaned_msg["content"] = ""
  282. # 添加简化模式标记
  283. cleaned_msg["simplified"] = True
  284. logger.info(f"🔍 清理结果: '{cleaned_msg['content']}', 是否中间步骤: {cleaned_msg.get('is_intermediate_step', False)}")
  285. return cleaned_msg
  286. def generate_conversation_stats(messages: List[Dict[str, Any]], include_tools: bool) -> Dict[str, Any]:
  287. """
  288. 生成对话统计信息
  289. Args:
  290. messages: 解析后的消息列表
  291. include_tools: 是否包含工具信息(影响统计内容)
  292. Returns:
  293. 统计信息字典
  294. """
  295. stats = {
  296. "total_messages": len(messages),
  297. "human_messages": 0,
  298. "ai_messages": 0,
  299. "conversation_rounds": 0,
  300. "include_tools_mode": include_tools
  301. }
  302. # 添加工具相关统计(仅在include_tools=True时)
  303. if include_tools:
  304. stats.update({
  305. "tool_messages": 0,
  306. "system_messages": 0,
  307. "messages_with_tools": 0,
  308. "unique_tools_used": set()
  309. })
  310. for msg in messages:
  311. msg_type = msg.get('type', 'unknown')
  312. if msg_type == 'human':
  313. stats["human_messages"] += 1
  314. elif msg_type == 'ai':
  315. stats["ai_messages"] += 1
  316. # 工具相关统计
  317. if include_tools and msg.get('has_tool_calls', False):
  318. stats["messages_with_tools"] += 1
  319. # 统计使用的工具
  320. tool_calls = msg.get('tool_calls', [])
  321. for tool_call in tool_calls:
  322. if isinstance(tool_call, dict) and 'name' in tool_call:
  323. stats["unique_tools_used"].add(tool_call['name'])
  324. elif include_tools:
  325. if msg_type == 'tool':
  326. stats["tool_messages"] += 1
  327. # 记录工具名称
  328. tool_name = msg.get('tool_name')
  329. if tool_name:
  330. stats["unique_tools_used"].add(tool_name)
  331. elif msg_type == 'system':
  332. stats["system_messages"] += 1
  333. # 计算对话轮次
  334. stats["conversation_rounds"] = stats["human_messages"]
  335. # 转换set为list(JSON序列化)
  336. if include_tools and "unique_tools_used" in stats:
  337. stats["unique_tools_used"] = list(stats["unique_tools_used"])
  338. return stats
  339. def format_timestamp_readable(timestamp: str) -> str:
  340. """格式化时间戳为可读格式"""
  341. try:
  342. if len(timestamp) >= 14:
  343. year = timestamp[:4]
  344. month = timestamp[4:6]
  345. day = timestamp[6:8]
  346. hour = timestamp[8:10]
  347. minute = timestamp[10:12]
  348. second = timestamp[12:14]
  349. return f"{year}-{month}-{day} {hour}:{minute}:{second}"
  350. except Exception:
  351. pass
  352. return timestamp
  353. # =================== 测试函数 ===================
  354. def test_conversation_detail_with_switch():
  355. """
  356. 测试对话详情获取功能,重点测试include_tools开关
  357. """
  358. print("🧪 测试对话详情获取(开关参数测试)...")
  359. # 测试thread_id(请替换为实际存在的thread_id)
  360. test_thread_id = "wang:20250709195048728323"
  361. print(f"\n1. 测试完整模式(include_tools=True)...")
  362. result_full = get_conversation_detail_from_redis(test_thread_id, include_tools=True)
  363. if result_full['success']:
  364. data = result_full['data']
  365. print(f" ✅ 成功获取完整对话")
  366. print(f" 📊 消息数量: {data['message_count']}")
  367. print(f" 📈 统计信息: {data['stats']}")
  368. print(f" 🔧 包含工具: {data['stats'].get('tool_messages', 0)} 条工具消息")
  369. # 显示消息类型分布
  370. message_types = {}
  371. for msg in data['messages']:
  372. msg_type = msg['type']
  373. message_types[msg_type] = message_types.get(msg_type, 0) + 1
  374. print(f" 📋 消息类型分布: {message_types}")
  375. else:
  376. print(f" ❌ 获取失败: {result_full['error']}")
  377. print(f"\n2. 测试简化模式(include_tools=False)...")
  378. result_simple = get_conversation_detail_from_redis(test_thread_id, include_tools=False)
  379. if result_simple['success']:
  380. data = result_simple['data']
  381. print(f" ✅ 成功获取简化对话")
  382. print(f" 📊 消息数量: {data['message_count']}")
  383. print(f" 📈 统计信息: {data['stats']}")
  384. # 显示消息类型分布
  385. message_types = {}
  386. for msg in data['messages']:
  387. msg_type = msg['type']
  388. message_types[msg_type] = message_types.get(msg_type, 0) + 1
  389. print(f" 📋 消息类型分布: {message_types}")
  390. # 显示前几条消息示例
  391. print(f" 💬 消息示例:")
  392. for i, msg in enumerate(data['messages'][:4]):
  393. content_preview = str(msg['content'])[:50] + "..." if len(str(msg['content'])) > 50 else str(msg['content'])
  394. simplified_mark = " [简化]" if msg.get('simplified') else ""
  395. print(f" [{i+1}] {msg['type']}: {content_preview}{simplified_mark}")
  396. else:
  397. print(f" ❌ 获取失败: {result_simple['error']}")
  398. # 比较两种模式
  399. if result_full['success'] and result_simple['success']:
  400. full_count = result_full['data']['message_count']
  401. simple_count = result_simple['data']['message_count']
  402. difference = full_count - simple_count
  403. print(f"\n3. 模式比较:")
  404. print(f" 📊 完整模式消息数: {full_count}")
  405. print(f" 📊 简化模式消息数: {simple_count}")
  406. print(f" 📊 过滤掉的消息数: {difference}")
  407. print(f" 🎯 过滤效果: {'有效' if difference > 0 else '无差异'}")
  408. if __name__ == "__main__":
  409. test_conversation_detail_with_switch()