api.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. """
  2. Custom React Agent API 服务
  3. 提供RESTful接口用于智能问答
  4. """
  5. import asyncio
  6. import atexit
  7. import os
  8. import sys
  9. from datetime import datetime
  10. from typing import Optional, Dict, Any
  11. # 🔧 修复模块路径问题:添加项目根目录到 sys.path
  12. CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
  13. PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, '..', '..'))
  14. sys.path.insert(0, CURRENT_DIR) # 当前目录优先
  15. sys.path.insert(1, PROJECT_ROOT) # 项目根目录
  16. from flask import Flask, request, jsonify
  17. import redis.asyncio as redis
  18. try:
  19. # 尝试相对导入(当作为模块导入时)
  20. from .agent import CustomReactAgent
  21. from . import config
  22. from core.logging import get_react_agent_logger
  23. except ImportError:
  24. # 如果相对导入失败,尝试绝对导入(直接运行时)
  25. from agent import CustomReactAgent
  26. import config
  27. from core.logging import get_react_agent_logger
  28. # 使用独立日志系统
  29. logger = get_react_agent_logger("ReactAgentAPI")
  30. # 全局Agent实例
  31. _agent_instance: Optional[CustomReactAgent] = None
  32. _redis_client: Optional[redis.Redis] = None
  33. def validate_request_data(data: Dict[str, Any]) -> Dict[str, Any]:
  34. """验证请求数据,并支持从thread_id中推断user_id"""
  35. errors = []
  36. # 验证 question(必填)
  37. question = data.get('question', '')
  38. if not question or not question.strip():
  39. errors.append('问题不能为空')
  40. elif len(question) > 2000:
  41. errors.append('问题长度不能超过2000字符')
  42. # 优先获取 thread_id
  43. thread_id = data.get('thread_id') or data.get('conversation_id')
  44. # 获取 user_id,但暂不设置默认值
  45. user_id = data.get('user_id')
  46. # 如果没有传递 user_id,则尝试从 thread_id 中推断
  47. if not user_id:
  48. if thread_id and ':' in thread_id:
  49. inferred_user_id = thread_id.split(':', 1)[0]
  50. if inferred_user_id:
  51. user_id = inferred_user_id
  52. logger.info(f"👤 未提供user_id,从 thread_id '{thread_id}' 中推断出: '{user_id}'")
  53. else:
  54. # 如果拆分结果为空,则使用默认值
  55. user_id = 'guest'
  56. else:
  57. # 如果 thread_id 不存在或格式不正确,则使用默认值
  58. user_id = 'guest'
  59. # 验证 user_id 长度
  60. if user_id and len(user_id) > 50:
  61. errors.append('用户ID长度不能超过50字符')
  62. # 用户ID与会话ID一致性校验
  63. if thread_id:
  64. if ':' not in thread_id:
  65. errors.append('会话ID格式无效,期望格式为 user_id:timestamp')
  66. else:
  67. thread_user_id = thread_id.split(':', 1)[0]
  68. if thread_user_id != user_id:
  69. errors.append(f'会话归属验证失败:会话ID [{thread_id}] 不属于当前用户 [{user_id}]')
  70. if errors:
  71. raise ValueError('; '.join(errors))
  72. return {
  73. 'question': question.strip(),
  74. 'user_id': user_id,
  75. 'thread_id': thread_id # 可选,不传则自动生成新会话
  76. }
  77. async def initialize_agent():
  78. """异步初始化Agent"""
  79. global _agent_instance, _redis_client
  80. if _agent_instance is None:
  81. logger.info("🚀 正在异步初始化 Custom React Agent...")
  82. try:
  83. # 设置环境变量(checkpointer内部需要)
  84. os.environ['REDIS_URL'] = config.REDIS_URL
  85. # 初始化共享的Redis客户端
  86. _redis_client = redis.from_url(config.REDIS_URL, decode_responses=True)
  87. await _redis_client.ping()
  88. logger.info("✅ Redis客户端连接成功")
  89. _agent_instance = await CustomReactAgent.create()
  90. logger.info("✅ Agent 异步初始化完成")
  91. except Exception as e:
  92. logger.error(f"❌ Agent 异步初始化失败: {e}")
  93. raise
  94. async def ensure_agent_ready():
  95. """异步确保Agent实例可用"""
  96. global _agent_instance
  97. if _agent_instance is None:
  98. await initialize_agent()
  99. # 测试Agent是否还可用
  100. try:
  101. # 简单测试 - 尝试获取一个不存在用户的对话(应该返回空列表)
  102. test_result = await _agent_instance.get_user_recent_conversations("__test__", 1)
  103. return True
  104. except Exception as e:
  105. logger.warning(f"⚠️ Agent实例不可用: {e}")
  106. # 重新创建Agent实例
  107. _agent_instance = None
  108. await initialize_agent()
  109. return True
  110. # 删除复杂的事件循环管理函数 - 不再需要
  111. async def cleanup_agent():
  112. """异步清理Agent资源"""
  113. global _agent_instance, _redis_client
  114. if _agent_instance:
  115. await _agent_instance.close()
  116. logger.info("✅ Agent 资源已异步清理")
  117. _agent_instance = None
  118. if _redis_client:
  119. await _redis_client.aclose()
  120. logger.info("✅ Redis客户端已异步关闭")
  121. _redis_client = None
  122. # 创建Flask应用
  123. app = Flask(__name__)
  124. # 简化的退出处理
  125. def cleanup_on_exit():
  126. """程序退出时的清理函数"""
  127. logger.info("程序退出,资源清理将在异步上下文中进行")
  128. atexit.register(cleanup_on_exit)
  129. @app.route("/")
  130. def root():
  131. """健康检查端点"""
  132. return jsonify({"message": "Custom React Agent API 服务正在运行"})
  133. @app.route('/health', methods=['GET'])
  134. def health_check():
  135. """健康检查端点"""
  136. try:
  137. health_status = {
  138. "status": "healthy",
  139. "agent_initialized": _agent_instance is not None,
  140. "timestamp": datetime.now().isoformat()
  141. }
  142. return jsonify(health_status), 200
  143. except Exception as e:
  144. logger.error(f"健康检查失败: {e}")
  145. return jsonify({"status": "unhealthy", "error": str(e)}), 500
  146. @app.route("/api/chat", methods=["POST"])
  147. async def chat_endpoint():
  148. """异步智能问答接口"""
  149. global _agent_instance
  150. # 确保Agent已初始化
  151. if not await ensure_agent_ready():
  152. return jsonify({
  153. "code": 503,
  154. "message": "服务未就绪",
  155. "success": False,
  156. "error": "Agent 初始化失败"
  157. }), 503
  158. try:
  159. # 获取请求数据,处理JSON解析错误
  160. try:
  161. data = request.get_json(force=True)
  162. except Exception as json_error:
  163. logger.warning(f"⚠️ JSON解析失败: {json_error}")
  164. return jsonify({
  165. "code": 400,
  166. "message": "请求格式错误",
  167. "success": False,
  168. "error": "无效的JSON格式,请检查请求体中是否存在语法错误(如多余的逗号、引号不匹配等)",
  169. "details": str(json_error)
  170. }), 400
  171. if not data:
  172. return jsonify({
  173. "code": 400,
  174. "message": "请求参数错误",
  175. "success": False,
  176. "error": "请求体不能为空"
  177. }), 400
  178. # 验证请求数据
  179. validated_data = validate_request_data(data)
  180. logger.info(f"📨 收到请求 - User: {validated_data['user_id']}, Question: {validated_data['question'][:50]}...")
  181. # 直接调用异步方法,不需要事件循环包装
  182. agent_result = await _agent_instance.chat(
  183. message=validated_data['question'],
  184. user_id=validated_data['user_id'],
  185. thread_id=validated_data['thread_id']
  186. )
  187. if not agent_result.get("success", False):
  188. # Agent处理失败
  189. error_msg = agent_result.get("error", "Agent处理失败")
  190. logger.error(f"❌ Agent处理失败: {error_msg}")
  191. return jsonify({
  192. "code": 500,
  193. "message": "处理失败",
  194. "success": False,
  195. "error": error_msg,
  196. "data": {
  197. "conversation_id": agent_result.get("thread_id"), # 新增:conversation_id等于thread_id
  198. "user_id": validated_data['user_id'], # 新增:返回用户ID
  199. "react_agent_meta": {
  200. "thread_id": agent_result.get("thread_id"),
  201. "agent_version": "custom_react_v1_async",
  202. "execution_path": ["error"]
  203. },
  204. "timestamp": datetime.now().isoformat()
  205. }
  206. }), 500
  207. # Agent处理成功,按照设计文档格式化响应
  208. api_data = agent_result.get("api_data", {})
  209. # 构建符合设计文档的响应数据
  210. response_data = {
  211. "response": api_data.get("response", ""),
  212. "conversation_id": agent_result.get("thread_id"), # 新增:conversation_id等于thread_id
  213. "user_id": validated_data['user_id'], # 新增:返回用户ID
  214. "react_agent_meta": api_data.get("react_agent_meta", {
  215. "thread_id": agent_result.get("thread_id"),
  216. "agent_version": "custom_react_v1"
  217. }),
  218. "timestamp": datetime.now().isoformat()
  219. }
  220. # 可选字段:SQL(仅当执行SQL时存在)
  221. if "sql" in api_data:
  222. response_data["sql"] = api_data["sql"]
  223. # 可选字段:records(仅当有查询结果时存在)
  224. if "records" in api_data:
  225. response_data["records"] = api_data["records"]
  226. logger.info(f"✅ 请求处理成功 - Thread: {response_data['react_agent_meta'].get('thread_id')}")
  227. return jsonify({
  228. "code": 200,
  229. "message": "操作成功",
  230. "success": True,
  231. "data": response_data
  232. })
  233. except ValueError as e:
  234. # 参数验证错误
  235. error_msg = str(e)
  236. logger.warning(f"⚠️ 参数验证失败: {error_msg}")
  237. # 根据错误类型提供更友好的消息
  238. if "会话归属验证失败" in error_msg:
  239. message = "会话归属验证失败"
  240. elif "会话ID格式无效" in error_msg:
  241. message = "会话ID格式无效"
  242. elif "JSON格式" in error_msg:
  243. message = "请求格式错误"
  244. else:
  245. message = "请求参数错误"
  246. return jsonify({
  247. "code": 400,
  248. "message": message,
  249. "success": False,
  250. "error": error_msg,
  251. "error_type": "validation_error"
  252. }), 400
  253. except Exception as e:
  254. # 其他未预期的错误
  255. logger.error(f"❌ 未预期的错误: {e}", exc_info=True)
  256. return jsonify({
  257. "code": 500,
  258. "message": "服务器内部错误",
  259. "success": False,
  260. "error": "系统异常,请稍后重试"
  261. }), 500
  262. @app.route('/api/v0/react/users/<user_id>/conversations', methods=['GET'])
  263. async def get_user_conversations(user_id: str):
  264. """异步获取用户的聊天记录列表"""
  265. global _agent_instance
  266. try:
  267. # 获取查询参数
  268. limit = request.args.get('limit', 10, type=int)
  269. # 限制limit的范围
  270. limit = max(1, min(limit, 50)) # 限制在1-50之间
  271. logger.info(f"📋 异步获取用户 {user_id} 的对话列表,限制 {limit} 条")
  272. # 确保Agent可用
  273. if not await ensure_agent_ready():
  274. return jsonify({
  275. "success": False,
  276. "error": "Agent 未就绪",
  277. "timestamp": datetime.now().isoformat()
  278. }), 503
  279. # 直接调用异步方法
  280. conversations = await _agent_instance.get_user_recent_conversations(user_id, limit)
  281. return jsonify({
  282. "success": True,
  283. "data": {
  284. "user_id": user_id,
  285. "conversations": conversations,
  286. "total_count": len(conversations),
  287. "limit": limit
  288. },
  289. "timestamp": datetime.now().isoformat()
  290. }), 200
  291. except Exception as e:
  292. logger.error(f"❌ 异步获取用户 {user_id} 对话列表失败: {e}")
  293. return jsonify({
  294. "success": False,
  295. "error": str(e),
  296. "timestamp": datetime.now().isoformat()
  297. }), 500
  298. @app.route('/api/v0/react/users/<user_id>/conversations/<thread_id>', methods=['GET'])
  299. async def get_user_conversation_detail(user_id: str, thread_id: str):
  300. """异步获取特定对话的详细历史"""
  301. global _agent_instance
  302. try:
  303. # 验证thread_id格式是否匹配user_id
  304. if not thread_id.startswith(f"{user_id}:"):
  305. return jsonify({
  306. "success": False,
  307. "error": f"Thread ID {thread_id} 不属于用户 {user_id}",
  308. "timestamp": datetime.now().isoformat()
  309. }), 400
  310. logger.info(f"📖 异步获取用户 {user_id} 的对话 {thread_id} 详情")
  311. # 确保Agent可用
  312. if not await ensure_agent_ready():
  313. return jsonify({
  314. "success": False,
  315. "error": "Agent 未就绪",
  316. "timestamp": datetime.now().isoformat()
  317. }), 503
  318. # 直接调用异步方法
  319. history = await _agent_instance.get_conversation_history(thread_id)
  320. logger.info(f"✅ 异步成功获取对话历史,消息数量: {len(history)}")
  321. if not history:
  322. return jsonify({
  323. "success": False,
  324. "error": f"未找到对话 {thread_id}",
  325. "timestamp": datetime.now().isoformat()
  326. }), 404
  327. return jsonify({
  328. "success": True,
  329. "data": {
  330. "user_id": user_id,
  331. "thread_id": thread_id,
  332. "message_count": len(history),
  333. "messages": history
  334. },
  335. "timestamp": datetime.now().isoformat()
  336. }), 200
  337. except Exception as e:
  338. import traceback
  339. logger.error(f"❌ 异步获取对话 {thread_id} 详情失败: {e}")
  340. logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
  341. return jsonify({
  342. "success": False,
  343. "error": str(e),
  344. "timestamp": datetime.now().isoformat()
  345. }), 500
  346. # 简单Redis查询函数(测试用)
  347. def get_user_conversations_simple_sync(user_id: str, limit: int = 10):
  348. """直接从Redis获取用户对话,测试版本"""
  349. import redis
  350. import json
  351. try:
  352. # 创建Redis连接
  353. redis_client = redis.Redis(
  354. host=config.REDIS_HOST,
  355. port=config.REDIS_PORT,
  356. db=config.REDIS_DB,
  357. password=config.REDIS_PASSWORD,
  358. decode_responses=True
  359. )
  360. redis_client.ping()
  361. # 扫描用户的checkpoint keys
  362. pattern = f"checkpoint:{user_id}:*"
  363. logger.info(f"🔍 扫描模式: {pattern}")
  364. keys = []
  365. cursor = 0
  366. while True:
  367. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  368. keys.extend(batch)
  369. if cursor == 0:
  370. break
  371. logger.info(f"📋 找到 {len(keys)} 个keys")
  372. # 解析thread信息
  373. thread_data = {}
  374. for key in keys:
  375. try:
  376. parts = key.split(':')
  377. if len(parts) >= 4:
  378. thread_id = f"{parts[1]}:{parts[2]}" # user_id:timestamp
  379. timestamp = parts[2]
  380. if thread_id not in thread_data:
  381. thread_data[thread_id] = {
  382. "thread_id": thread_id,
  383. "timestamp": timestamp,
  384. "keys": []
  385. }
  386. thread_data[thread_id]["keys"].append(key)
  387. except Exception as e:
  388. logger.warning(f"解析key失败 {key}: {e}")
  389. continue
  390. logger.info(f"📊 找到 {len(thread_data)} 个thread")
  391. # 按时间戳排序
  392. sorted_threads = sorted(
  393. thread_data.values(),
  394. key=lambda x: x["timestamp"],
  395. reverse=True
  396. )[:limit]
  397. # 获取每个thread的详细信息
  398. conversations = []
  399. for thread_info in sorted_threads:
  400. try:
  401. thread_id = thread_info["thread_id"]
  402. # 获取最新的checkpoint数据
  403. latest_key = max(thread_info["keys"])
  404. # 先检查key的数据类型
  405. key_type = redis_client.type(latest_key)
  406. logger.info(f"🔍 Key {latest_key} 的类型: {key_type}")
  407. data = None
  408. if key_type == 'string':
  409. data = redis_client.get(latest_key)
  410. elif key_type == 'hash':
  411. # 如果是hash类型,获取所有字段
  412. hash_data = redis_client.hgetall(latest_key)
  413. logger.info(f"🔍 Hash字段: {list(hash_data.keys())}")
  414. # 尝试获取可能的数据字段
  415. for field in ['data', 'state', 'value', 'checkpoint']:
  416. if field in hash_data:
  417. data = hash_data[field]
  418. break
  419. if not data and hash_data:
  420. # 如果没找到预期字段,取第一个值试试
  421. data = list(hash_data.values())[0]
  422. elif key_type == 'list':
  423. # 如果是list类型,获取最后一个元素
  424. data = redis_client.lindex(latest_key, -1)
  425. elif key_type == 'ReJSON-RL':
  426. # 这是RedisJSON类型,使用JSON.GET命令
  427. logger.info(f"🔍 使用JSON.GET获取RedisJSON数据")
  428. try:
  429. # 使用JSON.GET命令获取整个JSON对象
  430. json_data = redis_client.execute_command('JSON.GET', latest_key)
  431. if json_data:
  432. data = json_data # JSON.GET返回的就是JSON字符串
  433. logger.info(f"🔍 JSON数据长度: {len(data)} 字符")
  434. else:
  435. logger.warning(f"⚠️ JSON.GET 返回空数据")
  436. continue
  437. except Exception as json_error:
  438. logger.error(f"❌ JSON.GET 失败: {json_error}")
  439. continue
  440. else:
  441. logger.warning(f"⚠️ 未知的key类型: {key_type}")
  442. continue
  443. if data:
  444. try:
  445. checkpoint_data = json.loads(data)
  446. # 调试:查看JSON数据结构
  447. logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}")
  448. # 根据您提供的JSON结构,消息在 checkpoint.channel_values.messages
  449. messages = []
  450. # 首先检查是否有checkpoint字段
  451. if 'checkpoint' in checkpoint_data:
  452. checkpoint = checkpoint_data['checkpoint']
  453. if isinstance(checkpoint, dict) and 'channel_values' in checkpoint:
  454. channel_values = checkpoint['channel_values']
  455. if isinstance(channel_values, dict) and 'messages' in channel_values:
  456. messages = channel_values['messages']
  457. logger.info(f"🔍 找到messages: {len(messages)} 条消息")
  458. # 如果没有checkpoint字段,尝试直接在channel_values
  459. if not messages and 'channel_values' in checkpoint_data:
  460. channel_values = checkpoint_data['channel_values']
  461. if isinstance(channel_values, dict) and 'messages' in channel_values:
  462. messages = channel_values['messages']
  463. logger.info(f"🔍 找到messages(直接路径): {len(messages)} 条消息")
  464. # 生成对话预览
  465. preview = "空对话"
  466. if messages:
  467. for msg in messages:
  468. # 处理LangChain消息格式:{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "...", "type": "human"}}
  469. if isinstance(msg, dict):
  470. # 检查是否是LangChain格式的HumanMessage
  471. if (msg.get('lc') == 1 and
  472. msg.get('type') == 'constructor' and
  473. 'id' in msg and
  474. isinstance(msg['id'], list) and
  475. len(msg['id']) >= 4 and
  476. msg['id'][3] == 'HumanMessage' and
  477. 'kwargs' in msg):
  478. kwargs = msg['kwargs']
  479. if kwargs.get('type') == 'human' and 'content' in kwargs:
  480. content = str(kwargs['content'])
  481. preview = content[:50] + "..." if len(content) > 50 else content
  482. break
  483. # 兼容其他格式
  484. elif msg.get('type') == 'human' and 'content' in msg:
  485. content = str(msg['content'])
  486. preview = content[:50] + "..." if len(content) > 50 else content
  487. break
  488. conversations.append({
  489. "thread_id": thread_id,
  490. "user_id": user_id,
  491. "timestamp": thread_info["timestamp"],
  492. "message_count": len(messages),
  493. "conversation_preview": preview
  494. })
  495. except json.JSONDecodeError:
  496. logger.error(f"❌ JSON解析失败,数据类型: {type(data)}, 长度: {len(str(data))}")
  497. logger.error(f"❌ 数据开头: {str(data)[:200]}...")
  498. continue
  499. except Exception as e:
  500. logger.error(f"处理thread {thread_info['thread_id']} 失败: {e}")
  501. continue
  502. redis_client.close()
  503. logger.info(f"✅ 返回 {len(conversations)} 个对话")
  504. return conversations
  505. except Exception as e:
  506. logger.error(f"❌ Redis查询失败: {e}")
  507. return []
  508. @app.route('/api/test/redis', methods=['GET'])
  509. def test_redis_connection():
  510. """测试Redis连接和基本查询"""
  511. try:
  512. import redis
  513. # 创建Redis连接
  514. r = redis.Redis(
  515. host=config.REDIS_HOST,
  516. port=config.REDIS_PORT,
  517. db=config.REDIS_DB,
  518. password=config.REDIS_PASSWORD,
  519. decode_responses=True
  520. )
  521. r.ping()
  522. # 扫描checkpoint keys
  523. pattern = "checkpoint:*"
  524. keys = []
  525. cursor = 0
  526. count = 0
  527. while True:
  528. cursor, batch = r.scan(cursor=cursor, match=pattern, count=100)
  529. keys.extend(batch)
  530. count += len(batch)
  531. if cursor == 0 or count > 500: # 限制扫描数量
  532. break
  533. # 统计用户
  534. users = {}
  535. for key in keys:
  536. try:
  537. parts = key.split(':')
  538. if len(parts) >= 2:
  539. user_id = parts[1]
  540. users[user_id] = users.get(user_id, 0) + 1
  541. except:
  542. continue
  543. r.close()
  544. return jsonify({
  545. "success": True,
  546. "data": {
  547. "redis_connected": True,
  548. "total_checkpoint_keys": len(keys),
  549. "users_found": list(users.keys()),
  550. "user_key_counts": users,
  551. "sample_keys": keys[:5] if keys else []
  552. },
  553. "timestamp": datetime.now().isoformat()
  554. }), 200
  555. except Exception as e:
  556. logger.error(f"❌ Redis测试失败: {e}")
  557. return jsonify({
  558. "success": False,
  559. "error": str(e),
  560. "timestamp": datetime.now().isoformat()
  561. }), 500
  562. @app.route('/api/v0/react/direct/users/<user_id>/conversations', methods=['GET'])
  563. def test_get_user_conversations_simple(user_id: str):
  564. """测试简单Redis查询获取用户对话列表"""
  565. try:
  566. limit = request.args.get('limit', 10, type=int)
  567. limit = max(1, min(limit, 50))
  568. logger.info(f"🧪 测试获取用户 {user_id} 的对话列表(简单Redis方式)")
  569. # 使用简单Redis查询
  570. conversations = get_user_conversations_simple_sync(user_id, limit)
  571. return jsonify({
  572. "success": True,
  573. "method": "simple_redis_query",
  574. "data": {
  575. "user_id": user_id,
  576. "conversations": conversations,
  577. "total_count": len(conversations),
  578. "limit": limit
  579. },
  580. "timestamp": datetime.now().isoformat()
  581. }), 200
  582. except Exception as e:
  583. logger.error(f"❌ 测试简单Redis查询失败: {e}")
  584. return jsonify({
  585. "success": False,
  586. "error": str(e),
  587. "timestamp": datetime.now().isoformat()
  588. }), 500
  589. # 在 api.py 文件顶部的导入部分添加:
  590. try:
  591. from .enhanced_redis_api import get_conversation_detail_from_redis
  592. except ImportError:
  593. from enhanced_redis_api import get_conversation_detail_from_redis
  594. # 在 api.py 文件中添加以下新路由:
  595. @app.route('/api/v0/react/direct/conversations/<thread_id>', methods=['GET'])
  596. def get_conversation_detail_api(thread_id: str):
  597. """
  598. 获取特定对话的详细信息 - 支持include_tools开关参数
  599. Query Parameters:
  600. - include_tools: bool, 是否包含工具调用信息,默认false
  601. true: 返回完整对话(human/ai/tool/system)
  602. false: 只返回human/ai消息,清理工具调用信息
  603. - user_id: str, 可选的用户ID验证
  604. Examples:
  605. GET /api/conversations/wang:20250709195048728?include_tools=true # 完整模式
  606. GET /api/conversations/wang:20250709195048728?include_tools=false # 简化模式(默认)
  607. GET /api/conversations/wang:20250709195048728 # 简化模式(默认)
  608. """
  609. try:
  610. # 获取查询参数
  611. include_tools = request.args.get('include_tools', 'false').lower() == 'true'
  612. user_id = request.args.get('user_id')
  613. # 验证thread_id格式
  614. if ':' not in thread_id:
  615. return jsonify({
  616. "success": False,
  617. "error": "Invalid thread_id format. Expected format: user_id:timestamp",
  618. "timestamp": datetime.now().isoformat()
  619. }), 400
  620. # 如果提供了user_id,验证thread_id是否属于该用户
  621. thread_user_id = thread_id.split(':')[0]
  622. if user_id and thread_user_id != user_id:
  623. return jsonify({
  624. "success": False,
  625. "error": f"Thread ID {thread_id} does not belong to user {user_id}",
  626. "timestamp": datetime.now().isoformat()
  627. }), 400
  628. logger.info(f"📖 获取对话详情 - Thread: {thread_id}, Include Tools: {include_tools}")
  629. # 从Redis获取对话详情(使用我们的新函数)
  630. result = get_conversation_detail_from_redis(thread_id, include_tools)
  631. if not result['success']:
  632. logger.warning(f"⚠️ 获取对话详情失败: {result['error']}")
  633. return jsonify({
  634. "success": False,
  635. "error": result['error'],
  636. "timestamp": datetime.now().isoformat()
  637. }), 404
  638. # 添加API元数据
  639. result['data']['api_metadata'] = {
  640. "timestamp": datetime.now().isoformat(),
  641. "api_version": "v1",
  642. "endpoint": "get_conversation_detail",
  643. "query_params": {
  644. "include_tools": include_tools,
  645. "user_id": user_id
  646. }
  647. }
  648. mode_desc = "完整模式" if include_tools else "简化模式"
  649. logger.info(f"✅ 成功获取对话详情 - Messages: {result['data']['message_count']}, Mode: {mode_desc}")
  650. return jsonify({
  651. "success": True,
  652. "data": result['data'],
  653. "timestamp": datetime.now().isoformat()
  654. }), 200
  655. except Exception as e:
  656. import traceback
  657. logger.error(f"❌ 获取对话详情异常: {e}")
  658. logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
  659. return jsonify({
  660. "success": False,
  661. "error": str(e),
  662. "timestamp": datetime.now().isoformat()
  663. }), 500
  664. @app.route('/api/v0/react/direct/conversations/<thread_id>/compare', methods=['GET'])
  665. def compare_conversation_modes_api(thread_id: str):
  666. """
  667. 比较完整模式和简化模式的对话内容
  668. 用于调试和理解两种模式的差异
  669. Examples:
  670. GET /api/conversations/wang:20250709195048728/compare
  671. """
  672. try:
  673. logger.info(f"🔍 比较对话模式 - Thread: {thread_id}")
  674. # 获取完整模式
  675. full_result = get_conversation_detail_from_redis(thread_id, include_tools=True)
  676. # 获取简化模式
  677. simple_result = get_conversation_detail_from_redis(thread_id, include_tools=False)
  678. if not (full_result['success'] and simple_result['success']):
  679. return jsonify({
  680. "success": False,
  681. "error": "无法获取对话数据进行比较",
  682. "timestamp": datetime.now().isoformat()
  683. }), 404
  684. # 构建比较结果
  685. comparison = {
  686. "thread_id": thread_id,
  687. "full_mode": {
  688. "message_count": full_result['data']['message_count'],
  689. "stats": full_result['data']['stats'],
  690. "sample_messages": full_result['data']['messages'][:3] # 只显示前3条作为示例
  691. },
  692. "simple_mode": {
  693. "message_count": simple_result['data']['message_count'],
  694. "stats": simple_result['data']['stats'],
  695. "sample_messages": simple_result['data']['messages'][:3] # 只显示前3条作为示例
  696. },
  697. "comparison_summary": {
  698. "message_count_difference": full_result['data']['message_count'] - simple_result['data']['message_count'],
  699. "tools_filtered_out": full_result['data']['stats'].get('tool_messages', 0),
  700. "ai_messages_with_tools": full_result['data']['stats'].get('messages_with_tools', 0),
  701. "filtering_effectiveness": "有效" if (full_result['data']['message_count'] - simple_result['data']['message_count']) > 0 else "无差异"
  702. },
  703. "metadata": {
  704. "timestamp": datetime.now().isoformat(),
  705. "note": "sample_messages 只显示前3条消息作为示例,完整消息请使用相应的详情API"
  706. }
  707. }
  708. logger.info(f"✅ 模式比较完成 - 完整: {comparison['full_mode']['message_count']}, 简化: {comparison['simple_mode']['message_count']}")
  709. return jsonify({
  710. "success": True,
  711. "data": comparison,
  712. "timestamp": datetime.now().isoformat()
  713. }), 200
  714. except Exception as e:
  715. logger.error(f"❌ 对话模式比较失败: {e}")
  716. return jsonify({
  717. "success": False,
  718. "error": str(e),
  719. "timestamp": datetime.now().isoformat()
  720. }), 500
  721. @app.route('/api/v0/react/direct/conversations/<thread_id>/summary', methods=['GET'])
  722. def get_conversation_summary_api(thread_id: str):
  723. """
  724. 获取对话摘要信息(只包含基本统计,不返回具体消息)
  725. Query Parameters:
  726. - include_tools: bool, 影响统计信息的计算方式
  727. Examples:
  728. GET /api/conversations/wang:20250709195048728/summary?include_tools=true
  729. """
  730. try:
  731. include_tools = request.args.get('include_tools', 'false').lower() == 'true'
  732. # 验证thread_id格式
  733. if ':' not in thread_id:
  734. return jsonify({
  735. "success": False,
  736. "error": "Invalid thread_id format. Expected format: user_id:timestamp",
  737. "timestamp": datetime.now().isoformat()
  738. }), 400
  739. logger.info(f"📊 获取对话摘要 - Thread: {thread_id}, Include Tools: {include_tools}")
  740. # 获取完整对话信息
  741. result = get_conversation_detail_from_redis(thread_id, include_tools)
  742. if not result['success']:
  743. return jsonify({
  744. "success": False,
  745. "error": result['error'],
  746. "timestamp": datetime.now().isoformat()
  747. }), 404
  748. # 只返回摘要信息,不包含具体消息
  749. data = result['data']
  750. summary = {
  751. "thread_id": data['thread_id'],
  752. "user_id": data['user_id'],
  753. "include_tools": data['include_tools'],
  754. "message_count": data['message_count'],
  755. "stats": data['stats'],
  756. "metadata": data['metadata'],
  757. "first_message_preview": None,
  758. "last_message_preview": None,
  759. "conversation_preview": None
  760. }
  761. # 添加消息预览
  762. messages = data.get('messages', [])
  763. if messages:
  764. # 第一条human消息预览
  765. for msg in messages:
  766. if msg['type'] == 'human':
  767. content = str(msg['content'])
  768. summary['first_message_preview'] = content[:100] + "..." if len(content) > 100 else content
  769. break
  770. # 最后一条ai消息预览
  771. for msg in reversed(messages):
  772. if msg['type'] == 'ai' and msg.get('content', '').strip():
  773. content = str(msg['content'])
  774. summary['last_message_preview'] = content[:100] + "..." if len(content) > 100 else content
  775. break
  776. # 生成对话预览(第一条human消息)
  777. summary['conversation_preview'] = summary['first_message_preview']
  778. # 添加API元数据
  779. summary['api_metadata'] = {
  780. "timestamp": datetime.now().isoformat(),
  781. "api_version": "v1",
  782. "endpoint": "get_conversation_summary"
  783. }
  784. logger.info(f"✅ 成功获取对话摘要")
  785. return jsonify({
  786. "success": True,
  787. "data": summary,
  788. "timestamp": datetime.now().isoformat()
  789. }), 200
  790. except Exception as e:
  791. logger.error(f"❌ 获取对话摘要失败: {e}")
  792. return jsonify({
  793. "success": False,
  794. "error": str(e),
  795. "timestamp": datetime.now().isoformat()
  796. }), 500
  797. # 为了支持独立运行
  798. if __name__ == "__main__":
  799. try:
  800. # 尝试使用ASGI模式启动(推荐)
  801. import uvicorn
  802. from asgiref.wsgi import WsgiToAsgi
  803. logger.info("🚀 使用ASGI模式启动异步Flask应用...")
  804. logger.info(" 这将解决事件循环冲突问题,支持LangGraph异步checkpoint保存")
  805. # 将Flask WSGI应用转换为ASGI应用
  806. asgi_app = WsgiToAsgi(app)
  807. # 信号处理
  808. import signal
  809. def signal_handler(signum, frame):
  810. logger.info("🛑 收到关闭信号,开始清理...")
  811. logger.info("正在关闭服务...")
  812. exit(0)
  813. signal.signal(signal.SIGINT, signal_handler)
  814. signal.signal(signal.SIGTERM, signal_handler)
  815. # 使用uvicorn启动ASGI应用
  816. uvicorn.run(
  817. asgi_app,
  818. host="0.0.0.0",
  819. port=8000,
  820. log_level="info",
  821. access_log=True
  822. )
  823. except ImportError as e:
  824. # 如果缺少ASGI依赖,fallback到传统Flask模式
  825. logger.warning("⚠️ ASGI依赖缺失,使用传统Flask模式启动")
  826. logger.warning(" 建议安装: pip install uvicorn asgiref")
  827. logger.warning(" 传统模式可能存在异步事件循环冲突问题")
  828. # 信号处理
  829. import signal
  830. def signal_handler(signum, frame):
  831. logger.info("🛑 收到关闭信号,开始清理...")
  832. logger.info("正在关闭服务...")
  833. exit(0)
  834. signal.signal(signal.SIGINT, signal_handler)
  835. signal.signal(signal.SIGTERM, signal_handler)
  836. # 启动传统Flask应用
  837. app.run(host="0.0.0.0", port=8000, debug=False, threaded=True)