simple_redis_api.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """
  2. 简单Redis查询API函数,替换复杂的LangGraph方法
  3. """
  4. import redis
  5. import json
  6. from typing import List, Dict, Any
  7. from datetime import datetime
  8. def get_user_conversations_simple_sync(user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
  9. """
  10. 直接从Redis获取用户对话,不使用LangGraph
  11. 同步版本,避免事件循环问题
  12. """
  13. try:
  14. # 创建Redis连接
  15. redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  16. # 测试连接
  17. redis_client.ping()
  18. # 扫描用户的checkpoint keys
  19. pattern = f"checkpoint:{user_id}:*"
  20. print(f"🔍 扫描模式: {pattern}")
  21. # 获取所有匹配的keys
  22. keys = []
  23. cursor = 0
  24. while True:
  25. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  26. keys.extend(batch)
  27. if cursor == 0:
  28. break
  29. print(f"📋 找到 {len(keys)} 个keys")
  30. # 解析thread信息
  31. thread_data = {}
  32. for key in keys:
  33. try:
  34. # key格式: checkpoint:user_id:timestamp:status:uuid
  35. parts = key.split(':')
  36. if len(parts) >= 4:
  37. thread_id = f"{parts[1]}:{parts[2]}" # user_id:timestamp
  38. timestamp = parts[2]
  39. if thread_id not in thread_data:
  40. thread_data[thread_id] = {
  41. "thread_id": thread_id,
  42. "timestamp": timestamp,
  43. "keys": []
  44. }
  45. thread_data[thread_id]["keys"].append(key)
  46. except Exception as e:
  47. print(f"解析key失败 {key}: {e}")
  48. continue
  49. print(f"📊 找到 {len(thread_data)} 个thread")
  50. # 按时间戳排序
  51. sorted_threads = sorted(
  52. thread_data.values(),
  53. key=lambda x: x["timestamp"],
  54. reverse=True
  55. )[:limit]
  56. # 获取每个thread的详细信息
  57. conversations = []
  58. for thread_info in sorted_threads:
  59. try:
  60. thread_id = thread_info["thread_id"]
  61. # 获取该thread的最新checkpoint数据
  62. latest_key = None
  63. for key in thread_info["keys"]:
  64. if latest_key is None or key > latest_key:
  65. latest_key = key
  66. if latest_key:
  67. # 直接从Redis获取数据
  68. data = redis_client.get(latest_key)
  69. if data:
  70. try:
  71. # 尝试解析JSON数据
  72. checkpoint_data = json.loads(data)
  73. # 提取消息信息
  74. messages = checkpoint_data.get('channel_values', {}).get('messages', [])
  75. # 生成对话预览
  76. preview = "空对话"
  77. if messages:
  78. for msg in messages:
  79. # 处理不同的消息格式
  80. if isinstance(msg, dict):
  81. msg_type = msg.get('type', '')
  82. if msg_type == 'human':
  83. content = str(msg.get('content', ''))
  84. preview = content[:50] + "..." if len(content) > 50 else content
  85. break
  86. elif hasattr(msg, 'content') and hasattr(msg, '__class__'):
  87. # LangChain消息对象
  88. if msg.__class__.__name__ == 'HumanMessage':
  89. content = str(msg.content)
  90. preview = content[:50] + "..." if len(content) > 50 else content
  91. break
  92. conversations.append({
  93. "thread_id": thread_id,
  94. "user_id": user_id,
  95. "timestamp": thread_info["timestamp"],
  96. "message_count": len(messages),
  97. "conversation_preview": preview,
  98. "formatted_time": format_timestamp_simple(thread_info["timestamp"])
  99. })
  100. except json.JSONDecodeError:
  101. print(f"❌ 解析JSON失败: {latest_key}")
  102. continue
  103. except Exception as e:
  104. print(f"❌ 处理数据失败: {e}")
  105. continue
  106. except Exception as e:
  107. print(f"❌ 处理thread {thread_info['thread_id']} 失败: {e}")
  108. continue
  109. redis_client.close()
  110. print(f"✅ 返回 {len(conversations)} 个对话")
  111. return conversations
  112. except Exception as e:
  113. print(f"❌ Redis查询失败: {e}")
  114. return []
  115. def get_conversation_history_simple_sync(thread_id: str) -> List[Dict[str, Any]]:
  116. """
  117. 直接从Redis获取对话历史,不使用LangGraph
  118. """
  119. try:
  120. # 创建Redis连接
  121. redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  122. # 扫描该thread的所有checkpoint keys
  123. pattern = f"checkpoint:{thread_id}:*"
  124. keys = []
  125. cursor = 0
  126. while True:
  127. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  128. keys.extend(batch)
  129. if cursor == 0:
  130. break
  131. if not keys:
  132. redis_client.close()
  133. return []
  134. # 获取最新的checkpoint
  135. latest_key = max(keys)
  136. data = redis_client.get(latest_key)
  137. if not data:
  138. redis_client.close()
  139. return []
  140. # 解析数据
  141. checkpoint_data = json.loads(data)
  142. messages = checkpoint_data.get('channel_values', {}).get('messages', [])
  143. # 转换消息格式
  144. history = []
  145. for msg in messages:
  146. if isinstance(msg, dict):
  147. # 已经是字典格式
  148. msg_type = msg.get('type', 'unknown')
  149. if msg_type == 'human':
  150. role = "human"
  151. elif msg_type == 'tool':
  152. role = "tool"
  153. else:
  154. role = "ai"
  155. history.append({
  156. "type": role,
  157. "content": msg.get('content', ''),
  158. "tool_calls": msg.get('tool_calls', None)
  159. })
  160. elif hasattr(msg, '__class__'):
  161. # LangChain消息对象
  162. class_name = msg.__class__.__name__
  163. if class_name == 'HumanMessage':
  164. role = "human"
  165. elif class_name == 'ToolMessage':
  166. role = "tool"
  167. else:
  168. role = "ai"
  169. history.append({
  170. "type": role,
  171. "content": getattr(msg, 'content', ''),
  172. "tool_calls": getattr(msg, 'tool_calls', None)
  173. })
  174. redis_client.close()
  175. return history
  176. except Exception as e:
  177. print(f"❌ 获取对话历史失败: {e}")
  178. return []
  179. def format_timestamp_simple(timestamp: str) -> str:
  180. """格式化时间戳"""
  181. try:
  182. if len(timestamp) >= 14:
  183. year = timestamp[:4]
  184. month = timestamp[4:6]
  185. day = timestamp[6:8]
  186. hour = timestamp[8:10]
  187. minute = timestamp[10:12]
  188. second = timestamp[12:14]
  189. return f"{year}-{month}-{day} {hour}:{minute}:{second}"
  190. except Exception:
  191. pass
  192. return timestamp
  193. # 测试函数
  194. def test_simple_redis_functions():
  195. """测试简单Redis函数"""
  196. print("🧪 测试简单Redis函数...")
  197. try:
  198. # 测试获取对话列表
  199. print("1. 测试获取用户对话列表...")
  200. conversations = get_user_conversations_simple_sync("doudou", 5)
  201. print(f" 结果: {len(conversations)} 个对话")
  202. if conversations:
  203. for conv in conversations:
  204. print(f" - {conv['thread_id']}: {conv['conversation_preview']}")
  205. # 测试获取对话详情
  206. print("2. 测试获取对话详情...")
  207. first_thread = conversations[0]['thread_id']
  208. history = get_conversation_history_simple_sync(first_thread)
  209. print(f" 结果: {len(history)} 条消息")
  210. for i, msg in enumerate(history[:3]):
  211. print(f" [{i+1}] {msg['type']}: {str(msg['content'])[:50]}...")
  212. print("✅ 测试完成")
  213. return True
  214. except Exception as e:
  215. print(f"❌ 测试失败: {e}")
  216. import traceback
  217. traceback.print_exc()
  218. return False
  219. if __name__ == "__main__":
  220. test_simple_redis_functions()