import unittest import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from common.redis_conversation_manager import RedisConversationManager from datetime import datetime import time class TestRedisConversationManager(unittest.TestCase): """Redis对话管理器单元测试""" def setUp(self): """测试前准备""" self.manager = RedisConversationManager() # 清理测试数据 self.test_user_id = "test_user_123" self.test_guest_id = "guest_test_456" def tearDown(self): """测试后清理""" # 清理测试创建的数据 if self.manager.is_available(): # 清理测试用户的对话 try: conversations = self.manager.get_conversations(self.test_user_id) for conv in conversations: conv_id = conv.get('conversation_id') if conv_id: self.manager.redis_client.delete(f"conversation:{conv_id}:meta") self.manager.redis_client.delete(f"conversation:{conv_id}:messages") self.manager.redis_client.delete(f"user:{self.test_user_id}:conversations") # 清理guest用户 conversations = self.manager.get_conversations(self.test_guest_id) for conv in conversations: conv_id = conv.get('conversation_id') if conv_id: self.manager.redis_client.delete(f"conversation:{conv_id}:meta") self.manager.redis_client.delete(f"conversation:{conv_id}:messages") self.manager.redis_client.delete(f"user:{self.test_guest_id}:conversations") except: pass def test_redis_connection(self): """测试Redis连接""" is_available = self.manager.is_available() print(f"[TEST] Redis可用状态: {is_available}") if not is_available: self.skipTest("Redis不可用,跳过测试") def test_user_id_resolution(self): """测试用户ID解析逻辑""" # 测试登录用户ID优先 user_id = self.manager.resolve_user_id( "request_user", "session_123", "127.0.0.1", "login_user" ) self.assertEqual(user_id, "login_user") # 测试请求参数用户ID user_id = self.manager.resolve_user_id( "request_user", "session_123", "127.0.0.1", None ) self.assertEqual(user_id, "request_user") # 测试guest用户生成 user_id = self.manager.resolve_user_id( None, "session_123", "127.0.0.1", None ) self.assertTrue(user_id.startswith("guest_")) # 测试基于IP的临时guest user_id = self.manager.resolve_user_id( None, None, "127.0.0.1", None ) self.assertTrue(user_id.startswith("guest_temp_")) def test_conversation_creation(self): """测试对话创建""" if not self.manager.is_available(): self.skipTest("Redis不可用") conv_id = self.manager.create_conversation(self.test_user_id) print(f"[TEST] 创建的对话ID: {conv_id}") # 验证对话ID格式 self.assertTrue(conv_id.startswith("conv_")) self.assertIn("_", conv_id) # 验证对话元信息 meta = self.manager.get_conversation_meta(conv_id) self.assertEqual(meta.get('user_id'), self.test_user_id) self.assertEqual(meta.get('conversation_id'), conv_id) self.assertIn('created_at', meta) def test_message_saving_and_retrieval(self): """测试消息保存和获取""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建对话 conv_id = self.manager.create_conversation(self.test_user_id) # 保存消息 self.manager.save_message(conv_id, "user", "测试问题") self.manager.save_message(conv_id, "assistant", "测试回答") # 获取消息列表 messages = self.manager.get_conversation_messages(conv_id) self.assertEqual(len(messages), 2) # 验证消息顺序(时间正序) self.assertEqual(messages[0]['role'], 'user') self.assertEqual(messages[0]['content'], '测试问题') self.assertEqual(messages[1]['role'], 'assistant') self.assertEqual(messages[1]['content'], '测试回答') def test_context_generation(self): """测试上下文生成""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建对话并添加多条消息 conv_id = self.manager.create_conversation(self.test_user_id) self.manager.save_message(conv_id, "user", "问题1") self.manager.save_message(conv_id, "assistant", "回答1") self.manager.save_message(conv_id, "user", "问题2") self.manager.save_message(conv_id, "assistant", "回答2") # 获取上下文 context = self.manager.get_context(conv_id, count=2) print(f"[TEST] 生成的上下文:\n{context}") # 验证上下文格式 self.assertIn("用户: 问题1", context) self.assertIn("助手: 回答1", context) self.assertIn("用户: 问题2", context) self.assertIn("助手: 回答2", context) def test_conversation_list(self): """测试用户对话列表""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建多个对话 conv_ids = [] for i in range(3): conv_id = self.manager.create_conversation(self.test_user_id) conv_ids.append(conv_id) time.sleep(0.1) # 确保时间戳不同 # 获取对话列表 conversations = self.manager.get_conversations(self.test_user_id) self.assertEqual(len(conversations), 3) # 验证顺序(最新的在前) self.assertEqual(conversations[0]['conversation_id'], conv_ids[2]) self.assertEqual(conversations[1]['conversation_id'], conv_ids[1]) self.assertEqual(conversations[2]['conversation_id'], conv_ids[0]) def test_cache_functionality(self): """测试缓存功能""" if not self.manager.is_available(): self.skipTest("Redis不可用") question = "测试缓存问题" context = "用户: 之前的问题\n助手: 之前的回答" # 测试缓存未命中 cached = self.manager.get_cached_answer(question, context) self.assertIsNone(cached) # 缓存答案 answer = { "success": True, "data": { "response": "测试答案", "type": "CHAT" } } self.manager.cache_answer(question, answer, context) # 测试缓存命中 cached = self.manager.get_cached_answer(question, context) self.assertIsNotNone(cached) self.assertEqual(cached['data']['response'], '测试答案') # 测试不同上下文的缓存 different_context = "用户: 不同的问题\n助手: 不同的回答" cached = self.manager.get_cached_answer(question, different_context) self.assertIsNone(cached) # 不同上下文应该缓存未命中 def test_conversation_id_resolution(self): """测试对话ID解析""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 测试创建新对话 conv_id, status = self.manager.resolve_conversation_id( self.test_user_id, None, False ) self.assertTrue(conv_id.startswith("conv_")) self.assertEqual(status['status'], 'new') # 测试使用已存在的对话 conv_id2, status2 = self.manager.resolve_conversation_id( self.test_user_id, conv_id, False ) self.assertEqual(conv_id2, conv_id) self.assertEqual(status2['status'], 'existing') # 测试无效的对话ID conv_id3, status3 = self.manager.resolve_conversation_id( self.test_user_id, "invalid_conv_id", False ) self.assertNotEqual(conv_id3, "invalid_conv_id") self.assertEqual(status3['status'], 'invalid_id_new') self.assertEqual(status3['requested_id'], 'invalid_conv_id') def test_statistics(self): """测试统计功能""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建测试数据 conv_id = self.manager.create_conversation(self.test_user_id) self.manager.save_message(conv_id, "user", "统计测试") # 获取统计信息 stats = self.manager.get_stats() print(f"[TEST] 统计信息: {stats}") self.assertTrue(stats['available']) self.assertIn('total_users', stats) self.assertIn('total_conversations', stats) self.assertIn('cached_qa_count', stats) def test_guest_user_limit(self): """测试guest用户对话数量限制""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建多个对话,超过guest用户限制 from app_config import MAX_GUEST_CONVERSATIONS conv_ids = [] for i in range(MAX_GUEST_CONVERSATIONS + 2): conv_id = self.manager.create_conversation(self.test_guest_id) conv_ids.append(conv_id) time.sleep(0.05) # 验证只保留了限制数量的对话 conversations = self.manager.get_conversations(self.test_guest_id) self.assertEqual(len(conversations), MAX_GUEST_CONVERSATIONS) # 验证保留的是最新的对话 retained_ids = [conv['conversation_id'] for conv in conversations] for i in range(MAX_GUEST_CONVERSATIONS): self.assertIn(conv_ids[-(i+1)], retained_ids) def test_cleanup_functionality(self): """测试清理功能""" if not self.manager.is_available(): self.skipTest("Redis不可用") # 创建对话 conv_id = self.manager.create_conversation(self.test_user_id) # 手动删除对话元信息,模拟过期 self.manager.redis_client.delete(f"conversation:{conv_id}:meta") # 执行清理 self.manager.cleanup_expired_conversations() # 验证对话已从用户列表中移除 conversations = self.manager.get_conversations(self.test_user_id) conv_ids = [conv['conversation_id'] for conv in conversations] self.assertNotIn(conv_id, conv_ids) if __name__ == '__main__': unittest.main()