citu_app.py 59 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426
  1. # 给dataops 对话助手返回结果
  2. from vanna.flask import VannaFlaskApp
  3. from core.vanna_llm_factory import create_vanna_instance
  4. from flask import request, jsonify
  5. import pandas as pd
  6. import common.result as result
  7. from datetime import datetime, timedelta
  8. from common.session_aware_cache import WebSessionAwareMemoryCache
  9. from app_config import API_MAX_RETURN_ROWS, ENABLE_RESULT_SUMMARY
  10. import re
  11. import chainlit as cl
  12. import json
  13. # 设置默认的最大返回行数
  14. DEFAULT_MAX_RETURN_ROWS = 200
  15. MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
  16. vn = create_vanna_instance()
  17. # 创建带时间戳的缓存
  18. timestamped_cache = WebSessionAwareMemoryCache()
  19. # 实例化 VannaFlaskApp,使用自定义缓存
  20. app = VannaFlaskApp(
  21. vn,
  22. cache=timestamped_cache, # 使用带时间戳的缓存
  23. title="辞图智能数据问答平台",
  24. logo = "https://www.citupro.com/img/logo-black-2.png",
  25. subtitle="让 AI 为你写 SQL",
  26. chart=False,
  27. allow_llm_to_see_data=True,
  28. ask_results_correct=True,
  29. followup_questions=True,
  30. debug=True
  31. )
  32. # 修改ask接口,支持前端传递session_id
  33. @app.flask_app.route('/api/v0/ask', methods=['POST'])
  34. def ask_full():
  35. req = request.get_json(force=True)
  36. question = req.get("question", None)
  37. browser_session_id = req.get("session_id", None) # 前端传递的会话ID
  38. if not question:
  39. from common.result import bad_request_response
  40. return jsonify(bad_request_response(
  41. response_text="缺少必需参数:question",
  42. missing_params=["question"]
  43. )), 400
  44. # 如果使用WebSessionAwareMemoryCache
  45. if hasattr(app.cache, 'generate_id_with_browser_session') and browser_session_id:
  46. # 这里需要修改vanna的ask方法来支持传递session_id
  47. # 或者预先调用generate_id来建立会话关联
  48. conversation_id = app.cache.generate_id_with_browser_session(
  49. question=question,
  50. browser_session_id=browser_session_id
  51. )
  52. try:
  53. sql, df, _ = vn.ask(
  54. question=question,
  55. print_results=False,
  56. visualize=False,
  57. allow_llm_to_see_data=True
  58. )
  59. # 关键:检查是否有LLM解释性文本(无法生成SQL的情况)
  60. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  61. # 在解释性文本末尾添加提示语
  62. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  63. # 使用标准化错误响应
  64. from common.result import validation_failed_response
  65. return jsonify(validation_failed_response(
  66. response_text=explanation_message
  67. )), 422 # 修改HTTP状态码为422
  68. # 如果sql为None但没有解释性文本,返回通用错误
  69. if sql is None:
  70. from common.result import validation_failed_response
  71. return jsonify(validation_failed_response(
  72. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  73. )), 422
  74. # 处理返回数据 - 使用新的query_result结构
  75. query_result = {
  76. "rows": [],
  77. "columns": [],
  78. "row_count": 0,
  79. "is_limited": False,
  80. "total_row_count": 0
  81. }
  82. summary = None
  83. if isinstance(df, pd.DataFrame):
  84. query_result["columns"] = list(df.columns)
  85. if not df.empty:
  86. total_rows = len(df)
  87. limited_df = df.head(MAX_RETURN_ROWS)
  88. query_result["rows"] = limited_df.to_dict(orient="records")
  89. query_result["row_count"] = len(limited_df)
  90. query_result["total_row_count"] = total_rows
  91. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  92. # 生成数据摘要(可通过配置控制,仅在有数据时生成)
  93. if ENABLE_RESULT_SUMMARY:
  94. try:
  95. summary = vn.generate_summary(question=question, df=df)
  96. print(f"[INFO] 成功生成摘要: {summary}")
  97. except Exception as e:
  98. print(f"[WARNING] 生成摘要失败: {str(e)}")
  99. summary = None
  100. # 构建返回数据
  101. response_data = {
  102. "sql": sql,
  103. "query_result": query_result,
  104. "conversation_id": conversation_id if 'conversation_id' in locals() else None,
  105. "session_id": browser_session_id
  106. }
  107. # 添加摘要(如果启用且生成成功)
  108. if ENABLE_RESULT_SUMMARY and summary is not None:
  109. response_data["summary"] = summary
  110. response_data["response"] = summary # 同时添加response字段
  111. from common.result import success_response
  112. return jsonify(success_response(
  113. response_text="查询执行完成" if summary is None else None,
  114. data=response_data
  115. ))
  116. except Exception as e:
  117. print(f"[ERROR] ask_full执行失败: {str(e)}")
  118. # 即使发生异常,也检查是否有业务层面的解释
  119. if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  120. # 在解释性文本末尾添加提示语
  121. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  122. from common.result import validation_failed_response
  123. return jsonify(validation_failed_response(
  124. response_text=explanation_message
  125. )), 422
  126. else:
  127. # 技术错误,使用500错误码
  128. from common.result import internal_error_response
  129. return jsonify(internal_error_response(
  130. response_text="查询处理失败,请稍后重试"
  131. )), 500
  132. @app.flask_app.route('/api/v0/citu_run_sql', methods=['POST'])
  133. def citu_run_sql():
  134. req = request.get_json(force=True)
  135. sql = req.get('sql')
  136. if not sql:
  137. from common.result import bad_request_response
  138. return jsonify(bad_request_response(
  139. response_text="缺少必需参数:sql",
  140. missing_params=["sql"]
  141. )), 400
  142. try:
  143. df = vn.run_sql(sql)
  144. # 处理返回数据 - 使用新的query_result结构
  145. query_result = {
  146. "rows": [],
  147. "columns": [],
  148. "row_count": 0,
  149. "is_limited": False,
  150. "total_row_count": 0
  151. }
  152. if isinstance(df, pd.DataFrame):
  153. query_result["columns"] = list(df.columns)
  154. if not df.empty:
  155. total_rows = len(df)
  156. limited_df = df.head(MAX_RETURN_ROWS)
  157. query_result["rows"] = limited_df.to_dict(orient="records")
  158. query_result["row_count"] = len(limited_df)
  159. query_result["total_row_count"] = total_rows
  160. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  161. from common.result import success_response
  162. return jsonify(success_response(
  163. response_text=f"SQL执行完成,共返回 {query_result['total_row_count']} 条记录" +
  164. (f",已限制显示前 {MAX_RETURN_ROWS} 条" if query_result["is_limited"] else ""),
  165. data={
  166. "sql": sql,
  167. "query_result": query_result
  168. }
  169. ))
  170. except Exception as e:
  171. print(f"[ERROR] citu_run_sql执行失败: {str(e)}")
  172. from common.result import internal_error_response
  173. return jsonify(internal_error_response(
  174. response_text=f"SQL执行失败,请检查SQL语句是否正确"
  175. )), 500
  176. @app.flask_app.route('/api/v0/ask_cached', methods=['POST'])
  177. def ask_cached():
  178. """
  179. 带缓存功能的智能查询接口
  180. 支持会话管理和结果缓存,提高查询效率
  181. """
  182. req = request.get_json(force=True)
  183. question = req.get("question", None)
  184. browser_session_id = req.get("session_id", None)
  185. if not question:
  186. from common.result import bad_request_response
  187. return jsonify(bad_request_response(
  188. response_text="缺少必需参数:question",
  189. missing_params=["question"]
  190. )), 400
  191. try:
  192. # 生成conversation_id
  193. # 调试:查看generate_id的实际行为
  194. print(f"[DEBUG] 输入问题: '{question}'")
  195. conversation_id = app.cache.generate_id(question=question)
  196. print(f"[DEBUG] 生成的conversation_id: {conversation_id}")
  197. # 再次用相同问题测试
  198. conversation_id2 = app.cache.generate_id(question=question)
  199. print(f"[DEBUG] 再次生成的conversation_id: {conversation_id2}")
  200. print(f"[DEBUG] 两次ID是否相同: {conversation_id == conversation_id2}")
  201. # 检查缓存
  202. cached_sql = app.cache.get(id=conversation_id, field="sql")
  203. if cached_sql is not None:
  204. # 缓存命中
  205. print(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
  206. sql = cached_sql
  207. df = app.cache.get(id=conversation_id, field="df")
  208. summary = app.cache.get(id=conversation_id, field="summary")
  209. else:
  210. # 缓存未命中,执行新查询
  211. print(f"[CACHE MISS] 执行新查询: {conversation_id}")
  212. sql, df, _ = vn.ask(
  213. question=question,
  214. print_results=False,
  215. visualize=False,
  216. allow_llm_to_see_data=True
  217. )
  218. # 检查是否有LLM解释性文本(无法生成SQL的情况)
  219. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  220. # 在解释性文本末尾添加提示语
  221. explanation_message = vn.last_llm_explanation + "请尝试用其它方式提问。"
  222. from common.result import validation_failed_response
  223. return jsonify(validation_failed_response(
  224. response_text=explanation_message
  225. )), 422
  226. # 如果sql为None但没有解释性文本,返回通用错误
  227. if sql is None:
  228. from common.result import validation_failed_response
  229. return jsonify(validation_failed_response(
  230. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  231. )), 422
  232. # 缓存结果
  233. app.cache.set(id=conversation_id, field="question", value=question)
  234. app.cache.set(id=conversation_id, field="sql", value=sql)
  235. app.cache.set(id=conversation_id, field="df", value=df)
  236. # 生成并缓存摘要(可通过配置控制,仅在有数据时生成)
  237. summary = None
  238. if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
  239. try:
  240. summary = vn.generate_summary(question=question, df=df)
  241. print(f"[INFO] 成功生成摘要: {summary}")
  242. except Exception as e:
  243. print(f"[WARNING] 生成摘要失败: {str(e)}")
  244. summary = None
  245. app.cache.set(id=conversation_id, field="summary", value=summary)
  246. # 处理返回数据 - 使用新的query_result结构
  247. query_result = {
  248. "rows": [],
  249. "columns": [],
  250. "row_count": 0,
  251. "is_limited": False,
  252. "total_row_count": 0
  253. }
  254. if isinstance(df, pd.DataFrame):
  255. query_result["columns"] = list(df.columns)
  256. if not df.empty:
  257. total_rows = len(df)
  258. limited_df = df.head(MAX_RETURN_ROWS)
  259. query_result["rows"] = limited_df.to_dict(orient="records")
  260. query_result["row_count"] = len(limited_df)
  261. query_result["total_row_count"] = total_rows
  262. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  263. # 构建返回数据
  264. response_data = {
  265. "sql": sql,
  266. "query_result": query_result,
  267. "conversation_id": conversation_id,
  268. "session_id": browser_session_id,
  269. "cached": cached_sql is not None # 标识是否来自缓存
  270. }
  271. # 添加摘要(如果启用且生成成功)
  272. if ENABLE_RESULT_SUMMARY and summary is not None:
  273. response_data["summary"] = summary
  274. response_data["response"] = summary # 同时添加response字段
  275. from common.result import success_response
  276. return jsonify(success_response(
  277. response_text="查询执行完成" if summary is None else None,
  278. data=response_data
  279. ))
  280. except Exception as e:
  281. print(f"[ERROR] ask_cached执行失败: {str(e)}")
  282. from common.result import internal_error_response
  283. return jsonify(internal_error_response(
  284. response_text="查询处理失败,请稍后重试"
  285. )), 500
  286. @app.flask_app.route('/api/v0/citu_train_question_sql', methods=['POST'])
  287. def citu_train_question_sql():
  288. """
  289. 训练问题-SQL对接口
  290. 此API将接收的question/sql pair写入到training库中,用于训练和改进AI模型。
  291. 支持仅传入SQL或同时传入问题和SQL进行训练。
  292. Args:
  293. question (str, optional): 用户问题
  294. sql (str, required): 对应的SQL查询语句
  295. Returns:
  296. JSON: 包含训练ID和成功消息的响应
  297. """
  298. try:
  299. req = request.get_json(force=True)
  300. question = req.get('question')
  301. sql = req.get('sql')
  302. if not sql:
  303. from common.result import bad_request_response
  304. return jsonify(bad_request_response(
  305. response_text="缺少必需参数:sql",
  306. missing_params=["sql"]
  307. )), 400
  308. # 正确的调用方式:同时传递question和sql
  309. if question:
  310. training_id = vn.train(question=question, sql=sql)
  311. print(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
  312. else:
  313. training_id = vn.train(sql=sql)
  314. print(f"训练成功,训练ID为:{training_id},SQL:{sql}")
  315. from common.result import success_response
  316. return jsonify(success_response(
  317. response_text="问题-SQL对训练成功",
  318. data={
  319. "training_id": training_id,
  320. "message": "Question-SQL pair trained successfully"
  321. }
  322. ))
  323. except Exception as e:
  324. from common.result import internal_error_response
  325. return jsonify(internal_error_response(
  326. response_text="训练失败,请稍后重试"
  327. )), 500
  328. # ============ LangGraph Agent 集成 ============
  329. # 全局Agent实例(单例模式)
  330. citu_langraph_agent = None
  331. def get_citu_langraph_agent():
  332. """获取LangGraph Agent实例(懒加载)"""
  333. global citu_langraph_agent
  334. if citu_langraph_agent is None:
  335. try:
  336. from agent.citu_agent import CituLangGraphAgent
  337. print("[CITU_APP] 开始创建LangGraph Agent实例...")
  338. citu_langraph_agent = CituLangGraphAgent()
  339. print("[CITU_APP] LangGraph Agent实例创建成功")
  340. except ImportError as e:
  341. print(f"[CRITICAL] Agent模块导入失败: {str(e)}")
  342. print("[CRITICAL] 请检查agent模块是否存在以及依赖是否正确安装")
  343. raise Exception(f"Agent模块导入失败: {str(e)}")
  344. except Exception as e:
  345. print(f"[CRITICAL] LangGraph Agent实例创建失败: {str(e)}")
  346. print(f"[CRITICAL] 错误类型: {type(e).__name__}")
  347. # 提供更有用的错误信息
  348. if "config" in str(e).lower():
  349. print("[CRITICAL] 可能是配置文件问题,请检查配置")
  350. elif "llm" in str(e).lower():
  351. print("[CRITICAL] 可能是LLM连接问题,请检查LLM配置")
  352. elif "tool" in str(e).lower():
  353. print("[CRITICAL] 可能是工具加载问题,请检查工具模块")
  354. raise Exception(f"Agent初始化失败: {str(e)}")
  355. return citu_langraph_agent
  356. @app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
  357. def ask_agent():
  358. """
  359. 新的LangGraph Agent接口
  360. 请求格式:
  361. {
  362. "question": "用户问题",
  363. "session_id": "会话ID(可选)"
  364. }
  365. 响应格式:
  366. {
  367. "success": true/false,
  368. "code": 200,
  369. "message": "success" 或错误信息,
  370. "data": {
  371. "response": "最终回答",
  372. "type": "DATABASE/CHAT",
  373. "sql": "生成的SQL(如果是数据库查询)",
  374. "data_result": {
  375. "rows": [...],
  376. "columns": [...],
  377. "row_count": 数字
  378. },
  379. "summary": "数据摘要(如果是数据库查询)",
  380. "session_id": "会话ID",
  381. "execution_path": ["classify", "agent_database", "format_response"],
  382. "classification_info": {
  383. "confidence": 0.95,
  384. "reason": "分类原因",
  385. "method": "rule_based/llm_based"
  386. },
  387. "agent_version": "langgraph_v1"
  388. }
  389. }
  390. """
  391. req = request.get_json(force=True)
  392. question = req.get("question", None)
  393. browser_session_id = req.get("session_id", None)
  394. if not question:
  395. from common.result import bad_request_response
  396. return jsonify(bad_request_response(
  397. response_text="缺少必需参数:question",
  398. missing_params=["question"]
  399. )), 400
  400. try:
  401. # 专门处理Agent初始化异常
  402. try:
  403. agent = get_citu_langraph_agent()
  404. except Exception as e:
  405. print(f"[CRITICAL] Agent初始化失败: {str(e)}")
  406. from common.result import service_unavailable_response
  407. return jsonify(service_unavailable_response(
  408. response_text="AI服务暂时不可用,请稍后重试",
  409. can_retry=True
  410. )), 503
  411. # 调用Agent处理问题
  412. agent_result = agent.process_question(
  413. question=question,
  414. session_id=browser_session_id
  415. )
  416. # 统一返回格式 - 使用标准化的agent_success_response
  417. if agent_result.get("success", False):
  418. from common.result import agent_success_response
  419. return jsonify(agent_success_response(
  420. response_type=agent_result.get("type", "UNKNOWN"),
  421. session_id=browser_session_id,
  422. execution_path=agent_result.get("execution_path", []),
  423. classification_info=agent_result.get("classification_info", {}),
  424. response=agent_result.get("response", ""),
  425. sql=agent_result.get("sql"),
  426. query_result=agent_result.get("data_result"), # 字段重命名:data_result → query_result
  427. summary=agent_result.get("summary")
  428. ))
  429. else:
  430. # 使用标准化的agent_error_response
  431. from common.result import agent_error_response
  432. from common.messages import MessageTemplate, ErrorType
  433. # 根据错误代码选择合适的message
  434. error_code = agent_result.get("error_code", 500)
  435. if error_code == 400:
  436. message = MessageTemplate.BAD_REQUEST
  437. elif error_code == 422:
  438. message = MessageTemplate.VALIDATION_FAILED
  439. elif error_code == 503:
  440. message = MessageTemplate.SERVICE_UNAVAILABLE
  441. else:
  442. message = MessageTemplate.PROCESSING_FAILED
  443. return jsonify(agent_error_response(
  444. response_text=agent_result.get("error", "Agent处理失败"),
  445. error_type=ErrorType.REQUEST_PROCESSING_FAILED,
  446. message=message,
  447. code=error_code,
  448. session_id=browser_session_id,
  449. execution_path=agent_result.get("execution_path", []),
  450. classification_info=agent_result.get("classification_info", {})
  451. )), 200 # HTTP 200但业务失败
  452. except Exception as e:
  453. print(f"[ERROR] ask_agent执行失败: {str(e)}")
  454. from common.result import internal_error_response
  455. return jsonify(internal_error_response(
  456. response_text="请求处理异常,请稍后重试"
  457. )), 500
  458. @app.flask_app.route('/api/v0/agent_health', methods=['GET'])
  459. def agent_health():
  460. """
  461. Agent健康检查接口
  462. 响应格式:
  463. {
  464. "success": true/false,
  465. "code": 200/503,
  466. "message": "healthy/degraded/unhealthy",
  467. "data": {
  468. "status": "healthy/degraded/unhealthy",
  469. "test_result": true/false,
  470. "workflow_compiled": true/false,
  471. "tools_count": 4,
  472. "message": "详细信息",
  473. "timestamp": "2024-01-01T12:00:00",
  474. "checks": {
  475. "agent_creation": true/false,
  476. "tools_import": true/false,
  477. "llm_connection": true/false,
  478. "classifier_ready": true/false
  479. }
  480. }
  481. }
  482. """
  483. try:
  484. # 基础健康检查
  485. health_data = {
  486. "status": "unknown",
  487. "test_result": False,
  488. "workflow_compiled": False,
  489. "tools_count": 0,
  490. "message": "",
  491. "timestamp": datetime.now().isoformat(),
  492. "checks": {
  493. "agent_creation": False,
  494. "tools_import": False,
  495. "llm_connection": False,
  496. "classifier_ready": False
  497. }
  498. }
  499. # 检查1: Agent创建
  500. try:
  501. agent = get_citu_langraph_agent()
  502. health_data["checks"]["agent_creation"] = True
  503. health_data["workflow_compiled"] = agent.workflow is not None
  504. health_data["tools_count"] = len(agent.tools) if hasattr(agent, 'tools') else 0
  505. except Exception as e:
  506. health_data["message"] = f"Agent创建失败: {str(e)}"
  507. from common.result import health_error_response
  508. return jsonify(health_error_response(
  509. status="unhealthy",
  510. **health_data
  511. )), 503
  512. # 检查2: 工具导入
  513. try:
  514. from agent.tools import TOOLS
  515. health_data["checks"]["tools_import"] = len(TOOLS) > 0
  516. except Exception as e:
  517. health_data["message"] = f"工具导入失败: {str(e)}"
  518. # 检查3: LLM连接(简单测试)
  519. try:
  520. from agent.utils import get_compatible_llm
  521. llm = get_compatible_llm()
  522. health_data["checks"]["llm_connection"] = llm is not None
  523. except Exception as e:
  524. health_data["message"] = f"LLM连接失败: {str(e)}"
  525. # 检查4: 分类器准备
  526. try:
  527. from agent.classifier import QuestionClassifier
  528. classifier = QuestionClassifier()
  529. health_data["checks"]["classifier_ready"] = True
  530. except Exception as e:
  531. health_data["message"] = f"分类器失败: {str(e)}"
  532. # 检查5: 完整流程测试(可选)
  533. try:
  534. if all(health_data["checks"].values()):
  535. test_result = agent.health_check()
  536. health_data["test_result"] = test_result.get("status") == "healthy"
  537. health_data["status"] = test_result.get("status", "unknown")
  538. health_data["message"] = test_result.get("message", "健康检查完成")
  539. else:
  540. health_data["status"] = "degraded"
  541. health_data["message"] = "部分组件异常"
  542. except Exception as e:
  543. health_data["status"] = "degraded"
  544. health_data["message"] = f"完整测试失败: {str(e)}"
  545. # 根据状态返回相应的HTTP代码 - 使用标准化健康检查响应
  546. from common.result import health_success_response, health_error_response
  547. if health_data["status"] == "healthy":
  548. return jsonify(health_success_response(**health_data))
  549. elif health_data["status"] == "degraded":
  550. return jsonify(health_error_response(status="degraded", **health_data)), 503
  551. else:
  552. return jsonify(health_error_response(status="unhealthy", **health_data)), 503
  553. except Exception as e:
  554. print(f"[ERROR] 健康检查异常: {str(e)}")
  555. from common.result import internal_error_response
  556. return jsonify(internal_error_response(
  557. response_text="健康检查失败,请稍后重试"
  558. )), 500
  559. # ==================== 日常管理API ====================
  560. @app.flask_app.route('/api/v0/cache_overview', methods=['GET'])
  561. def cache_overview():
  562. """日常管理:轻量概览 - 合并原cache_inspect的核心功能"""
  563. try:
  564. cache = app.cache
  565. result_data = {
  566. 'overview_summary': {
  567. 'total_conversations': 0,
  568. 'total_sessions': 0,
  569. 'query_time': datetime.now().isoformat()
  570. },
  571. 'recent_conversations': [], # 最近的对话
  572. 'session_summary': [] # 会话摘要
  573. }
  574. if hasattr(cache, 'cache') and isinstance(cache.cache, dict):
  575. result_data['overview_summary']['total_conversations'] = len(cache.cache)
  576. # 获取会话信息
  577. if hasattr(cache, 'get_all_sessions'):
  578. all_sessions = cache.get_all_sessions()
  579. result_data['overview_summary']['total_sessions'] = len(all_sessions)
  580. # 会话摘要(按最近活动排序)
  581. session_list = []
  582. for session_id, session_data in all_sessions.items():
  583. session_summary = {
  584. 'session_id': session_id,
  585. 'start_time': session_data['start_time'].isoformat(),
  586. 'conversation_count': session_data.get('conversation_count', 0),
  587. 'duration_seconds': session_data.get('session_duration_seconds', 0),
  588. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  589. 'is_active': (datetime.now() - session_data.get('last_activity', session_data['start_time'])).total_seconds() < 1800 # 30分钟内活跃
  590. }
  591. session_list.append(session_summary)
  592. # 按最后活动时间排序
  593. session_list.sort(key=lambda x: x['last_activity'], reverse=True)
  594. result_data['session_summary'] = session_list
  595. # 最近的对话(最多显示10个)
  596. conversation_list = []
  597. for conversation_id, conversation_data in cache.cache.items():
  598. conversation_start_time = cache.conversation_start_times.get(conversation_id)
  599. conversation_info = {
  600. 'conversation_id': conversation_id,
  601. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  602. 'session_id': cache.conversation_to_session.get(conversation_id),
  603. 'has_question': 'question' in conversation_data,
  604. 'has_sql': 'sql' in conversation_data,
  605. 'has_data': 'df' in conversation_data and conversation_data['df'] is not None,
  606. 'question_preview': conversation_data.get('question', '')[:80] + '...' if len(conversation_data.get('question', '')) > 80 else conversation_data.get('question', ''),
  607. }
  608. # 计算对话持续时间
  609. if conversation_start_time:
  610. duration = datetime.now() - conversation_start_time
  611. conversation_info['conversation_duration_seconds'] = duration.total_seconds()
  612. conversation_list.append(conversation_info)
  613. # 按对话开始时间排序,显示最新的10个
  614. conversation_list.sort(key=lambda x: x['conversation_start_time'] or '', reverse=True)
  615. result_data['recent_conversations'] = conversation_list[:10]
  616. from common.result import success_response
  617. return jsonify(success_response(
  618. response_text="缓存概览查询完成",
  619. data=result_data
  620. ))
  621. except Exception as e:
  622. from common.result import internal_error_response
  623. return jsonify(internal_error_response(
  624. response_text="获取缓存概览失败,请稍后重试"
  625. )), 500
  626. @app.flask_app.route('/api/v0/cache_stats', methods=['GET'])
  627. def cache_stats():
  628. """日常管理:统计信息 - 合并原session_stats和cache_stats功能"""
  629. try:
  630. cache = app.cache
  631. current_time = datetime.now()
  632. stats = {
  633. 'basic_stats': {
  634. 'total_sessions': len(getattr(cache, 'session_info', {})),
  635. 'total_conversations': len(getattr(cache, 'cache', {})),
  636. 'active_sessions': 0, # 最近30分钟有活动
  637. 'average_conversations_per_session': 0
  638. },
  639. 'time_distribution': {
  640. 'sessions': {
  641. 'last_1_hour': 0,
  642. 'last_6_hours': 0,
  643. 'last_24_hours': 0,
  644. 'last_7_days': 0,
  645. 'older': 0
  646. },
  647. 'conversations': {
  648. 'last_1_hour': 0,
  649. 'last_6_hours': 0,
  650. 'last_24_hours': 0,
  651. 'last_7_days': 0,
  652. 'older': 0
  653. }
  654. },
  655. 'session_details': [],
  656. 'time_ranges': {
  657. 'oldest_session': None,
  658. 'newest_session': None,
  659. 'oldest_conversation': None,
  660. 'newest_conversation': None
  661. }
  662. }
  663. # 会话统计
  664. if hasattr(cache, 'session_info'):
  665. session_times = []
  666. total_conversations = 0
  667. for session_id, session_data in cache.session_info.items():
  668. start_time = session_data['start_time']
  669. session_times.append(start_time)
  670. conversation_count = len(session_data.get('conversations', []))
  671. total_conversations += conversation_count
  672. # 检查活跃状态
  673. last_activity = session_data.get('last_activity', session_data['start_time'])
  674. if (current_time - last_activity).total_seconds() < 1800:
  675. stats['basic_stats']['active_sessions'] += 1
  676. # 时间分布统计
  677. age_hours = (current_time - start_time).total_seconds() / 3600
  678. if age_hours <= 1:
  679. stats['time_distribution']['sessions']['last_1_hour'] += 1
  680. elif age_hours <= 6:
  681. stats['time_distribution']['sessions']['last_6_hours'] += 1
  682. elif age_hours <= 24:
  683. stats['time_distribution']['sessions']['last_24_hours'] += 1
  684. elif age_hours <= 168: # 7 days
  685. stats['time_distribution']['sessions']['last_7_days'] += 1
  686. else:
  687. stats['time_distribution']['sessions']['older'] += 1
  688. # 会话详细信息
  689. session_duration = current_time - start_time
  690. stats['session_details'].append({
  691. 'session_id': session_id,
  692. 'start_time': start_time.isoformat(),
  693. 'last_activity': last_activity.isoformat(),
  694. 'conversation_count': conversation_count,
  695. 'duration_seconds': session_duration.total_seconds(),
  696. 'duration_formatted': str(session_duration),
  697. 'is_active': (current_time - last_activity).total_seconds() < 1800,
  698. 'browser_session_id': session_data.get('browser_session_id')
  699. })
  700. # 计算平均值
  701. if len(cache.session_info) > 0:
  702. stats['basic_stats']['average_conversations_per_session'] = total_conversations / len(cache.session_info)
  703. # 时间范围
  704. if session_times:
  705. stats['time_ranges']['oldest_session'] = min(session_times).isoformat()
  706. stats['time_ranges']['newest_session'] = max(session_times).isoformat()
  707. # 对话统计
  708. if hasattr(cache, 'conversation_start_times'):
  709. conversation_times = []
  710. for conv_time in cache.conversation_start_times.values():
  711. conversation_times.append(conv_time)
  712. age_hours = (current_time - conv_time).total_seconds() / 3600
  713. if age_hours <= 1:
  714. stats['time_distribution']['conversations']['last_1_hour'] += 1
  715. elif age_hours <= 6:
  716. stats['time_distribution']['conversations']['last_6_hours'] += 1
  717. elif age_hours <= 24:
  718. stats['time_distribution']['conversations']['last_24_hours'] += 1
  719. elif age_hours <= 168:
  720. stats['time_distribution']['conversations']['last_7_days'] += 1
  721. else:
  722. stats['time_distribution']['conversations']['older'] += 1
  723. if conversation_times:
  724. stats['time_ranges']['oldest_conversation'] = min(conversation_times).isoformat()
  725. stats['time_ranges']['newest_conversation'] = max(conversation_times).isoformat()
  726. # 按最近活动排序会话详情
  727. stats['session_details'].sort(key=lambda x: x['last_activity'], reverse=True)
  728. from common.result import success_response
  729. return jsonify(success_response(
  730. response_text="缓存统计信息查询完成",
  731. data=stats
  732. ))
  733. except Exception as e:
  734. from common.result import internal_error_response
  735. return jsonify(internal_error_response(
  736. response_text="获取缓存统计失败,请稍后重试"
  737. )), 500
  738. # ==================== 高级功能API ====================
  739. @app.flask_app.route('/api/v0/cache_export', methods=['GET'])
  740. def cache_export():
  741. """高级功能:完整导出 - 保持原cache_raw_export的完整功能"""
  742. try:
  743. cache = app.cache
  744. # 验证缓存的实际结构
  745. if not hasattr(cache, 'cache'):
  746. from common.result import internal_error_response
  747. return jsonify(internal_error_response(
  748. response_text="缓存对象结构异常,请联系系统管理员"
  749. )), 500
  750. if not isinstance(cache.cache, dict):
  751. from common.result import internal_error_response
  752. return jsonify(internal_error_response(
  753. response_text="缓存数据类型异常,请联系系统管理员"
  754. )), 500
  755. # 定义JSON序列化辅助函数
  756. def make_json_serializable(obj):
  757. """将对象转换为JSON可序列化的格式"""
  758. if obj is None:
  759. return None
  760. elif isinstance(obj, (str, int, float, bool)):
  761. return obj
  762. elif isinstance(obj, (list, tuple)):
  763. return [make_json_serializable(item) for item in obj]
  764. elif isinstance(obj, dict):
  765. return {str(k): make_json_serializable(v) for k, v in obj.items()}
  766. elif hasattr(obj, 'isoformat'): # datetime objects
  767. return obj.isoformat()
  768. elif hasattr(obj, 'item'): # numpy scalars
  769. return obj.item()
  770. elif hasattr(obj, 'tolist'): # numpy arrays
  771. return obj.tolist()
  772. elif hasattr(obj, '__dict__'): # pandas dtypes and other objects
  773. return str(obj)
  774. else:
  775. return str(obj)
  776. # 获取完整的原始缓存数据
  777. raw_cache = cache.cache
  778. # 获取会话和对话时间信息
  779. conversation_times = getattr(cache, 'conversation_start_times', {})
  780. session_info = getattr(cache, 'session_info', {})
  781. conversation_to_session = getattr(cache, 'conversation_to_session', {})
  782. export_data = {
  783. 'export_metadata': {
  784. 'export_time': datetime.now().isoformat(),
  785. 'total_conversations': len(raw_cache),
  786. 'total_sessions': len(session_info),
  787. 'cache_type': type(cache).__name__,
  788. 'cache_object_info': str(cache),
  789. 'has_session_times': bool(session_info),
  790. 'has_conversation_times': bool(conversation_times)
  791. },
  792. 'session_info': {
  793. session_id: {
  794. 'start_time': session_data['start_time'].isoformat(),
  795. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  796. 'conversations': session_data['conversations'],
  797. 'conversation_count': len(session_data['conversations']),
  798. 'browser_session_id': session_data.get('browser_session_id'),
  799. 'user_info': session_data.get('user_info', {})
  800. }
  801. for session_id, session_data in session_info.items()
  802. },
  803. 'conversation_times': {
  804. conversation_id: start_time.isoformat()
  805. for conversation_id, start_time in conversation_times.items()
  806. },
  807. 'conversation_to_session_mapping': conversation_to_session,
  808. 'conversations': {}
  809. }
  810. # 处理每个对话的完整数据
  811. for conversation_id, conversation_data in raw_cache.items():
  812. # 获取时间信息
  813. conversation_start_time = conversation_times.get(conversation_id)
  814. session_id = conversation_to_session.get(conversation_id)
  815. session_start_time = None
  816. if session_id and session_id in session_info:
  817. session_start_time = session_info[session_id]['start_time']
  818. processed_conversation = {
  819. 'conversation_id': conversation_id,
  820. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  821. 'session_id': session_id,
  822. 'session_start_time': session_start_time.isoformat() if session_start_time else None,
  823. 'field_count': len(conversation_data),
  824. 'fields': {}
  825. }
  826. # 添加时间计算
  827. if conversation_start_time:
  828. conversation_duration = datetime.now() - conversation_start_time
  829. processed_conversation['conversation_duration_seconds'] = conversation_duration.total_seconds()
  830. processed_conversation['conversation_duration_formatted'] = str(conversation_duration)
  831. if session_start_time:
  832. session_duration = datetime.now() - session_start_time
  833. processed_conversation['session_duration_seconds'] = session_duration.total_seconds()
  834. processed_conversation['session_duration_formatted'] = str(session_duration)
  835. # 处理每个字段,确保JSON序列化安全
  836. for field_name, field_value in conversation_data.items():
  837. field_info = {
  838. 'field_name': field_name,
  839. 'data_type': type(field_value).__name__,
  840. 'is_none': field_value is None
  841. }
  842. try:
  843. if field_value is None:
  844. field_info['value'] = None
  845. elif field_name in ['conversation_start_time', 'session_start_time']:
  846. # 处理时间字段
  847. field_info['content'] = make_json_serializable(field_value)
  848. elif field_name == 'df' and field_value is not None:
  849. # DataFrame的安全处理
  850. if hasattr(field_value, 'to_dict'):
  851. # 安全地处理dtypes
  852. try:
  853. dtypes_dict = {}
  854. for col, dtype in field_value.dtypes.items():
  855. dtypes_dict[col] = str(dtype)
  856. except Exception:
  857. dtypes_dict = {"error": "无法序列化dtypes"}
  858. # 安全地处理内存使用
  859. try:
  860. memory_usage = field_value.memory_usage(deep=True)
  861. memory_dict = {}
  862. for idx, usage in memory_usage.items():
  863. memory_dict[str(idx)] = int(usage) if hasattr(usage, 'item') else int(usage)
  864. except Exception:
  865. memory_dict = {"error": "无法获取内存使用信息"}
  866. field_info.update({
  867. 'dataframe_info': {
  868. 'shape': list(field_value.shape),
  869. 'columns': list(field_value.columns),
  870. 'dtypes': dtypes_dict,
  871. 'index_info': {
  872. 'type': type(field_value.index).__name__,
  873. 'length': len(field_value.index)
  874. }
  875. },
  876. 'data': make_json_serializable(field_value.to_dict('records')),
  877. 'memory_usage': memory_dict
  878. })
  879. else:
  880. field_info['value'] = str(field_value)
  881. field_info['note'] = 'not_standard_dataframe'
  882. elif field_name == 'fig_json':
  883. # 图表JSON数据处理
  884. if isinstance(field_value, str):
  885. try:
  886. import json
  887. parsed_fig = json.loads(field_value)
  888. field_info.update({
  889. 'json_valid': True,
  890. 'json_size_bytes': len(field_value),
  891. 'plotly_structure': {
  892. 'has_data': 'data' in parsed_fig,
  893. 'has_layout': 'layout' in parsed_fig,
  894. 'data_traces_count': len(parsed_fig.get('data', [])),
  895. },
  896. 'raw_json': field_value
  897. })
  898. except json.JSONDecodeError:
  899. field_info.update({
  900. 'json_valid': False,
  901. 'raw_content': str(field_value)
  902. })
  903. else:
  904. field_info['value'] = make_json_serializable(field_value)
  905. elif field_name == 'followup_questions':
  906. # 后续问题列表
  907. field_info.update({
  908. 'content': make_json_serializable(field_value)
  909. })
  910. elif field_name in ['question', 'sql', 'summary']:
  911. # 文本字段
  912. if isinstance(field_value, str):
  913. field_info.update({
  914. 'text_length': len(field_value),
  915. 'content': field_value
  916. })
  917. else:
  918. field_info['value'] = make_json_serializable(field_value)
  919. else:
  920. # 未知字段的安全处理
  921. field_info['content'] = make_json_serializable(field_value)
  922. except Exception as e:
  923. field_info.update({
  924. 'processing_error': str(e),
  925. 'fallback_value': str(field_value)[:500] + '...' if len(str(field_value)) > 500 else str(field_value)
  926. })
  927. processed_conversation['fields'][field_name] = field_info
  928. export_data['conversations'][conversation_id] = processed_conversation
  929. # 添加缓存统计信息
  930. field_frequency = {}
  931. data_types_found = set()
  932. total_dataframes = 0
  933. total_questions = 0
  934. for conv_data in export_data['conversations'].values():
  935. for field_name, field_info in conv_data['fields'].items():
  936. field_frequency[field_name] = field_frequency.get(field_name, 0) + 1
  937. data_types_found.add(field_info['data_type'])
  938. if field_name == 'df' and not field_info['is_none']:
  939. total_dataframes += 1
  940. if field_name == 'question' and not field_info['is_none']:
  941. total_questions += 1
  942. export_data['cache_statistics'] = {
  943. 'field_frequency': field_frequency,
  944. 'data_types_found': list(data_types_found),
  945. 'total_dataframes': total_dataframes,
  946. 'total_questions': total_questions,
  947. 'has_session_timing': 'session_start_time' in field_frequency,
  948. 'has_conversation_timing': 'conversation_start_time' in field_frequency
  949. }
  950. from common.result import success_response
  951. return jsonify(success_response(
  952. response_text="缓存数据导出完成",
  953. data=export_data
  954. ))
  955. except Exception as e:
  956. import traceback
  957. error_details = {
  958. 'error_message': str(e),
  959. 'error_type': type(e).__name__,
  960. 'traceback': traceback.format_exc()
  961. }
  962. from common.result import internal_error_response
  963. return jsonify(internal_error_response(
  964. response_text="导出缓存失败,请稍后重试"
  965. )), 500
  966. # ==================== 清理功能API ====================
  967. @app.flask_app.route('/api/v0/cache_preview_cleanup', methods=['POST'])
  968. def cache_preview_cleanup():
  969. """清理功能:预览删除操作 - 保持原功能"""
  970. try:
  971. req = request.get_json(force=True)
  972. # 时间条件 - 支持三种方式
  973. older_than_hours = req.get('older_than_hours')
  974. older_than_days = req.get('older_than_days')
  975. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  976. cache = app.cache
  977. # 计算截止时间
  978. cutoff_time = None
  979. time_condition = None
  980. if older_than_hours:
  981. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  982. time_condition = f"older_than_hours: {older_than_hours}"
  983. elif older_than_days:
  984. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  985. time_condition = f"older_than_days: {older_than_days}"
  986. elif before_timestamp:
  987. try:
  988. # 支持 YYYY-MM-DD HH:MM:SS 格式
  989. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  990. time_condition = f"before_timestamp: {before_timestamp}"
  991. except ValueError:
  992. from common.result import validation_failed_response
  993. return jsonify(validation_failed_response(
  994. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  995. )), 422
  996. else:
  997. from common.result import bad_request_response
  998. return jsonify(bad_request_response(
  999. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1000. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1001. )), 400
  1002. preview = {
  1003. 'time_condition': time_condition,
  1004. 'cutoff_time': cutoff_time.isoformat(),
  1005. 'will_be_removed': {
  1006. 'sessions': []
  1007. },
  1008. 'will_be_kept': {
  1009. 'sessions_count': 0,
  1010. 'conversations_count': 0
  1011. },
  1012. 'summary': {
  1013. 'sessions_to_remove': 0,
  1014. 'conversations_to_remove': 0,
  1015. 'sessions_to_keep': 0,
  1016. 'conversations_to_keep': 0
  1017. }
  1018. }
  1019. # 预览按session删除
  1020. sessions_to_remove_count = 0
  1021. conversations_to_remove_count = 0
  1022. for session_id, session_data in cache.session_info.items():
  1023. session_preview = {
  1024. 'session_id': session_id,
  1025. 'start_time': session_data['start_time'].isoformat(),
  1026. 'conversation_count': len(session_data['conversations']),
  1027. 'conversations': []
  1028. }
  1029. # 添加conversation详情
  1030. for conv_id in session_data['conversations']:
  1031. if conv_id in cache.cache:
  1032. conv_data = cache.cache[conv_id]
  1033. session_preview['conversations'].append({
  1034. 'conversation_id': conv_id,
  1035. 'question': conv_data.get('question', '')[:50] + '...' if conv_data.get('question') else '',
  1036. 'start_time': cache.conversation_start_times.get(conv_id, '').isoformat() if cache.conversation_start_times.get(conv_id) else ''
  1037. })
  1038. if session_data['start_time'] < cutoff_time:
  1039. preview['will_be_removed']['sessions'].append(session_preview)
  1040. sessions_to_remove_count += 1
  1041. conversations_to_remove_count += len(session_data['conversations'])
  1042. else:
  1043. preview['will_be_kept']['sessions_count'] += 1
  1044. preview['will_be_kept']['conversations_count'] += len(session_data['conversations'])
  1045. # 更新摘要统计
  1046. preview['summary'] = {
  1047. 'sessions_to_remove': sessions_to_remove_count,
  1048. 'conversations_to_remove': conversations_to_remove_count,
  1049. 'sessions_to_keep': preview['will_be_kept']['sessions_count'],
  1050. 'conversations_to_keep': preview['will_be_kept']['conversations_count']
  1051. }
  1052. from common.result import success_response
  1053. return jsonify(success_response(
  1054. response_text=f"清理预览完成,将删除 {sessions_to_remove_count} 个会话和 {conversations_to_remove_count} 个对话",
  1055. data=preview
  1056. ))
  1057. except Exception as e:
  1058. from common.result import internal_error_response
  1059. return jsonify(internal_error_response(
  1060. response_text="预览清理操作失败,请稍后重试"
  1061. )), 500
  1062. @app.flask_app.route('/api/v0/cache_cleanup', methods=['POST'])
  1063. def cache_cleanup():
  1064. """清理功能:实际删除缓存 - 保持原功能"""
  1065. try:
  1066. req = request.get_json(force=True)
  1067. # 时间条件 - 支持三种方式
  1068. older_than_hours = req.get('older_than_hours')
  1069. older_than_days = req.get('older_than_days')
  1070. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1071. cache = app.cache
  1072. if not hasattr(cache, 'session_info'):
  1073. from common.result import service_unavailable_response
  1074. return jsonify(service_unavailable_response(
  1075. response_text="缓存不支持会话功能"
  1076. )), 503
  1077. # 计算截止时间
  1078. cutoff_time = None
  1079. time_condition = None
  1080. if older_than_hours:
  1081. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1082. time_condition = f"older_than_hours: {older_than_hours}"
  1083. elif older_than_days:
  1084. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1085. time_condition = f"older_than_days: {older_than_days}"
  1086. elif before_timestamp:
  1087. try:
  1088. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1089. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1090. time_condition = f"before_timestamp: {before_timestamp}"
  1091. except ValueError:
  1092. from common.result import validation_failed_response
  1093. return jsonify(validation_failed_response(
  1094. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1095. )), 422
  1096. else:
  1097. from common.result import bad_request_response
  1098. return jsonify(bad_request_response(
  1099. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1100. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1101. )), 400
  1102. cleanup_stats = {
  1103. 'time_condition': time_condition,
  1104. 'cutoff_time': cutoff_time.isoformat(),
  1105. 'sessions_removed': 0,
  1106. 'conversations_removed': 0,
  1107. 'sessions_kept': 0,
  1108. 'conversations_kept': 0,
  1109. 'removed_session_ids': [],
  1110. 'removed_conversation_ids': []
  1111. }
  1112. # 按session删除
  1113. sessions_to_remove = []
  1114. for session_id, session_data in cache.session_info.items():
  1115. if session_data['start_time'] < cutoff_time:
  1116. sessions_to_remove.append(session_id)
  1117. # 删除符合条件的sessions及其所有conversations
  1118. for session_id in sessions_to_remove:
  1119. session_data = cache.session_info[session_id]
  1120. conversations_in_session = session_data['conversations'].copy()
  1121. # 删除session中的所有conversations
  1122. for conv_id in conversations_in_session:
  1123. if conv_id in cache.cache:
  1124. del cache.cache[conv_id]
  1125. cleanup_stats['conversations_removed'] += 1
  1126. cleanup_stats['removed_conversation_ids'].append(conv_id)
  1127. # 清理conversation相关的时间记录
  1128. if hasattr(cache, 'conversation_start_times') and conv_id in cache.conversation_start_times:
  1129. del cache.conversation_start_times[conv_id]
  1130. if hasattr(cache, 'conversation_to_session') and conv_id in cache.conversation_to_session:
  1131. del cache.conversation_to_session[conv_id]
  1132. # 删除session记录
  1133. del cache.session_info[session_id]
  1134. cleanup_stats['sessions_removed'] += 1
  1135. cleanup_stats['removed_session_ids'].append(session_id)
  1136. # 统计保留的sessions和conversations
  1137. cleanup_stats['sessions_kept'] = len(cache.session_info)
  1138. cleanup_stats['conversations_kept'] = len(cache.cache)
  1139. from common.result import success_response
  1140. return jsonify(success_response(
  1141. response_text=f"缓存清理完成,删除了 {cleanup_stats['sessions_removed']} 个会话和 {cleanup_stats['conversations_removed']} 个对话",
  1142. data=cleanup_stats
  1143. ))
  1144. except Exception as e:
  1145. from common.result import internal_error_response
  1146. return jsonify(internal_error_response(
  1147. response_text="缓存清理失败,请稍后重试"
  1148. )), 500
  1149. @app.flask_app.route('/api/v0/training_error_question_sql', methods=['POST'])
  1150. def training_error_question_sql():
  1151. """
  1152. 存储错误的question-sql对到error_sql集合中
  1153. 此API将接收的错误question/sql pair写入到error_sql集合中,用于记录和分析错误的SQL查询。
  1154. Args:
  1155. question (str, required): 用户问题
  1156. sql (str, required): 对应的错误SQL查询语句
  1157. Returns:
  1158. JSON: 包含训练ID和成功消息的响应
  1159. """
  1160. try:
  1161. data = request.get_json()
  1162. question = data.get('question')
  1163. sql = data.get('sql')
  1164. print(f"[DEBUG] 接收到错误SQL训练请求: question={question}, sql={sql}")
  1165. if not question or not sql:
  1166. from common.result import bad_request_response
  1167. missing_params = []
  1168. if not question:
  1169. missing_params.append("question")
  1170. if not sql:
  1171. missing_params.append("sql")
  1172. return jsonify(bad_request_response(
  1173. response_text="question和sql参数都是必需的",
  1174. missing_params=missing_params
  1175. )), 400
  1176. # 使用vn实例的train_error_sql方法存储错误SQL
  1177. id = vn.train_error_sql(question=question, sql=sql)
  1178. print(f"[INFO] 成功存储错误SQL,ID: {id}")
  1179. from common.result import success_response
  1180. return jsonify(success_response(
  1181. response_text="错误SQL对已成功存储",
  1182. data={
  1183. "id": id,
  1184. "message": "错误SQL对已成功存储到error_sql集合"
  1185. }
  1186. ))
  1187. except Exception as e:
  1188. print(f"[ERROR] 存储错误SQL失败: {str(e)}")
  1189. from common.result import internal_error_response
  1190. return jsonify(internal_error_response(
  1191. response_text="存储错误SQL失败,请稍后重试"
  1192. )), 500
  1193. # 前端JavaScript示例 - 如何维持会话
  1194. """
  1195. // 前端需要维护一个会话ID
  1196. class ChatSession {
  1197. constructor() {
  1198. // 从localStorage获取或创建新的会话ID
  1199. this.sessionId = localStorage.getItem('chat_session_id') || this.generateSessionId();
  1200. localStorage.setItem('chat_session_id', this.sessionId);
  1201. }
  1202. generateSessionId() {
  1203. return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  1204. }
  1205. async askQuestion(question) {
  1206. const response = await fetch('/api/v0/ask', {
  1207. method: 'POST',
  1208. headers: {
  1209. 'Content-Type': 'application/json',
  1210. },
  1211. body: JSON.stringify({
  1212. question: question,
  1213. session_id: this.sessionId // 关键:传递会话ID
  1214. })
  1215. });
  1216. return await response.json();
  1217. }
  1218. // 开始新会话
  1219. startNewSession() {
  1220. this.sessionId = this.generateSessionId();
  1221. localStorage.setItem('chat_session_id', this.sessionId);
  1222. }
  1223. }
  1224. // 使用示例
  1225. const chatSession = new ChatSession();
  1226. chatSession.askQuestion("各年龄段客户的流失率如何?");
  1227. """
  1228. print("正在启动Flask应用: http://localhost:8084")
  1229. app.run(host="0.0.0.0", port=8084, debug=True)