citu_app.py 93 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278
  1. # 给dataops 对话助手返回结果
  2. from vanna.flask import VannaFlaskApp
  3. from core.vanna_llm_factory import create_vanna_instance
  4. from flask import request, jsonify
  5. import pandas as pd
  6. import common.result as result
  7. from datetime import datetime, timedelta
  8. from common.session_aware_cache import WebSessionAwareMemoryCache
  9. from app_config import API_MAX_RETURN_ROWS, ENABLE_RESULT_SUMMARY
  10. import re
  11. import chainlit as cl
  12. import json
  13. from flask import session # 添加session导入
  14. from common.redis_conversation_manager import RedisConversationManager # 添加Redis对话管理器导入
  15. from common.qa_feedback_manager import QAFeedbackManager
  16. from common.result import success_response, bad_request_response, not_found_response, internal_error_response
  17. from common.result import ( # 统一导入所有需要的响应函数
  18. bad_request_response, service_unavailable_response,
  19. agent_success_response, agent_error_response,
  20. internal_error_response, success_response,
  21. validation_failed_response
  22. )
  23. from app_config import ( # 添加Redis相关配置导入
  24. USER_MAX_CONVERSATIONS,
  25. CONVERSATION_CONTEXT_COUNT,
  26. DEFAULT_ANONYMOUS_USER,
  27. ENABLE_QUESTION_ANSWER_CACHE
  28. )
  29. # 设置默认的最大返回行数
  30. DEFAULT_MAX_RETURN_ROWS = 200
  31. MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
  32. vn = create_vanna_instance()
  33. # 创建带时间戳的缓存
  34. timestamped_cache = WebSessionAwareMemoryCache()
  35. # 实例化 VannaFlaskApp,使用自定义缓存
  36. app = VannaFlaskApp(
  37. vn,
  38. cache=timestamped_cache, # 使用带时间戳的缓存
  39. title="辞图智能数据问答平台",
  40. logo = "https://www.citupro.com/img/logo-black-2.png",
  41. subtitle="让 AI 为你写 SQL",
  42. chart=False,
  43. allow_llm_to_see_data=True,
  44. ask_results_correct=True,
  45. followup_questions=True,
  46. debug=True
  47. )
  48. # 创建Redis对话管理器实例
  49. redis_conversation_manager = RedisConversationManager()
  50. # 修改ask接口,支持前端传递session_id
  51. @app.flask_app.route('/api/v0/ask', methods=['POST'])
  52. def ask_full():
  53. req = request.get_json(force=True)
  54. question = req.get("question", None)
  55. browser_session_id = req.get("session_id", None) # 前端传递的会话ID
  56. if not question:
  57. from common.result import bad_request_response
  58. return jsonify(bad_request_response(
  59. response_text="缺少必需参数:question",
  60. missing_params=["question"]
  61. )), 400
  62. # 如果使用WebSessionAwareMemoryCache
  63. if hasattr(app.cache, 'generate_id_with_browser_session') and browser_session_id:
  64. # 这里需要修改vanna的ask方法来支持传递session_id
  65. # 或者预先调用generate_id来建立会话关联
  66. conversation_id = app.cache.generate_id_with_browser_session(
  67. question=question,
  68. browser_session_id=browser_session_id
  69. )
  70. try:
  71. sql, df, _ = vn.ask(
  72. question=question,
  73. print_results=False,
  74. visualize=False,
  75. allow_llm_to_see_data=True
  76. )
  77. # 关键:检查是否有LLM解释性文本(无法生成SQL的情况)
  78. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  79. # 在解释性文本末尾添加提示语
  80. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  81. # 使用标准化错误响应
  82. from common.result import validation_failed_response
  83. return jsonify(validation_failed_response(
  84. response_text=explanation_message
  85. )), 422 # 修改HTTP状态码为422
  86. # 如果sql为None但没有解释性文本,返回通用错误
  87. if sql is None:
  88. from common.result import validation_failed_response
  89. return jsonify(validation_failed_response(
  90. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  91. )), 422
  92. # 处理返回数据 - 使用新的query_result结构
  93. query_result = {
  94. "rows": [],
  95. "columns": [],
  96. "row_count": 0,
  97. "is_limited": False,
  98. "total_row_count": 0
  99. }
  100. summary = None
  101. if isinstance(df, pd.DataFrame):
  102. query_result["columns"] = list(df.columns)
  103. if not df.empty:
  104. total_rows = len(df)
  105. limited_df = df.head(MAX_RETURN_ROWS)
  106. query_result["rows"] = limited_df.to_dict(orient="records")
  107. query_result["row_count"] = len(limited_df)
  108. query_result["total_row_count"] = total_rows
  109. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  110. # 生成数据摘要(可通过配置控制,仅在有数据时生成)
  111. if ENABLE_RESULT_SUMMARY:
  112. try:
  113. summary = vn.generate_summary(question=question, df=df)
  114. print(f"[INFO] 成功生成摘要: {summary}")
  115. except Exception as e:
  116. print(f"[WARNING] 生成摘要失败: {str(e)}")
  117. summary = None
  118. # 构建返回数据
  119. response_data = {
  120. "sql": sql,
  121. "query_result": query_result,
  122. "conversation_id": conversation_id if 'conversation_id' in locals() else None,
  123. "session_id": browser_session_id
  124. }
  125. # 添加摘要(如果启用且生成成功)
  126. if ENABLE_RESULT_SUMMARY and summary is not None:
  127. response_data["summary"] = summary
  128. response_data["response"] = summary # 同时添加response字段
  129. from common.result import success_response
  130. return jsonify(success_response(
  131. response_text="查询执行完成" if summary is None else None,
  132. data=response_data
  133. ))
  134. except Exception as e:
  135. print(f"[ERROR] ask_full执行失败: {str(e)}")
  136. # 即使发生异常,也检查是否有业务层面的解释
  137. if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  138. # 在解释性文本末尾添加提示语
  139. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  140. from common.result import validation_failed_response
  141. return jsonify(validation_failed_response(
  142. response_text=explanation_message
  143. )), 422
  144. else:
  145. # 技术错误,使用500错误码
  146. from common.result import internal_error_response
  147. return jsonify(internal_error_response(
  148. response_text="查询处理失败,请稍后重试"
  149. )), 500
  150. @app.flask_app.route('/api/v0/citu_run_sql', methods=['POST'])
  151. def citu_run_sql():
  152. req = request.get_json(force=True)
  153. sql = req.get('sql')
  154. if not sql:
  155. from common.result import bad_request_response
  156. return jsonify(bad_request_response(
  157. response_text="缺少必需参数:sql",
  158. missing_params=["sql"]
  159. )), 400
  160. try:
  161. df = vn.run_sql(sql)
  162. # 处理返回数据 - 使用新的query_result结构
  163. query_result = {
  164. "rows": [],
  165. "columns": [],
  166. "row_count": 0,
  167. "is_limited": False,
  168. "total_row_count": 0
  169. }
  170. if isinstance(df, pd.DataFrame):
  171. query_result["columns"] = list(df.columns)
  172. if not df.empty:
  173. total_rows = len(df)
  174. limited_df = df.head(MAX_RETURN_ROWS)
  175. query_result["rows"] = limited_df.to_dict(orient="records")
  176. query_result["row_count"] = len(limited_df)
  177. query_result["total_row_count"] = total_rows
  178. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  179. from common.result import success_response
  180. return jsonify(success_response(
  181. response_text=f"SQL执行完成,共返回 {query_result['total_row_count']} 条记录" +
  182. (f",已限制显示前 {MAX_RETURN_ROWS} 条" if query_result["is_limited"] else ""),
  183. data={
  184. "sql": sql,
  185. "query_result": query_result
  186. }
  187. ))
  188. except Exception as e:
  189. print(f"[ERROR] citu_run_sql执行失败: {str(e)}")
  190. from common.result import internal_error_response
  191. return jsonify(internal_error_response(
  192. response_text=f"SQL执行失败,请检查SQL语句是否正确"
  193. )), 500
  194. @app.flask_app.route('/api/v0/ask_cached', methods=['POST'])
  195. def ask_cached():
  196. """
  197. 带缓存功能的智能查询接口
  198. 支持会话管理和结果缓存,提高查询效率
  199. """
  200. req = request.get_json(force=True)
  201. question = req.get("question", None)
  202. browser_session_id = req.get("session_id", None)
  203. if not question:
  204. from common.result import bad_request_response
  205. return jsonify(bad_request_response(
  206. response_text="缺少必需参数:question",
  207. missing_params=["question"]
  208. )), 400
  209. try:
  210. # 生成conversation_id
  211. # 调试:查看generate_id的实际行为
  212. print(f"[DEBUG] 输入问题: '{question}'")
  213. conversation_id = app.cache.generate_id(question=question)
  214. print(f"[DEBUG] 生成的conversation_id: {conversation_id}")
  215. # 再次用相同问题测试
  216. conversation_id2 = app.cache.generate_id(question=question)
  217. print(f"[DEBUG] 再次生成的conversation_id: {conversation_id2}")
  218. print(f"[DEBUG] 两次ID是否相同: {conversation_id == conversation_id2}")
  219. # 检查缓存
  220. cached_sql = app.cache.get(id=conversation_id, field="sql")
  221. if cached_sql is not None:
  222. # 缓存命中
  223. print(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
  224. sql = cached_sql
  225. df = app.cache.get(id=conversation_id, field="df")
  226. summary = app.cache.get(id=conversation_id, field="summary")
  227. else:
  228. # 缓存未命中,执行新查询
  229. print(f"[CACHE MISS] 执行新查询: {conversation_id}")
  230. sql, df, _ = vn.ask(
  231. question=question,
  232. print_results=False,
  233. visualize=False,
  234. allow_llm_to_see_data=True
  235. )
  236. # 检查是否有LLM解释性文本(无法生成SQL的情况)
  237. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  238. # 在解释性文本末尾添加提示语
  239. explanation_message = vn.last_llm_explanation + "请尝试用其它方式提问。"
  240. from common.result import validation_failed_response
  241. return jsonify(validation_failed_response(
  242. response_text=explanation_message
  243. )), 422
  244. # 如果sql为None但没有解释性文本,返回通用错误
  245. if sql is None:
  246. from common.result import validation_failed_response
  247. return jsonify(validation_failed_response(
  248. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  249. )), 422
  250. # 缓存结果
  251. app.cache.set(id=conversation_id, field="question", value=question)
  252. app.cache.set(id=conversation_id, field="sql", value=sql)
  253. app.cache.set(id=conversation_id, field="df", value=df)
  254. # 生成并缓存摘要(可通过配置控制,仅在有数据时生成)
  255. summary = None
  256. if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
  257. try:
  258. summary = vn.generate_summary(question=question, df=df)
  259. print(f"[INFO] 成功生成摘要: {summary}")
  260. except Exception as e:
  261. print(f"[WARNING] 生成摘要失败: {str(e)}")
  262. summary = None
  263. app.cache.set(id=conversation_id, field="summary", value=summary)
  264. # 处理返回数据 - 使用新的query_result结构
  265. query_result = {
  266. "rows": [],
  267. "columns": [],
  268. "row_count": 0,
  269. "is_limited": False,
  270. "total_row_count": 0
  271. }
  272. if isinstance(df, pd.DataFrame):
  273. query_result["columns"] = list(df.columns)
  274. if not df.empty:
  275. total_rows = len(df)
  276. limited_df = df.head(MAX_RETURN_ROWS)
  277. query_result["rows"] = limited_df.to_dict(orient="records")
  278. query_result["row_count"] = len(limited_df)
  279. query_result["total_row_count"] = total_rows
  280. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  281. # 构建返回数据
  282. response_data = {
  283. "sql": sql,
  284. "query_result": query_result,
  285. "conversation_id": conversation_id,
  286. "session_id": browser_session_id,
  287. "cached": cached_sql is not None # 标识是否来自缓存
  288. }
  289. # 添加摘要(如果启用且生成成功)
  290. if ENABLE_RESULT_SUMMARY and summary is not None:
  291. response_data["summary"] = summary
  292. response_data["response"] = summary # 同时添加response字段
  293. from common.result import success_response
  294. return jsonify(success_response(
  295. response_text="查询执行完成" if summary is None else None,
  296. data=response_data
  297. ))
  298. except Exception as e:
  299. print(f"[ERROR] ask_cached执行失败: {str(e)}")
  300. from common.result import internal_error_response
  301. return jsonify(internal_error_response(
  302. response_text="查询处理失败,请稍后重试"
  303. )), 500
  304. @app.flask_app.route('/api/v0/citu_train_question_sql', methods=['POST'])
  305. def citu_train_question_sql():
  306. """
  307. 训练问题-SQL对接口
  308. 此API将接收的question/sql pair写入到training库中,用于训练和改进AI模型。
  309. 支持仅传入SQL或同时传入问题和SQL进行训练。
  310. Args:
  311. question (str, optional): 用户问题
  312. sql (str, required): 对应的SQL查询语句
  313. Returns:
  314. JSON: 包含训练ID和成功消息的响应
  315. """
  316. try:
  317. req = request.get_json(force=True)
  318. question = req.get('question')
  319. sql = req.get('sql')
  320. if not sql:
  321. from common.result import bad_request_response
  322. return jsonify(bad_request_response(
  323. response_text="缺少必需参数:sql",
  324. missing_params=["sql"]
  325. )), 400
  326. # 正确的调用方式:同时传递question和sql
  327. if question:
  328. training_id = vn.train(question=question, sql=sql)
  329. print(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
  330. else:
  331. training_id = vn.train(sql=sql)
  332. print(f"训练成功,训练ID为:{training_id},SQL:{sql}")
  333. from common.result import success_response
  334. return jsonify(success_response(
  335. response_text="问题-SQL对训练成功",
  336. data={
  337. "training_id": training_id,
  338. "message": "Question-SQL pair trained successfully"
  339. }
  340. ))
  341. except Exception as e:
  342. from common.result import internal_error_response
  343. return jsonify(internal_error_response(
  344. response_text="训练失败,请稍后重试"
  345. )), 500
  346. # ============ LangGraph Agent 集成 ============
  347. # 全局Agent实例(单例模式)
  348. citu_langraph_agent = None
  349. def get_citu_langraph_agent():
  350. """获取LangGraph Agent实例(懒加载)"""
  351. global citu_langraph_agent
  352. if citu_langraph_agent is None:
  353. try:
  354. from agent.citu_agent import CituLangGraphAgent
  355. print("[CITU_APP] 开始创建LangGraph Agent实例...")
  356. citu_langraph_agent = CituLangGraphAgent()
  357. print("[CITU_APP] LangGraph Agent实例创建成功")
  358. except ImportError as e:
  359. print(f"[CRITICAL] Agent模块导入失败: {str(e)}")
  360. print("[CRITICAL] 请检查agent模块是否存在以及依赖是否正确安装")
  361. raise Exception(f"Agent模块导入失败: {str(e)}")
  362. except Exception as e:
  363. print(f"[CRITICAL] LangGraph Agent实例创建失败: {str(e)}")
  364. print(f"[CRITICAL] 错误类型: {type(e).__name__}")
  365. # 提供更有用的错误信息
  366. if "config" in str(e).lower():
  367. print("[CRITICAL] 可能是配置文件问题,请检查配置")
  368. elif "llm" in str(e).lower():
  369. print("[CRITICAL] 可能是LLM连接问题,请检查LLM配置")
  370. elif "tool" in str(e).lower():
  371. print("[CRITICAL] 可能是工具加载问题,请检查工具模块")
  372. raise Exception(f"Agent初始化失败: {str(e)}")
  373. return citu_langraph_agent
  374. @app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
  375. def ask_agent():
  376. """
  377. 支持对话上下文的ask_agent API - 修正版
  378. """
  379. req = request.get_json(force=True)
  380. question = req.get("question", None)
  381. browser_session_id = req.get("session_id", None)
  382. # 新增参数解析
  383. user_id_input = req.get("user_id", None)
  384. conversation_id_input = req.get("conversation_id", None)
  385. continue_conversation = req.get("continue_conversation", False)
  386. # 新增:路由模式参数解析和验证
  387. api_routing_mode = req.get("routing_mode", None)
  388. VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
  389. if not question:
  390. return jsonify(bad_request_response(
  391. response_text="缺少必需参数:question",
  392. missing_params=["question"]
  393. )), 400
  394. # 验证routing_mode参数
  395. if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
  396. return jsonify(bad_request_response(
  397. response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
  398. invalid_params=["routing_mode"]
  399. )), 400
  400. try:
  401. # 1. 获取登录用户ID(修正:在函数中获取session信息)
  402. login_user_id = session.get('user_id') if 'user_id' in session else None
  403. # 2. 智能ID解析(修正:传入登录用户ID)
  404. user_id = redis_conversation_manager.resolve_user_id(
  405. user_id_input, browser_session_id, request.remote_addr, login_user_id
  406. )
  407. conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
  408. user_id, conversation_id_input, continue_conversation
  409. )
  410. # 3. 获取上下文和上下文类型(提前到缓存检查之前)
  411. context = redis_conversation_manager.get_context(conversation_id)
  412. # 获取上下文类型:从最后一条助手消息的metadata中获取类型
  413. context_type = None
  414. if context:
  415. try:
  416. # 获取最后一条助手消息的metadata
  417. messages = redis_conversation_manager.get_messages(conversation_id, limit=10)
  418. for message in reversed(messages): # 从最新的开始找
  419. if message.get("role") == "assistant":
  420. metadata = message.get("metadata", {})
  421. context_type = metadata.get("type")
  422. if context_type:
  423. print(f"[AGENT_API] 检测到上下文类型: {context_type}")
  424. break
  425. except Exception as e:
  426. print(f"[WARNING] 获取上下文类型失败: {str(e)}")
  427. # 4. 检查缓存(新逻辑:放宽使用条件,严控存储条件)
  428. cached_answer = redis_conversation_manager.get_cached_answer(question, context)
  429. if cached_answer:
  430. print(f"[AGENT_API] 使用缓存答案")
  431. # 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
  432. cached_response_type = cached_answer.get("type", "UNKNOWN")
  433. if cached_response_type == "DATABASE":
  434. # DATABASE类型:按优先级选择内容
  435. if cached_answer.get("response"):
  436. # 优先级1:错误或解释性回复(如SQL生成失败)
  437. assistant_response = cached_answer.get("response")
  438. elif cached_answer.get("summary"):
  439. # 优先级2:查询成功的摘要
  440. assistant_response = cached_answer.get("summary")
  441. elif cached_answer.get("query_result"):
  442. # 优先级3:构造简单描述
  443. query_result = cached_answer.get("query_result")
  444. row_count = query_result.get("row_count", 0)
  445. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  446. else:
  447. # 异常情况
  448. assistant_response = "数据库查询已处理。"
  449. else:
  450. # CHAT类型:直接使用response
  451. assistant_response = cached_answer.get("response", "")
  452. # 更新对话历史
  453. redis_conversation_manager.save_message(conversation_id, "user", question)
  454. redis_conversation_manager.save_message(
  455. conversation_id, "assistant",
  456. assistant_response,
  457. metadata={"from_cache": True}
  458. )
  459. # 添加对话信息到缓存结果
  460. cached_answer["conversation_id"] = conversation_id
  461. cached_answer["user_id"] = user_id
  462. cached_answer["from_cache"] = True
  463. cached_answer.update(conversation_status)
  464. # 使用agent_success_response返回标准格式
  465. return jsonify(agent_success_response(
  466. response_type=cached_answer.get("type", "UNKNOWN"),
  467. response=cached_answer.get("response", ""), # 修正:使用response而不是response_text
  468. sql=cached_answer.get("sql"),
  469. query_result=cached_answer.get("query_result"),
  470. summary=cached_answer.get("summary"),
  471. session_id=browser_session_id,
  472. execution_path=cached_answer.get("execution_path", []),
  473. classification_info=cached_answer.get("classification_info", {}),
  474. conversation_id=conversation_id,
  475. user_id=user_id,
  476. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  477. context_used=bool(context),
  478. from_cache=True,
  479. conversation_status=conversation_status["status"],
  480. conversation_message=conversation_status["message"],
  481. requested_conversation_id=conversation_status.get("requested_id")
  482. ))
  483. # 5. 保存用户消息
  484. redis_conversation_manager.save_message(conversation_id, "user", question)
  485. # 6. 构建带上下文的问题
  486. if context:
  487. enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
  488. print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
  489. else:
  490. enhanced_question = question
  491. print(f"[AGENT_API] 新对话,无上下文")
  492. # 7. 确定最终使用的路由模式(优先级逻辑)
  493. if api_routing_mode:
  494. # API传了参数,优先使用
  495. effective_routing_mode = api_routing_mode
  496. print(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
  497. else:
  498. # API没传参数,使用配置文件
  499. try:
  500. from app_config import QUESTION_ROUTING_MODE
  501. effective_routing_mode = QUESTION_ROUTING_MODE
  502. print(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
  503. except ImportError:
  504. effective_routing_mode = "hybrid"
  505. print(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
  506. # 8. 现有Agent处理逻辑(修改为传递路由模式)
  507. try:
  508. agent = get_citu_langraph_agent()
  509. except Exception as e:
  510. print(f"[CRITICAL] Agent初始化失败: {str(e)}")
  511. return jsonify(service_unavailable_response(
  512. response_text="AI服务暂时不可用,请稍后重试",
  513. can_retry=True
  514. )), 503
  515. agent_result = agent.process_question(
  516. question=enhanced_question, # 使用增强后的问题
  517. session_id=browser_session_id,
  518. context_type=context_type, # 传递上下文类型
  519. routing_mode=effective_routing_mode # 新增:传递路由模式
  520. )
  521. # 8. 处理Agent结果
  522. if agent_result.get("success", False):
  523. # 修正:直接从agent_result获取字段,因为它就是final_response
  524. response_type = agent_result.get("type", "UNKNOWN")
  525. response_text = agent_result.get("response", "")
  526. sql = agent_result.get("sql")
  527. query_result = agent_result.get("query_result")
  528. summary = agent_result.get("summary")
  529. execution_path = agent_result.get("execution_path", [])
  530. classification_info = agent_result.get("classification_info", {})
  531. # 确定助手回复内容的优先级
  532. if response_type == "DATABASE":
  533. # DATABASE类型:按优先级选择内容
  534. if response_text:
  535. # 优先级1:错误或解释性回复(如SQL生成失败)
  536. assistant_response = response_text
  537. elif summary:
  538. # 优先级2:查询成功的摘要
  539. assistant_response = summary
  540. elif query_result:
  541. # 优先级3:构造简单描述
  542. row_count = query_result.get("row_count", 0)
  543. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  544. else:
  545. # 异常情况
  546. assistant_response = "数据库查询已处理。"
  547. else:
  548. # CHAT类型:直接使用response
  549. assistant_response = response_text
  550. # 保存助手回复
  551. redis_conversation_manager.save_message(
  552. conversation_id, "assistant", assistant_response,
  553. metadata={
  554. "type": response_type,
  555. "sql": sql,
  556. "execution_path": execution_path
  557. }
  558. )
  559. # 缓存成功的答案(新逻辑:只缓存无上下文的问答)
  560. # 直接缓存agent_result,它已经包含所有需要的字段
  561. redis_conversation_manager.cache_answer(question, agent_result, context)
  562. # 使用agent_success_response的正确方式
  563. return jsonify(agent_success_response(
  564. response_type=response_type,
  565. response=response_text, # 修正:使用response而不是response_text
  566. sql=sql,
  567. query_result=query_result,
  568. summary=summary,
  569. session_id=browser_session_id,
  570. execution_path=execution_path,
  571. classification_info=classification_info,
  572. conversation_id=conversation_id,
  573. user_id=user_id,
  574. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  575. context_used=bool(context),
  576. from_cache=False,
  577. conversation_status=conversation_status["status"],
  578. conversation_message=conversation_status["message"],
  579. requested_conversation_id=conversation_status.get("requested_id"),
  580. routing_mode_used=effective_routing_mode, # 新增:实际使用的路由模式
  581. routing_mode_source="api" if api_routing_mode else "config" # 新增:路由模式来源
  582. ))
  583. else:
  584. # 错误处理(修正:确保使用现有的错误响应格式)
  585. error_message = agent_result.get("error", "Agent处理失败")
  586. error_code = agent_result.get("error_code", 500)
  587. return jsonify(agent_error_response(
  588. response_text=error_message,
  589. error_type="agent_processing_failed",
  590. code=error_code,
  591. session_id=browser_session_id,
  592. conversation_id=conversation_id,
  593. user_id=user_id
  594. )), error_code
  595. except Exception as e:
  596. print(f"[ERROR] ask_agent执行失败: {str(e)}")
  597. return jsonify(internal_error_response(
  598. response_text="查询处理失败,请稍后重试"
  599. )), 500
  600. @app.flask_app.route('/api/v0/agent_health', methods=['GET'])
  601. def agent_health():
  602. """
  603. Agent健康检查接口
  604. 响应格式:
  605. {
  606. "success": true/false,
  607. "code": 200/503,
  608. "message": "healthy/degraded/unhealthy",
  609. "data": {
  610. "status": "healthy/degraded/unhealthy",
  611. "test_result": true/false,
  612. "workflow_compiled": true/false,
  613. "tools_count": 4,
  614. "message": "详细信息",
  615. "timestamp": "2024-01-01T12:00:00",
  616. "checks": {
  617. "agent_creation": true/false,
  618. "tools_import": true/false,
  619. "llm_connection": true/false,
  620. "classifier_ready": true/false
  621. }
  622. }
  623. }
  624. """
  625. try:
  626. # 基础健康检查
  627. health_data = {
  628. "status": "unknown",
  629. "test_result": False,
  630. "workflow_compiled": False,
  631. "tools_count": 0,
  632. "message": "",
  633. "timestamp": datetime.now().isoformat(),
  634. "checks": {
  635. "agent_creation": False,
  636. "tools_import": False,
  637. "llm_connection": False,
  638. "classifier_ready": False
  639. }
  640. }
  641. # 检查1: Agent创建
  642. try:
  643. agent = get_citu_langraph_agent()
  644. health_data["checks"]["agent_creation"] = True
  645. health_data["workflow_compiled"] = agent.workflow is not None
  646. health_data["tools_count"] = len(agent.tools) if hasattr(agent, 'tools') else 0
  647. except Exception as e:
  648. health_data["message"] = f"Agent创建失败: {str(e)}"
  649. from common.result import health_error_response
  650. return jsonify(health_error_response(
  651. status="unhealthy",
  652. **health_data
  653. )), 503
  654. # 检查2: 工具导入
  655. try:
  656. from agent.tools import TOOLS
  657. health_data["checks"]["tools_import"] = len(TOOLS) > 0
  658. except Exception as e:
  659. health_data["message"] = f"工具导入失败: {str(e)}"
  660. # 检查3: LLM连接(简单测试)
  661. try:
  662. from agent.utils import get_compatible_llm
  663. llm = get_compatible_llm()
  664. health_data["checks"]["llm_connection"] = llm is not None
  665. except Exception as e:
  666. health_data["message"] = f"LLM连接失败: {str(e)}"
  667. # 检查4: 分类器准备
  668. try:
  669. from agent.classifier import QuestionClassifier
  670. classifier = QuestionClassifier()
  671. health_data["checks"]["classifier_ready"] = True
  672. except Exception as e:
  673. health_data["message"] = f"分类器失败: {str(e)}"
  674. # 检查5: 完整流程测试(可选)
  675. try:
  676. if all(health_data["checks"].values()):
  677. test_result = agent.health_check()
  678. health_data["test_result"] = test_result.get("status") == "healthy"
  679. health_data["status"] = test_result.get("status", "unknown")
  680. health_data["message"] = test_result.get("message", "健康检查完成")
  681. else:
  682. health_data["status"] = "degraded"
  683. health_data["message"] = "部分组件异常"
  684. except Exception as e:
  685. health_data["status"] = "degraded"
  686. health_data["message"] = f"完整测试失败: {str(e)}"
  687. # 根据状态返回相应的HTTP代码 - 使用标准化健康检查响应
  688. from common.result import health_success_response, health_error_response
  689. if health_data["status"] == "healthy":
  690. return jsonify(health_success_response(**health_data))
  691. elif health_data["status"] == "degraded":
  692. return jsonify(health_error_response(status="degraded", **health_data)), 503
  693. else:
  694. return jsonify(health_error_response(status="unhealthy", **health_data)), 503
  695. except Exception as e:
  696. print(f"[ERROR] 健康检查异常: {str(e)}")
  697. from common.result import internal_error_response
  698. return jsonify(internal_error_response(
  699. response_text="健康检查失败,请稍后重试"
  700. )), 500
  701. # ==================== 日常管理API ====================
  702. @app.flask_app.route('/api/v0/cache_overview', methods=['GET'])
  703. def cache_overview():
  704. """日常管理:轻量概览 - 合并原cache_inspect的核心功能"""
  705. try:
  706. cache = app.cache
  707. result_data = {
  708. 'overview_summary': {
  709. 'total_conversations': 0,
  710. 'total_sessions': 0,
  711. 'query_time': datetime.now().isoformat()
  712. },
  713. 'recent_conversations': [], # 最近的对话
  714. 'session_summary': [] # 会话摘要
  715. }
  716. if hasattr(cache, 'cache') and isinstance(cache.cache, dict):
  717. result_data['overview_summary']['total_conversations'] = len(cache.cache)
  718. # 获取会话信息
  719. if hasattr(cache, 'get_all_sessions'):
  720. all_sessions = cache.get_all_sessions()
  721. result_data['overview_summary']['total_sessions'] = len(all_sessions)
  722. # 会话摘要(按最近活动排序)
  723. session_list = []
  724. for session_id, session_data in all_sessions.items():
  725. session_summary = {
  726. 'session_id': session_id,
  727. 'start_time': session_data['start_time'].isoformat(),
  728. 'conversation_count': session_data.get('conversation_count', 0),
  729. 'duration_seconds': session_data.get('session_duration_seconds', 0),
  730. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  731. 'is_active': (datetime.now() - session_data.get('last_activity', session_data['start_time'])).total_seconds() < 1800 # 30分钟内活跃
  732. }
  733. session_list.append(session_summary)
  734. # 按最后活动时间排序
  735. session_list.sort(key=lambda x: x['last_activity'], reverse=True)
  736. result_data['session_summary'] = session_list
  737. # 最近的对话(最多显示10个)
  738. conversation_list = []
  739. for conversation_id, conversation_data in cache.cache.items():
  740. conversation_start_time = cache.conversation_start_times.get(conversation_id)
  741. conversation_info = {
  742. 'conversation_id': conversation_id,
  743. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  744. 'session_id': cache.conversation_to_session.get(conversation_id),
  745. 'has_question': 'question' in conversation_data,
  746. 'has_sql': 'sql' in conversation_data,
  747. 'has_data': 'df' in conversation_data and conversation_data['df'] is not None,
  748. 'question_preview': conversation_data.get('question', '')[:80] + '...' if len(conversation_data.get('question', '')) > 80 else conversation_data.get('question', ''),
  749. }
  750. # 计算对话持续时间
  751. if conversation_start_time:
  752. duration = datetime.now() - conversation_start_time
  753. conversation_info['conversation_duration_seconds'] = duration.total_seconds()
  754. conversation_list.append(conversation_info)
  755. # 按对话开始时间排序,显示最新的10个
  756. conversation_list.sort(key=lambda x: x['conversation_start_time'] or '', reverse=True)
  757. result_data['recent_conversations'] = conversation_list[:10]
  758. from common.result import success_response
  759. return jsonify(success_response(
  760. response_text="缓存概览查询完成",
  761. data=result_data
  762. ))
  763. except Exception as e:
  764. from common.result import internal_error_response
  765. return jsonify(internal_error_response(
  766. response_text="获取缓存概览失败,请稍后重试"
  767. )), 500
  768. @app.flask_app.route('/api/v0/cache_stats', methods=['GET'])
  769. def cache_stats():
  770. """日常管理:统计信息 - 合并原session_stats和cache_stats功能"""
  771. try:
  772. cache = app.cache
  773. current_time = datetime.now()
  774. stats = {
  775. 'basic_stats': {
  776. 'total_sessions': len(getattr(cache, 'session_info', {})),
  777. 'total_conversations': len(getattr(cache, 'cache', {})),
  778. 'active_sessions': 0, # 最近30分钟有活动
  779. 'average_conversations_per_session': 0
  780. },
  781. 'time_distribution': {
  782. 'sessions': {
  783. 'last_1_hour': 0,
  784. 'last_6_hours': 0,
  785. 'last_24_hours': 0,
  786. 'last_7_days': 0,
  787. 'older': 0
  788. },
  789. 'conversations': {
  790. 'last_1_hour': 0,
  791. 'last_6_hours': 0,
  792. 'last_24_hours': 0,
  793. 'last_7_days': 0,
  794. 'older': 0
  795. }
  796. },
  797. 'session_details': [],
  798. 'time_ranges': {
  799. 'oldest_session': None,
  800. 'newest_session': None,
  801. 'oldest_conversation': None,
  802. 'newest_conversation': None
  803. }
  804. }
  805. # 会话统计
  806. if hasattr(cache, 'session_info'):
  807. session_times = []
  808. total_conversations = 0
  809. for session_id, session_data in cache.session_info.items():
  810. start_time = session_data['start_time']
  811. session_times.append(start_time)
  812. conversation_count = len(session_data.get('conversations', []))
  813. total_conversations += conversation_count
  814. # 检查活跃状态
  815. last_activity = session_data.get('last_activity', session_data['start_time'])
  816. if (current_time - last_activity).total_seconds() < 1800:
  817. stats['basic_stats']['active_sessions'] += 1
  818. # 时间分布统计
  819. age_hours = (current_time - start_time).total_seconds() / 3600
  820. if age_hours <= 1:
  821. stats['time_distribution']['sessions']['last_1_hour'] += 1
  822. elif age_hours <= 6:
  823. stats['time_distribution']['sessions']['last_6_hours'] += 1
  824. elif age_hours <= 24:
  825. stats['time_distribution']['sessions']['last_24_hours'] += 1
  826. elif age_hours <= 168: # 7 days
  827. stats['time_distribution']['sessions']['last_7_days'] += 1
  828. else:
  829. stats['time_distribution']['sessions']['older'] += 1
  830. # 会话详细信息
  831. session_duration = current_time - start_time
  832. stats['session_details'].append({
  833. 'session_id': session_id,
  834. 'start_time': start_time.isoformat(),
  835. 'last_activity': last_activity.isoformat(),
  836. 'conversation_count': conversation_count,
  837. 'duration_seconds': session_duration.total_seconds(),
  838. 'duration_formatted': str(session_duration),
  839. 'is_active': (current_time - last_activity).total_seconds() < 1800,
  840. 'browser_session_id': session_data.get('browser_session_id')
  841. })
  842. # 计算平均值
  843. if len(cache.session_info) > 0:
  844. stats['basic_stats']['average_conversations_per_session'] = total_conversations / len(cache.session_info)
  845. # 时间范围
  846. if session_times:
  847. stats['time_ranges']['oldest_session'] = min(session_times).isoformat()
  848. stats['time_ranges']['newest_session'] = max(session_times).isoformat()
  849. # 对话统计
  850. if hasattr(cache, 'conversation_start_times'):
  851. conversation_times = []
  852. for conv_time in cache.conversation_start_times.values():
  853. conversation_times.append(conv_time)
  854. age_hours = (current_time - conv_time).total_seconds() / 3600
  855. if age_hours <= 1:
  856. stats['time_distribution']['conversations']['last_1_hour'] += 1
  857. elif age_hours <= 6:
  858. stats['time_distribution']['conversations']['last_6_hours'] += 1
  859. elif age_hours <= 24:
  860. stats['time_distribution']['conversations']['last_24_hours'] += 1
  861. elif age_hours <= 168:
  862. stats['time_distribution']['conversations']['last_7_days'] += 1
  863. else:
  864. stats['time_distribution']['conversations']['older'] += 1
  865. if conversation_times:
  866. stats['time_ranges']['oldest_conversation'] = min(conversation_times).isoformat()
  867. stats['time_ranges']['newest_conversation'] = max(conversation_times).isoformat()
  868. # 按最近活动排序会话详情
  869. stats['session_details'].sort(key=lambda x: x['last_activity'], reverse=True)
  870. from common.result import success_response
  871. return jsonify(success_response(
  872. response_text="缓存统计信息查询完成",
  873. data=stats
  874. ))
  875. except Exception as e:
  876. from common.result import internal_error_response
  877. return jsonify(internal_error_response(
  878. response_text="获取缓存统计失败,请稍后重试"
  879. )), 500
  880. # ==================== 高级功能API ====================
  881. @app.flask_app.route('/api/v0/cache_export', methods=['GET'])
  882. def cache_export():
  883. """高级功能:完整导出 - 保持原cache_raw_export的完整功能"""
  884. try:
  885. cache = app.cache
  886. # 验证缓存的实际结构
  887. if not hasattr(cache, 'cache'):
  888. from common.result import internal_error_response
  889. return jsonify(internal_error_response(
  890. response_text="缓存对象结构异常,请联系系统管理员"
  891. )), 500
  892. if not isinstance(cache.cache, dict):
  893. from common.result import internal_error_response
  894. return jsonify(internal_error_response(
  895. response_text="缓存数据类型异常,请联系系统管理员"
  896. )), 500
  897. # 定义JSON序列化辅助函数
  898. def make_json_serializable(obj):
  899. """将对象转换为JSON可序列化的格式"""
  900. if obj is None:
  901. return None
  902. elif isinstance(obj, (str, int, float, bool)):
  903. return obj
  904. elif isinstance(obj, (list, tuple)):
  905. return [make_json_serializable(item) for item in obj]
  906. elif isinstance(obj, dict):
  907. return {str(k): make_json_serializable(v) for k, v in obj.items()}
  908. elif hasattr(obj, 'isoformat'): # datetime objects
  909. return obj.isoformat()
  910. elif hasattr(obj, 'item'): # numpy scalars
  911. return obj.item()
  912. elif hasattr(obj, 'tolist'): # numpy arrays
  913. return obj.tolist()
  914. elif hasattr(obj, '__dict__'): # pandas dtypes and other objects
  915. return str(obj)
  916. else:
  917. return str(obj)
  918. # 获取完整的原始缓存数据
  919. raw_cache = cache.cache
  920. # 获取会话和对话时间信息
  921. conversation_times = getattr(cache, 'conversation_start_times', {})
  922. session_info = getattr(cache, 'session_info', {})
  923. conversation_to_session = getattr(cache, 'conversation_to_session', {})
  924. export_data = {
  925. 'export_metadata': {
  926. 'export_time': datetime.now().isoformat(),
  927. 'total_conversations': len(raw_cache),
  928. 'total_sessions': len(session_info),
  929. 'cache_type': type(cache).__name__,
  930. 'cache_object_info': str(cache),
  931. 'has_session_times': bool(session_info),
  932. 'has_conversation_times': bool(conversation_times)
  933. },
  934. 'session_info': {
  935. session_id: {
  936. 'start_time': session_data['start_time'].isoformat(),
  937. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  938. 'conversations': session_data['conversations'],
  939. 'conversation_count': len(session_data['conversations']),
  940. 'browser_session_id': session_data.get('browser_session_id'),
  941. 'user_info': session_data.get('user_info', {})
  942. }
  943. for session_id, session_data in session_info.items()
  944. },
  945. 'conversation_times': {
  946. conversation_id: start_time.isoformat()
  947. for conversation_id, start_time in conversation_times.items()
  948. },
  949. 'conversation_to_session_mapping': conversation_to_session,
  950. 'conversations': {}
  951. }
  952. # 处理每个对话的完整数据
  953. for conversation_id, conversation_data in raw_cache.items():
  954. # 获取时间信息
  955. conversation_start_time = conversation_times.get(conversation_id)
  956. session_id = conversation_to_session.get(conversation_id)
  957. session_start_time = None
  958. if session_id and session_id in session_info:
  959. session_start_time = session_info[session_id]['start_time']
  960. processed_conversation = {
  961. 'conversation_id': conversation_id,
  962. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  963. 'session_id': session_id,
  964. 'session_start_time': session_start_time.isoformat() if session_start_time else None,
  965. 'field_count': len(conversation_data),
  966. 'fields': {}
  967. }
  968. # 添加时间计算
  969. if conversation_start_time:
  970. conversation_duration = datetime.now() - conversation_start_time
  971. processed_conversation['conversation_duration_seconds'] = conversation_duration.total_seconds()
  972. processed_conversation['conversation_duration_formatted'] = str(conversation_duration)
  973. if session_start_time:
  974. session_duration = datetime.now() - session_start_time
  975. processed_conversation['session_duration_seconds'] = session_duration.total_seconds()
  976. processed_conversation['session_duration_formatted'] = str(session_duration)
  977. # 处理每个字段,确保JSON序列化安全
  978. for field_name, field_value in conversation_data.items():
  979. field_info = {
  980. 'field_name': field_name,
  981. 'data_type': type(field_value).__name__,
  982. 'is_none': field_value is None
  983. }
  984. try:
  985. if field_value is None:
  986. field_info['value'] = None
  987. elif field_name in ['conversation_start_time', 'session_start_time']:
  988. # 处理时间字段
  989. field_info['content'] = make_json_serializable(field_value)
  990. elif field_name == 'df' and field_value is not None:
  991. # DataFrame的安全处理
  992. if hasattr(field_value, 'to_dict'):
  993. # 安全地处理dtypes
  994. try:
  995. dtypes_dict = {}
  996. for col, dtype in field_value.dtypes.items():
  997. dtypes_dict[col] = str(dtype)
  998. except Exception:
  999. dtypes_dict = {"error": "无法序列化dtypes"}
  1000. # 安全地处理内存使用
  1001. try:
  1002. memory_usage = field_value.memory_usage(deep=True)
  1003. memory_dict = {}
  1004. for idx, usage in memory_usage.items():
  1005. memory_dict[str(idx)] = int(usage) if hasattr(usage, 'item') else int(usage)
  1006. except Exception:
  1007. memory_dict = {"error": "无法获取内存使用信息"}
  1008. field_info.update({
  1009. 'dataframe_info': {
  1010. 'shape': list(field_value.shape),
  1011. 'columns': list(field_value.columns),
  1012. 'dtypes': dtypes_dict,
  1013. 'index_info': {
  1014. 'type': type(field_value.index).__name__,
  1015. 'length': len(field_value.index)
  1016. }
  1017. },
  1018. 'data': make_json_serializable(field_value.to_dict('records')),
  1019. 'memory_usage': memory_dict
  1020. })
  1021. else:
  1022. field_info['value'] = str(field_value)
  1023. field_info['note'] = 'not_standard_dataframe'
  1024. elif field_name == 'fig_json':
  1025. # 图表JSON数据处理
  1026. if isinstance(field_value, str):
  1027. try:
  1028. import json
  1029. parsed_fig = json.loads(field_value)
  1030. field_info.update({
  1031. 'json_valid': True,
  1032. 'json_size_bytes': len(field_value),
  1033. 'plotly_structure': {
  1034. 'has_data': 'data' in parsed_fig,
  1035. 'has_layout': 'layout' in parsed_fig,
  1036. 'data_traces_count': len(parsed_fig.get('data', [])),
  1037. },
  1038. 'raw_json': field_value
  1039. })
  1040. except json.JSONDecodeError:
  1041. field_info.update({
  1042. 'json_valid': False,
  1043. 'raw_content': str(field_value)
  1044. })
  1045. else:
  1046. field_info['value'] = make_json_serializable(field_value)
  1047. elif field_name == 'followup_questions':
  1048. # 后续问题列表
  1049. field_info.update({
  1050. 'content': make_json_serializable(field_value)
  1051. })
  1052. elif field_name in ['question', 'sql', 'summary']:
  1053. # 文本字段
  1054. if isinstance(field_value, str):
  1055. field_info.update({
  1056. 'text_length': len(field_value),
  1057. 'content': field_value
  1058. })
  1059. else:
  1060. field_info['value'] = make_json_serializable(field_value)
  1061. else:
  1062. # 未知字段的安全处理
  1063. field_info['content'] = make_json_serializable(field_value)
  1064. except Exception as e:
  1065. field_info.update({
  1066. 'processing_error': str(e),
  1067. 'fallback_value': str(field_value)[:500] + '...' if len(str(field_value)) > 500 else str(field_value)
  1068. })
  1069. processed_conversation['fields'][field_name] = field_info
  1070. export_data['conversations'][conversation_id] = processed_conversation
  1071. # 添加缓存统计信息
  1072. field_frequency = {}
  1073. data_types_found = set()
  1074. total_dataframes = 0
  1075. total_questions = 0
  1076. for conv_data in export_data['conversations'].values():
  1077. for field_name, field_info in conv_data['fields'].items():
  1078. field_frequency[field_name] = field_frequency.get(field_name, 0) + 1
  1079. data_types_found.add(field_info['data_type'])
  1080. if field_name == 'df' and not field_info['is_none']:
  1081. total_dataframes += 1
  1082. if field_name == 'question' and not field_info['is_none']:
  1083. total_questions += 1
  1084. export_data['cache_statistics'] = {
  1085. 'field_frequency': field_frequency,
  1086. 'data_types_found': list(data_types_found),
  1087. 'total_dataframes': total_dataframes,
  1088. 'total_questions': total_questions,
  1089. 'has_session_timing': 'session_start_time' in field_frequency,
  1090. 'has_conversation_timing': 'conversation_start_time' in field_frequency
  1091. }
  1092. from common.result import success_response
  1093. return jsonify(success_response(
  1094. response_text="缓存数据导出完成",
  1095. data=export_data
  1096. ))
  1097. except Exception as e:
  1098. import traceback
  1099. error_details = {
  1100. 'error_message': str(e),
  1101. 'error_type': type(e).__name__,
  1102. 'traceback': traceback.format_exc()
  1103. }
  1104. from common.result import internal_error_response
  1105. return jsonify(internal_error_response(
  1106. response_text="导出缓存失败,请稍后重试"
  1107. )), 500
  1108. # ==================== 清理功能API ====================
  1109. @app.flask_app.route('/api/v0/cache_preview_cleanup', methods=['POST'])
  1110. def cache_preview_cleanup():
  1111. """清理功能:预览删除操作 - 保持原功能"""
  1112. try:
  1113. req = request.get_json(force=True)
  1114. # 时间条件 - 支持三种方式
  1115. older_than_hours = req.get('older_than_hours')
  1116. older_than_days = req.get('older_than_days')
  1117. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1118. cache = app.cache
  1119. # 计算截止时间
  1120. cutoff_time = None
  1121. time_condition = None
  1122. if older_than_hours:
  1123. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1124. time_condition = f"older_than_hours: {older_than_hours}"
  1125. elif older_than_days:
  1126. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1127. time_condition = f"older_than_days: {older_than_days}"
  1128. elif before_timestamp:
  1129. try:
  1130. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1131. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1132. time_condition = f"before_timestamp: {before_timestamp}"
  1133. except ValueError:
  1134. from common.result import validation_failed_response
  1135. return jsonify(validation_failed_response(
  1136. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1137. )), 422
  1138. else:
  1139. from common.result import bad_request_response
  1140. return jsonify(bad_request_response(
  1141. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1142. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1143. )), 400
  1144. preview = {
  1145. 'time_condition': time_condition,
  1146. 'cutoff_time': cutoff_time.isoformat(),
  1147. 'will_be_removed': {
  1148. 'sessions': []
  1149. },
  1150. 'will_be_kept': {
  1151. 'sessions_count': 0,
  1152. 'conversations_count': 0
  1153. },
  1154. 'summary': {
  1155. 'sessions_to_remove': 0,
  1156. 'conversations_to_remove': 0,
  1157. 'sessions_to_keep': 0,
  1158. 'conversations_to_keep': 0
  1159. }
  1160. }
  1161. # 预览按session删除
  1162. sessions_to_remove_count = 0
  1163. conversations_to_remove_count = 0
  1164. for session_id, session_data in cache.session_info.items():
  1165. session_preview = {
  1166. 'session_id': session_id,
  1167. 'start_time': session_data['start_time'].isoformat(),
  1168. 'conversation_count': len(session_data['conversations']),
  1169. 'conversations': []
  1170. }
  1171. # 添加conversation详情
  1172. for conv_id in session_data['conversations']:
  1173. if conv_id in cache.cache:
  1174. conv_data = cache.cache[conv_id]
  1175. session_preview['conversations'].append({
  1176. 'conversation_id': conv_id,
  1177. 'question': conv_data.get('question', '')[:50] + '...' if conv_data.get('question') else '',
  1178. 'start_time': cache.conversation_start_times.get(conv_id, '').isoformat() if cache.conversation_start_times.get(conv_id) else ''
  1179. })
  1180. if session_data['start_time'] < cutoff_time:
  1181. preview['will_be_removed']['sessions'].append(session_preview)
  1182. sessions_to_remove_count += 1
  1183. conversations_to_remove_count += len(session_data['conversations'])
  1184. else:
  1185. preview['will_be_kept']['sessions_count'] += 1
  1186. preview['will_be_kept']['conversations_count'] += len(session_data['conversations'])
  1187. # 更新摘要统计
  1188. preview['summary'] = {
  1189. 'sessions_to_remove': sessions_to_remove_count,
  1190. 'conversations_to_remove': conversations_to_remove_count,
  1191. 'sessions_to_keep': preview['will_be_kept']['sessions_count'],
  1192. 'conversations_to_keep': preview['will_be_kept']['conversations_count']
  1193. }
  1194. from common.result import success_response
  1195. return jsonify(success_response(
  1196. response_text=f"清理预览完成,将删除 {sessions_to_remove_count} 个会话和 {conversations_to_remove_count} 个对话",
  1197. data=preview
  1198. ))
  1199. except Exception as e:
  1200. from common.result import internal_error_response
  1201. return jsonify(internal_error_response(
  1202. response_text="预览清理操作失败,请稍后重试"
  1203. )), 500
  1204. @app.flask_app.route('/api/v0/cache_cleanup', methods=['POST'])
  1205. def cache_cleanup():
  1206. """清理功能:实际删除缓存 - 保持原功能"""
  1207. try:
  1208. req = request.get_json(force=True)
  1209. # 时间条件 - 支持三种方式
  1210. older_than_hours = req.get('older_than_hours')
  1211. older_than_days = req.get('older_than_days')
  1212. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1213. cache = app.cache
  1214. if not hasattr(cache, 'session_info'):
  1215. from common.result import service_unavailable_response
  1216. return jsonify(service_unavailable_response(
  1217. response_text="缓存不支持会话功能"
  1218. )), 503
  1219. # 计算截止时间
  1220. cutoff_time = None
  1221. time_condition = None
  1222. if older_than_hours:
  1223. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1224. time_condition = f"older_than_hours: {older_than_hours}"
  1225. elif older_than_days:
  1226. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1227. time_condition = f"older_than_days: {older_than_days}"
  1228. elif before_timestamp:
  1229. try:
  1230. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1231. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1232. time_condition = f"before_timestamp: {before_timestamp}"
  1233. except ValueError:
  1234. from common.result import validation_failed_response
  1235. return jsonify(validation_failed_response(
  1236. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1237. )), 422
  1238. else:
  1239. from common.result import bad_request_response
  1240. return jsonify(bad_request_response(
  1241. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1242. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1243. )), 400
  1244. cleanup_stats = {
  1245. 'time_condition': time_condition,
  1246. 'cutoff_time': cutoff_time.isoformat(),
  1247. 'sessions_removed': 0,
  1248. 'conversations_removed': 0,
  1249. 'sessions_kept': 0,
  1250. 'conversations_kept': 0,
  1251. 'removed_session_ids': [],
  1252. 'removed_conversation_ids': []
  1253. }
  1254. # 按session删除
  1255. sessions_to_remove = []
  1256. for session_id, session_data in cache.session_info.items():
  1257. if session_data['start_time'] < cutoff_time:
  1258. sessions_to_remove.append(session_id)
  1259. # 删除符合条件的sessions及其所有conversations
  1260. for session_id in sessions_to_remove:
  1261. session_data = cache.session_info[session_id]
  1262. conversations_in_session = session_data['conversations'].copy()
  1263. # 删除session中的所有conversations
  1264. for conv_id in conversations_in_session:
  1265. if conv_id in cache.cache:
  1266. del cache.cache[conv_id]
  1267. cleanup_stats['conversations_removed'] += 1
  1268. cleanup_stats['removed_conversation_ids'].append(conv_id)
  1269. # 清理conversation相关的时间记录
  1270. if hasattr(cache, 'conversation_start_times') and conv_id in cache.conversation_start_times:
  1271. del cache.conversation_start_times[conv_id]
  1272. if hasattr(cache, 'conversation_to_session') and conv_id in cache.conversation_to_session:
  1273. del cache.conversation_to_session[conv_id]
  1274. # 删除session记录
  1275. del cache.session_info[session_id]
  1276. cleanup_stats['sessions_removed'] += 1
  1277. cleanup_stats['removed_session_ids'].append(session_id)
  1278. # 统计保留的sessions和conversations
  1279. cleanup_stats['sessions_kept'] = len(cache.session_info)
  1280. cleanup_stats['conversations_kept'] = len(cache.cache)
  1281. from common.result import success_response
  1282. return jsonify(success_response(
  1283. response_text=f"缓存清理完成,删除了 {cleanup_stats['sessions_removed']} 个会话和 {cleanup_stats['conversations_removed']} 个对话",
  1284. data=cleanup_stats
  1285. ))
  1286. except Exception as e:
  1287. from common.result import internal_error_response
  1288. return jsonify(internal_error_response(
  1289. response_text="缓存清理失败,请稍后重试"
  1290. )), 500
  1291. @app.flask_app.route('/api/v0/training_error_question_sql', methods=['POST'])
  1292. def training_error_question_sql():
  1293. """
  1294. 存储错误的question-sql对到error_sql集合中
  1295. 此API将接收的错误question/sql pair写入到error_sql集合中,用于记录和分析错误的SQL查询。
  1296. Args:
  1297. question (str, required): 用户问题
  1298. sql (str, required): 对应的错误SQL查询语句
  1299. Returns:
  1300. JSON: 包含训练ID和成功消息的响应
  1301. """
  1302. try:
  1303. data = request.get_json()
  1304. question = data.get('question')
  1305. sql = data.get('sql')
  1306. print(f"[DEBUG] 接收到错误SQL训练请求: question={question}, sql={sql}")
  1307. if not question or not sql:
  1308. from common.result import bad_request_response
  1309. missing_params = []
  1310. if not question:
  1311. missing_params.append("question")
  1312. if not sql:
  1313. missing_params.append("sql")
  1314. return jsonify(bad_request_response(
  1315. response_text="question和sql参数都是必需的",
  1316. missing_params=missing_params
  1317. )), 400
  1318. # 使用vn实例的train_error_sql方法存储错误SQL
  1319. id = vn.train_error_sql(question=question, sql=sql)
  1320. print(f"[INFO] 成功存储错误SQL,ID: {id}")
  1321. from common.result import success_response
  1322. return jsonify(success_response(
  1323. response_text="错误SQL对已成功存储",
  1324. data={
  1325. "id": id,
  1326. "message": "错误SQL对已成功存储到error_sql集合"
  1327. }
  1328. ))
  1329. except Exception as e:
  1330. print(f"[ERROR] 存储错误SQL失败: {str(e)}")
  1331. from common.result import internal_error_response
  1332. return jsonify(internal_error_response(
  1333. response_text="存储错误SQL失败,请稍后重试"
  1334. )), 500
  1335. # ==================== Redis对话管理API ====================
  1336. @app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
  1337. def get_user_conversations(user_id: str):
  1338. """获取用户的对话列表(按时间倒序)"""
  1339. try:
  1340. limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
  1341. conversations = redis_conversation_manager.get_conversations(user_id, limit)
  1342. return jsonify(success_response(
  1343. response_text="获取用户对话列表成功",
  1344. data={
  1345. "user_id": user_id,
  1346. "conversations": conversations,
  1347. "total_count": len(conversations)
  1348. }
  1349. ))
  1350. except Exception as e:
  1351. return jsonify(internal_error_response(
  1352. response_text="获取对话列表失败,请稍后重试"
  1353. )), 500
  1354. @app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
  1355. def get_conversation_messages(conversation_id: str):
  1356. """获取特定对话的消息历史"""
  1357. try:
  1358. limit = request.args.get('limit', type=int) # 可选参数
  1359. messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
  1360. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1361. return jsonify(success_response(
  1362. response_text="获取对话消息成功",
  1363. data={
  1364. "conversation_id": conversation_id,
  1365. "conversation_meta": meta,
  1366. "messages": messages,
  1367. "message_count": len(messages)
  1368. }
  1369. ))
  1370. except Exception as e:
  1371. return jsonify(internal_error_response(
  1372. response_text="获取对话消息失败"
  1373. )), 500
  1374. @app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
  1375. def get_conversation_context(conversation_id: str):
  1376. """获取对话上下文(格式化用于LLM)"""
  1377. try:
  1378. count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
  1379. context = redis_conversation_manager.get_context(conversation_id, count)
  1380. return jsonify(success_response(
  1381. response_text="获取对话上下文成功",
  1382. data={
  1383. "conversation_id": conversation_id,
  1384. "context": context,
  1385. "context_message_count": count
  1386. }
  1387. ))
  1388. except Exception as e:
  1389. return jsonify(internal_error_response(
  1390. response_text="获取对话上下文失败"
  1391. )), 500
  1392. @app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
  1393. def conversation_stats():
  1394. """获取对话系统统计信息"""
  1395. try:
  1396. stats = redis_conversation_manager.get_stats()
  1397. return jsonify(success_response(
  1398. response_text="获取统计信息成功",
  1399. data=stats
  1400. ))
  1401. except Exception as e:
  1402. return jsonify(internal_error_response(
  1403. response_text="获取统计信息失败,请稍后重试"
  1404. )), 500
  1405. @app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
  1406. def conversation_cleanup():
  1407. """手动清理过期对话"""
  1408. try:
  1409. redis_conversation_manager.cleanup_expired_conversations()
  1410. return jsonify(success_response(
  1411. response_text="对话清理完成"
  1412. ))
  1413. except Exception as e:
  1414. return jsonify(internal_error_response(
  1415. response_text="对话清理失败,请稍后重试"
  1416. )), 500
  1417. @app.flask_app.route('/api/v0/user/<user_id>/conversations/full', methods=['GET'])
  1418. def get_user_conversations_with_messages(user_id: str):
  1419. """
  1420. 获取用户的完整对话数据(包含所有消息)
  1421. 一次性返回用户的所有对话和每个对话下的消息历史
  1422. Args:
  1423. user_id: 用户ID(路径参数)
  1424. conversation_limit: 对话数量限制(查询参数,可选,不传则返回所有对话)
  1425. message_limit: 每个对话的消息数限制(查询参数,可选,不传则返回所有消息)
  1426. Returns:
  1427. 包含用户所有对话和消息的完整数据
  1428. """
  1429. try:
  1430. # 获取可选参数,不传递时使用None(返回所有记录)
  1431. conversation_limit = request.args.get('conversation_limit', type=int)
  1432. message_limit = request.args.get('message_limit', type=int)
  1433. # 获取用户的对话列表
  1434. conversations = redis_conversation_manager.get_conversations(user_id, conversation_limit)
  1435. # 为每个对话获取消息历史
  1436. full_conversations = []
  1437. total_messages = 0
  1438. for conversation in conversations:
  1439. conversation_id = conversation['conversation_id']
  1440. # 获取对话消息
  1441. messages = redis_conversation_manager.get_conversation_messages(
  1442. conversation_id, message_limit
  1443. )
  1444. # 获取对话元数据
  1445. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1446. # 组合完整数据
  1447. full_conversation = {
  1448. **conversation, # 基础对话信息
  1449. 'meta': meta, # 对话元数据
  1450. 'messages': messages, # 消息列表
  1451. 'message_count': len(messages)
  1452. }
  1453. full_conversations.append(full_conversation)
  1454. total_messages += len(messages)
  1455. return jsonify(success_response(
  1456. response_text="获取用户完整对话数据成功",
  1457. data={
  1458. "user_id": user_id,
  1459. "conversations": full_conversations,
  1460. "total_conversations": len(full_conversations),
  1461. "total_messages": total_messages,
  1462. "conversation_limit_applied": conversation_limit,
  1463. "message_limit_applied": message_limit,
  1464. "query_time": datetime.now().isoformat()
  1465. }
  1466. ))
  1467. except Exception as e:
  1468. print(f"[ERROR] 获取用户完整对话数据失败: {str(e)}")
  1469. return jsonify(internal_error_response(
  1470. response_text="获取用户对话数据失败,请稍后重试"
  1471. )), 500
  1472. # ==================== Embedding缓存管理接口 ====================
  1473. @app.flask_app.route('/api/v0/embedding_cache_stats', methods=['GET'])
  1474. def embedding_cache_stats():
  1475. """获取embedding缓存统计信息"""
  1476. try:
  1477. from common.embedding_cache_manager import get_embedding_cache_manager
  1478. cache_manager = get_embedding_cache_manager()
  1479. stats = cache_manager.get_cache_stats()
  1480. return jsonify(success_response(
  1481. response_text="获取embedding缓存统计成功",
  1482. data=stats
  1483. ))
  1484. except Exception as e:
  1485. print(f"[ERROR] 获取embedding缓存统计失败: {str(e)}")
  1486. return jsonify(internal_error_response(
  1487. response_text="获取embedding缓存统计失败,请稍后重试"
  1488. )), 500
  1489. @app.flask_app.route('/api/v0/embedding_cache_cleanup', methods=['POST'])
  1490. def embedding_cache_cleanup():
  1491. """清空所有embedding缓存"""
  1492. try:
  1493. from common.embedding_cache_manager import get_embedding_cache_manager
  1494. cache_manager = get_embedding_cache_manager()
  1495. if not cache_manager.is_available():
  1496. return jsonify(internal_error_response(
  1497. response_text="Embedding缓存功能未启用或不可用"
  1498. )), 400
  1499. success = cache_manager.clear_all_cache()
  1500. if success:
  1501. return jsonify(success_response(
  1502. response_text="所有embedding缓存已清空",
  1503. data={"cleared": True}
  1504. ))
  1505. else:
  1506. return jsonify(internal_error_response(
  1507. response_text="清空embedding缓存失败"
  1508. )), 500
  1509. except Exception as e:
  1510. print(f"[ERROR] 清空embedding缓存失败: {str(e)}")
  1511. return jsonify(internal_error_response(
  1512. response_text="清空embedding缓存失败,请稍后重试"
  1513. )), 500
  1514. # ==================== QA反馈系统接口 ====================
  1515. # 全局反馈管理器实例
  1516. qa_feedback_manager = None
  1517. def get_qa_feedback_manager():
  1518. """获取QA反馈管理器实例(懒加载)- 复用Vanna连接版本"""
  1519. global qa_feedback_manager
  1520. if qa_feedback_manager is None:
  1521. try:
  1522. # 优先尝试复用vanna连接
  1523. vanna_instance = None
  1524. try:
  1525. # 尝试获取现有的vanna实例
  1526. if 'get_citu_langraph_agent' in globals():
  1527. agent = get_citu_langraph_agent()
  1528. if hasattr(agent, 'vn'):
  1529. vanna_instance = agent.vn
  1530. elif 'vn' in globals():
  1531. vanna_instance = vn
  1532. else:
  1533. print("[INFO] 未找到可用的vanna实例,将创建新的数据库连接")
  1534. except Exception as e:
  1535. print(f"[INFO] 获取vanna实例失败: {e},将创建新的数据库连接")
  1536. vanna_instance = None
  1537. qa_feedback_manager = QAFeedbackManager(vanna_instance=vanna_instance)
  1538. print("[CITU_APP] QA反馈管理器实例创建成功")
  1539. except Exception as e:
  1540. print(f"[CRITICAL] QA反馈管理器创建失败: {str(e)}")
  1541. raise Exception(f"QA反馈管理器初始化失败: {str(e)}")
  1542. return qa_feedback_manager
  1543. @app.flask_app.route('/api/v0/qa_feedback/query', methods=['POST'])
  1544. def qa_feedback_query():
  1545. """
  1546. 查询反馈记录API
  1547. 支持分页、筛选和排序功能
  1548. """
  1549. try:
  1550. req = request.get_json(force=True)
  1551. # 解析参数,设置默认值
  1552. page = req.get('page', 1)
  1553. page_size = req.get('page_size', 20)
  1554. is_thumb_up = req.get('is_thumb_up')
  1555. create_time_start = req.get('create_time_start')
  1556. create_time_end = req.get('create_time_end')
  1557. is_in_training_data = req.get('is_in_training_data')
  1558. sort_by = req.get('sort_by', 'create_time')
  1559. sort_order = req.get('sort_order', 'desc')
  1560. # 参数验证
  1561. if page < 1:
  1562. return jsonify(bad_request_response(
  1563. response_text="页码必须大于0",
  1564. invalid_params=["page"]
  1565. )), 400
  1566. if page_size < 1 or page_size > 100:
  1567. return jsonify(bad_request_response(
  1568. response_text="每页大小必须在1-100之间",
  1569. invalid_params=["page_size"]
  1570. )), 400
  1571. # 获取反馈管理器并查询
  1572. manager = get_qa_feedback_manager()
  1573. records, total = manager.query_feedback(
  1574. page=page,
  1575. page_size=page_size,
  1576. is_thumb_up=is_thumb_up,
  1577. create_time_start=create_time_start,
  1578. create_time_end=create_time_end,
  1579. is_in_training_data=is_in_training_data,
  1580. sort_by=sort_by,
  1581. sort_order=sort_order
  1582. )
  1583. # 计算分页信息
  1584. total_pages = (total + page_size - 1) // page_size
  1585. return jsonify(success_response(
  1586. response_text=f"查询成功,共找到 {total} 条记录",
  1587. data={
  1588. "records": records,
  1589. "pagination": {
  1590. "page": page,
  1591. "page_size": page_size,
  1592. "total": total,
  1593. "total_pages": total_pages,
  1594. "has_next": page < total_pages,
  1595. "has_prev": page > 1
  1596. }
  1597. }
  1598. ))
  1599. except Exception as e:
  1600. print(f"[ERROR] qa_feedback_query执行失败: {str(e)}")
  1601. return jsonify(internal_error_response(
  1602. response_text="查询反馈记录失败,请稍后重试"
  1603. )), 500
  1604. @app.flask_app.route('/api/v0/qa_feedback/delete/<int:feedback_id>', methods=['DELETE'])
  1605. def qa_feedback_delete(feedback_id):
  1606. """
  1607. 删除反馈记录API
  1608. """
  1609. try:
  1610. manager = get_qa_feedback_manager()
  1611. success = manager.delete_feedback(feedback_id)
  1612. if success:
  1613. return jsonify(success_response(
  1614. response_text=f"反馈记录删除成功",
  1615. data={"deleted_id": feedback_id}
  1616. ))
  1617. else:
  1618. return jsonify(not_found_response(
  1619. response_text=f"反馈记录不存在 (ID: {feedback_id})"
  1620. )), 404
  1621. except Exception as e:
  1622. print(f"[ERROR] qa_feedback_delete执行失败: {str(e)}")
  1623. return jsonify(internal_error_response(
  1624. response_text="删除反馈记录失败,请稍后重试"
  1625. )), 500
  1626. @app.flask_app.route('/api/v0/qa_feedback/update/<int:feedback_id>', methods=['PUT'])
  1627. def qa_feedback_update(feedback_id):
  1628. """
  1629. 更新反馈记录API
  1630. """
  1631. try:
  1632. req = request.get_json(force=True)
  1633. # 提取允许更新的字段
  1634. allowed_fields = ['question', 'sql', 'is_thumb_up', 'user_id', 'is_in_training_data']
  1635. update_data = {}
  1636. for field in allowed_fields:
  1637. if field in req:
  1638. update_data[field] = req[field]
  1639. if not update_data:
  1640. return jsonify(bad_request_response(
  1641. response_text="没有提供有效的更新字段",
  1642. missing_params=allowed_fields
  1643. )), 400
  1644. manager = get_qa_feedback_manager()
  1645. success = manager.update_feedback(feedback_id, **update_data)
  1646. if success:
  1647. return jsonify(success_response(
  1648. response_text="反馈记录更新成功",
  1649. data={
  1650. "updated_id": feedback_id,
  1651. "updated_fields": list(update_data.keys())
  1652. }
  1653. ))
  1654. else:
  1655. return jsonify(not_found_response(
  1656. response_text=f"反馈记录不存在或无变化 (ID: {feedback_id})"
  1657. )), 404
  1658. except Exception as e:
  1659. print(f"[ERROR] qa_feedback_update执行失败: {str(e)}")
  1660. return jsonify(internal_error_response(
  1661. response_text="更新反馈记录失败,请稍后重试"
  1662. )), 500
  1663. @app.flask_app.route('/api/v0/qa_feedback/add_to_training', methods=['POST'])
  1664. def qa_feedback_add_to_training():
  1665. """
  1666. 将反馈记录添加到训练数据集API
  1667. 支持混合批量处理:正向反馈加入SQL训练集,负向反馈加入error_sql训练集
  1668. """
  1669. try:
  1670. req = request.get_json(force=True)
  1671. feedback_ids = req.get('feedback_ids', [])
  1672. if not feedback_ids or not isinstance(feedback_ids, list):
  1673. return jsonify(bad_request_response(
  1674. response_text="缺少有效的反馈ID列表",
  1675. missing_params=["feedback_ids"]
  1676. )), 400
  1677. manager = get_qa_feedback_manager()
  1678. # 获取反馈记录
  1679. records = manager.get_feedback_by_ids(feedback_ids)
  1680. if not records:
  1681. return jsonify(not_found_response(
  1682. response_text="未找到任何有效的反馈记录"
  1683. )), 404
  1684. # 分别处理正向和负向反馈
  1685. positive_count = 0 # 正向训练计数
  1686. negative_count = 0 # 负向训练计数
  1687. already_trained_count = 0 # 已训练计数
  1688. error_count = 0 # 错误计数
  1689. successfully_trained_ids = [] # 成功训练的ID列表
  1690. for record in records:
  1691. try:
  1692. # 检查是否已经在训练数据中
  1693. if record['is_in_training_data']:
  1694. already_trained_count += 1
  1695. continue
  1696. if record['is_thumb_up']:
  1697. # 正向反馈 - 加入标准SQL训练集
  1698. training_id = vn.train(
  1699. question=record['question'],
  1700. sql=record['sql']
  1701. )
  1702. positive_count += 1
  1703. print(f"[TRAINING] 正向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1704. else:
  1705. # 负向反馈 - 加入错误SQL训练集
  1706. training_id = vn.train_error_sql(
  1707. question=record['question'],
  1708. sql=record['sql']
  1709. )
  1710. negative_count += 1
  1711. print(f"[TRAINING] 负向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1712. successfully_trained_ids.append(record['id'])
  1713. except Exception as e:
  1714. print(f"[ERROR] 训练失败 - 反馈ID: {record['id']}, 错误: {e}")
  1715. error_count += 1
  1716. # 更新训练状态
  1717. if successfully_trained_ids:
  1718. updated_count = manager.mark_training_status(successfully_trained_ids, True)
  1719. print(f"[TRAINING] 批量更新训练状态完成,影响 {updated_count} 条记录")
  1720. # 构建响应
  1721. total_processed = positive_count + negative_count + already_trained_count + error_count
  1722. return jsonify(success_response(
  1723. response_text=f"训练数据添加完成,成功处理 {positive_count + negative_count} 条记录",
  1724. data={
  1725. "summary": {
  1726. "total_requested": len(feedback_ids),
  1727. "total_processed": total_processed,
  1728. "positive_trained": positive_count,
  1729. "negative_trained": negative_count,
  1730. "already_trained": already_trained_count,
  1731. "errors": error_count
  1732. },
  1733. "successfully_trained_ids": successfully_trained_ids,
  1734. "training_details": {
  1735. "sql_training_count": positive_count,
  1736. "error_sql_training_count": negative_count
  1737. }
  1738. }
  1739. ))
  1740. except Exception as e:
  1741. print(f"[ERROR] qa_feedback_add_to_training执行失败: {str(e)}")
  1742. return jsonify(internal_error_response(
  1743. response_text="添加训练数据失败,请稍后重试"
  1744. )), 500
  1745. @app.flask_app.route('/api/v0/qa_feedback/add', methods=['POST'])
  1746. def qa_feedback_add():
  1747. """
  1748. 添加反馈记录API
  1749. 用于前端直接创建反馈记录
  1750. """
  1751. try:
  1752. req = request.get_json(force=True)
  1753. question = req.get('question')
  1754. sql = req.get('sql')
  1755. is_thumb_up = req.get('is_thumb_up')
  1756. user_id = req.get('user_id', 'guest')
  1757. # 参数验证
  1758. if not question:
  1759. return jsonify(bad_request_response(
  1760. response_text="缺少必需参数:question",
  1761. missing_params=["question"]
  1762. )), 400
  1763. if not sql:
  1764. return jsonify(bad_request_response(
  1765. response_text="缺少必需参数:sql",
  1766. missing_params=["sql"]
  1767. )), 400
  1768. if is_thumb_up is None:
  1769. return jsonify(bad_request_response(
  1770. response_text="缺少必需参数:is_thumb_up",
  1771. missing_params=["is_thumb_up"]
  1772. )), 400
  1773. manager = get_qa_feedback_manager()
  1774. feedback_id = manager.add_feedback(
  1775. question=question,
  1776. sql=sql,
  1777. is_thumb_up=bool(is_thumb_up),
  1778. user_id=user_id
  1779. )
  1780. return jsonify(success_response(
  1781. response_text="反馈记录创建成功",
  1782. data={
  1783. "feedback_id": feedback_id
  1784. }
  1785. ))
  1786. except Exception as e:
  1787. print(f"[ERROR] qa_feedback_add执行失败: {str(e)}")
  1788. return jsonify(internal_error_response(
  1789. response_text="创建反馈记录失败,请稍后重试"
  1790. )), 500
  1791. @app.flask_app.route('/api/v0/qa_feedback/stats', methods=['GET'])
  1792. def qa_feedback_stats():
  1793. """
  1794. 反馈统计API
  1795. 返回反馈数据的统计信息
  1796. """
  1797. try:
  1798. manager = get_qa_feedback_manager()
  1799. # 查询各种统计数据
  1800. all_records, total_count = manager.query_feedback(page=1, page_size=1)
  1801. positive_records, positive_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=True)
  1802. negative_records, negative_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=False)
  1803. trained_records, trained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=True)
  1804. untrained_records, untrained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=False)
  1805. return jsonify(success_response(
  1806. response_text="统计信息获取成功",
  1807. data={
  1808. "total_feedback": total_count,
  1809. "positive_feedback": positive_count,
  1810. "negative_feedback": negative_count,
  1811. "trained_feedback": trained_count,
  1812. "untrained_feedback": untrained_count,
  1813. "positive_rate": round(positive_count / max(total_count, 1) * 100, 2),
  1814. "training_rate": round(trained_count / max(total_count, 1) * 100, 2)
  1815. }
  1816. ))
  1817. except Exception as e:
  1818. print(f"[ERROR] qa_feedback_stats执行失败: {str(e)}")
  1819. return jsonify(internal_error_response(
  1820. response_text="获取统计信息失败,请稍后重试"
  1821. )), 500
  1822. # ==================== 问答缓存管理接口 ====================
  1823. @app.flask_app.route('/api/v0/qa_cache_stats', methods=['GET'])
  1824. def qa_cache_stats():
  1825. """获取问答缓存统计信息"""
  1826. try:
  1827. stats = redis_conversation_manager.get_qa_cache_stats()
  1828. return jsonify(success_response(
  1829. response_text="获取问答缓存统计成功",
  1830. data=stats
  1831. ))
  1832. except Exception as e:
  1833. print(f"[ERROR] 获取问答缓存统计失败: {str(e)}")
  1834. return jsonify(internal_error_response(
  1835. response_text="获取问答缓存统计失败,请稍后重试"
  1836. )), 500
  1837. @app.flask_app.route('/api/v0/qa_cache_list', methods=['GET'])
  1838. def qa_cache_list():
  1839. """获取问答缓存列表(支持分页)"""
  1840. try:
  1841. # 获取分页参数,默认限制50条
  1842. limit = request.args.get('limit', 50, type=int)
  1843. # 限制最大返回数量,防止一次性返回过多数据
  1844. if limit > 500:
  1845. limit = 500
  1846. elif limit <= 0:
  1847. limit = 50
  1848. cache_list = redis_conversation_manager.get_qa_cache_list(limit)
  1849. return jsonify(success_response(
  1850. response_text="获取问答缓存列表成功",
  1851. data={
  1852. "cache_list": cache_list,
  1853. "total_returned": len(cache_list),
  1854. "limit_applied": limit,
  1855. "note": "按缓存时间倒序排列,最新的在前面"
  1856. }
  1857. ))
  1858. except Exception as e:
  1859. print(f"[ERROR] 获取问答缓存列表失败: {str(e)}")
  1860. return jsonify(internal_error_response(
  1861. response_text="获取问答缓存列表失败,请稍后重试"
  1862. )), 500
  1863. @app.flask_app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
  1864. def qa_cache_cleanup():
  1865. """清空所有问答缓存"""
  1866. try:
  1867. if not redis_conversation_manager.is_available():
  1868. return jsonify(internal_error_response(
  1869. response_text="Redis连接不可用,无法执行清理操作"
  1870. )), 500
  1871. deleted_count = redis_conversation_manager.clear_all_qa_cache()
  1872. return jsonify(success_response(
  1873. response_text="问答缓存清理完成",
  1874. data={
  1875. "deleted_count": deleted_count,
  1876. "cleared": deleted_count > 0,
  1877. "cleanup_time": datetime.now().isoformat()
  1878. }
  1879. ))
  1880. except Exception as e:
  1881. print(f"[ERROR] 清空问答缓存失败: {str(e)}")
  1882. return jsonify(internal_error_response(
  1883. response_text="清空问答缓存失败,请稍后重试"
  1884. )), 500
  1885. @app.flask_app.route('/api/v0/cache_overview_full', methods=['GET'])
  1886. def cache_overview_full():
  1887. """获取所有缓存系统的综合概览"""
  1888. try:
  1889. from common.embedding_cache_manager import get_embedding_cache_manager
  1890. from common.vanna_instance import get_vanna_instance
  1891. # 获取现有的缓存统计
  1892. vanna_cache = get_vanna_instance()
  1893. # 直接使用应用中的缓存实例
  1894. cache = app.cache
  1895. cache_overview = {
  1896. "conversation_aware_cache": {
  1897. "enabled": True,
  1898. "total_items": len(cache.cache) if hasattr(cache, 'cache') else 0,
  1899. "sessions": list(cache.cache.keys()) if hasattr(cache, 'cache') else [],
  1900. "cache_type": type(cache).__name__
  1901. },
  1902. "question_answer_cache": redis_conversation_manager.get_qa_cache_stats() if redis_conversation_manager.is_available() else {"available": False},
  1903. "embedding_cache": get_embedding_cache_manager().get_cache_stats(),
  1904. "redis_conversation_stats": redis_conversation_manager.get_stats() if redis_conversation_manager.is_available() else None
  1905. }
  1906. return jsonify(success_response(
  1907. response_text="获取综合缓存概览成功",
  1908. data=cache_overview
  1909. ))
  1910. except Exception as e:
  1911. print(f"[ERROR] 获取综合缓存概览失败: {str(e)}")
  1912. return jsonify(internal_error_response(
  1913. response_text="获取缓存概览失败,请稍后重试"
  1914. )), 500
  1915. # 前端JavaScript示例 - 如何维持会话
  1916. """
  1917. // 前端需要维护一个会话ID
  1918. class ChatSession {
  1919. constructor() {
  1920. // 从localStorage获取或创建新的会话ID
  1921. this.sessionId = localStorage.getItem('chat_session_id') || this.generateSessionId();
  1922. localStorage.setItem('chat_session_id', this.sessionId);
  1923. }
  1924. generateSessionId() {
  1925. return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  1926. }
  1927. async askQuestion(question) {
  1928. const response = await fetch('/api/v0/ask', {
  1929. method: 'POST',
  1930. headers: {
  1931. 'Content-Type': 'application/json',
  1932. },
  1933. body: JSON.stringify({
  1934. question: question,
  1935. session_id: this.sessionId // 关键:传递会话ID
  1936. })
  1937. });
  1938. return await response.json();
  1939. }
  1940. // 开始新会话
  1941. startNewSession() {
  1942. this.sessionId = this.generateSessionId();
  1943. localStorage.setItem('chat_session_id', this.sessionId);
  1944. }
  1945. }
  1946. // 使用示例
  1947. const chatSession = new ChatSession();
  1948. chatSession.askQuestion("各年龄段客户的流失率如何?");
  1949. """
  1950. print("正在启动Flask应用: http://localhost:8084")
  1951. app.run(host="0.0.0.0", port=8084, debug=True)