test_ask_agent_redis_integration.py 11 KB


  1. import unittest
  2. import requests
  3. import json
  4. import sys
  5. import os
  6. import time
  7. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  8. from common.redis_conversation_manager import RedisConversationManager
  9. class TestAskAgentRedisIntegration(unittest.TestCase):
  10. """ask_agent API的Redis集成测试"""
  11. def setUp(self):
  12. """测试前准备"""
  13. self.base_url = "http://localhost:8084/api/v0"
  14. self.test_session_id = "test_session_" + str(int(time.time()))
  15. self.manager = RedisConversationManager()
  16. def tearDown(self):
  17. """测试后清理"""
  18. # 清理测试数据
  19. pass
  20. def test_api_availability(self):
  21. """测试API可用性"""
  22. try:
  23. response = requests.get(f"{self.base_url}/agent_health", timeout=5)
  24. print(f"[TEST] Agent健康检查响应码: {response.status_code}")
  25. except Exception as e:
  26. self.skipTest(f"API服务不可用: {str(e)}")
  27. def test_basic_ask_agent(self):
  28. """测试基本的ask_agent调用"""
  29. try:
  30. # 第一次调用 - 创建新对话
  31. payload = {
  32. "question": "测试问题:高速公路服务区有多少个?",
  33. "session_id": self.test_session_id
  34. }
  35. response = requests.post(
  36. f"{self.base_url}/ask_agent",
  37. json=payload,
  38. timeout=30
  39. )
  40. print(f"[TEST] 第一次调用响应码: {response.status_code}")
  41. if response.status_code == 200:
  42. data = response.json()
  43. print(f"[TEST] 响应数据: {json.dumps(data, indent=2, ensure_ascii=False)}")
  44. # 验证返回字段
  45. self.assertIn('data', data)
  46. self.assertIn('conversation_id', data['data'])
  47. self.assertIn('user_id', data['data'])
  48. self.assertIn('conversation_status', data['data'])
  49. conversation_id = data['data']['conversation_id']
  50. user_id = data['data']['user_id']
  51. print(f"[TEST] 创建的对话ID: {conversation_id}")
  52. print(f"[TEST] 用户ID: {user_id}")
  53. return conversation_id, user_id
  54. except Exception as e:
  55. self.skipTest(f"API调用失败: {str(e)}")
  56. def test_conversation_context(self):
  57. """测试对话上下文功能"""
  58. try:
  59. # 第一次调用
  60. payload1 = {
  61. "question": "高速公路服务区有多少个?",
  62. "session_id": self.test_session_id
  63. }
  64. response1 = requests.post(
  65. f"{self.base_url}/ask_agent",
  66. json=payload1,
  67. timeout=30
  68. )
  69. if response1.status_code != 200:
  70. self.skipTest("第一次API调用失败")
  71. data1 = response1.json()
  72. conversation_id = data1['data']['conversation_id']
  73. # 第二次调用 - 使用相同的对话ID
  74. payload2 = {
  75. "question": "这些服务区的经理都是谁?", # 这个问题依赖于前面的上下文
  76. "session_id": self.test_session_id,
  77. "conversation_id": conversation_id
  78. }
  79. response2 = requests.post(
  80. f"{self.base_url}/ask_agent",
  81. json=payload2,
  82. timeout=30
  83. )
  84. print(f"[TEST] 第二次调用响应码: {response2.status_code}")
  85. if response2.status_code == 200:
  86. data2 = response2.json()
  87. print(f"[TEST] 使用了上下文: {data2['data'].get('context_used', False)}")
  88. self.assertTrue(data2['data'].get('context_used', False))
  89. except Exception as e:
  90. self.skipTest(f"上下文测试失败: {str(e)}")
  91. def test_cache_hit(self):
  92. """测试缓存命中"""
  93. try:
  94. # 同样的问题问两次
  95. question = "高速公路服务区的数量是多少?"
  96. # 第一次调用
  97. payload = {
  98. "question": question,
  99. "session_id": self.test_session_id + "_cache_test"
  100. }
  101. response1 = requests.post(
  102. f"{self.base_url}/ask_agent",
  103. json=payload,
  104. timeout=30
  105. )
  106. if response1.status_code != 200:
  107. self.skipTest("第一次API调用失败")
  108. data1 = response1.json()
  109. from_cache1 = data1['data'].get('from_cache', False)
  110. print(f"[TEST] 第一次调用from_cache: {from_cache1}")
  111. self.assertFalse(from_cache1)
  112. # 立即第二次调用相同的问题
  113. response2 = requests.post(
  114. f"{self.base_url}/ask_agent",
  115. json=payload,
  116. timeout=30
  117. )
  118. if response2.status_code == 200:
  119. data2 = response2.json()
  120. from_cache2 = data2['data'].get('from_cache', False)
  121. print(f"[TEST] 第二次调用from_cache: {from_cache2}")
  122. # 注意:由于是新对话,可能不会命中缓存
  123. except Exception as e:
  124. self.skipTest(f"缓存测试失败: {str(e)}")
  125. def test_invalid_conversation_id(self):
  126. """测试无效的conversation_id处理"""
  127. try:
  128. payload = {
  129. "question": "测试无效对话ID",
  130. "session_id": self.test_session_id,
  131. "conversation_id": "invalid_conv_id_xyz"
  132. }
  133. response = requests.post(
  134. f"{self.base_url}/ask_agent",
  135. json=payload,
  136. timeout=30
  137. )
  138. if response.status_code == 200:
  139. data = response.json()
  140. status = data['data'].get('conversation_status')
  141. print(f"[TEST] 无效对话ID的状态: {status}")
  142. self.assertEqual(status, 'invalid_id_new')
  143. self.assertEqual(
  144. data['data'].get('requested_conversation_id'),
  145. 'invalid_conv_id_xyz'
  146. )
  147. except Exception as e:
  148. self.skipTest(f"无效ID测试失败: {str(e)}")
  149. def test_conversation_api_endpoints(self):
  150. """测试对话管理API端点"""
  151. try:
  152. # 先创建一个对话
  153. result = self.test_basic_ask_agent()
  154. if not result:
  155. self.skipTest("无法创建测试对话")
  156. conversation_id, user_id = result
  157. # 测试获取用户对话列表
  158. response = requests.get(
  159. f"{self.base_url}/user/{user_id}/conversations",
  160. timeout=10
  161. )
  162. print(f"[TEST] 获取对话列表响应码: {response.status_code}")
  163. if response.status_code == 200:
  164. data = response.json()
  165. self.assertIn('data', data)
  166. self.assertIn('conversations', data['data'])
  167. print(f"[TEST] 用户对话数: {len(data['data']['conversations'])}")
  168. # 测试获取对话消息
  169. response = requests.get(
  170. f"{self.base_url}/conversation/{conversation_id}/messages",
  171. timeout=10
  172. )
  173. print(f"[TEST] 获取对话消息响应码: {response.status_code}")
  174. if response.status_code == 200:
  175. data = response.json()
  176. self.assertIn('data', data)
  177. self.assertIn('messages', data['data'])
  178. print(f"[TEST] 对话消息数: {len(data['data']['messages'])}")
  179. # 测试获取统计信息
  180. response = requests.get(
  181. f"{self.base_url}/conversation_stats",
  182. timeout=10
  183. )
  184. print(f"[TEST] 获取统计信息响应码: {response.status_code}")
  185. if response.status_code == 200:
  186. data = response.json()
  187. self.assertIn('data', data)
  188. stats = data['data']
  189. print(f"[TEST] Redis可用: {stats.get('available')}")
  190. print(f"[TEST] 总用户数: {stats.get('total_users')}")
  191. print(f"[TEST] 总对话数: {stats.get('total_conversations')}")
  192. except Exception as e:
  193. print(f"[ERROR] 管理API测试失败: {str(e)}")
  194. def test_guest_user_generation(self):
  195. """测试guest用户生成"""
  196. try:
  197. # 不提供user_id,应该生成guest用户
  198. payload = {
  199. "question": "测试guest用户",
  200. "session_id": self.test_session_id + "_guest"
  201. }
  202. response = requests.post(
  203. f"{self.base_url}/ask_agent",
  204. json=payload,
  205. timeout=30
  206. )
  207. if response.status_code == 200:
  208. data = response.json()
  209. user_id = data['data']['user_id']
  210. is_guest = user_id == "guest" # 直接通过user_id判断
  211. print(f"[TEST] 生成的用户ID: {user_id}")
  212. print(f"[TEST] 是否为guest用户: {is_guest}")
  213. self.assertTrue(user_id.startswith('guest_'))
  214. self.assertTrue(is_guest)
  215. except Exception as e:
  216. self.skipTest(f"Guest用户测试失败: {str(e)}")
  217. def run_selected_tests():
  218. """运行选定的测试"""
  219. suite = unittest.TestSuite()
  220. # 添加要运行的测试
  221. suite.addTest(TestAskAgentRedisIntegration('test_api_availability'))
  222. suite.addTest(TestAskAgentRedisIntegration('test_basic_ask_agent'))
  223. suite.addTest(TestAskAgentRedisIntegration('test_conversation_context'))
  224. suite.addTest(TestAskAgentRedisIntegration('test_invalid_conversation_id'))
  225. suite.addTest(TestAskAgentRedisIntegration('test_conversation_api_endpoints'))
  226. runner = unittest.TextTestRunner(verbosity=2)
  227. runner.run(suite)
  228. if __name__ == '__main__':
  229. print("=" * 60)
  230. print("ask_agent Redis集成测试")
  231. print("注意: 需要先启动Flask应用 (python citu_app.py)")
  232. print("=" * 60)
  233. # 可以选择运行所有测试或选定的测试
  234. unittest.main()
  235. # 或者运行选定的测试
  236. # run_selected_tests()