123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- #!/usr/bin/env python3
- """
- 测试LangGraph StateSnapshot的created_at时间戳
- """
- import asyncio
- import sys
- from pathlib import Path
- from datetime import datetime
- # 添加项目路径
- project_root = Path(__file__).parent
- sys.path.insert(0, str(project_root))
- from react_agent.agent import CustomReactAgent
- async def test_state_history_timestamps():
- """测试StateSnapshot的created_at时间戳"""
- print("=" * 60)
- print("测试LangGraph StateSnapshot的created_at时间戳")
- print("=" * 60)
-
- try:
- # 初始化Agent
- print("🚀 初始化Agent...")
- agent = await CustomReactAgent.create()
-
- thread_id = "wang10:20250717211620915"
- print(f"\n📖 分析Thread: {thread_id}")
-
- # 使用get_state_history获取所有历史状态
- thread_config = {"configurable": {"thread_id": thread_id}}
-
- print(f"🔍 获取状态历史...")
- # agent_executor是编译后的graph
- if hasattr(agent, 'agent_executor') and hasattr(agent.agent_executor, 'get_state_history'):
- print(f"✅ 找到get_state_history方法")
-
- # 获取前5个历史状态
- history_count = 0
- for state_snapshot in agent.agent_executor.get_state_history(thread_config):
- history_count += 1
- if history_count > 5: # 只看前5个状态
- break
-
- print(f"\n📊 状态 #{history_count}:")
- print(f" 🆔 Checkpoint ID: {state_snapshot.config.get('configurable', {}).get('checkpoint_id', 'N/A')}")
- print(f" ⏰ Created At: {state_snapshot.created_at}")
- print(f" 📝 Messages Count: {len(state_snapshot.values.get('messages', []))}")
- print(f" 🔄 Next: {state_snapshot.next}")
- print(f" 📋 Step: {state_snapshot.metadata.get('step', 'N/A')}")
-
- # 解析时间戳
- if state_snapshot.created_at:
- try:
- dt = datetime.fromisoformat(state_snapshot.created_at.replace('Z', '+00:00'))
- print(f" 📅 解析后时间: {dt}")
- print(f" 🕐 本地时间: {dt.astimezone()}")
- except Exception as e:
- print(f" ❌ 时间解析失败: {e}")
-
- # 如果有消息,显示最后一条消息的类型
- messages = state_snapshot.values.get('messages', [])
- if messages:
- last_msg = messages[-1]
- msg_type = getattr(last_msg, 'type', 'unknown')
- content_preview = str(getattr(last_msg, 'content', ''))[:50]
- print(f" 💬 最后消息: {msg_type} - {content_preview}...")
-
- if history_count == 0:
- print("❌ 没有找到历史状态")
- else:
- print(f"\n✅ 总共分析了 {history_count} 个历史状态")
-
- else:
- print("❌ 未找到get_state_history方法")
- print("📝 可用方法:")
- if hasattr(agent, 'agent_executor'):
- methods = [m for m in dir(agent.agent_executor) if not m.startswith('_')]
- print(f" agent_executor methods: {methods}")
-
- await agent.close()
-
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- import traceback
- traceback.print_exc()
- async def test_current_state_timestamp():
- """测试当前状态的时间戳"""
- print(f"\n" + "=" * 60)
- print("测试当前状态的时间戳")
- print("=" * 60)
-
- try:
- # 初始化Agent
- agent = await CustomReactAgent.create()
-
- thread_id = "wang10:20250717211620915"
- thread_config = {"configurable": {"thread_id": thread_id}}
-
- print(f"🔍 获取当前状态...")
- if hasattr(agent, 'agent_executor') and hasattr(agent.agent_executor, 'get_state'):
- current_state = agent.agent_executor.get_state(thread_config)
-
- print(f"📊 当前状态:")
- print(f" 🆔 Checkpoint ID: {current_state.config.get('configurable', {}).get('checkpoint_id', 'N/A')}")
- print(f" ⏰ Created At: {current_state.created_at}")
- print(f" 📝 Messages Count: {len(current_state.values.get('messages', []))}")
- print(f" 🔄 Next: {current_state.next}")
-
- if current_state.created_at:
- try:
- dt = datetime.fromisoformat(current_state.created_at.replace('Z', '+00:00'))
- print(f" 📅 解析后时间: {dt}")
- print(f" 🕐 本地时间: {dt.astimezone()}")
- except Exception as e:
- print(f" ❌ 时间解析失败: {e}")
- else:
- print("❌ 未找到get_state方法")
-
- await agent.close()
-
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- import traceback
- traceback.print_exc()
- async def test_checkpointer_access():
- """测试直接通过checkpointer访问历史"""
- print(f"\n" + "=" * 60)
- print("测试通过checkpointer访问历史")
- print("=" * 60)
-
- try:
- # 初始化Agent
- agent = await CustomReactAgent.create()
-
- thread_id = "wang10:20250717211620915"
- thread_config = {"configurable": {"thread_id": thread_id}}
-
- print(f"🔍 通过checkpointer获取历史...")
- if hasattr(agent, 'checkpointer') and hasattr(agent.checkpointer, 'alist'):
- print(f"✅ 找到checkpointer.alist方法")
-
- # 获取前5个checkpoint
- history_count = 0
- async for checkpoint_tuple in agent.checkpointer.alist(thread_config):
- history_count += 1
- if history_count > 5: # 只看前5个状态
- break
-
- print(f"\n📊 Checkpoint #{history_count}:")
- print(f" 🆔 Config: {checkpoint_tuple.config}")
- print(f" ⏰ Checkpoint TS: {checkpoint_tuple.checkpoint.get('ts', 'N/A')}")
- print(f" 📋 Metadata: {checkpoint_tuple.metadata}")
-
- # 解析checkpoint中的时间戳
- ts_value = checkpoint_tuple.checkpoint.get('ts')
- if ts_value:
- try:
- dt = datetime.fromisoformat(ts_value.replace('Z', '+00:00'))
- print(f" 📅 解析后时间: {dt}")
- print(f" 🕐 本地时间: {dt.astimezone()}")
- except Exception as e:
- print(f" ❌ 时间解析失败: {e}")
-
- # 检查消息
- messages = checkpoint_tuple.checkpoint.get('channel_values', {}).get('messages', [])
- print(f" 📝 Messages Count: {len(messages)}")
-
- if history_count == 0:
- print("❌ 没有找到checkpoint历史")
- else:
- print(f"\n✅ 总共分析了 {history_count} 个checkpoint")
- else:
- print("❌ 未找到checkpointer.alist方法")
-
- await agent.close()
-
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- asyncio.run(test_state_history_timestamps())
- asyncio.run(test_current_state_timestamp())
- asyncio.run(test_checkpointer_access())
|