simple_redis_query.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #!/usr/bin/env python3
  2. """
  3. 简单的Redis查询脚本,绕过LangGraph的复杂异步机制
  4. """
  5. import asyncio
  6. import redis
  7. import json
  8. from typing import List, Dict, Any
  9. async def get_user_conversations_simple(user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
  10. """
  11. 直接从Redis获取用户对话,不使用LangGraph
  12. """
  13. # 创建Redis连接
  14. redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  15. try:
  16. # 扫描用户的checkpoint keys
  17. pattern = f"checkpoint:{user_id}:*"
  18. print(f"🔍 扫描模式: {pattern}")
  19. # 获取所有匹配的keys
  20. keys = []
  21. cursor = 0
  22. while True:
  23. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  24. keys.extend(batch)
  25. if cursor == 0:
  26. break
  27. print(f"📋 找到 {len(keys)} 个keys")
  28. # 解析thread信息
  29. thread_data = {}
  30. for key in keys:
  31. try:
  32. # key格式: checkpoint:user_id:timestamp:status:uuid
  33. parts = key.split(':')
  34. if len(parts) >= 4:
  35. thread_id = f"{parts[1]}:{parts[2]}" # user_id:timestamp
  36. timestamp = parts[2]
  37. if thread_id not in thread_data:
  38. thread_data[thread_id] = {
  39. "thread_id": thread_id,
  40. "timestamp": timestamp,
  41. "keys": []
  42. }
  43. thread_data[thread_id]["keys"].append(key)
  44. except Exception as e:
  45. print(f"解析key失败 {key}: {e}")
  46. continue
  47. print(f"📊 找到 {len(thread_data)} 个thread")
  48. # 按时间戳排序
  49. sorted_threads = sorted(
  50. thread_data.values(),
  51. key=lambda x: x["timestamp"],
  52. reverse=True
  53. )[:limit]
  54. # 获取每个thread的详细信息
  55. conversations = []
  56. for thread_info in sorted_threads:
  57. try:
  58. thread_id = thread_info["thread_id"]
  59. # 获取该thread的最新checkpoint数据
  60. latest_key = None
  61. for key in thread_info["keys"]:
  62. if latest_key is None or key > latest_key:
  63. latest_key = key
  64. if latest_key:
  65. # 直接从Redis获取数据
  66. data = redis_client.get(latest_key)
  67. if data:
  68. try:
  69. # 尝试解析JSON数据
  70. checkpoint_data = json.loads(data)
  71. # 提取消息信息
  72. messages = checkpoint_data.get('channel_values', {}).get('messages', [])
  73. # 生成对话预览
  74. preview = "空对话"
  75. if messages:
  76. for msg in messages:
  77. if isinstance(msg, dict) and msg.get('type') == 'human':
  78. content = str(msg.get('content', ''))
  79. preview = content[:50] + "..." if len(content) > 50 else content
  80. break
  81. conversations.append({
  82. "thread_id": thread_id,
  83. "user_id": user_id,
  84. "timestamp": thread_info["timestamp"],
  85. "message_count": len(messages),
  86. "conversation_preview": preview,
  87. "formatted_time": format_timestamp(thread_info["timestamp"])
  88. })
  89. except json.JSONDecodeError:
  90. print(f"❌ 解析JSON失败: {latest_key}")
  91. continue
  92. except Exception as e:
  93. print(f"❌ 处理数据失败: {e}")
  94. continue
  95. except Exception as e:
  96. print(f"❌ 处理thread {thread_info['thread_id']} 失败: {e}")
  97. continue
  98. print(f"✅ 返回 {len(conversations)} 个对话")
  99. return conversations
  100. finally:
  101. redis_client.close()
  102. def format_timestamp(timestamp: str) -> str:
  103. """格式化时间戳"""
  104. try:
  105. if len(timestamp) >= 14:
  106. year = timestamp[:4]
  107. month = timestamp[4:6]
  108. day = timestamp[6:8]
  109. hour = timestamp[8:10]
  110. minute = timestamp[10:12]
  111. second = timestamp[12:14]
  112. return f"{year}-{month}-{day} {hour}:{minute}:{second}"
  113. except Exception:
  114. pass
  115. return timestamp
  116. async def test_simple_query():
  117. """测试简单查询"""
  118. print("🧪 测试简单Redis查询...")
  119. try:
  120. conversations = await get_user_conversations_simple("doudou", 10)
  121. print(f"📋 查询结果: {len(conversations)} 个对话")
  122. for conv in conversations:
  123. print(f" - {conv['thread_id']}: {conv['conversation_preview']}")
  124. return conversations
  125. except Exception as e:
  126. print(f"❌ 查询失败: {e}")
  127. import traceback
  128. traceback.print_exc()
  129. return []
  130. if __name__ == "__main__":
  131. asyncio.run(test_simple_query())