test_state_history_timestamps.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #!/usr/bin/env python3
  2. """
  3. 测试LangGraph StateSnapshot的created_at时间戳
  4. """
  5. import asyncio
  6. import sys
  7. from pathlib import Path
  8. from datetime import datetime
  9. # 添加项目路径
  10. project_root = Path(__file__).parent
  11. sys.path.insert(0, str(project_root))
  12. from react_agent.agent import CustomReactAgent
  13. async def test_state_history_timestamps():
  14. """测试StateSnapshot的created_at时间戳"""
  15. print("=" * 60)
  16. print("测试LangGraph StateSnapshot的created_at时间戳")
  17. print("=" * 60)
  18. try:
  19. # 初始化Agent
  20. print("🚀 初始化Agent...")
  21. agent = await CustomReactAgent.create()
  22. thread_id = "wang10:20250717211620915"
  23. print(f"\n📖 分析Thread: {thread_id}")
  24. # 使用get_state_history获取所有历史状态
  25. thread_config = {"configurable": {"thread_id": thread_id}}
  26. print(f"🔍 获取状态历史...")
  27. # agent_executor是编译后的graph
  28. if hasattr(agent, 'agent_executor') and hasattr(agent.agent_executor, 'get_state_history'):
  29. print(f"✅ 找到get_state_history方法")
  30. # 获取前5个历史状态
  31. history_count = 0
  32. for state_snapshot in agent.agent_executor.get_state_history(thread_config):
  33. history_count += 1
  34. if history_count > 5: # 只看前5个状态
  35. break
  36. print(f"\n📊 状态 #{history_count}:")
  37. print(f" 🆔 Checkpoint ID: {state_snapshot.config.get('configurable', {}).get('checkpoint_id', 'N/A')}")
  38. print(f" ⏰ Created At: {state_snapshot.created_at}")
  39. print(f" 📝 Messages Count: {len(state_snapshot.values.get('messages', []))}")
  40. print(f" 🔄 Next: {state_snapshot.next}")
  41. print(f" 📋 Step: {state_snapshot.metadata.get('step', 'N/A')}")
  42. # 解析时间戳
  43. if state_snapshot.created_at:
  44. try:
  45. dt = datetime.fromisoformat(state_snapshot.created_at.replace('Z', '+00:00'))
  46. print(f" 📅 解析后时间: {dt}")
  47. print(f" 🕐 本地时间: {dt.astimezone()}")
  48. except Exception as e:
  49. print(f" ❌ 时间解析失败: {e}")
  50. # 如果有消息,显示最后一条消息的类型
  51. messages = state_snapshot.values.get('messages', [])
  52. if messages:
  53. last_msg = messages[-1]
  54. msg_type = getattr(last_msg, 'type', 'unknown')
  55. content_preview = str(getattr(last_msg, 'content', ''))[:50]
  56. print(f" 💬 最后消息: {msg_type} - {content_preview}...")
  57. if history_count == 0:
  58. print("❌ 没有找到历史状态")
  59. else:
  60. print(f"\n✅ 总共分析了 {history_count} 个历史状态")
  61. else:
  62. print("❌ 未找到get_state_history方法")
  63. print("📝 可用方法:")
  64. if hasattr(agent, 'agent_executor'):
  65. methods = [m for m in dir(agent.agent_executor) if not m.startswith('_')]
  66. print(f" agent_executor methods: {methods}")
  67. await agent.close()
  68. except Exception as e:
  69. print(f"❌ 测试失败: {e}")
  70. import traceback
  71. traceback.print_exc()
  72. async def test_current_state_timestamp():
  73. """测试当前状态的时间戳"""
  74. print(f"\n" + "=" * 60)
  75. print("测试当前状态的时间戳")
  76. print("=" * 60)
  77. try:
  78. # 初始化Agent
  79. agent = await CustomReactAgent.create()
  80. thread_id = "wang10:20250717211620915"
  81. thread_config = {"configurable": {"thread_id": thread_id}}
  82. print(f"🔍 获取当前状态...")
  83. if hasattr(agent, 'agent_executor') and hasattr(agent.agent_executor, 'get_state'):
  84. current_state = agent.agent_executor.get_state(thread_config)
  85. print(f"📊 当前状态:")
  86. print(f" 🆔 Checkpoint ID: {current_state.config.get('configurable', {}).get('checkpoint_id', 'N/A')}")
  87. print(f" ⏰ Created At: {current_state.created_at}")
  88. print(f" 📝 Messages Count: {len(current_state.values.get('messages', []))}")
  89. print(f" 🔄 Next: {current_state.next}")
  90. if current_state.created_at:
  91. try:
  92. dt = datetime.fromisoformat(current_state.created_at.replace('Z', '+00:00'))
  93. print(f" 📅 解析后时间: {dt}")
  94. print(f" 🕐 本地时间: {dt.astimezone()}")
  95. except Exception as e:
  96. print(f" ❌ 时间解析失败: {e}")
  97. else:
  98. print("❌ 未找到get_state方法")
  99. await agent.close()
  100. except Exception as e:
  101. print(f"❌ 测试失败: {e}")
  102. import traceback
  103. traceback.print_exc()
  104. async def test_checkpointer_access():
  105. """测试直接通过checkpointer访问历史"""
  106. print(f"\n" + "=" * 60)
  107. print("测试通过checkpointer访问历史")
  108. print("=" * 60)
  109. try:
  110. # 初始化Agent
  111. agent = await CustomReactAgent.create()
  112. thread_id = "wang10:20250717211620915"
  113. thread_config = {"configurable": {"thread_id": thread_id}}
  114. print(f"🔍 通过checkpointer获取历史...")
  115. if hasattr(agent, 'checkpointer') and hasattr(agent.checkpointer, 'alist'):
  116. print(f"✅ 找到checkpointer.alist方法")
  117. # 获取前5个checkpoint
  118. history_count = 0
  119. async for checkpoint_tuple in agent.checkpointer.alist(thread_config):
  120. history_count += 1
  121. if history_count > 5: # 只看前5个状态
  122. break
  123. print(f"\n📊 Checkpoint #{history_count}:")
  124. print(f" 🆔 Config: {checkpoint_tuple.config}")
  125. print(f" ⏰ Checkpoint TS: {checkpoint_tuple.checkpoint.get('ts', 'N/A')}")
  126. print(f" 📋 Metadata: {checkpoint_tuple.metadata}")
  127. # 解析checkpoint中的时间戳
  128. ts_value = checkpoint_tuple.checkpoint.get('ts')
  129. if ts_value:
  130. try:
  131. dt = datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
  132. print(f" 📅 解析后时间: {dt}")
  133. print(f" 🕐 本地时间: {dt.astimezone()}")
  134. except Exception as e:
  135. print(f" ❌ 时间解析失败: {e}")
  136. # 检查消息
  137. messages = checkpoint_tuple.checkpoint.get('channel_values', {}).get('messages', [])
  138. print(f" 📝 Messages Count: {len(messages)}")
  139. if history_count == 0:
  140. print("❌ 没有找到checkpoint历史")
  141. else:
  142. print(f"\n✅ 总共分析了 {history_count} 个checkpoint")
  143. else:
  144. print("❌ 未找到checkpointer.alist方法")
  145. await agent.close()
  146. except Exception as e:
  147. print(f"❌ 测试失败: {e}")
  148. import traceback
  149. traceback.print_exc()
  150. if __name__ == "__main__":
  151. asyncio.run(test_state_history_timestamps())
  152. asyncio.run(test_current_state_timestamp())
  153. asyncio.run(test_checkpointer_access())