citu_app.py 109 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720
  1. # 给dataops 对话助手返回结果
  2. from vanna.flask import VannaFlaskApp
  3. from core.vanna_llm_factory import create_vanna_instance
  4. from flask import request, jsonify
  5. import pandas as pd
  6. import common.result as result
  7. from datetime import datetime, timedelta
  8. from common.session_aware_cache import WebSessionAwareMemoryCache
  9. from app_config import API_MAX_RETURN_ROWS, ENABLE_RESULT_SUMMARY
  10. import re
  11. import chainlit as cl
  12. import json
  13. from flask import session # 添加session导入
  14. import sqlparse # 用于SQL语法检查
  15. from common.redis_conversation_manager import RedisConversationManager # 添加Redis对话管理器导入
  16. from common.qa_feedback_manager import QAFeedbackManager
  17. from common.result import success_response, bad_request_response, not_found_response, internal_error_response
  18. from common.result import ( # 统一导入所有需要的响应函数
  19. bad_request_response, service_unavailable_response,
  20. agent_success_response, agent_error_response,
  21. internal_error_response, success_response,
  22. validation_failed_response
  23. )
  24. from app_config import ( # 添加Redis相关配置导入
  25. USER_MAX_CONVERSATIONS,
  26. CONVERSATION_CONTEXT_COUNT,
  27. DEFAULT_ANONYMOUS_USER,
  28. ENABLE_QUESTION_ANSWER_CACHE
  29. )
  30. # 设置默认的最大返回行数
  31. DEFAULT_MAX_RETURN_ROWS = 200
  32. MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
  33. vn = create_vanna_instance()
  34. # 创建带时间戳的缓存
  35. timestamped_cache = WebSessionAwareMemoryCache()
  36. # 实例化 VannaFlaskApp,使用自定义缓存
  37. app = VannaFlaskApp(
  38. vn,
  39. cache=timestamped_cache, # 使用带时间戳的缓存
  40. title="辞图智能数据问答平台",
  41. logo = "https://www.citupro.com/img/logo-black-2.png",
  42. subtitle="让 AI 为你写 SQL",
  43. chart=False,
  44. allow_llm_to_see_data=True,
  45. ask_results_correct=True,
  46. followup_questions=True,
  47. debug=True
  48. )
  49. # 创建Redis对话管理器实例
  50. redis_conversation_manager = RedisConversationManager()
  51. # 修改ask接口,支持前端传递session_id
  52. @app.flask_app.route('/api/v0/ask', methods=['POST'])
  53. def ask_full():
  54. req = request.get_json(force=True)
  55. question = req.get("question", None)
  56. browser_session_id = req.get("session_id", None) # 前端传递的会话ID
  57. if not question:
  58. from common.result import bad_request_response
  59. return jsonify(bad_request_response(
  60. response_text="缺少必需参数:question",
  61. missing_params=["question"]
  62. )), 400
  63. # 如果使用WebSessionAwareMemoryCache
  64. if hasattr(app.cache, 'generate_id_with_browser_session') and browser_session_id:
  65. # 这里需要修改vanna的ask方法来支持传递session_id
  66. # 或者预先调用generate_id来建立会话关联
  67. conversation_id = app.cache.generate_id_with_browser_session(
  68. question=question,
  69. browser_session_id=browser_session_id
  70. )
  71. try:
  72. sql, df, _ = vn.ask(
  73. question=question,
  74. print_results=False,
  75. visualize=False,
  76. allow_llm_to_see_data=True
  77. )
  78. # 关键:检查是否有LLM解释性文本(无法生成SQL的情况)
  79. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  80. # 在解释性文本末尾添加提示语
  81. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  82. # 使用标准化错误响应
  83. from common.result import validation_failed_response
  84. return jsonify(validation_failed_response(
  85. response_text=explanation_message
  86. )), 422 # 修改HTTP状态码为422
  87. # 如果sql为None但没有解释性文本,返回通用错误
  88. if sql is None:
  89. from common.result import validation_failed_response
  90. return jsonify(validation_failed_response(
  91. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  92. )), 422
  93. # 处理返回数据 - 使用新的query_result结构
  94. query_result = {
  95. "rows": [],
  96. "columns": [],
  97. "row_count": 0,
  98. "is_limited": False,
  99. "total_row_count": 0
  100. }
  101. summary = None
  102. if isinstance(df, pd.DataFrame):
  103. query_result["columns"] = list(df.columns)
  104. if not df.empty:
  105. total_rows = len(df)
  106. limited_df = df.head(MAX_RETURN_ROWS)
  107. query_result["rows"] = limited_df.to_dict(orient="records")
  108. query_result["row_count"] = len(limited_df)
  109. query_result["total_row_count"] = total_rows
  110. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  111. # 生成数据摘要(可通过配置控制,仅在有数据时生成)
  112. if ENABLE_RESULT_SUMMARY:
  113. try:
  114. summary = vn.generate_summary(question=question, df=df)
  115. print(f"[INFO] 成功生成摘要: {summary}")
  116. except Exception as e:
  117. print(f"[WARNING] 生成摘要失败: {str(e)}")
  118. summary = None
  119. # 构建返回数据
  120. response_data = {
  121. "sql": sql,
  122. "query_result": query_result,
  123. "conversation_id": conversation_id if 'conversation_id' in locals() else None,
  124. "session_id": browser_session_id
  125. }
  126. # 添加摘要(如果启用且生成成功)
  127. if ENABLE_RESULT_SUMMARY and summary is not None:
  128. response_data["summary"] = summary
  129. response_data["response"] = summary # 同时添加response字段
  130. from common.result import success_response
  131. return jsonify(success_response(
  132. response_text="查询执行完成" if summary is None else None,
  133. data=response_data
  134. ))
  135. except Exception as e:
  136. print(f"[ERROR] ask_full执行失败: {str(e)}")
  137. # 即使发生异常,也检查是否有业务层面的解释
  138. if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  139. # 在解释性文本末尾添加提示语
  140. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  141. from common.result import validation_failed_response
  142. return jsonify(validation_failed_response(
  143. response_text=explanation_message
  144. )), 422
  145. else:
  146. # 技术错误,使用500错误码
  147. from common.result import internal_error_response
  148. return jsonify(internal_error_response(
  149. response_text="查询处理失败,请稍后重试"
  150. )), 500
  151. @app.flask_app.route('/api/v0/citu_run_sql', methods=['POST'])
  152. def citu_run_sql():
  153. req = request.get_json(force=True)
  154. sql = req.get('sql')
  155. if not sql:
  156. from common.result import bad_request_response
  157. return jsonify(bad_request_response(
  158. response_text="缺少必需参数:sql",
  159. missing_params=["sql"]
  160. )), 400
  161. try:
  162. df = vn.run_sql(sql)
  163. # 处理返回数据 - 使用新的query_result结构
  164. query_result = {
  165. "rows": [],
  166. "columns": [],
  167. "row_count": 0,
  168. "is_limited": False,
  169. "total_row_count": 0
  170. }
  171. if isinstance(df, pd.DataFrame):
  172. query_result["columns"] = list(df.columns)
  173. if not df.empty:
  174. total_rows = len(df)
  175. limited_df = df.head(MAX_RETURN_ROWS)
  176. query_result["rows"] = limited_df.to_dict(orient="records")
  177. query_result["row_count"] = len(limited_df)
  178. query_result["total_row_count"] = total_rows
  179. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  180. from common.result import success_response
  181. return jsonify(success_response(
  182. response_text=f"SQL执行完成,共返回 {query_result['total_row_count']} 条记录" +
  183. (f",已限制显示前 {MAX_RETURN_ROWS} 条" if query_result["is_limited"] else ""),
  184. data={
  185. "sql": sql,
  186. "query_result": query_result
  187. }
  188. ))
  189. except Exception as e:
  190. print(f"[ERROR] citu_run_sql执行失败: {str(e)}")
  191. from common.result import internal_error_response
  192. return jsonify(internal_error_response(
  193. response_text=f"SQL执行失败,请检查SQL语句是否正确"
  194. )), 500
  195. @app.flask_app.route('/api/v0/ask_cached', methods=['POST'])
  196. def ask_cached():
  197. """
  198. 带缓存功能的智能查询接口
  199. 支持会话管理和结果缓存,提高查询效率
  200. """
  201. req = request.get_json(force=True)
  202. question = req.get("question", None)
  203. browser_session_id = req.get("session_id", None)
  204. if not question:
  205. from common.result import bad_request_response
  206. return jsonify(bad_request_response(
  207. response_text="缺少必需参数:question",
  208. missing_params=["question"]
  209. )), 400
  210. try:
  211. # 生成conversation_id
  212. # 调试:查看generate_id的实际行为
  213. print(f"[DEBUG] 输入问题: '{question}'")
  214. conversation_id = app.cache.generate_id(question=question)
  215. print(f"[DEBUG] 生成的conversation_id: {conversation_id}")
  216. # 再次用相同问题测试
  217. conversation_id2 = app.cache.generate_id(question=question)
  218. print(f"[DEBUG] 再次生成的conversation_id: {conversation_id2}")
  219. print(f"[DEBUG] 两次ID是否相同: {conversation_id == conversation_id2}")
  220. # 检查缓存
  221. cached_sql = app.cache.get(id=conversation_id, field="sql")
  222. if cached_sql is not None:
  223. # 缓存命中
  224. print(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
  225. sql = cached_sql
  226. df = app.cache.get(id=conversation_id, field="df")
  227. summary = app.cache.get(id=conversation_id, field="summary")
  228. else:
  229. # 缓存未命中,执行新查询
  230. print(f"[CACHE MISS] 执行新查询: {conversation_id}")
  231. sql, df, _ = vn.ask(
  232. question=question,
  233. print_results=False,
  234. visualize=False,
  235. allow_llm_to_see_data=True
  236. )
  237. # 检查是否有LLM解释性文本(无法生成SQL的情况)
  238. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  239. # 在解释性文本末尾添加提示语
  240. explanation_message = vn.last_llm_explanation + "请尝试用其它方式提问。"
  241. from common.result import validation_failed_response
  242. return jsonify(validation_failed_response(
  243. response_text=explanation_message
  244. )), 422
  245. # 如果sql为None但没有解释性文本,返回通用错误
  246. if sql is None:
  247. from common.result import validation_failed_response
  248. return jsonify(validation_failed_response(
  249. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  250. )), 422
  251. # 缓存结果
  252. app.cache.set(id=conversation_id, field="question", value=question)
  253. app.cache.set(id=conversation_id, field="sql", value=sql)
  254. app.cache.set(id=conversation_id, field="df", value=df)
  255. # 生成并缓存摘要(可通过配置控制,仅在有数据时生成)
  256. summary = None
  257. if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
  258. try:
  259. summary = vn.generate_summary(question=question, df=df)
  260. print(f"[INFO] 成功生成摘要: {summary}")
  261. except Exception as e:
  262. print(f"[WARNING] 生成摘要失败: {str(e)}")
  263. summary = None
  264. app.cache.set(id=conversation_id, field="summary", value=summary)
  265. # 处理返回数据 - 使用新的query_result结构
  266. query_result = {
  267. "rows": [],
  268. "columns": [],
  269. "row_count": 0,
  270. "is_limited": False,
  271. "total_row_count": 0
  272. }
  273. if isinstance(df, pd.DataFrame):
  274. query_result["columns"] = list(df.columns)
  275. if not df.empty:
  276. total_rows = len(df)
  277. limited_df = df.head(MAX_RETURN_ROWS)
  278. query_result["rows"] = limited_df.to_dict(orient="records")
  279. query_result["row_count"] = len(limited_df)
  280. query_result["total_row_count"] = total_rows
  281. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  282. # 构建返回数据
  283. response_data = {
  284. "sql": sql,
  285. "query_result": query_result,
  286. "conversation_id": conversation_id,
  287. "session_id": browser_session_id,
  288. "cached": cached_sql is not None # 标识是否来自缓存
  289. }
  290. # 添加摘要(如果启用且生成成功)
  291. if ENABLE_RESULT_SUMMARY and summary is not None:
  292. response_data["summary"] = summary
  293. response_data["response"] = summary # 同时添加response字段
  294. from common.result import success_response
  295. return jsonify(success_response(
  296. response_text="查询执行完成" if summary is None else None,
  297. data=response_data
  298. ))
  299. except Exception as e:
  300. print(f"[ERROR] ask_cached执行失败: {str(e)}")
  301. from common.result import internal_error_response
  302. return jsonify(internal_error_response(
  303. response_text="查询处理失败,请稍后重试"
  304. )), 500
  305. @app.flask_app.route('/api/v0/citu_train_question_sql', methods=['POST'])
  306. def citu_train_question_sql():
  307. """
  308. 训练问题-SQL对接口
  309. 此API将接收的question/sql pair写入到training库中,用于训练和改进AI模型。
  310. 支持仅传入SQL或同时传入问题和SQL进行训练。
  311. Args:
  312. question (str, optional): 用户问题
  313. sql (str, required): 对应的SQL查询语句
  314. Returns:
  315. JSON: 包含训练ID和成功消息的响应
  316. """
  317. try:
  318. req = request.get_json(force=True)
  319. question = req.get('question')
  320. sql = req.get('sql')
  321. if not sql:
  322. from common.result import bad_request_response
  323. return jsonify(bad_request_response(
  324. response_text="缺少必需参数:sql",
  325. missing_params=["sql"]
  326. )), 400
  327. # 正确的调用方式:同时传递question和sql
  328. if question:
  329. training_id = vn.train(question=question, sql=sql)
  330. print(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
  331. else:
  332. training_id = vn.train(sql=sql)
  333. print(f"训练成功,训练ID为:{training_id},SQL:{sql}")
  334. from common.result import success_response
  335. return jsonify(success_response(
  336. response_text="问题-SQL对训练成功",
  337. data={
  338. "training_id": training_id,
  339. "message": "Question-SQL pair trained successfully"
  340. }
  341. ))
  342. except Exception as e:
  343. from common.result import internal_error_response
  344. return jsonify(internal_error_response(
  345. response_text="训练失败,请稍后重试"
  346. )), 500
  347. # ============ LangGraph Agent 集成 ============
  348. # 全局Agent实例(单例模式)
  349. citu_langraph_agent = None
  350. def get_citu_langraph_agent():
  351. """获取LangGraph Agent实例(懒加载)"""
  352. global citu_langraph_agent
  353. if citu_langraph_agent is None:
  354. try:
  355. from agent.citu_agent import CituLangGraphAgent
  356. print("[CITU_APP] 开始创建LangGraph Agent实例...")
  357. citu_langraph_agent = CituLangGraphAgent()
  358. print("[CITU_APP] LangGraph Agent实例创建成功")
  359. except ImportError as e:
  360. print(f"[CRITICAL] Agent模块导入失败: {str(e)}")
  361. print("[CRITICAL] 请检查agent模块是否存在以及依赖是否正确安装")
  362. raise Exception(f"Agent模块导入失败: {str(e)}")
  363. except Exception as e:
  364. print(f"[CRITICAL] LangGraph Agent实例创建失败: {str(e)}")
  365. print(f"[CRITICAL] 错误类型: {type(e).__name__}")
  366. # 提供更有用的错误信息
  367. if "config" in str(e).lower():
  368. print("[CRITICAL] 可能是配置文件问题,请检查配置")
  369. elif "llm" in str(e).lower():
  370. print("[CRITICAL] 可能是LLM连接问题,请检查LLM配置")
  371. elif "tool" in str(e).lower():
  372. print("[CRITICAL] 可能是工具加载问题,请检查工具模块")
  373. raise Exception(f"Agent初始化失败: {str(e)}")
  374. return citu_langraph_agent
  375. @app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
  376. def ask_agent():
  377. """
  378. 支持对话上下文的ask_agent API - 修正版
  379. """
  380. req = request.get_json(force=True)
  381. question = req.get("question", None)
  382. browser_session_id = req.get("session_id", None)
  383. # 新增参数解析
  384. user_id_input = req.get("user_id", None)
  385. conversation_id_input = req.get("conversation_id", None)
  386. continue_conversation = req.get("continue_conversation", False)
  387. # 新增:路由模式参数解析和验证
  388. api_routing_mode = req.get("routing_mode", None)
  389. VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
  390. if not question:
  391. return jsonify(bad_request_response(
  392. response_text="缺少必需参数:question",
  393. missing_params=["question"]
  394. )), 400
  395. # 验证routing_mode参数
  396. if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
  397. return jsonify(bad_request_response(
  398. response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
  399. invalid_params=["routing_mode"]
  400. )), 400
  401. try:
  402. # 1. 获取登录用户ID(修正:在函数中获取session信息)
  403. login_user_id = session.get('user_id') if 'user_id' in session else None
  404. # 2. 智能ID解析(修正:传入登录用户ID)
  405. user_id = redis_conversation_manager.resolve_user_id(
  406. user_id_input, browser_session_id, request.remote_addr, login_user_id
  407. )
  408. conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
  409. user_id, conversation_id_input, continue_conversation
  410. )
  411. # 3. 获取上下文和上下文类型(提前到缓存检查之前)
  412. context = redis_conversation_manager.get_context(conversation_id)
  413. # 获取上下文类型:从最后一条助手消息的metadata中获取类型
  414. context_type = None
  415. if context:
  416. try:
  417. # 获取最后一条助手消息的metadata
  418. messages = redis_conversation_manager.get_messages(conversation_id, limit=10)
  419. for message in reversed(messages): # 从最新的开始找
  420. if message.get("role") == "assistant":
  421. metadata = message.get("metadata", {})
  422. context_type = metadata.get("type")
  423. if context_type:
  424. print(f"[AGENT_API] 检测到上下文类型: {context_type}")
  425. break
  426. except Exception as e:
  427. print(f"[WARNING] 获取上下文类型失败: {str(e)}")
  428. # 4. 检查缓存(新逻辑:放宽使用条件,严控存储条件)
  429. cached_answer = redis_conversation_manager.get_cached_answer(question, context)
  430. if cached_answer:
  431. print(f"[AGENT_API] 使用缓存答案")
  432. # 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
  433. cached_response_type = cached_answer.get("type", "UNKNOWN")
  434. if cached_response_type == "DATABASE":
  435. # DATABASE类型:按优先级选择内容
  436. if cached_answer.get("response"):
  437. # 优先级1:错误或解释性回复(如SQL生成失败)
  438. assistant_response = cached_answer.get("response")
  439. elif cached_answer.get("summary"):
  440. # 优先级2:查询成功的摘要
  441. assistant_response = cached_answer.get("summary")
  442. elif cached_answer.get("query_result"):
  443. # 优先级3:构造简单描述
  444. query_result = cached_answer.get("query_result")
  445. row_count = query_result.get("row_count", 0)
  446. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  447. else:
  448. # 异常情况
  449. assistant_response = "数据库查询已处理。"
  450. else:
  451. # CHAT类型:直接使用response
  452. assistant_response = cached_answer.get("response", "")
  453. # 更新对话历史
  454. redis_conversation_manager.save_message(conversation_id, "user", question)
  455. redis_conversation_manager.save_message(
  456. conversation_id, "assistant",
  457. assistant_response,
  458. metadata={"from_cache": True}
  459. )
  460. # 添加对话信息到缓存结果
  461. cached_answer["conversation_id"] = conversation_id
  462. cached_answer["user_id"] = user_id
  463. cached_answer["from_cache"] = True
  464. cached_answer.update(conversation_status)
  465. # 使用agent_success_response返回标准格式
  466. return jsonify(agent_success_response(
  467. response_type=cached_answer.get("type", "UNKNOWN"),
  468. response=cached_answer.get("response", ""), # 修正:使用response而不是response_text
  469. sql=cached_answer.get("sql"),
  470. records=cached_answer.get("query_result"), # 修改:query_result改为records
  471. summary=cached_answer.get("summary"),
  472. session_id=browser_session_id,
  473. execution_path=cached_answer.get("execution_path", []),
  474. classification_info=cached_answer.get("classification_info", {}),
  475. conversation_id=conversation_id,
  476. user_id=user_id,
  477. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  478. context_used=bool(context),
  479. from_cache=True,
  480. conversation_status=conversation_status["status"],
  481. conversation_message=conversation_status["message"],
  482. requested_conversation_id=conversation_status.get("requested_id")
  483. ))
  484. # 5. 保存用户消息
  485. redis_conversation_manager.save_message(conversation_id, "user", question)
  486. # 6. 构建带上下文的问题
  487. if context:
  488. enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
  489. print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
  490. else:
  491. enhanced_question = question
  492. print(f"[AGENT_API] 新对话,无上下文")
  493. # 7. 确定最终使用的路由模式(优先级逻辑)
  494. if api_routing_mode:
  495. # API传了参数,优先使用
  496. effective_routing_mode = api_routing_mode
  497. print(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
  498. else:
  499. # API没传参数,使用配置文件
  500. try:
  501. from app_config import QUESTION_ROUTING_MODE
  502. effective_routing_mode = QUESTION_ROUTING_MODE
  503. print(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
  504. except ImportError:
  505. effective_routing_mode = "hybrid"
  506. print(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
  507. # 8. 现有Agent处理逻辑(修改为传递路由模式)
  508. try:
  509. agent = get_citu_langraph_agent()
  510. except Exception as e:
  511. print(f"[CRITICAL] Agent初始化失败: {str(e)}")
  512. return jsonify(service_unavailable_response(
  513. response_text="AI服务暂时不可用,请稍后重试",
  514. can_retry=True
  515. )), 503
  516. # 异步调用Agent处理问题
  517. import asyncio
  518. agent_result = asyncio.run(agent.process_question(
  519. question=enhanced_question, # 使用增强后的问题
  520. session_id=browser_session_id,
  521. context_type=context_type, # 传递上下文类型
  522. routing_mode=effective_routing_mode # 新增:传递路由模式
  523. ))
  524. # 8. 处理Agent结果
  525. if agent_result.get("success", False):
  526. # 修正:直接从agent_result获取字段,因为它就是final_response
  527. response_type = agent_result.get("type", "UNKNOWN")
  528. response_text = agent_result.get("response", "")
  529. sql = agent_result.get("sql")
  530. query_result = agent_result.get("query_result")
  531. summary = agent_result.get("summary")
  532. execution_path = agent_result.get("execution_path", [])
  533. classification_info = agent_result.get("classification_info", {})
  534. # 确定助手回复内容的优先级
  535. if response_type == "DATABASE":
  536. # DATABASE类型:按优先级选择内容
  537. if response_text:
  538. # 优先级1:错误或解释性回复(如SQL生成失败)
  539. assistant_response = response_text
  540. elif summary:
  541. # 优先级2:查询成功的摘要
  542. assistant_response = summary
  543. elif query_result:
  544. # 优先级3:构造简单描述
  545. row_count = query_result.get("row_count", 0)
  546. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  547. else:
  548. # 异常情况
  549. assistant_response = "数据库查询已处理。"
  550. else:
  551. # CHAT类型:直接使用response
  552. assistant_response = response_text
  553. # 保存助手回复
  554. redis_conversation_manager.save_message(
  555. conversation_id, "assistant", assistant_response,
  556. metadata={
  557. "type": response_type,
  558. "sql": sql,
  559. "execution_path": execution_path
  560. }
  561. )
  562. # 缓存成功的答案(新逻辑:只缓存无上下文的问答)
  563. # 直接缓存agent_result,它已经包含所有需要的字段
  564. redis_conversation_manager.cache_answer(question, agent_result, context)
  565. # 使用agent_success_response的正确方式
  566. return jsonify(agent_success_response(
  567. response_type=response_type,
  568. response=response_text, # 修正:使用response而不是response_text
  569. sql=sql,
  570. records=query_result, # 修改:query_result改为records
  571. summary=summary,
  572. session_id=browser_session_id,
  573. execution_path=execution_path,
  574. classification_info=classification_info,
  575. conversation_id=conversation_id,
  576. user_id=user_id,
  577. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  578. context_used=bool(context),
  579. from_cache=False,
  580. conversation_status=conversation_status["status"],
  581. conversation_message=conversation_status["message"],
  582. requested_conversation_id=conversation_status.get("requested_id"),
  583. routing_mode_used=effective_routing_mode, # 新增:实际使用的路由模式
  584. routing_mode_source="api" if api_routing_mode else "config" # 新增:路由模式来源
  585. ))
  586. else:
  587. # 错误处理(修正:确保使用现有的错误响应格式)
  588. error_message = agent_result.get("error", "Agent处理失败")
  589. error_code = agent_result.get("error_code", 500)
  590. return jsonify(agent_error_response(
  591. response_text=error_message,
  592. error_type="agent_processing_failed",
  593. code=error_code,
  594. session_id=browser_session_id,
  595. conversation_id=conversation_id,
  596. user_id=user_id
  597. )), error_code
  598. except Exception as e:
  599. print(f"[ERROR] ask_agent执行失败: {str(e)}")
  600. return jsonify(internal_error_response(
  601. response_text="查询处理失败,请稍后重试"
  602. )), 500
  603. @app.flask_app.route('/api/v0/agent_health', methods=['GET'])
  604. def agent_health():
  605. """
  606. Agent健康检查接口
  607. 响应格式:
  608. {
  609. "success": true/false,
  610. "code": 200/503,
  611. "message": "healthy/degraded/unhealthy",
  612. "data": {
  613. "status": "healthy/degraded/unhealthy",
  614. "test_result": true/false,
  615. "workflow_compiled": true/false,
  616. "tools_count": 4,
  617. "message": "详细信息",
  618. "timestamp": "2024-01-01T12:00:00",
  619. "checks": {
  620. "agent_creation": true/false,
  621. "tools_import": true/false,
  622. "llm_connection": true/false,
  623. "classifier_ready": true/false
  624. }
  625. }
  626. }
  627. """
  628. try:
  629. # 基础健康检查
  630. health_data = {
  631. "status": "unknown",
  632. "test_result": False,
  633. "workflow_compiled": False,
  634. "tools_count": 0,
  635. "message": "",
  636. "timestamp": datetime.now().isoformat(),
  637. "checks": {
  638. "agent_creation": False,
  639. "tools_import": False,
  640. "llm_connection": False,
  641. "classifier_ready": False
  642. }
  643. }
  644. # 检查1: Agent创建
  645. try:
  646. agent = get_citu_langraph_agent()
  647. health_data["checks"]["agent_creation"] = True
  648. # 修正:Agent现在是动态创建workflow的,不再有预创建的workflow属性
  649. health_data["workflow_compiled"] = True # 动态创建,始终可用
  650. health_data["tools_count"] = len(agent.tools) if hasattr(agent, 'tools') else 0
  651. except Exception as e:
  652. health_data["message"] = f"Agent创建失败: {str(e)}"
  653. health_data["status"] = "unhealthy" # 设置状态
  654. from common.result import health_error_response
  655. return jsonify(health_error_response(**health_data)), 503
  656. # 检查2: 工具导入
  657. try:
  658. from agent.tools import TOOLS
  659. health_data["checks"]["tools_import"] = len(TOOLS) > 0
  660. except Exception as e:
  661. health_data["message"] = f"工具导入失败: {str(e)}"
  662. # 检查3: LLM连接(简单测试)
  663. try:
  664. from agent.utils import get_compatible_llm
  665. llm = get_compatible_llm()
  666. health_data["checks"]["llm_connection"] = llm is not None
  667. except Exception as e:
  668. health_data["message"] = f"LLM连接失败: {str(e)}"
  669. # 检查4: 分类器准备
  670. try:
  671. from agent.classifier import QuestionClassifier
  672. classifier = QuestionClassifier()
  673. health_data["checks"]["classifier_ready"] = True
  674. except Exception as e:
  675. health_data["message"] = f"分类器失败: {str(e)}"
  676. # 检查5: 完整流程测试(可选)
  677. try:
  678. if all(health_data["checks"].values()):
  679. import asyncio
  680. # 异步调用健康检查
  681. test_result = asyncio.run(agent.health_check())
  682. health_data["test_result"] = test_result.get("status") == "healthy"
  683. health_data["status"] = test_result.get("status", "unknown")
  684. health_data["message"] = test_result.get("message", "健康检查完成")
  685. else:
  686. health_data["status"] = "degraded"
  687. health_data["message"] = "部分组件异常"
  688. except Exception as e:
  689. print(f"[ERROR] 健康检查异常: {str(e)}")
  690. import traceback
  691. print(f"[ERROR] 详细健康检查错误: {traceback.format_exc()}")
  692. health_data["status"] = "degraded"
  693. health_data["message"] = f"完整测试失败: {str(e)}"
  694. # 根据状态返回相应的HTTP代码 - 使用标准化健康检查响应
  695. from common.result import health_success_response, health_error_response
  696. if health_data["status"] == "healthy":
  697. return jsonify(health_success_response(**health_data))
  698. elif health_data["status"] == "degraded":
  699. return jsonify(health_error_response(**health_data)), 503
  700. else:
  701. # 确保状态设置为unhealthy
  702. health_data["status"] = "unhealthy"
  703. return jsonify(health_error_response(**health_data)), 503
  704. except Exception as e:
  705. print(f"[ERROR] 顶层健康检查异常: {str(e)}")
  706. import traceback
  707. print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
  708. from common.result import internal_error_response
  709. return jsonify(internal_error_response(
  710. response_text="健康检查失败,请稍后重试"
  711. )), 500
  712. # ==================== 日常管理API ====================
  713. @app.flask_app.route('/api/v0/cache_overview', methods=['GET'])
  714. def cache_overview():
  715. """日常管理:轻量概览 - 合并原cache_inspect的核心功能"""
  716. try:
  717. cache = app.cache
  718. result_data = {
  719. 'overview_summary': {
  720. 'total_conversations': 0,
  721. 'total_sessions': 0,
  722. 'query_time': datetime.now().isoformat()
  723. },
  724. 'recent_conversations': [], # 最近的对话
  725. 'session_summary': [] # 会话摘要
  726. }
  727. if hasattr(cache, 'cache') and isinstance(cache.cache, dict):
  728. result_data['overview_summary']['total_conversations'] = len(cache.cache)
  729. # 获取会话信息
  730. if hasattr(cache, 'get_all_sessions'):
  731. all_sessions = cache.get_all_sessions()
  732. result_data['overview_summary']['total_sessions'] = len(all_sessions)
  733. # 会话摘要(按最近活动排序)
  734. session_list = []
  735. for session_id, session_data in all_sessions.items():
  736. session_summary = {
  737. 'session_id': session_id,
  738. 'start_time': session_data['start_time'].isoformat(),
  739. 'conversation_count': session_data.get('conversation_count', 0),
  740. 'duration_seconds': session_data.get('session_duration_seconds', 0),
  741. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  742. 'is_active': (datetime.now() - session_data.get('last_activity', session_data['start_time'])).total_seconds() < 1800 # 30分钟内活跃
  743. }
  744. session_list.append(session_summary)
  745. # 按最后活动时间排序
  746. session_list.sort(key=lambda x: x['last_activity'], reverse=True)
  747. result_data['session_summary'] = session_list
  748. # 最近的对话(最多显示10个)
  749. conversation_list = []
  750. for conversation_id, conversation_data in cache.cache.items():
  751. conversation_start_time = cache.conversation_start_times.get(conversation_id)
  752. conversation_info = {
  753. 'conversation_id': conversation_id,
  754. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  755. 'session_id': cache.conversation_to_session.get(conversation_id),
  756. 'has_question': 'question' in conversation_data,
  757. 'has_sql': 'sql' in conversation_data,
  758. 'has_data': 'df' in conversation_data and conversation_data['df'] is not None,
  759. 'question_preview': conversation_data.get('question', '')[:80] + '...' if len(conversation_data.get('question', '')) > 80 else conversation_data.get('question', ''),
  760. }
  761. # 计算对话持续时间
  762. if conversation_start_time:
  763. duration = datetime.now() - conversation_start_time
  764. conversation_info['conversation_duration_seconds'] = duration.total_seconds()
  765. conversation_list.append(conversation_info)
  766. # 按对话开始时间排序,显示最新的10个
  767. conversation_list.sort(key=lambda x: x['conversation_start_time'] or '', reverse=True)
  768. result_data['recent_conversations'] = conversation_list[:10]
  769. from common.result import success_response
  770. return jsonify(success_response(
  771. response_text="缓存概览查询完成",
  772. data=result_data
  773. ))
  774. except Exception as e:
  775. from common.result import internal_error_response
  776. return jsonify(internal_error_response(
  777. response_text="获取缓存概览失败,请稍后重试"
  778. )), 500
  779. @app.flask_app.route('/api/v0/cache_stats', methods=['GET'])
  780. def cache_stats():
  781. """日常管理:统计信息 - 合并原session_stats和cache_stats功能"""
  782. try:
  783. cache = app.cache
  784. current_time = datetime.now()
  785. stats = {
  786. 'basic_stats': {
  787. 'total_sessions': len(getattr(cache, 'session_info', {})),
  788. 'total_conversations': len(getattr(cache, 'cache', {})),
  789. 'active_sessions': 0, # 最近30分钟有活动
  790. 'average_conversations_per_session': 0
  791. },
  792. 'time_distribution': {
  793. 'sessions': {
  794. 'last_1_hour': 0,
  795. 'last_6_hours': 0,
  796. 'last_24_hours': 0,
  797. 'last_7_days': 0,
  798. 'older': 0
  799. },
  800. 'conversations': {
  801. 'last_1_hour': 0,
  802. 'last_6_hours': 0,
  803. 'last_24_hours': 0,
  804. 'last_7_days': 0,
  805. 'older': 0
  806. }
  807. },
  808. 'session_details': [],
  809. 'time_ranges': {
  810. 'oldest_session': None,
  811. 'newest_session': None,
  812. 'oldest_conversation': None,
  813. 'newest_conversation': None
  814. }
  815. }
  816. # 会话统计
  817. if hasattr(cache, 'session_info'):
  818. session_times = []
  819. total_conversations = 0
  820. for session_id, session_data in cache.session_info.items():
  821. start_time = session_data['start_time']
  822. session_times.append(start_time)
  823. conversation_count = len(session_data.get('conversations', []))
  824. total_conversations += conversation_count
  825. # 检查活跃状态
  826. last_activity = session_data.get('last_activity', session_data['start_time'])
  827. if (current_time - last_activity).total_seconds() < 1800:
  828. stats['basic_stats']['active_sessions'] += 1
  829. # 时间分布统计
  830. age_hours = (current_time - start_time).total_seconds() / 3600
  831. if age_hours <= 1:
  832. stats['time_distribution']['sessions']['last_1_hour'] += 1
  833. elif age_hours <= 6:
  834. stats['time_distribution']['sessions']['last_6_hours'] += 1
  835. elif age_hours <= 24:
  836. stats['time_distribution']['sessions']['last_24_hours'] += 1
  837. elif age_hours <= 168: # 7 days
  838. stats['time_distribution']['sessions']['last_7_days'] += 1
  839. else:
  840. stats['time_distribution']['sessions']['older'] += 1
  841. # 会话详细信息
  842. session_duration = current_time - start_time
  843. stats['session_details'].append({
  844. 'session_id': session_id,
  845. 'start_time': start_time.isoformat(),
  846. 'last_activity': last_activity.isoformat(),
  847. 'conversation_count': conversation_count,
  848. 'duration_seconds': session_duration.total_seconds(),
  849. 'duration_formatted': str(session_duration),
  850. 'is_active': (current_time - last_activity).total_seconds() < 1800,
  851. 'browser_session_id': session_data.get('browser_session_id')
  852. })
  853. # 计算平均值
  854. if len(cache.session_info) > 0:
  855. stats['basic_stats']['average_conversations_per_session'] = total_conversations / len(cache.session_info)
  856. # 时间范围
  857. if session_times:
  858. stats['time_ranges']['oldest_session'] = min(session_times).isoformat()
  859. stats['time_ranges']['newest_session'] = max(session_times).isoformat()
  860. # 对话统计
  861. if hasattr(cache, 'conversation_start_times'):
  862. conversation_times = []
  863. for conv_time in cache.conversation_start_times.values():
  864. conversation_times.append(conv_time)
  865. age_hours = (current_time - conv_time).total_seconds() / 3600
  866. if age_hours <= 1:
  867. stats['time_distribution']['conversations']['last_1_hour'] += 1
  868. elif age_hours <= 6:
  869. stats['time_distribution']['conversations']['last_6_hours'] += 1
  870. elif age_hours <= 24:
  871. stats['time_distribution']['conversations']['last_24_hours'] += 1
  872. elif age_hours <= 168:
  873. stats['time_distribution']['conversations']['last_7_days'] += 1
  874. else:
  875. stats['time_distribution']['conversations']['older'] += 1
  876. if conversation_times:
  877. stats['time_ranges']['oldest_conversation'] = min(conversation_times).isoformat()
  878. stats['time_ranges']['newest_conversation'] = max(conversation_times).isoformat()
  879. # 按最近活动排序会话详情
  880. stats['session_details'].sort(key=lambda x: x['last_activity'], reverse=True)
  881. from common.result import success_response
  882. return jsonify(success_response(
  883. response_text="缓存统计信息查询完成",
  884. data=stats
  885. ))
  886. except Exception as e:
  887. from common.result import internal_error_response
  888. return jsonify(internal_error_response(
  889. response_text="获取缓存统计失败,请稍后重试"
  890. )), 500
  891. # ==================== 高级功能API ====================
  892. @app.flask_app.route('/api/v0/cache_export', methods=['GET'])
  893. def cache_export():
  894. """高级功能:完整导出 - 保持原cache_raw_export的完整功能"""
  895. try:
  896. cache = app.cache
  897. # 验证缓存的实际结构
  898. if not hasattr(cache, 'cache'):
  899. from common.result import internal_error_response
  900. return jsonify(internal_error_response(
  901. response_text="缓存对象结构异常,请联系系统管理员"
  902. )), 500
  903. if not isinstance(cache.cache, dict):
  904. from common.result import internal_error_response
  905. return jsonify(internal_error_response(
  906. response_text="缓存数据类型异常,请联系系统管理员"
  907. )), 500
  908. # 定义JSON序列化辅助函数
  909. def make_json_serializable(obj):
  910. """将对象转换为JSON可序列化的格式"""
  911. if obj is None:
  912. return None
  913. elif isinstance(obj, (str, int, float, bool)):
  914. return obj
  915. elif isinstance(obj, (list, tuple)):
  916. return [make_json_serializable(item) for item in obj]
  917. elif isinstance(obj, dict):
  918. return {str(k): make_json_serializable(v) for k, v in obj.items()}
  919. elif hasattr(obj, 'isoformat'): # datetime objects
  920. return obj.isoformat()
  921. elif hasattr(obj, 'item'): # numpy scalars
  922. return obj.item()
  923. elif hasattr(obj, 'tolist'): # numpy arrays
  924. return obj.tolist()
  925. elif hasattr(obj, '__dict__'): # pandas dtypes and other objects
  926. return str(obj)
  927. else:
  928. return str(obj)
  929. # 获取完整的原始缓存数据
  930. raw_cache = cache.cache
  931. # 获取会话和对话时间信息
  932. conversation_times = getattr(cache, 'conversation_start_times', {})
  933. session_info = getattr(cache, 'session_info', {})
  934. conversation_to_session = getattr(cache, 'conversation_to_session', {})
  935. export_data = {
  936. 'export_metadata': {
  937. 'export_time': datetime.now().isoformat(),
  938. 'total_conversations': len(raw_cache),
  939. 'total_sessions': len(session_info),
  940. 'cache_type': type(cache).__name__,
  941. 'cache_object_info': str(cache),
  942. 'has_session_times': bool(session_info),
  943. 'has_conversation_times': bool(conversation_times)
  944. },
  945. 'session_info': {
  946. session_id: {
  947. 'start_time': session_data['start_time'].isoformat(),
  948. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  949. 'conversations': session_data['conversations'],
  950. 'conversation_count': len(session_data['conversations']),
  951. 'browser_session_id': session_data.get('browser_session_id'),
  952. 'user_info': session_data.get('user_info', {})
  953. }
  954. for session_id, session_data in session_info.items()
  955. },
  956. 'conversation_times': {
  957. conversation_id: start_time.isoformat()
  958. for conversation_id, start_time in conversation_times.items()
  959. },
  960. 'conversation_to_session_mapping': conversation_to_session,
  961. 'conversations': {}
  962. }
  963. # 处理每个对话的完整数据
  964. for conversation_id, conversation_data in raw_cache.items():
  965. # 获取时间信息
  966. conversation_start_time = conversation_times.get(conversation_id)
  967. session_id = conversation_to_session.get(conversation_id)
  968. session_start_time = None
  969. if session_id and session_id in session_info:
  970. session_start_time = session_info[session_id]['start_time']
  971. processed_conversation = {
  972. 'conversation_id': conversation_id,
  973. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  974. 'session_id': session_id,
  975. 'session_start_time': session_start_time.isoformat() if session_start_time else None,
  976. 'field_count': len(conversation_data),
  977. 'fields': {}
  978. }
  979. # 添加时间计算
  980. if conversation_start_time:
  981. conversation_duration = datetime.now() - conversation_start_time
  982. processed_conversation['conversation_duration_seconds'] = conversation_duration.total_seconds()
  983. processed_conversation['conversation_duration_formatted'] = str(conversation_duration)
  984. if session_start_time:
  985. session_duration = datetime.now() - session_start_time
  986. processed_conversation['session_duration_seconds'] = session_duration.total_seconds()
  987. processed_conversation['session_duration_formatted'] = str(session_duration)
  988. # 处理每个字段,确保JSON序列化安全
  989. for field_name, field_value in conversation_data.items():
  990. field_info = {
  991. 'field_name': field_name,
  992. 'data_type': type(field_value).__name__,
  993. 'is_none': field_value is None
  994. }
  995. try:
  996. if field_value is None:
  997. field_info['value'] = None
  998. elif field_name in ['conversation_start_time', 'session_start_time']:
  999. # 处理时间字段
  1000. field_info['content'] = make_json_serializable(field_value)
  1001. elif field_name == 'df' and field_value is not None:
  1002. # DataFrame的安全处理
  1003. if hasattr(field_value, 'to_dict'):
  1004. # 安全地处理dtypes
  1005. try:
  1006. dtypes_dict = {}
  1007. for col, dtype in field_value.dtypes.items():
  1008. dtypes_dict[col] = str(dtype)
  1009. except Exception:
  1010. dtypes_dict = {"error": "无法序列化dtypes"}
  1011. # 安全地处理内存使用
  1012. try:
  1013. memory_usage = field_value.memory_usage(deep=True)
  1014. memory_dict = {}
  1015. for idx, usage in memory_usage.items():
  1016. memory_dict[str(idx)] = int(usage) if hasattr(usage, 'item') else int(usage)
  1017. except Exception:
  1018. memory_dict = {"error": "无法获取内存使用信息"}
  1019. field_info.update({
  1020. 'dataframe_info': {
  1021. 'shape': list(field_value.shape),
  1022. 'columns': list(field_value.columns),
  1023. 'dtypes': dtypes_dict,
  1024. 'index_info': {
  1025. 'type': type(field_value.index).__name__,
  1026. 'length': len(field_value.index)
  1027. }
  1028. },
  1029. 'data': make_json_serializable(field_value.to_dict('records')),
  1030. 'memory_usage': memory_dict
  1031. })
  1032. else:
  1033. field_info['value'] = str(field_value)
  1034. field_info['note'] = 'not_standard_dataframe'
  1035. elif field_name == 'fig_json':
  1036. # 图表JSON数据处理
  1037. if isinstance(field_value, str):
  1038. try:
  1039. import json
  1040. parsed_fig = json.loads(field_value)
  1041. field_info.update({
  1042. 'json_valid': True,
  1043. 'json_size_bytes': len(field_value),
  1044. 'plotly_structure': {
  1045. 'has_data': 'data' in parsed_fig,
  1046. 'has_layout': 'layout' in parsed_fig,
  1047. 'data_traces_count': len(parsed_fig.get('data', [])),
  1048. },
  1049. 'raw_json': field_value
  1050. })
  1051. except json.JSONDecodeError:
  1052. field_info.update({
  1053. 'json_valid': False,
  1054. 'raw_content': str(field_value)
  1055. })
  1056. else:
  1057. field_info['value'] = make_json_serializable(field_value)
  1058. elif field_name == 'followup_questions':
  1059. # 后续问题列表
  1060. field_info.update({
  1061. 'content': make_json_serializable(field_value)
  1062. })
  1063. elif field_name in ['question', 'sql', 'summary']:
  1064. # 文本字段
  1065. if isinstance(field_value, str):
  1066. field_info.update({
  1067. 'text_length': len(field_value),
  1068. 'content': field_value
  1069. })
  1070. else:
  1071. field_info['value'] = make_json_serializable(field_value)
  1072. else:
  1073. # 未知字段的安全处理
  1074. field_info['content'] = make_json_serializable(field_value)
  1075. except Exception as e:
  1076. field_info.update({
  1077. 'processing_error': str(e),
  1078. 'fallback_value': str(field_value)[:500] + '...' if len(str(field_value)) > 500 else str(field_value)
  1079. })
  1080. processed_conversation['fields'][field_name] = field_info
  1081. export_data['conversations'][conversation_id] = processed_conversation
  1082. # 添加缓存统计信息
  1083. field_frequency = {}
  1084. data_types_found = set()
  1085. total_dataframes = 0
  1086. total_questions = 0
  1087. for conv_data in export_data['conversations'].values():
  1088. for field_name, field_info in conv_data['fields'].items():
  1089. field_frequency[field_name] = field_frequency.get(field_name, 0) + 1
  1090. data_types_found.add(field_info['data_type'])
  1091. if field_name == 'df' and not field_info['is_none']:
  1092. total_dataframes += 1
  1093. if field_name == 'question' and not field_info['is_none']:
  1094. total_questions += 1
  1095. export_data['cache_statistics'] = {
  1096. 'field_frequency': field_frequency,
  1097. 'data_types_found': list(data_types_found),
  1098. 'total_dataframes': total_dataframes,
  1099. 'total_questions': total_questions,
  1100. 'has_session_timing': 'session_start_time' in field_frequency,
  1101. 'has_conversation_timing': 'conversation_start_time' in field_frequency
  1102. }
  1103. from common.result import success_response
  1104. return jsonify(success_response(
  1105. response_text="缓存数据导出完成",
  1106. data=export_data
  1107. ))
  1108. except Exception as e:
  1109. import traceback
  1110. error_details = {
  1111. 'error_message': str(e),
  1112. 'error_type': type(e).__name__,
  1113. 'traceback': traceback.format_exc()
  1114. }
  1115. from common.result import internal_error_response
  1116. return jsonify(internal_error_response(
  1117. response_text="导出缓存失败,请稍后重试"
  1118. )), 500
  1119. # ==================== 清理功能API ====================
  1120. @app.flask_app.route('/api/v0/cache_preview_cleanup', methods=['POST'])
  1121. def cache_preview_cleanup():
  1122. """清理功能:预览删除操作 - 保持原功能"""
  1123. try:
  1124. req = request.get_json(force=True)
  1125. # 时间条件 - 支持三种方式
  1126. older_than_hours = req.get('older_than_hours')
  1127. older_than_days = req.get('older_than_days')
  1128. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1129. cache = app.cache
  1130. # 计算截止时间
  1131. cutoff_time = None
  1132. time_condition = None
  1133. if older_than_hours:
  1134. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1135. time_condition = f"older_than_hours: {older_than_hours}"
  1136. elif older_than_days:
  1137. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1138. time_condition = f"older_than_days: {older_than_days}"
  1139. elif before_timestamp:
  1140. try:
  1141. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1142. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1143. time_condition = f"before_timestamp: {before_timestamp}"
  1144. except ValueError:
  1145. from common.result import validation_failed_response
  1146. return jsonify(validation_failed_response(
  1147. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1148. )), 422
  1149. else:
  1150. from common.result import bad_request_response
  1151. return jsonify(bad_request_response(
  1152. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1153. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1154. )), 400
  1155. preview = {
  1156. 'time_condition': time_condition,
  1157. 'cutoff_time': cutoff_time.isoformat(),
  1158. 'will_be_removed': {
  1159. 'sessions': []
  1160. },
  1161. 'will_be_kept': {
  1162. 'sessions_count': 0,
  1163. 'conversations_count': 0
  1164. },
  1165. 'summary': {
  1166. 'sessions_to_remove': 0,
  1167. 'conversations_to_remove': 0,
  1168. 'sessions_to_keep': 0,
  1169. 'conversations_to_keep': 0
  1170. }
  1171. }
  1172. # 预览按session删除
  1173. sessions_to_remove_count = 0
  1174. conversations_to_remove_count = 0
  1175. for session_id, session_data in cache.session_info.items():
  1176. session_preview = {
  1177. 'session_id': session_id,
  1178. 'start_time': session_data['start_time'].isoformat(),
  1179. 'conversation_count': len(session_data['conversations']),
  1180. 'conversations': []
  1181. }
  1182. # 添加conversation详情
  1183. for conv_id in session_data['conversations']:
  1184. if conv_id in cache.cache:
  1185. conv_data = cache.cache[conv_id]
  1186. session_preview['conversations'].append({
  1187. 'conversation_id': conv_id,
  1188. 'question': conv_data.get('question', '')[:50] + '...' if conv_data.get('question') else '',
  1189. 'start_time': cache.conversation_start_times.get(conv_id, '').isoformat() if cache.conversation_start_times.get(conv_id) else ''
  1190. })
  1191. if session_data['start_time'] < cutoff_time:
  1192. preview['will_be_removed']['sessions'].append(session_preview)
  1193. sessions_to_remove_count += 1
  1194. conversations_to_remove_count += len(session_data['conversations'])
  1195. else:
  1196. preview['will_be_kept']['sessions_count'] += 1
  1197. preview['will_be_kept']['conversations_count'] += len(session_data['conversations'])
  1198. # 更新摘要统计
  1199. preview['summary'] = {
  1200. 'sessions_to_remove': sessions_to_remove_count,
  1201. 'conversations_to_remove': conversations_to_remove_count,
  1202. 'sessions_to_keep': preview['will_be_kept']['sessions_count'],
  1203. 'conversations_to_keep': preview['will_be_kept']['conversations_count']
  1204. }
  1205. from common.result import success_response
  1206. return jsonify(success_response(
  1207. response_text=f"清理预览完成,将删除 {sessions_to_remove_count} 个会话和 {conversations_to_remove_count} 个对话",
  1208. data=preview
  1209. ))
  1210. except Exception as e:
  1211. from common.result import internal_error_response
  1212. return jsonify(internal_error_response(
  1213. response_text="预览清理操作失败,请稍后重试"
  1214. )), 500
  1215. @app.flask_app.route('/api/v0/cache_cleanup', methods=['POST'])
  1216. def cache_cleanup():
  1217. """清理功能:实际删除缓存 - 保持原功能"""
  1218. try:
  1219. req = request.get_json(force=True)
  1220. # 时间条件 - 支持三种方式
  1221. older_than_hours = req.get('older_than_hours')
  1222. older_than_days = req.get('older_than_days')
  1223. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1224. cache = app.cache
  1225. if not hasattr(cache, 'session_info'):
  1226. from common.result import service_unavailable_response
  1227. return jsonify(service_unavailable_response(
  1228. response_text="缓存不支持会话功能"
  1229. )), 503
  1230. # 计算截止时间
  1231. cutoff_time = None
  1232. time_condition = None
  1233. if older_than_hours:
  1234. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1235. time_condition = f"older_than_hours: {older_than_hours}"
  1236. elif older_than_days:
  1237. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1238. time_condition = f"older_than_days: {older_than_days}"
  1239. elif before_timestamp:
  1240. try:
  1241. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1242. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1243. time_condition = f"before_timestamp: {before_timestamp}"
  1244. except ValueError:
  1245. from common.result import validation_failed_response
  1246. return jsonify(validation_failed_response(
  1247. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1248. )), 422
  1249. else:
  1250. from common.result import bad_request_response
  1251. return jsonify(bad_request_response(
  1252. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1253. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1254. )), 400
  1255. cleanup_stats = {
  1256. 'time_condition': time_condition,
  1257. 'cutoff_time': cutoff_time.isoformat(),
  1258. 'sessions_removed': 0,
  1259. 'conversations_removed': 0,
  1260. 'sessions_kept': 0,
  1261. 'conversations_kept': 0,
  1262. 'removed_session_ids': [],
  1263. 'removed_conversation_ids': []
  1264. }
  1265. # 按session删除
  1266. sessions_to_remove = []
  1267. for session_id, session_data in cache.session_info.items():
  1268. if session_data['start_time'] < cutoff_time:
  1269. sessions_to_remove.append(session_id)
  1270. # 删除符合条件的sessions及其所有conversations
  1271. for session_id in sessions_to_remove:
  1272. session_data = cache.session_info[session_id]
  1273. conversations_in_session = session_data['conversations'].copy()
  1274. # 删除session中的所有conversations
  1275. for conv_id in conversations_in_session:
  1276. if conv_id in cache.cache:
  1277. del cache.cache[conv_id]
  1278. cleanup_stats['conversations_removed'] += 1
  1279. cleanup_stats['removed_conversation_ids'].append(conv_id)
  1280. # 清理conversation相关的时间记录
  1281. if hasattr(cache, 'conversation_start_times') and conv_id in cache.conversation_start_times:
  1282. del cache.conversation_start_times[conv_id]
  1283. if hasattr(cache, 'conversation_to_session') and conv_id in cache.conversation_to_session:
  1284. del cache.conversation_to_session[conv_id]
  1285. # 删除session记录
  1286. del cache.session_info[session_id]
  1287. cleanup_stats['sessions_removed'] += 1
  1288. cleanup_stats['removed_session_ids'].append(session_id)
  1289. # 统计保留的sessions和conversations
  1290. cleanup_stats['sessions_kept'] = len(cache.session_info)
  1291. cleanup_stats['conversations_kept'] = len(cache.cache)
  1292. from common.result import success_response
  1293. return jsonify(success_response(
  1294. response_text=f"缓存清理完成,删除了 {cleanup_stats['sessions_removed']} 个会话和 {cleanup_stats['conversations_removed']} 个对话",
  1295. data=cleanup_stats
  1296. ))
  1297. except Exception as e:
  1298. from common.result import internal_error_response
  1299. return jsonify(internal_error_response(
  1300. response_text="缓存清理失败,请稍后重试"
  1301. )), 500
  1302. @app.flask_app.route('/api/v0/training_error_question_sql', methods=['POST'])
  1303. def training_error_question_sql():
  1304. """
  1305. 存储错误的question-sql对到error_sql集合中
  1306. 此API将接收的错误question/sql pair写入到error_sql集合中,用于记录和分析错误的SQL查询。
  1307. Args:
  1308. question (str, required): 用户问题
  1309. sql (str, required): 对应的错误SQL查询语句
  1310. Returns:
  1311. JSON: 包含训练ID和成功消息的响应
  1312. """
  1313. try:
  1314. data = request.get_json()
  1315. question = data.get('question')
  1316. sql = data.get('sql')
  1317. print(f"[DEBUG] 接收到错误SQL训练请求: question={question}, sql={sql}")
  1318. if not question or not sql:
  1319. from common.result import bad_request_response
  1320. missing_params = []
  1321. if not question:
  1322. missing_params.append("question")
  1323. if not sql:
  1324. missing_params.append("sql")
  1325. return jsonify(bad_request_response(
  1326. response_text="question和sql参数都是必需的",
  1327. missing_params=missing_params
  1328. )), 400
  1329. # 使用vn实例的train_error_sql方法存储错误SQL
  1330. id = vn.train_error_sql(question=question, sql=sql)
  1331. print(f"[INFO] 成功存储错误SQL,ID: {id}")
  1332. from common.result import success_response
  1333. return jsonify(success_response(
  1334. response_text="错误SQL对已成功存储",
  1335. data={
  1336. "id": id,
  1337. "message": "错误SQL对已成功存储到error_sql集合"
  1338. }
  1339. ))
  1340. except Exception as e:
  1341. print(f"[ERROR] 存储错误SQL失败: {str(e)}")
  1342. from common.result import internal_error_response
  1343. return jsonify(internal_error_response(
  1344. response_text="存储错误SQL失败,请稍后重试"
  1345. )), 500
  1346. # ==================== Redis对话管理API ====================
  1347. @app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
  1348. def get_user_conversations(user_id: str):
  1349. """获取用户的对话列表(按时间倒序)"""
  1350. try:
  1351. limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
  1352. conversations = redis_conversation_manager.get_conversations(user_id, limit)
  1353. return jsonify(success_response(
  1354. response_text="获取用户对话列表成功",
  1355. data={
  1356. "user_id": user_id,
  1357. "conversations": conversations,
  1358. "total_count": len(conversations)
  1359. }
  1360. ))
  1361. except Exception as e:
  1362. return jsonify(internal_error_response(
  1363. response_text="获取对话列表失败,请稍后重试"
  1364. )), 500
  1365. @app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
  1366. def get_conversation_messages(conversation_id: str):
  1367. """获取特定对话的消息历史"""
  1368. try:
  1369. limit = request.args.get('limit', type=int) # 可选参数
  1370. messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
  1371. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1372. return jsonify(success_response(
  1373. response_text="获取对话消息成功",
  1374. data={
  1375. "conversation_id": conversation_id,
  1376. "conversation_meta": meta,
  1377. "messages": messages,
  1378. "message_count": len(messages)
  1379. }
  1380. ))
  1381. except Exception as e:
  1382. return jsonify(internal_error_response(
  1383. response_text="获取对话消息失败"
  1384. )), 500
  1385. @app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
  1386. def get_conversation_context(conversation_id: str):
  1387. """获取对话上下文(格式化用于LLM)"""
  1388. try:
  1389. count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
  1390. context = redis_conversation_manager.get_context(conversation_id, count)
  1391. return jsonify(success_response(
  1392. response_text="获取对话上下文成功",
  1393. data={
  1394. "conversation_id": conversation_id,
  1395. "context": context,
  1396. "context_message_count": count
  1397. }
  1398. ))
  1399. except Exception as e:
  1400. return jsonify(internal_error_response(
  1401. response_text="获取对话上下文失败"
  1402. )), 500
  1403. @app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
  1404. def conversation_stats():
  1405. """获取对话系统统计信息"""
  1406. try:
  1407. stats = redis_conversation_manager.get_stats()
  1408. return jsonify(success_response(
  1409. response_text="获取统计信息成功",
  1410. data=stats
  1411. ))
  1412. except Exception as e:
  1413. return jsonify(internal_error_response(
  1414. response_text="获取统计信息失败,请稍后重试"
  1415. )), 500
  1416. @app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
  1417. def conversation_cleanup():
  1418. """手动清理过期对话"""
  1419. try:
  1420. redis_conversation_manager.cleanup_expired_conversations()
  1421. return jsonify(success_response(
  1422. response_text="对话清理完成"
  1423. ))
  1424. except Exception as e:
  1425. return jsonify(internal_error_response(
  1426. response_text="对话清理失败,请稍后重试"
  1427. )), 500
  1428. @app.flask_app.route('/api/v0/user/<user_id>/conversations/full', methods=['GET'])
  1429. def get_user_conversations_with_messages(user_id: str):
  1430. """
  1431. 获取用户的完整对话数据(包含所有消息)
  1432. 一次性返回用户的所有对话和每个对话下的消息历史
  1433. Args:
  1434. user_id: 用户ID(路径参数)
  1435. conversation_limit: 对话数量限制(查询参数,可选,不传则返回所有对话)
  1436. message_limit: 每个对话的消息数限制(查询参数,可选,不传则返回所有消息)
  1437. Returns:
  1438. 包含用户所有对话和消息的完整数据
  1439. """
  1440. try:
  1441. # 获取可选参数,不传递时使用None(返回所有记录)
  1442. conversation_limit = request.args.get('conversation_limit', type=int)
  1443. message_limit = request.args.get('message_limit', type=int)
  1444. # 获取用户的对话列表
  1445. conversations = redis_conversation_manager.get_conversations(user_id, conversation_limit)
  1446. # 为每个对话获取消息历史
  1447. full_conversations = []
  1448. total_messages = 0
  1449. for conversation in conversations:
  1450. conversation_id = conversation['conversation_id']
  1451. # 获取对话消息
  1452. messages = redis_conversation_manager.get_conversation_messages(
  1453. conversation_id, message_limit
  1454. )
  1455. # 获取对话元数据
  1456. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1457. # 组合完整数据
  1458. full_conversation = {
  1459. **conversation, # 基础对话信息
  1460. 'meta': meta, # 对话元数据
  1461. 'messages': messages, # 消息列表
  1462. 'message_count': len(messages)
  1463. }
  1464. full_conversations.append(full_conversation)
  1465. total_messages += len(messages)
  1466. return jsonify(success_response(
  1467. response_text="获取用户完整对话数据成功",
  1468. data={
  1469. "user_id": user_id,
  1470. "conversations": full_conversations,
  1471. "total_conversations": len(full_conversations),
  1472. "total_messages": total_messages,
  1473. "conversation_limit_applied": conversation_limit,
  1474. "message_limit_applied": message_limit,
  1475. "query_time": datetime.now().isoformat()
  1476. }
  1477. ))
  1478. except Exception as e:
  1479. print(f"[ERROR] 获取用户完整对话数据失败: {str(e)}")
  1480. return jsonify(internal_error_response(
  1481. response_text="获取用户对话数据失败,请稍后重试"
  1482. )), 500
  1483. # ==================== Embedding缓存管理接口 ====================
  1484. @app.flask_app.route('/api/v0/embedding_cache_stats', methods=['GET'])
  1485. def embedding_cache_stats():
  1486. """获取embedding缓存统计信息"""
  1487. try:
  1488. from common.embedding_cache_manager import get_embedding_cache_manager
  1489. cache_manager = get_embedding_cache_manager()
  1490. stats = cache_manager.get_cache_stats()
  1491. return jsonify(success_response(
  1492. response_text="获取embedding缓存统计成功",
  1493. data=stats
  1494. ))
  1495. except Exception as e:
  1496. print(f"[ERROR] 获取embedding缓存统计失败: {str(e)}")
  1497. return jsonify(internal_error_response(
  1498. response_text="获取embedding缓存统计失败,请稍后重试"
  1499. )), 500
  1500. @app.flask_app.route('/api/v0/embedding_cache_cleanup', methods=['POST'])
  1501. def embedding_cache_cleanup():
  1502. """清空所有embedding缓存"""
  1503. try:
  1504. from common.embedding_cache_manager import get_embedding_cache_manager
  1505. cache_manager = get_embedding_cache_manager()
  1506. if not cache_manager.is_available():
  1507. return jsonify(internal_error_response(
  1508. response_text="Embedding缓存功能未启用或不可用"
  1509. )), 400
  1510. success = cache_manager.clear_all_cache()
  1511. if success:
  1512. return jsonify(success_response(
  1513. response_text="所有embedding缓存已清空",
  1514. data={"cleared": True}
  1515. ))
  1516. else:
  1517. return jsonify(internal_error_response(
  1518. response_text="清空embedding缓存失败"
  1519. )), 500
  1520. except Exception as e:
  1521. print(f"[ERROR] 清空embedding缓存失败: {str(e)}")
  1522. return jsonify(internal_error_response(
  1523. response_text="清空embedding缓存失败,请稍后重试"
  1524. )), 500
  1525. # ==================== QA反馈系统接口 ====================
  1526. # 全局反馈管理器实例
  1527. qa_feedback_manager = None
  1528. def get_qa_feedback_manager():
  1529. """获取QA反馈管理器实例(懒加载)- 复用Vanna连接版本"""
  1530. global qa_feedback_manager
  1531. if qa_feedback_manager is None:
  1532. try:
  1533. # 优先尝试复用vanna连接
  1534. vanna_instance = None
  1535. try:
  1536. # 尝试获取现有的vanna实例
  1537. if 'get_citu_langraph_agent' in globals():
  1538. agent = get_citu_langraph_agent()
  1539. if hasattr(agent, 'vn'):
  1540. vanna_instance = agent.vn
  1541. elif 'vn' in globals():
  1542. vanna_instance = vn
  1543. else:
  1544. print("[INFO] 未找到可用的vanna实例,将创建新的数据库连接")
  1545. except Exception as e:
  1546. print(f"[INFO] 获取vanna实例失败: {e},将创建新的数据库连接")
  1547. vanna_instance = None
  1548. qa_feedback_manager = QAFeedbackManager(vanna_instance=vanna_instance)
  1549. print("[CITU_APP] QA反馈管理器实例创建成功")
  1550. except Exception as e:
  1551. print(f"[CRITICAL] QA反馈管理器创建失败: {str(e)}")
  1552. raise Exception(f"QA反馈管理器初始化失败: {str(e)}")
  1553. return qa_feedback_manager
  1554. @app.flask_app.route('/api/v0/qa_feedback/query', methods=['POST'])
  1555. def qa_feedback_query():
  1556. """
  1557. 查询反馈记录API
  1558. 支持分页、筛选和排序功能
  1559. """
  1560. try:
  1561. req = request.get_json(force=True)
  1562. # 解析参数,设置默认值
  1563. page = req.get('page', 1)
  1564. page_size = req.get('page_size', 20)
  1565. is_thumb_up = req.get('is_thumb_up')
  1566. create_time_start = req.get('create_time_start')
  1567. create_time_end = req.get('create_time_end')
  1568. is_in_training_data = req.get('is_in_training_data')
  1569. sort_by = req.get('sort_by', 'create_time')
  1570. sort_order = req.get('sort_order', 'desc')
  1571. # 参数验证
  1572. if page < 1:
  1573. return jsonify(bad_request_response(
  1574. response_text="页码必须大于0",
  1575. invalid_params=["page"]
  1576. )), 400
  1577. if page_size < 1 or page_size > 100:
  1578. return jsonify(bad_request_response(
  1579. response_text="每页大小必须在1-100之间",
  1580. invalid_params=["page_size"]
  1581. )), 400
  1582. # 获取反馈管理器并查询
  1583. manager = get_qa_feedback_manager()
  1584. records, total = manager.query_feedback(
  1585. page=page,
  1586. page_size=page_size,
  1587. is_thumb_up=is_thumb_up,
  1588. create_time_start=create_time_start,
  1589. create_time_end=create_time_end,
  1590. is_in_training_data=is_in_training_data,
  1591. sort_by=sort_by,
  1592. sort_order=sort_order
  1593. )
  1594. # 计算分页信息
  1595. total_pages = (total + page_size - 1) // page_size
  1596. return jsonify(success_response(
  1597. response_text=f"查询成功,共找到 {total} 条记录",
  1598. data={
  1599. "records": records,
  1600. "pagination": {
  1601. "page": page,
  1602. "page_size": page_size,
  1603. "total": total,
  1604. "total_pages": total_pages,
  1605. "has_next": page < total_pages,
  1606. "has_prev": page > 1
  1607. }
  1608. }
  1609. ))
  1610. except Exception as e:
  1611. print(f"[ERROR] qa_feedback_query执行失败: {str(e)}")
  1612. return jsonify(internal_error_response(
  1613. response_text="查询反馈记录失败,请稍后重试"
  1614. )), 500
  1615. @app.flask_app.route('/api/v0/qa_feedback/delete/<int:feedback_id>', methods=['DELETE'])
  1616. def qa_feedback_delete(feedback_id):
  1617. """
  1618. 删除反馈记录API
  1619. """
  1620. try:
  1621. manager = get_qa_feedback_manager()
  1622. success = manager.delete_feedback(feedback_id)
  1623. if success:
  1624. return jsonify(success_response(
  1625. response_text=f"反馈记录删除成功",
  1626. data={"deleted_id": feedback_id}
  1627. ))
  1628. else:
  1629. return jsonify(not_found_response(
  1630. response_text=f"反馈记录不存在 (ID: {feedback_id})"
  1631. )), 404
  1632. except Exception as e:
  1633. print(f"[ERROR] qa_feedback_delete执行失败: {str(e)}")
  1634. return jsonify(internal_error_response(
  1635. response_text="删除反馈记录失败,请稍后重试"
  1636. )), 500
  1637. @app.flask_app.route('/api/v0/qa_feedback/update/<int:feedback_id>', methods=['PUT'])
  1638. def qa_feedback_update(feedback_id):
  1639. """
  1640. 更新反馈记录API
  1641. """
  1642. try:
  1643. req = request.get_json(force=True)
  1644. # 提取允许更新的字段
  1645. allowed_fields = ['question', 'sql', 'is_thumb_up', 'user_id', 'is_in_training_data']
  1646. update_data = {}
  1647. for field in allowed_fields:
  1648. if field in req:
  1649. update_data[field] = req[field]
  1650. if not update_data:
  1651. return jsonify(bad_request_response(
  1652. response_text="没有提供有效的更新字段",
  1653. missing_params=allowed_fields
  1654. )), 400
  1655. manager = get_qa_feedback_manager()
  1656. success = manager.update_feedback(feedback_id, **update_data)
  1657. if success:
  1658. return jsonify(success_response(
  1659. response_text="反馈记录更新成功",
  1660. data={
  1661. "updated_id": feedback_id,
  1662. "updated_fields": list(update_data.keys())
  1663. }
  1664. ))
  1665. else:
  1666. return jsonify(not_found_response(
  1667. response_text=f"反馈记录不存在或无变化 (ID: {feedback_id})"
  1668. )), 404
  1669. except Exception as e:
  1670. print(f"[ERROR] qa_feedback_update执行失败: {str(e)}")
  1671. return jsonify(internal_error_response(
  1672. response_text="更新反馈记录失败,请稍后重试"
  1673. )), 500
  1674. @app.flask_app.route('/api/v0/qa_feedback/add_to_training', methods=['POST'])
  1675. def qa_feedback_add_to_training():
  1676. """
  1677. 将反馈记录添加到训练数据集API
  1678. 支持混合批量处理:正向反馈加入SQL训练集,负向反馈加入error_sql训练集
  1679. """
  1680. try:
  1681. req = request.get_json(force=True)
  1682. feedback_ids = req.get('feedback_ids', [])
  1683. if not feedback_ids or not isinstance(feedback_ids, list):
  1684. return jsonify(bad_request_response(
  1685. response_text="缺少有效的反馈ID列表",
  1686. missing_params=["feedback_ids"]
  1687. )), 400
  1688. manager = get_qa_feedback_manager()
  1689. # 获取反馈记录
  1690. records = manager.get_feedback_by_ids(feedback_ids)
  1691. if not records:
  1692. return jsonify(not_found_response(
  1693. response_text="未找到任何有效的反馈记录"
  1694. )), 404
  1695. # 分别处理正向和负向反馈
  1696. positive_count = 0 # 正向训练计数
  1697. negative_count = 0 # 负向训练计数
  1698. already_trained_count = 0 # 已训练计数
  1699. error_count = 0 # 错误计数
  1700. successfully_trained_ids = [] # 成功训练的ID列表
  1701. for record in records:
  1702. try:
  1703. # 检查是否已经在训练数据中
  1704. if record['is_in_training_data']:
  1705. already_trained_count += 1
  1706. continue
  1707. if record['is_thumb_up']:
  1708. # 正向反馈 - 加入标准SQL训练集
  1709. training_id = vn.train(
  1710. question=record['question'],
  1711. sql=record['sql']
  1712. )
  1713. positive_count += 1
  1714. print(f"[TRAINING] 正向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1715. else:
  1716. # 负向反馈 - 加入错误SQL训练集
  1717. training_id = vn.train_error_sql(
  1718. question=record['question'],
  1719. sql=record['sql']
  1720. )
  1721. negative_count += 1
  1722. print(f"[TRAINING] 负向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1723. successfully_trained_ids.append(record['id'])
  1724. except Exception as e:
  1725. print(f"[ERROR] 训练失败 - 反馈ID: {record['id']}, 错误: {e}")
  1726. error_count += 1
  1727. # 更新训练状态
  1728. if successfully_trained_ids:
  1729. updated_count = manager.mark_training_status(successfully_trained_ids, True)
  1730. print(f"[TRAINING] 批量更新训练状态完成,影响 {updated_count} 条记录")
  1731. # 构建响应
  1732. total_processed = positive_count + negative_count + already_trained_count + error_count
  1733. return jsonify(success_response(
  1734. response_text=f"训练数据添加完成,成功处理 {positive_count + negative_count} 条记录",
  1735. data={
  1736. "summary": {
  1737. "total_requested": len(feedback_ids),
  1738. "total_processed": total_processed,
  1739. "positive_trained": positive_count,
  1740. "negative_trained": negative_count,
  1741. "already_trained": already_trained_count,
  1742. "errors": error_count
  1743. },
  1744. "successfully_trained_ids": successfully_trained_ids,
  1745. "training_details": {
  1746. "sql_training_count": positive_count,
  1747. "error_sql_training_count": negative_count
  1748. }
  1749. }
  1750. ))
  1751. except Exception as e:
  1752. print(f"[ERROR] qa_feedback_add_to_training执行失败: {str(e)}")
  1753. return jsonify(internal_error_response(
  1754. response_text="添加训练数据失败,请稍后重试"
  1755. )), 500
  1756. @app.flask_app.route('/api/v0/qa_feedback/add', methods=['POST'])
  1757. def qa_feedback_add():
  1758. """
  1759. 添加反馈记录API
  1760. 用于前端直接创建反馈记录
  1761. """
  1762. try:
  1763. req = request.get_json(force=True)
  1764. question = req.get('question')
  1765. sql = req.get('sql')
  1766. is_thumb_up = req.get('is_thumb_up')
  1767. user_id = req.get('user_id', 'guest')
  1768. # 参数验证
  1769. if not question:
  1770. return jsonify(bad_request_response(
  1771. response_text="缺少必需参数:question",
  1772. missing_params=["question"]
  1773. )), 400
  1774. if not sql:
  1775. return jsonify(bad_request_response(
  1776. response_text="缺少必需参数:sql",
  1777. missing_params=["sql"]
  1778. )), 400
  1779. if is_thumb_up is None:
  1780. return jsonify(bad_request_response(
  1781. response_text="缺少必需参数:is_thumb_up",
  1782. missing_params=["is_thumb_up"]
  1783. )), 400
  1784. manager = get_qa_feedback_manager()
  1785. feedback_id = manager.add_feedback(
  1786. question=question,
  1787. sql=sql,
  1788. is_thumb_up=bool(is_thumb_up),
  1789. user_id=user_id
  1790. )
  1791. return jsonify(success_response(
  1792. response_text="反馈记录创建成功",
  1793. data={
  1794. "feedback_id": feedback_id
  1795. }
  1796. ))
  1797. except Exception as e:
  1798. print(f"[ERROR] qa_feedback_add执行失败: {str(e)}")
  1799. return jsonify(internal_error_response(
  1800. response_text="创建反馈记录失败,请稍后重试"
  1801. )), 500
  1802. @app.flask_app.route('/api/v0/qa_feedback/stats', methods=['GET'])
  1803. def qa_feedback_stats():
  1804. """
  1805. 反馈统计API
  1806. 返回反馈数据的统计信息
  1807. """
  1808. try:
  1809. manager = get_qa_feedback_manager()
  1810. # 查询各种统计数据
  1811. all_records, total_count = manager.query_feedback(page=1, page_size=1)
  1812. positive_records, positive_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=True)
  1813. negative_records, negative_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=False)
  1814. trained_records, trained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=True)
  1815. untrained_records, untrained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=False)
  1816. return jsonify(success_response(
  1817. response_text="统计信息获取成功",
  1818. data={
  1819. "total_feedback": total_count,
  1820. "positive_feedback": positive_count,
  1821. "negative_feedback": negative_count,
  1822. "trained_feedback": trained_count,
  1823. "untrained_feedback": untrained_count,
  1824. "positive_rate": round(positive_count / max(total_count, 1) * 100, 2),
  1825. "training_rate": round(trained_count / max(total_count, 1) * 100, 2)
  1826. }
  1827. ))
  1828. except Exception as e:
  1829. print(f"[ERROR] qa_feedback_stats执行失败: {str(e)}")
  1830. return jsonify(internal_error_response(
  1831. response_text="获取统计信息失败,请稍后重试"
  1832. )), 500
  1833. # ==================== 问答缓存管理接口 ====================
  1834. @app.flask_app.route('/api/v0/qa_cache_stats', methods=['GET'])
  1835. def qa_cache_stats():
  1836. """获取问答缓存统计信息"""
  1837. try:
  1838. stats = redis_conversation_manager.get_qa_cache_stats()
  1839. return jsonify(success_response(
  1840. response_text="获取问答缓存统计成功",
  1841. data=stats
  1842. ))
  1843. except Exception as e:
  1844. print(f"[ERROR] 获取问答缓存统计失败: {str(e)}")
  1845. return jsonify(internal_error_response(
  1846. response_text="获取问答缓存统计失败,请稍后重试"
  1847. )), 500
  1848. @app.flask_app.route('/api/v0/qa_cache_list', methods=['GET'])
  1849. def qa_cache_list():
  1850. """获取问答缓存列表(支持分页)"""
  1851. try:
  1852. # 获取分页参数,默认限制50条
  1853. limit = request.args.get('limit', 50, type=int)
  1854. # 限制最大返回数量,防止一次性返回过多数据
  1855. if limit > 500:
  1856. limit = 500
  1857. elif limit <= 0:
  1858. limit = 50
  1859. cache_list = redis_conversation_manager.get_qa_cache_list(limit)
  1860. return jsonify(success_response(
  1861. response_text="获取问答缓存列表成功",
  1862. data={
  1863. "cache_list": cache_list,
  1864. "total_returned": len(cache_list),
  1865. "limit_applied": limit,
  1866. "note": "按缓存时间倒序排列,最新的在前面"
  1867. }
  1868. ))
  1869. except Exception as e:
  1870. print(f"[ERROR] 获取问答缓存列表失败: {str(e)}")
  1871. return jsonify(internal_error_response(
  1872. response_text="获取问答缓存列表失败,请稍后重试"
  1873. )), 500
  1874. @app.flask_app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
  1875. def qa_cache_cleanup():
  1876. """清空所有问答缓存"""
  1877. try:
  1878. if not redis_conversation_manager.is_available():
  1879. return jsonify(internal_error_response(
  1880. response_text="Redis连接不可用,无法执行清理操作"
  1881. )), 500
  1882. deleted_count = redis_conversation_manager.clear_all_qa_cache()
  1883. return jsonify(success_response(
  1884. response_text="问答缓存清理完成",
  1885. data={
  1886. "deleted_count": deleted_count,
  1887. "cleared": deleted_count > 0,
  1888. "cleanup_time": datetime.now().isoformat()
  1889. }
  1890. ))
  1891. except Exception as e:
  1892. print(f"[ERROR] 清空问答缓存失败: {str(e)}")
  1893. return jsonify(internal_error_response(
  1894. response_text="清空问答缓存失败,请稍后重试"
  1895. )), 500
  1896. # ==================== 训练数据管理接口 ====================
  1897. def validate_sql_syntax(sql: str) -> tuple[bool, str]:
  1898. """SQL语法检查(仅对sql类型)"""
  1899. try:
  1900. parsed = sqlparse.parse(sql.strip())
  1901. if not parsed or not parsed[0].tokens:
  1902. return False, "SQL语法错误:空语句"
  1903. # 基本语法检查
  1904. sql_upper = sql.strip().upper()
  1905. if not any(sql_upper.startswith(keyword) for keyword in
  1906. ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']):
  1907. return False, "SQL语法错误:不是有效的SQL语句"
  1908. # 安全检查:禁止危险的SQL操作
  1909. dangerous_operations = ['UPDATE', 'DELETE', 'ALERT', 'DROP']
  1910. for operation in dangerous_operations:
  1911. if sql_upper.startswith(operation):
  1912. return False, f'在训练集中禁止使用"{",".join(dangerous_operations)}"'
  1913. return True, ""
  1914. except Exception as e:
  1915. return False, f"SQL语法错误:{str(e)}"
  1916. def paginate_data(data_list: list, page: int, page_size: int):
  1917. """分页处理算法"""
  1918. total = len(data_list)
  1919. start_idx = (page - 1) * page_size
  1920. end_idx = start_idx + page_size
  1921. page_data = data_list[start_idx:end_idx]
  1922. return {
  1923. "data": page_data,
  1924. "pagination": {
  1925. "page": page,
  1926. "page_size": page_size,
  1927. "total": total,
  1928. "total_pages": (total + page_size - 1) // page_size,
  1929. "has_next": end_idx < total,
  1930. "has_prev": page > 1
  1931. }
  1932. }
  1933. def filter_by_type(data_list: list, training_data_type: str):
  1934. """按类型筛选算法"""
  1935. if not training_data_type:
  1936. return data_list
  1937. return [
  1938. record for record in data_list
  1939. if record.get('training_data_type') == training_data_type
  1940. ]
  1941. def search_in_data(data_list: list, search_keyword: str):
  1942. """在数据中搜索关键词"""
  1943. if not search_keyword:
  1944. return data_list
  1945. keyword_lower = search_keyword.lower()
  1946. return [
  1947. record for record in data_list
  1948. if (record.get('question') and keyword_lower in record['question'].lower()) or
  1949. (record.get('content') and keyword_lower in record['content'].lower())
  1950. ]
  1951. def process_single_training_item(item: dict, index: int) -> dict:
  1952. """处理单个训练数据项"""
  1953. training_type = item.get('training_data_type')
  1954. if training_type == 'sql':
  1955. sql = item.get('sql')
  1956. if not sql:
  1957. raise ValueError("SQL字段是必需的")
  1958. # SQL语法检查
  1959. is_valid, error_msg = validate_sql_syntax(sql)
  1960. if not is_valid:
  1961. raise ValueError(error_msg)
  1962. question = item.get('question')
  1963. if question:
  1964. training_id = vn.train(question=question, sql=sql)
  1965. else:
  1966. training_id = vn.train(sql=sql)
  1967. elif training_type == 'error_sql':
  1968. # error_sql不需要语法检查
  1969. question = item.get('question')
  1970. sql = item.get('sql')
  1971. if not question or not sql:
  1972. raise ValueError("question和sql字段都是必需的")
  1973. training_id = vn.train_error_sql(question=question, sql=sql)
  1974. elif training_type == 'documentation':
  1975. content = item.get('content')
  1976. if not content:
  1977. raise ValueError("content字段是必需的")
  1978. training_id = vn.train(documentation=content)
  1979. elif training_type == 'ddl':
  1980. ddl = item.get('ddl')
  1981. if not ddl:
  1982. raise ValueError("ddl字段是必需的")
  1983. training_id = vn.train(ddl=ddl)
  1984. else:
  1985. raise ValueError(f"不支持的训练数据类型: {training_type}")
  1986. return {
  1987. "index": index,
  1988. "success": True,
  1989. "training_id": training_id,
  1990. "type": training_type,
  1991. "message": f"{training_type}训练数据创建成功"
  1992. }
  1993. def get_total_training_count():
  1994. """获取当前训练数据总数"""
  1995. try:
  1996. training_data = vn.get_training_data()
  1997. if training_data is not None and not training_data.empty:
  1998. return len(training_data)
  1999. return 0
  2000. except Exception as e:
  2001. print(f"[WARNING] 获取训练数据总数失败: {e}")
  2002. return 0
  2003. @app.flask_app.route('/api/v0/training_data/query', methods=['POST'])
  2004. def training_data_query():
  2005. """
  2006. 分页查询训练数据API
  2007. 支持类型筛选、搜索和排序功能
  2008. """
  2009. try:
  2010. req = request.get_json(force=True)
  2011. # 解析参数,设置默认值
  2012. page = req.get('page', 1)
  2013. page_size = req.get('page_size', 20)
  2014. training_data_type = req.get('training_data_type')
  2015. sort_by = req.get('sort_by', 'id')
  2016. sort_order = req.get('sort_order', 'desc')
  2017. search_keyword = req.get('search_keyword')
  2018. # 参数验证
  2019. if page < 1:
  2020. return jsonify(bad_request_response(
  2021. response_text="页码必须大于0",
  2022. missing_params=["page"]
  2023. )), 400
  2024. if page_size < 1 or page_size > 100:
  2025. return jsonify(bad_request_response(
  2026. response_text="每页大小必须在1-100之间",
  2027. missing_params=["page_size"]
  2028. )), 400
  2029. if search_keyword and len(search_keyword) > 100:
  2030. return jsonify(bad_request_response(
  2031. response_text="搜索关键词最大长度为100字符",
  2032. missing_params=["search_keyword"]
  2033. )), 400
  2034. # 获取训练数据
  2035. training_data = vn.get_training_data()
  2036. if training_data is None or training_data.empty:
  2037. return jsonify(success_response(
  2038. response_text="查询成功,暂无训练数据",
  2039. data={
  2040. "records": [],
  2041. "pagination": {
  2042. "page": page,
  2043. "page_size": page_size,
  2044. "total": 0,
  2045. "total_pages": 0,
  2046. "has_next": False,
  2047. "has_prev": False
  2048. },
  2049. "filters_applied": {
  2050. "training_data_type": training_data_type,
  2051. "search_keyword": search_keyword
  2052. }
  2053. }
  2054. ))
  2055. # 转换为列表格式
  2056. records = training_data.to_dict(orient="records")
  2057. # 应用筛选条件
  2058. if training_data_type:
  2059. records = filter_by_type(records, training_data_type)
  2060. if search_keyword:
  2061. records = search_in_data(records, search_keyword)
  2062. # 排序
  2063. if sort_by in ['id', 'training_data_type']:
  2064. reverse = (sort_order.lower() == 'desc')
  2065. records.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse)
  2066. # 分页
  2067. paginated_result = paginate_data(records, page, page_size)
  2068. return jsonify(success_response(
  2069. response_text=f"查询成功,共找到 {paginated_result['pagination']['total']} 条记录",
  2070. data={
  2071. "records": paginated_result["data"],
  2072. "pagination": paginated_result["pagination"],
  2073. "filters_applied": {
  2074. "training_data_type": training_data_type,
  2075. "search_keyword": search_keyword
  2076. }
  2077. }
  2078. ))
  2079. except Exception as e:
  2080. print(f"[ERROR] training_data_query执行失败: {str(e)}")
  2081. return jsonify(internal_error_response(
  2082. response_text="查询训练数据失败,请稍后重试"
  2083. )), 500
  2084. @app.flask_app.route('/api/v0/training_data/create', methods=['POST'])
  2085. def training_data_create():
  2086. """
  2087. 创建训练数据API
  2088. 支持单条和批量创建,支持四种数据类型
  2089. """
  2090. try:
  2091. req = request.get_json(force=True)
  2092. data = req.get('data')
  2093. if not data:
  2094. return jsonify(bad_request_response(
  2095. response_text="缺少必需参数:data",
  2096. missing_params=["data"]
  2097. )), 400
  2098. # 统一处理为列表格式
  2099. if isinstance(data, dict):
  2100. data_list = [data]
  2101. elif isinstance(data, list):
  2102. data_list = data
  2103. else:
  2104. return jsonify(bad_request_response(
  2105. response_text="data字段格式错误,应为对象或数组"
  2106. )), 400
  2107. # 批量操作限制
  2108. if len(data_list) > 50:
  2109. return jsonify(bad_request_response(
  2110. response_text="批量操作最大支持50条记录"
  2111. )), 400
  2112. results = []
  2113. successful_count = 0
  2114. type_summary = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  2115. for index, item in enumerate(data_list):
  2116. try:
  2117. result = process_single_training_item(item, index)
  2118. results.append(result)
  2119. if result['success']:
  2120. successful_count += 1
  2121. type_summary[result['type']] += 1
  2122. except Exception as e:
  2123. results.append({
  2124. "index": index,
  2125. "success": False,
  2126. "type": item.get('training_data_type', 'unknown'),
  2127. "error": str(e),
  2128. "message": "创建失败"
  2129. })
  2130. # 获取创建后的总记录数
  2131. current_total = get_total_training_count()
  2132. return jsonify(success_response(
  2133. response_text="训练数据创建完成",
  2134. data={
  2135. "total_requested": len(data_list),
  2136. "successfully_created": successful_count,
  2137. "failed_count": len(data_list) - successful_count,
  2138. "results": results,
  2139. "summary": type_summary,
  2140. "current_total_count": current_total
  2141. }
  2142. ))
  2143. except Exception as e:
  2144. print(f"[ERROR] training_data_create执行失败: {str(e)}")
  2145. return jsonify(internal_error_response(
  2146. response_text="创建训练数据失败,请稍后重试"
  2147. )), 500
  2148. @app.flask_app.route('/api/v0/training_data/delete', methods=['POST'])
  2149. def training_data_delete():
  2150. """
  2151. 删除训练数据API
  2152. 支持批量删除
  2153. """
  2154. try:
  2155. req = request.get_json(force=True)
  2156. ids = req.get('ids', [])
  2157. confirm = req.get('confirm', False)
  2158. if not ids or not isinstance(ids, list):
  2159. return jsonify(bad_request_response(
  2160. response_text="缺少有效的ID列表",
  2161. missing_params=["ids"]
  2162. )), 400
  2163. if not confirm:
  2164. return jsonify(bad_request_response(
  2165. response_text="删除操作需要确认,请设置confirm为true"
  2166. )), 400
  2167. # 批量操作限制
  2168. if len(ids) > 50:
  2169. return jsonify(bad_request_response(
  2170. response_text="批量删除最大支持50条记录"
  2171. )), 400
  2172. deleted_ids = []
  2173. failed_ids = []
  2174. failed_details = []
  2175. for training_id in ids:
  2176. try:
  2177. success = vn.remove_training_data(training_id)
  2178. if success:
  2179. deleted_ids.append(training_id)
  2180. else:
  2181. failed_ids.append(training_id)
  2182. failed_details.append({
  2183. "id": training_id,
  2184. "error": "记录不存在或删除失败"
  2185. })
  2186. except Exception as e:
  2187. failed_ids.append(training_id)
  2188. failed_details.append({
  2189. "id": training_id,
  2190. "error": str(e)
  2191. })
  2192. # 获取删除后的总记录数
  2193. current_total = get_total_training_count()
  2194. return jsonify(success_response(
  2195. response_text="训练数据删除完成",
  2196. data={
  2197. "total_requested": len(ids),
  2198. "successfully_deleted": len(deleted_ids),
  2199. "failed_count": len(failed_ids),
  2200. "deleted_ids": deleted_ids,
  2201. "failed_ids": failed_ids,
  2202. "failed_details": failed_details,
  2203. "current_total_count": current_total
  2204. }
  2205. ))
  2206. except Exception as e:
  2207. print(f"[ERROR] training_data_delete执行失败: {str(e)}")
  2208. return jsonify(internal_error_response(
  2209. response_text="删除训练数据失败,请稍后重试"
  2210. )), 500
  2211. @app.flask_app.route('/api/v0/training_data/stats', methods=['GET'])
  2212. def training_data_stats():
  2213. """
  2214. 获取训练数据统计信息API
  2215. """
  2216. try:
  2217. training_data = vn.get_training_data()
  2218. if training_data is None or training_data.empty:
  2219. return jsonify(success_response(
  2220. response_text="统计信息获取成功",
  2221. data={
  2222. "total_count": 0,
  2223. "type_breakdown": {
  2224. "sql": 0,
  2225. "documentation": 0,
  2226. "ddl": 0,
  2227. "error_sql": 0
  2228. },
  2229. "type_percentages": {
  2230. "sql": 0.0,
  2231. "documentation": 0.0,
  2232. "ddl": 0.0,
  2233. "error_sql": 0.0
  2234. },
  2235. "last_updated": datetime.now().isoformat()
  2236. }
  2237. ))
  2238. total_count = len(training_data)
  2239. # 统计各类型数量
  2240. type_breakdown = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  2241. if 'training_data_type' in training_data.columns:
  2242. type_counts = training_data['training_data_type'].value_counts()
  2243. for data_type, count in type_counts.items():
  2244. if data_type in type_breakdown:
  2245. type_breakdown[data_type] = int(count)
  2246. # 计算百分比
  2247. type_percentages = {}
  2248. for data_type, count in type_breakdown.items():
  2249. type_percentages[data_type] = round(count / max(total_count, 1) * 100, 2)
  2250. return jsonify(success_response(
  2251. response_text="统计信息获取成功",
  2252. data={
  2253. "total_count": total_count,
  2254. "type_breakdown": type_breakdown,
  2255. "type_percentages": type_percentages,
  2256. "last_updated": datetime.now().isoformat()
  2257. }
  2258. ))
  2259. except Exception as e:
  2260. print(f"[ERROR] training_data_stats执行失败: {str(e)}")
  2261. return jsonify(internal_error_response(
  2262. response_text="获取统计信息失败,请稍后重试"
  2263. )), 500
  2264. @app.flask_app.route('/api/v0/cache_overview_full', methods=['GET'])
  2265. def cache_overview_full():
  2266. """获取所有缓存系统的综合概览"""
  2267. try:
  2268. from common.embedding_cache_manager import get_embedding_cache_manager
  2269. from common.vanna_instance import get_vanna_instance
  2270. # 获取现有的缓存统计
  2271. vanna_cache = get_vanna_instance()
  2272. # 直接使用应用中的缓存实例
  2273. cache = app.cache
  2274. cache_overview = {
  2275. "conversation_aware_cache": {
  2276. "enabled": True,
  2277. "total_items": len(cache.cache) if hasattr(cache, 'cache') else 0,
  2278. "sessions": list(cache.cache.keys()) if hasattr(cache, 'cache') else [],
  2279. "cache_type": type(cache).__name__
  2280. },
  2281. "question_answer_cache": redis_conversation_manager.get_qa_cache_stats() if redis_conversation_manager.is_available() else {"available": False},
  2282. "embedding_cache": get_embedding_cache_manager().get_cache_stats(),
  2283. "redis_conversation_stats": redis_conversation_manager.get_stats() if redis_conversation_manager.is_available() else None
  2284. }
  2285. return jsonify(success_response(
  2286. response_text="获取综合缓存概览成功",
  2287. data=cache_overview
  2288. ))
  2289. except Exception as e:
  2290. print(f"[ERROR] 获取综合缓存概览失败: {str(e)}")
  2291. return jsonify(internal_error_response(
  2292. response_text="获取缓存概览失败,请稍后重试"
  2293. )), 500
  2294. # 前端JavaScript示例 - 如何维持会话
  2295. """
  2296. // 前端需要维护一个会话ID
  2297. class ChatSession {
  2298. constructor() {
  2299. // 从localStorage获取或创建新的会话ID
  2300. this.sessionId = localStorage.getItem('chat_session_id') || this.generateSessionId();
  2301. localStorage.setItem('chat_session_id', this.sessionId);
  2302. }
  2303. generateSessionId() {
  2304. return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  2305. }
  2306. async askQuestion(question) {
  2307. const response = await fetch('/api/v0/ask', {
  2308. method: 'POST',
  2309. headers: {
  2310. 'Content-Type': 'application/json',
  2311. },
  2312. body: JSON.stringify({
  2313. question: question,
  2314. session_id: this.sessionId // 关键:传递会话ID
  2315. })
  2316. });
  2317. return await response.json();
  2318. }
  2319. // 开始新会话
  2320. startNewSession() {
  2321. this.sessionId = this.generateSessionId();
  2322. localStorage.setItem('chat_session_id', this.sessionId);
  2323. }
  2324. }
  2325. // 使用示例
  2326. const chatSession = new ChatSession();
  2327. chatSession.askQuestion("各年龄段客户的流失率如何?");
  2328. """
  2329. print("正在启动Flask应用: http://localhost:8084")
  2330. app.run(host="0.0.0.0", port=8084, debug=True)