redis_conversation_demo.py 9.4 KB


  1. """
  2. Redis对话管理功能演示脚本
  3. 这个脚本演示了如何使用Redis对话管理系统的各种功能:
  4. 1. 创建对话
  5. 2. 多轮对话(带上下文)
  6. 3. 缓存命中
  7. 4. 对话历史查询
  8. 5. 统计信息查看
  9. """
  10. import requests
  11. import json
  12. import time
  13. import sys
  14. import os
  15. class ConversationDemo:
  16. def __init__(self, base_url="http://localhost:8084/api/v0"):
  17. self.base_url = base_url
  18. self.session_id = f"demo_session_{int(time.time())}"
  19. self.conversation_id = None
  20. self.user_id = None
  21. def print_section(self, title):
  22. """打印分隔线"""
  23. print("\n" + "="*60)
  24. print(f" {title} ")
  25. print("="*60)
  26. def demo_basic_conversation(self):
  27. """演示基本对话功能"""
  28. self.print_section("1. 基本对话功能")
  29. # 第一个问题
  30. print("\n[DEMO] 发送第一个问题...")
  31. response = requests.post(
  32. f"{self.base_url}/ask_agent",
  33. json={
  34. "question": "高速公路服务区有多少个?",
  35. "session_id": self.session_id
  36. }
  37. )
  38. if response.status_code == 200:
  39. data = response.json()
  40. self.conversation_id = data['data']['conversation_id']
  41. self.user_id = data['data']['user_id']
  42. print(f"[结果] 对话ID: {self.conversation_id}")
  43. print(f"[结果] 用户ID: {self.user_id}")
  44. print(f"[结果] 是否为Guest用户: {data['data']['user_id'] == 'guest'}")
  45. print(f"[结果] 回答: {data['data'].get('response', '')[:100]}...")
  46. else:
  47. print(f"[错误] 响应码: {response.status_code}")
  48. def demo_context_awareness(self):
  49. """演示上下文感知功能"""
  50. self.print_section("2. 上下文感知功能")
  51. if not self.conversation_id:
  52. print("[警告] 需要先运行基本对话演示")
  53. return
  54. # 第二个问题(依赖上下文)
  55. print("\n[DEMO] 发送依赖上下文的问题...")
  56. response = requests.post(
  57. f"{self.base_url}/ask_agent",
  58. json={
  59. "question": "这些服务区的经理都是谁?",
  60. "session_id": self.session_id,
  61. "conversation_id": self.conversation_id
  62. }
  63. )
  64. if response.status_code == 200:
  65. data = response.json()
  66. print(f"[结果] 使用了上下文: {data['data'].get('context_used')}")
  67. print(f"[结果] 对话状态: {data['data'].get('conversation_status')}")
  68. print(f"[结果] 回答: {data['data'].get('response', '')[:100]}...")
  69. else:
  70. print(f"[错误] 响应码: {response.status_code}")
  71. def demo_cache_functionality(self):
  72. """演示缓存功能"""
  73. self.print_section("3. 缓存功能")
  74. # 问相同的问题
  75. question = "高速公路服务区的总数是多少?"
  76. print(f"\n[DEMO] 第一次询问: {question}")
  77. response1 = requests.post(
  78. f"{self.base_url}/ask_agent",
  79. json={
  80. "question": question,
  81. "session_id": self.session_id + "_cache",
  82. }
  83. )
  84. if response1.status_code == 200:
  85. data1 = response1.json()
  86. print(f"[结果] 来自缓存: {data1['data'].get('from_cache')}")
  87. conv_id = data1['data']['conversation_id']
  88. # 立即再问一次
  89. print(f"\n[DEMO] 第二次询问相同问题...")
  90. response2 = requests.post(
  91. f"{self.base_url}/ask_agent",
  92. json={
  93. "question": question,
  94. "session_id": self.session_id + "_cache",
  95. "conversation_id": conv_id
  96. }
  97. )
  98. if response2.status_code == 200:
  99. data2 = response2.json()
  100. print(f"[结果] 来自缓存: {data2['data'].get('from_cache')}")
  101. def demo_conversation_history(self):
  102. """演示对话历史查询"""
  103. self.print_section("4. 对话历史查询")
  104. if not self.user_id:
  105. print("[警告] 需要先运行基本对话演示")
  106. return
  107. # 获取用户的对话列表
  108. print(f"\n[DEMO] 获取用户 {self.user_id} 的对话列表...")
  109. response = requests.get(
  110. f"{self.base_url}/user/{self.user_id}/conversations"
  111. )
  112. if response.status_code == 200:
  113. data = response.json()
  114. conversations = data['data']['conversations']
  115. print(f"[结果] 找到 {len(conversations)} 个对话")
  116. for i, conv in enumerate(conversations):
  117. print(f"\n 对话 {i+1}:")
  118. print(f" ID: {conv['conversation_id']}")
  119. print(f" 创建时间: {conv['created_at']}")
  120. print(f" 消息数: {conv['message_count']}")
  121. # 获取特定对话的消息
  122. if self.conversation_id:
  123. print(f"\n[DEMO] 获取对话 {self.conversation_id} 的消息...")
  124. response = requests.get(
  125. f"{self.base_url}/conversation/{self.conversation_id}/messages"
  126. )
  127. if response.status_code == 200:
  128. data = response.json()
  129. messages = data['data']['messages']
  130. print(f"[结果] 找到 {len(messages)} 条消息")
  131. for msg in messages:
  132. role = "用户" if msg['role'] == 'user' else "助手"
  133. content = msg['content'][:50] + "..." if len(msg['content']) > 50 else msg['content']
  134. print(f"\n [{role}]: {content}")
  135. def demo_statistics(self):
  136. """演示统计信息"""
  137. self.print_section("5. 统计信息")
  138. print("\n[DEMO] 获取对话系统统计信息...")
  139. response = requests.get(f"{self.base_url}/conversation_stats")
  140. if response.status_code == 200:
  141. data = response.json()
  142. stats = data['data']
  143. print(f"\n[统计信息]")
  144. print(f" Redis可用: {stats.get('available')}")
  145. print(f" 总用户数: {stats.get('total_users')}")
  146. print(f" 总对话数: {stats.get('total_conversations')}")
  147. print(f" 缓存的问答数: {stats.get('cached_qa_count')}")
  148. if stats.get('redis_info'):
  149. print(f"\n[Redis信息]")
  150. print(f" 内存使用: {stats['redis_info'].get('used_memory')}")
  151. print(f" 连接客户端数: {stats['redis_info'].get('connected_clients')}")
  152. def demo_invalid_conversation_id(self):
  153. """演示无效对话ID处理"""
  154. self.print_section("6. 无效对话ID处理")
  155. print("\n[DEMO] 使用无效的对话ID...")
  156. response = requests.post(
  157. f"{self.base_url}/ask_agent",
  158. json={
  159. "question": "测试无效ID",
  160. "session_id": self.session_id,
  161. "conversation_id": "invalid_conversation_xyz"
  162. }
  163. )
  164. if response.status_code == 200:
  165. data = response.json()
  166. print(f"[结果] 对话状态: {data['data'].get('conversation_status')}")
  167. # 根据状态显示对应消息(本地化处理)
  168. status = data['data'].get('conversation_status')
  169. status_messages = {
  170. 'new': '创建新对话',
  171. 'continue': '继续已有对话',
  172. 'invalid_id_new': '您请求的对话不存在或无权访问,已为您创建新对话'
  173. }
  174. print(f"[结果] 状态消息: {status_messages.get(status, '未知状态')}")
  175. print(f"[结果] 请求的ID: {data['data'].get('requested_conversation_id')}")
  176. print(f"[结果] 新创建的ID: {data['data'].get('conversation_id')}")
  177. def run_all_demos(self):
  178. """运行所有演示"""
  179. try:
  180. # 检查服务是否可用
  181. print("[DEMO] 检查服务可用性...")
  182. response = requests.get(f"{self.base_url}/agent_health", timeout=5)
  183. if response.status_code != 200:
  184. print("[错误] 服务不可用,请先启动Flask应用")
  185. return
  186. # 运行各个演示
  187. self.demo_basic_conversation()
  188. time.sleep(1)
  189. self.demo_context_awareness()
  190. time.sleep(1)
  191. self.demo_cache_functionality()
  192. time.sleep(1)
  193. self.demo_conversation_history()
  194. time.sleep(1)
  195. self.demo_statistics()
  196. time.sleep(1)
  197. self.demo_invalid_conversation_id()
  198. print("\n" + "="*60)
  199. print(" 演示完成 ")
  200. print("="*60)
  201. except Exception as e:
  202. print(f"\n[错误] 演示过程中出错: {str(e)}")
  203. print("请确保Flask应用正在运行 (python citu_app.py)")
  204. if __name__ == "__main__":
  205. print("Redis对话管理功能演示")
  206. print("确保已经启动了Flask应用和Redis服务")
  207. print("-" * 60)
  208. demo = ConversationDemo()
  209. demo.run_all_demos()