test_redis_conversation_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import unittest
  2. import sys
  3. import os
  4. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  5. from common.redis_conversation_manager import RedisConversationManager
  6. from datetime import datetime
  7. import time
  8. class TestRedisConversationManager(unittest.TestCase):
  9. """Redis对话管理器单元测试"""
  10. def setUp(self):
  11. """测试前准备"""
  12. self.manager = RedisConversationManager()
  13. # 清理测试数据
  14. self.test_user_id = "test_user_123"
  15. self.test_guest_id = "guest_test_456"
  16. def tearDown(self):
  17. """测试后清理"""
  18. # 清理测试创建的数据
  19. if self.manager.is_available():
  20. # 清理测试用户的对话
  21. try:
  22. conversations = self.manager.get_conversations(self.test_user_id)
  23. for conv in conversations:
  24. conv_id = conv.get('conversation_id')
  25. if conv_id:
  26. self.manager.redis_client.delete(f"conversation:{conv_id}:meta")
  27. self.manager.redis_client.delete(f"conversation:{conv_id}:messages")
  28. self.manager.redis_client.delete(f"user:{self.test_user_id}:conversations")
  29. # 清理guest用户
  30. conversations = self.manager.get_conversations(self.test_guest_id)
  31. for conv in conversations:
  32. conv_id = conv.get('conversation_id')
  33. if conv_id:
  34. self.manager.redis_client.delete(f"conversation:{conv_id}:meta")
  35. self.manager.redis_client.delete(f"conversation:{conv_id}:messages")
  36. self.manager.redis_client.delete(f"user:{self.test_guest_id}:conversations")
  37. except:
  38. pass
  39. def test_redis_connection(self):
  40. """测试Redis连接"""
  41. is_available = self.manager.is_available()
  42. print(f"[TEST] Redis可用状态: {is_available}")
  43. if not is_available:
  44. self.skipTest("Redis不可用,跳过测试")
  45. def test_user_id_resolution(self):
  46. """测试用户ID解析逻辑"""
  47. # 测试登录用户ID优先
  48. user_id = self.manager.resolve_user_id(
  49. "request_user", "session_123", "127.0.0.1", "login_user"
  50. )
  51. self.assertEqual(user_id, "login_user")
  52. # 测试请求参数用户ID
  53. user_id = self.manager.resolve_user_id(
  54. "request_user", "session_123", "127.0.0.1", None
  55. )
  56. self.assertEqual(user_id, "request_user")
  57. # 测试guest用户生成
  58. user_id = self.manager.resolve_user_id(
  59. None, "session_123", "127.0.0.1", None
  60. )
  61. self.assertTrue(user_id.startswith("guest_"))
  62. # 测试基于IP的临时guest
  63. user_id = self.manager.resolve_user_id(
  64. None, None, "127.0.0.1", None
  65. )
  66. self.assertTrue(user_id.startswith("guest_temp_"))
  67. def test_conversation_creation(self):
  68. """测试对话创建"""
  69. if not self.manager.is_available():
  70. self.skipTest("Redis不可用")
  71. conv_id = self.manager.create_conversation(self.test_user_id)
  72. print(f"[TEST] 创建的对话ID: {conv_id}")
  73. # 验证对话ID格式
  74. self.assertTrue(conv_id.startswith("conv_"))
  75. self.assertIn("_", conv_id)
  76. # 验证对话元信息
  77. meta = self.manager.get_conversation_meta(conv_id)
  78. self.assertEqual(meta.get('user_id'), self.test_user_id)
  79. self.assertEqual(meta.get('conversation_id'), conv_id)
  80. self.assertIn('created_at', meta)
  81. def test_message_saving_and_retrieval(self):
  82. """测试消息保存和获取"""
  83. if not self.manager.is_available():
  84. self.skipTest("Redis不可用")
  85. # 创建对话
  86. conv_id = self.manager.create_conversation(self.test_user_id)
  87. # 保存消息
  88. self.manager.save_message(conv_id, "user", "测试问题")
  89. self.manager.save_message(conv_id, "assistant", "测试回答")
  90. # 获取消息列表
  91. messages = self.manager.get_conversation_messages(conv_id)
  92. self.assertEqual(len(messages), 2)
  93. # 验证消息顺序(时间正序)
  94. self.assertEqual(messages[0]['role'], 'user')
  95. self.assertEqual(messages[0]['content'], '测试问题')
  96. self.assertEqual(messages[1]['role'], 'assistant')
  97. self.assertEqual(messages[1]['content'], '测试回答')
  98. def test_context_generation(self):
  99. """测试上下文生成"""
  100. if not self.manager.is_available():
  101. self.skipTest("Redis不可用")
  102. # 创建对话并添加多条消息
  103. conv_id = self.manager.create_conversation(self.test_user_id)
  104. self.manager.save_message(conv_id, "user", "问题1")
  105. self.manager.save_message(conv_id, "assistant", "回答1")
  106. self.manager.save_message(conv_id, "user", "问题2")
  107. self.manager.save_message(conv_id, "assistant", "回答2")
  108. # 获取上下文
  109. context = self.manager.get_context(conv_id, count=2)
  110. print(f"[TEST] 生成的上下文:\n{context}")
  111. # 验证上下文格式
  112. self.assertIn("用户: 问题1", context)
  113. self.assertIn("助手: 回答1", context)
  114. self.assertIn("用户: 问题2", context)
  115. self.assertIn("助手: 回答2", context)
  116. def test_conversation_list(self):
  117. """测试用户对话列表"""
  118. if not self.manager.is_available():
  119. self.skipTest("Redis不可用")
  120. # 创建多个对话
  121. conv_ids = []
  122. for i in range(3):
  123. conv_id = self.manager.create_conversation(self.test_user_id)
  124. conv_ids.append(conv_id)
  125. time.sleep(0.1) # 确保时间戳不同
  126. # 获取对话列表
  127. conversations = self.manager.get_conversations(self.test_user_id)
  128. self.assertEqual(len(conversations), 3)
  129. # 验证顺序(最新的在前)
  130. self.assertEqual(conversations[0]['conversation_id'], conv_ids[2])
  131. self.assertEqual(conversations[1]['conversation_id'], conv_ids[1])
  132. self.assertEqual(conversations[2]['conversation_id'], conv_ids[0])
  133. def test_cache_functionality(self):
  134. """测试缓存功能"""
  135. if not self.manager.is_available():
  136. self.skipTest("Redis不可用")
  137. question = "测试缓存问题"
  138. context = "用户: 之前的问题\n助手: 之前的回答"
  139. # 测试缓存未命中
  140. cached = self.manager.get_cached_answer(question, context)
  141. self.assertIsNone(cached)
  142. # 缓存答案
  143. answer = {
  144. "success": True,
  145. "data": {
  146. "response": "测试答案",
  147. "type": "CHAT"
  148. }
  149. }
  150. self.manager.cache_answer(question, answer, context)
  151. # 测试缓存命中
  152. cached = self.manager.get_cached_answer(question, context)
  153. self.assertIsNotNone(cached)
  154. self.assertEqual(cached['data']['response'], '测试答案')
  155. # 测试不同上下文的缓存
  156. different_context = "用户: 不同的问题\n助手: 不同的回答"
  157. cached = self.manager.get_cached_answer(question, different_context)
  158. self.assertIsNone(cached) # 不同上下文应该缓存未命中
  159. def test_conversation_id_resolution(self):
  160. """测试对话ID解析"""
  161. if not self.manager.is_available():
  162. self.skipTest("Redis不可用")
  163. # 测试创建新对话
  164. conv_id, status = self.manager.resolve_conversation_id(
  165. self.test_user_id, None, False
  166. )
  167. self.assertTrue(conv_id.startswith("conv_"))
  168. self.assertEqual(status['status'], 'new')
  169. # 测试使用已存在的对话
  170. conv_id2, status2 = self.manager.resolve_conversation_id(
  171. self.test_user_id, conv_id, False
  172. )
  173. self.assertEqual(conv_id2, conv_id)
  174. self.assertEqual(status2['status'], 'existing')
  175. # 测试无效的对话ID
  176. conv_id3, status3 = self.manager.resolve_conversation_id(
  177. self.test_user_id, "invalid_conv_id", False
  178. )
  179. self.assertNotEqual(conv_id3, "invalid_conv_id")
  180. self.assertEqual(status3['status'], 'invalid_id_new')
  181. self.assertEqual(status3['requested_id'], 'invalid_conv_id')
  182. def test_statistics(self):
  183. """测试统计功能"""
  184. if not self.manager.is_available():
  185. self.skipTest("Redis不可用")
  186. # 创建测试数据
  187. conv_id = self.manager.create_conversation(self.test_user_id)
  188. self.manager.save_message(conv_id, "user", "统计测试")
  189. # 获取统计信息
  190. stats = self.manager.get_stats()
  191. print(f"[TEST] 统计信息: {stats}")
  192. self.assertTrue(stats['available'])
  193. self.assertIn('total_users', stats)
  194. self.assertIn('total_conversations', stats)
  195. self.assertIn('cached_qa_count', stats)
  196. def test_guest_user_limit(self):
  197. """测试guest用户对话数量限制"""
  198. if not self.manager.is_available():
  199. self.skipTest("Redis不可用")
  200. # 创建多个对话,超过guest用户限制
  201. from app_config import MAX_GUEST_CONVERSATIONS
  202. conv_ids = []
  203. for i in range(MAX_GUEST_CONVERSATIONS + 2):
  204. conv_id = self.manager.create_conversation(self.test_guest_id)
  205. conv_ids.append(conv_id)
  206. time.sleep(0.05)
  207. # 验证只保留了限制数量的对话
  208. conversations = self.manager.get_conversations(self.test_guest_id)
  209. self.assertEqual(len(conversations), MAX_GUEST_CONVERSATIONS)
  210. # 验证保留的是最新的对话
  211. retained_ids = [conv['conversation_id'] for conv in conversations]
  212. for i in range(MAX_GUEST_CONVERSATIONS):
  213. self.assertIn(conv_ids[-(i+1)], retained_ids)
  214. def test_cleanup_functionality(self):
  215. """测试清理功能"""
  216. if not self.manager.is_available():
  217. self.skipTest("Redis不可用")
  218. # 创建对话
  219. conv_id = self.manager.create_conversation(self.test_user_id)
  220. # 手动删除对话元信息,模拟过期
  221. self.manager.redis_client.delete(f"conversation:{conv_id}:meta")
  222. # 执行清理
  223. self.manager.cleanup_expired_conversations()
  224. # 验证对话已从用户列表中移除
  225. conversations = self.manager.get_conversations(self.test_user_id)
  226. conv_ids = [conv['conversation_id'] for conv in conversations]
  227. self.assertNotIn(conv_id, conv_ids)
  228. if __name__ == '__main__':
  229. unittest.main()