citu_app.py 179 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552
  1. # 给dataops 对话助手返回结果
  2. # 初始化日志系统 - 必须在最前面
  3. from core.logging import initialize_logging, get_app_logger, set_log_context, clear_log_context
  4. initialize_logging()
  5. from vanna.flask import VannaFlaskApp
  6. from core.vanna_llm_factory import create_vanna_instance
  7. from flask import request, jsonify
  8. import pandas as pd
  9. import common.result as result
  10. from datetime import datetime, timedelta
  11. from common.session_aware_cache import ConversationAwareMemoryCache
  12. from app_config import API_MAX_RETURN_ROWS, ENABLE_RESULT_SUMMARY
  13. import re
  14. import chainlit as cl
  15. import json
  16. from flask import session # 添加session导入
  17. import sqlparse # 用于SQL语法检查
  18. from common.redis_conversation_manager import RedisConversationManager # 添加Redis对话管理器导入
  19. from common.qa_feedback_manager import QAFeedbackManager
  20. from common.result import ( # 统一导入所有需要的响应函数
  21. success_response, bad_request_response, not_found_response, internal_error_response,
  22. error_response, service_unavailable_response,
  23. agent_success_response, agent_error_response,
  24. validation_failed_response
  25. )
  26. from app_config import ( # 添加Redis相关配置导入
  27. USER_MAX_CONVERSATIONS,
  28. CONVERSATION_CONTEXT_COUNT,
  29. DEFAULT_ANONYMOUS_USER,
  30. ENABLE_QUESTION_ANSWER_CACHE
  31. )
  32. # 创建app logger
  33. logger = get_app_logger("CituApp")
  34. # 设置默认的最大返回行数
  35. DEFAULT_MAX_RETURN_ROWS = 200
  36. MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
  37. vn = create_vanna_instance()
  38. # 创建对话感知的缓存
  39. conversation_cache = ConversationAwareMemoryCache()
  40. # 实例化 VannaFlaskApp,使用自定义缓存
  41. app = VannaFlaskApp(
  42. vn,
  43. cache=conversation_cache, # 使用对话感知的缓存
  44. title="辞图智能数据问答平台",
  45. logo = "https://www.citupro.com/img/logo-black-2.png",
  46. subtitle="让 AI 为你写 SQL",
  47. chart=False,
  48. allow_llm_to_see_data=True,
  49. ask_results_correct=True,
  50. followup_questions=True,
  51. debug=True
  52. )
  53. # 创建Redis对话管理器实例
  54. redis_conversation_manager = RedisConversationManager()
  55. # 修改ask接口,支持前端传递conversation_id
  56. @app.flask_app.route('/api/v0/ask', methods=['POST'])
  57. def ask_full():
  58. req = request.get_json(force=True)
  59. question = req.get("question", None)
  60. conversation_id = req.get("conversation_id", None) # 前端传递的对话ID
  61. user_id = req.get("user_id", None) # 前端传递的用户ID
  62. if not question:
  63. from common.result import bad_request_response
  64. return jsonify(bad_request_response(
  65. response_text="缺少必需参数:question",
  66. missing_params=["question"]
  67. )), 400
  68. # 如果没有传递user_id,使用默认值guest
  69. if not user_id:
  70. user_id = "guest"
  71. # 如果前端没有传递conversation_id,则生成新的
  72. if not conversation_id:
  73. conversation_id = app.cache.generate_id(question=question, user_id=user_id)
  74. try:
  75. sql, df, _ = vn.ask(
  76. question=question,
  77. print_results=False,
  78. visualize=False,
  79. allow_llm_to_see_data=True
  80. )
  81. # 关键:检查是否有LLM解释性文本(无法生成SQL的情况)
  82. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  83. # 在解释性文本末尾添加提示语
  84. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  85. # 使用标准化错误响应
  86. from common.result import validation_failed_response
  87. return jsonify(validation_failed_response(
  88. response_text=explanation_message
  89. )), 422 # 修改HTTP状态码为422
  90. # 如果sql为None但没有解释性文本,返回通用错误
  91. if sql is None:
  92. from common.result import validation_failed_response
  93. return jsonify(validation_failed_response(
  94. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  95. )), 422
  96. # 处理返回数据 - 使用新的query_result结构
  97. query_result = {
  98. "rows": [],
  99. "columns": [],
  100. "row_count": 0,
  101. "is_limited": False,
  102. "total_row_count": 0
  103. }
  104. summary = None
  105. if isinstance(df, pd.DataFrame):
  106. query_result["columns"] = list(df.columns)
  107. if not df.empty:
  108. total_rows = len(df)
  109. limited_df = df.head(MAX_RETURN_ROWS)
  110. query_result["rows"] = limited_df.to_dict(orient="records")
  111. query_result["row_count"] = len(limited_df)
  112. query_result["total_row_count"] = total_rows
  113. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  114. # 生成数据摘要(可通过配置控制,仅在有数据时生成)
  115. if ENABLE_RESULT_SUMMARY:
  116. try:
  117. summary = vn.generate_summary(question=question, df=df)
  118. logger.info(f"成功生成摘要: {summary}")
  119. except Exception as e:
  120. logger.warning(f"生成摘要失败: {str(e)}")
  121. summary = None
  122. # 构建返回数据
  123. response_data = {
  124. "sql": sql,
  125. "query_result": query_result,
  126. "conversation_id": conversation_id
  127. }
  128. # 添加摘要(如果启用且生成成功)
  129. if ENABLE_RESULT_SUMMARY and summary is not None:
  130. response_data["summary"] = summary
  131. response_data["response"] = summary # 同时添加response字段
  132. from common.result import success_response
  133. return jsonify(success_response(
  134. response_text="查询执行完成" if summary is None else None,
  135. data=response_data
  136. ))
  137. except Exception as e:
  138. logger.error(f"ask_full执行失败: {str(e)}")
  139. # 即使发生异常,也检查是否有业务层面的解释
  140. if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  141. # 在解释性文本末尾添加提示语
  142. explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
  143. from common.result import validation_failed_response
  144. return jsonify(validation_failed_response(
  145. response_text=explanation_message
  146. )), 422
  147. else:
  148. # 技术错误,使用500错误码
  149. from common.result import internal_error_response
  150. return jsonify(internal_error_response(
  151. response_text="查询处理失败,请稍后重试"
  152. )), 500
  153. @app.flask_app.route('/api/v0/citu_run_sql', methods=['POST'])
  154. def citu_run_sql():
  155. req = request.get_json(force=True)
  156. sql = req.get('sql')
  157. if not sql:
  158. from common.result import bad_request_response
  159. return jsonify(bad_request_response(
  160. response_text="缺少必需参数:sql",
  161. missing_params=["sql"]
  162. )), 400
  163. try:
  164. df = vn.run_sql(sql)
  165. # 处理返回数据 - 使用新的query_result结构
  166. query_result = {
  167. "rows": [],
  168. "columns": [],
  169. "row_count": 0,
  170. "is_limited": False,
  171. "total_row_count": 0
  172. }
  173. if isinstance(df, pd.DataFrame):
  174. query_result["columns"] = list(df.columns)
  175. if not df.empty:
  176. total_rows = len(df)
  177. limited_df = df.head(MAX_RETURN_ROWS)
  178. query_result["rows"] = limited_df.to_dict(orient="records")
  179. query_result["row_count"] = len(limited_df)
  180. query_result["total_row_count"] = total_rows
  181. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  182. from common.result import success_response
  183. return jsonify(success_response(
  184. response_text=f"SQL执行完成,共返回 {query_result['total_row_count']} 条记录" +
  185. (f",已限制显示前 {MAX_RETURN_ROWS} 条" if query_result["is_limited"] else ""),
  186. data={
  187. "sql": sql,
  188. "query_result": query_result
  189. }
  190. ))
  191. except Exception as e:
  192. logger.error(f"citu_run_sql执行失败: {str(e)}")
  193. from common.result import internal_error_response
  194. return jsonify(internal_error_response(
  195. response_text=f"SQL执行失败,请检查SQL语句是否正确"
  196. )), 500
  197. @app.flask_app.route('/api/v0/ask_cached', methods=['POST'])
  198. def ask_cached():
  199. """
  200. 带缓存功能的智能查询接口
  201. 支持会话管理和结果缓存,提高查询效率
  202. """
  203. req = request.get_json(force=True)
  204. question = req.get("question", None)
  205. conversation_id = req.get("conversation_id", None)
  206. user_id = req.get("user_id", None)
  207. if not question:
  208. from common.result import bad_request_response
  209. return jsonify(bad_request_response(
  210. response_text="缺少必需参数:question",
  211. missing_params=["question"]
  212. )), 400
  213. # 如果没有传递user_id,使用默认值guest
  214. if not user_id:
  215. user_id = "guest"
  216. try:
  217. # 生成conversation_id
  218. # 调试:查看generate_id的实际行为
  219. logger.debug(f"输入问题: '{question}'")
  220. conversation_id = app.cache.generate_id(question=question, user_id=user_id)
  221. logger.debug(f"生成的conversation_id: {conversation_id}")
  222. # 再次用相同问题测试
  223. conversation_id2 = app.cache.generate_id(question=question, user_id=user_id)
  224. logger.debug(f"再次生成的conversation_id: {conversation_id2}")
  225. logger.debug(f"两次ID是否相同: {conversation_id == conversation_id2}")
  226. # 检查缓存
  227. cached_sql = app.cache.get(id=conversation_id, field="sql")
  228. if cached_sql is not None:
  229. # 缓存命中
  230. logger.info(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
  231. sql = cached_sql
  232. df = app.cache.get(id=conversation_id, field="df")
  233. summary = app.cache.get(id=conversation_id, field="summary")
  234. else:
  235. # 缓存未命中,执行新查询
  236. logger.info(f"[CACHE MISS] 执行新查询: {conversation_id}")
  237. sql, df, _ = vn.ask(
  238. question=question,
  239. print_results=False,
  240. visualize=False,
  241. allow_llm_to_see_data=True
  242. )
  243. # 检查是否有LLM解释性文本(无法生成SQL的情况)
  244. if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
  245. # 在解释性文本末尾添加提示语
  246. explanation_message = vn.last_llm_explanation + "请尝试用其它方式提问。"
  247. from common.result import validation_failed_response
  248. return jsonify(validation_failed_response(
  249. response_text=explanation_message
  250. )), 422
  251. # 如果sql为None但没有解释性文本,返回通用错误
  252. if sql is None:
  253. from common.result import validation_failed_response
  254. return jsonify(validation_failed_response(
  255. response_text="无法生成SQL查询,请检查问题描述或数据表结构"
  256. )), 422
  257. # 缓存结果
  258. app.cache.set(id=conversation_id, field="question", value=question)
  259. app.cache.set(id=conversation_id, field="sql", value=sql)
  260. app.cache.set(id=conversation_id, field="df", value=df)
  261. # 生成并缓存摘要(可通过配置控制,仅在有数据时生成)
  262. summary = None
  263. if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
  264. try:
  265. summary = vn.generate_summary(question=question, df=df)
  266. logger.info(f"成功生成摘要: {summary}")
  267. except Exception as e:
  268. logger.warning(f"生成摘要失败: {str(e)}")
  269. summary = None
  270. app.cache.set(id=conversation_id, field="summary", value=summary)
  271. # 处理返回数据 - 使用新的query_result结构
  272. query_result = {
  273. "rows": [],
  274. "columns": [],
  275. "row_count": 0,
  276. "is_limited": False,
  277. "total_row_count": 0
  278. }
  279. if isinstance(df, pd.DataFrame):
  280. query_result["columns"] = list(df.columns)
  281. if not df.empty:
  282. total_rows = len(df)
  283. limited_df = df.head(MAX_RETURN_ROWS)
  284. query_result["rows"] = limited_df.to_dict(orient="records")
  285. query_result["row_count"] = len(limited_df)
  286. query_result["total_row_count"] = total_rows
  287. query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
  288. # 构建返回数据
  289. response_data = {
  290. "sql": sql,
  291. "query_result": query_result,
  292. "conversation_id": conversation_id,
  293. "cached": cached_sql is not None # 标识是否来自缓存
  294. }
  295. # 添加摘要(如果启用且生成成功)
  296. if ENABLE_RESULT_SUMMARY and summary is not None:
  297. response_data["summary"] = summary
  298. response_data["response"] = summary # 同时添加response字段
  299. from common.result import success_response
  300. return jsonify(success_response(
  301. response_text="查询执行完成" if summary is None else None,
  302. data=response_data
  303. ))
  304. except Exception as e:
  305. logger.error(f"ask_cached执行失败: {str(e)}")
  306. from common.result import internal_error_response
  307. return jsonify(internal_error_response(
  308. response_text="查询处理失败,请稍后重试"
  309. )), 500
  310. @app.flask_app.route('/api/v0/citu_train_question_sql', methods=['POST'])
  311. def citu_train_question_sql():
  312. """
  313. 训练问题-SQL对接口
  314. 此API将接收的question/sql pair写入到training库中,用于训练和改进AI模型。
  315. 支持仅传入SQL或同时传入问题和SQL进行训练。
  316. Args:
  317. question (str, optional): 用户问题
  318. sql (str, required): 对应的SQL查询语句
  319. Returns:
  320. JSON: 包含训练ID和成功消息的响应
  321. """
  322. try:
  323. req = request.get_json(force=True)
  324. question = req.get('question')
  325. sql = req.get('sql')
  326. if not sql:
  327. from common.result import bad_request_response
  328. return jsonify(bad_request_response(
  329. response_text="缺少必需参数:sql",
  330. missing_params=["sql"]
  331. )), 400
  332. # 正确的调用方式:同时传递question和sql
  333. if question:
  334. training_id = vn.train(question=question, sql=sql)
  335. logger.info(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
  336. else:
  337. training_id = vn.train(sql=sql)
  338. logger.info(f"训练成功,训练ID为:{training_id},SQL:{sql}")
  339. from common.result import success_response
  340. return jsonify(success_response(
  341. response_text="问题-SQL对训练成功",
  342. data={
  343. "training_id": training_id,
  344. "message": "Question-SQL pair trained successfully"
  345. }
  346. ))
  347. except Exception as e:
  348. from common.result import internal_error_response
  349. return jsonify(internal_error_response(
  350. response_text="训练失败,请稍后重试"
  351. )), 500
  352. # ============ LangGraph Agent 集成 ============
  353. # 全局Agent实例(单例模式)
  354. citu_langraph_agent = None
  355. def get_citu_langraph_agent():
  356. """获取LangGraph Agent实例(懒加载)"""
  357. global citu_langraph_agent
  358. if citu_langraph_agent is None:
  359. try:
  360. from agent.citu_agent import CituLangGraphAgent
  361. logger.info("开始创建LangGraph Agent实例...")
  362. citu_langraph_agent = CituLangGraphAgent()
  363. logger.info("LangGraph Agent实例创建成功")
  364. except ImportError as e:
  365. logger.critical(f"Agent模块导入失败: {str(e)}")
  366. logger.critical("请检查agent模块是否存在以及依赖是否正确安装")
  367. raise Exception(f"Agent模块导入失败: {str(e)}")
  368. except Exception as e:
  369. logger.critical(f"LangGraph Agent实例创建失败: {str(e)}")
  370. logger.critical(f"错误类型: {type(e).__name__}")
  371. # 提供更有用的错误信息
  372. if "config" in str(e).lower():
  373. logger.critical("可能是配置文件问题,请检查配置")
  374. elif "llm" in str(e).lower():
  375. logger.critical("可能是LLM连接问题,请检查LLM配置")
  376. elif "tool" in str(e).lower():
  377. logger.critical("可能是工具加载问题,请检查工具模块")
  378. raise Exception(f"Agent初始化失败: {str(e)}")
  379. return citu_langraph_agent
  380. @app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
  381. def ask_agent():
  382. """
  383. 支持对话上下文的ask_agent API - 修正版
  384. """
  385. req = request.get_json(force=True)
  386. question = req.get("question", None)
  387. conversation_id_input = req.get("conversation_id", None)
  388. # 新增参数解析
  389. user_id_input = req.get("user_id", None)
  390. continue_conversation = req.get("continue_conversation", False)
  391. # 新增:路由模式参数解析和验证
  392. api_routing_mode = req.get("routing_mode", None)
  393. VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
  394. if not question:
  395. return jsonify(bad_request_response(
  396. response_text="缺少必需参数:question",
  397. missing_params=["question"]
  398. )), 400
  399. # 验证routing_mode参数
  400. if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
  401. return jsonify(bad_request_response(
  402. response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
  403. invalid_params=["routing_mode"]
  404. )), 400
  405. try:
  406. # 1. 获取登录用户ID(修正:在函数中获取session信息)
  407. login_user_id = session.get('user_id') if 'user_id' in session else None
  408. # 2. 用户ID和对话ID一致性校验
  409. from common.session_aware_cache import ConversationAwareMemoryCache
  410. # 2.1 如果传递了conversation_id,从中解析user_id
  411. extracted_user_id = None
  412. if conversation_id_input:
  413. extracted_user_id = ConversationAwareMemoryCache.extract_user_id(conversation_id_input)
  414. # 如果同时传递了user_id和conversation_id,进行一致性校验
  415. if user_id_input:
  416. is_valid, error_msg = ConversationAwareMemoryCache.validate_user_id_consistency(
  417. conversation_id_input, user_id_input
  418. )
  419. if not is_valid:
  420. return jsonify(bad_request_response(
  421. response_text=error_msg,
  422. invalid_params=["user_id", "conversation_id"]
  423. )), 400
  424. # 如果没有传递user_id,但有conversation_id,则从conversation_id中解析
  425. elif not user_id_input and extracted_user_id:
  426. user_id_input = extracted_user_id
  427. logger.info(f"从conversation_id解析出user_id: {user_id_input}")
  428. # 2.2 如果没有传递user_id,使用默认值guest
  429. if not user_id_input:
  430. user_id_input = "guest"
  431. logger.info("未传递user_id,使用默认值: guest")
  432. # 3. 智能ID解析(修正:传入登录用户ID)
  433. user_id = redis_conversation_manager.resolve_user_id(
  434. user_id_input, None, request.remote_addr, login_user_id
  435. )
  436. conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
  437. user_id, conversation_id_input, continue_conversation
  438. )
  439. # 3. 获取上下文和上下文类型(提前到缓存检查之前)
  440. context = redis_conversation_manager.get_context(conversation_id)
  441. # 获取上下文类型:从最后一条助手消息的metadata中获取类型
  442. context_type = None
  443. if context:
  444. try:
  445. # 获取最后一条助手消息的metadata
  446. messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit=10)
  447. for message in reversed(messages): # 从最新的开始找
  448. if message.get("role") == "assistant":
  449. metadata = message.get("metadata", {})
  450. context_type = metadata.get("type")
  451. if context_type:
  452. logger.info(f"[AGENT_API] 检测到上下文类型: {context_type}")
  453. break
  454. except Exception as e:
  455. logger.warning(f"获取上下文类型失败: {str(e)}")
  456. # 4. 检查缓存(新逻辑:放宽使用条件,严控存储条件)
  457. cached_answer = redis_conversation_manager.get_cached_answer(question, context)
  458. if cached_answer:
  459. logger.info(f"[AGENT_API] 使用缓存答案")
  460. # 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
  461. cached_response_type = cached_answer.get("type", "UNKNOWN")
  462. if cached_response_type == "DATABASE":
  463. # DATABASE类型:按优先级选择内容
  464. if cached_answer.get("response"):
  465. # 优先级1:错误或解释性回复(如SQL生成失败)
  466. assistant_response = cached_answer.get("response")
  467. elif cached_answer.get("summary"):
  468. # 优先级2:查询成功的摘要
  469. assistant_response = cached_answer.get("summary")
  470. elif cached_answer.get("query_result"):
  471. # 优先级3:构造简单描述
  472. query_result = cached_answer.get("query_result")
  473. row_count = query_result.get("row_count", 0)
  474. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  475. else:
  476. # 异常情况
  477. assistant_response = "数据库查询已处理。"
  478. else:
  479. # CHAT类型:直接使用response
  480. assistant_response = cached_answer.get("response", "")
  481. # 更新对话历史
  482. redis_conversation_manager.save_message(conversation_id, "user", question)
  483. redis_conversation_manager.save_message(
  484. conversation_id, "assistant",
  485. assistant_response,
  486. metadata={"from_cache": True}
  487. )
  488. # 添加对话信息到缓存结果
  489. cached_answer["conversation_id"] = conversation_id
  490. cached_answer["user_id"] = user_id
  491. cached_answer["from_cache"] = True
  492. cached_answer.update(conversation_status)
  493. # 使用agent_success_response返回标准格式
  494. return jsonify(agent_success_response(
  495. response_type=cached_answer.get("type", "UNKNOWN"),
  496. response=cached_answer.get("response", ""), # 修正:使用response而不是response_text
  497. sql=cached_answer.get("sql"),
  498. records=cached_answer.get("query_result"), # 修改:query_result改为records
  499. summary=cached_answer.get("summary"),
  500. conversation_id=conversation_id,
  501. execution_path=cached_answer.get("execution_path", []),
  502. classification_info=cached_answer.get("classification_info", {}),
  503. user_id=user_id,
  504. context_used=bool(context),
  505. from_cache=True,
  506. conversation_status=conversation_status["status"],
  507. requested_conversation_id=conversation_status.get("requested_id")
  508. ))
  509. # 5. 保存用户消息
  510. redis_conversation_manager.save_message(conversation_id, "user", question)
  511. # 6. 构建带上下文的问题
  512. if context:
  513. enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
  514. logger.info(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
  515. else:
  516. enhanced_question = question
  517. logger.info(f"[AGENT_API] 新对话,无上下文")
  518. # 7. 确定最终使用的路由模式(优先级逻辑)
  519. if api_routing_mode:
  520. # API传了参数,优先使用
  521. effective_routing_mode = api_routing_mode
  522. logger.info(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
  523. else:
  524. # API没传参数,使用配置文件
  525. try:
  526. from app_config import QUESTION_ROUTING_MODE
  527. effective_routing_mode = QUESTION_ROUTING_MODE
  528. logger.info(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
  529. except ImportError:
  530. effective_routing_mode = "hybrid"
  531. logger.info(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
  532. # 8. 现有Agent处理逻辑(修改为传递路由模式)
  533. try:
  534. agent = get_citu_langraph_agent()
  535. except Exception as e:
  536. logger.critical(f"Agent初始化失败: {str(e)}")
  537. return jsonify(service_unavailable_response(
  538. response_text="AI服务暂时不可用,请稍后重试",
  539. can_retry=True
  540. )), 503
  541. # 异步调用Agent处理问题
  542. import asyncio
  543. agent_result = asyncio.run(agent.process_question(
  544. question=enhanced_question, # 使用增强后的问题
  545. conversation_id=conversation_id,
  546. context_type=context_type, # 传递上下文类型
  547. routing_mode=effective_routing_mode # 新增:传递路由模式
  548. ))
  549. # 8. 处理Agent结果
  550. if agent_result.get("success", False):
  551. # 修正:直接从agent_result获取字段,因为它就是final_response
  552. response_type = agent_result.get("type", "UNKNOWN")
  553. response_text = agent_result.get("response", "")
  554. sql = agent_result.get("sql")
  555. query_result = agent_result.get("query_result")
  556. summary = agent_result.get("summary")
  557. execution_path = agent_result.get("execution_path", [])
  558. classification_info = agent_result.get("classification_info", {})
  559. # 确定助手回复内容的优先级
  560. if response_type == "DATABASE":
  561. # DATABASE类型:按优先级选择内容
  562. if response_text:
  563. # 优先级1:错误或解释性回复(如SQL生成失败)
  564. assistant_response = response_text
  565. elif summary:
  566. # 优先级2:查询成功的摘要
  567. assistant_response = summary
  568. elif query_result:
  569. # 优先级3:构造简单描述
  570. row_count = query_result.get("row_count", 0)
  571. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  572. else:
  573. # 异常情况
  574. assistant_response = "数据库查询已处理。"
  575. else:
  576. # CHAT类型:直接使用response
  577. assistant_response = response_text
  578. # 保存助手回复
  579. redis_conversation_manager.save_message(
  580. conversation_id, "assistant", assistant_response,
  581. metadata={
  582. "type": response_type,
  583. "sql": sql,
  584. "execution_path": execution_path
  585. }
  586. )
  587. # 缓存成功的答案(新逻辑:只缓存无上下文的问答)
  588. # 直接缓存agent_result,它已经包含所有需要的字段
  589. redis_conversation_manager.cache_answer(question, agent_result, context)
  590. # 使用agent_success_response的正确方式
  591. return jsonify(agent_success_response(
  592. response_type=response_type,
  593. response=response_text, # 修正:使用response而不是response_text
  594. sql=sql,
  595. records=query_result, # 修改:query_result改为records
  596. summary=summary,
  597. conversation_id=conversation_id,
  598. execution_path=execution_path,
  599. classification_info=classification_info,
  600. user_id=user_id,
  601. context_used=bool(context),
  602. from_cache=False,
  603. conversation_status=conversation_status["status"],
  604. requested_conversation_id=conversation_status.get("requested_id"),
  605. routing_mode_used=effective_routing_mode, # 新增:实际使用的路由模式
  606. routing_mode_source="api" if api_routing_mode else "config" # 新增:路由模式来源
  607. ))
  608. else:
  609. # 错误处理(修正:确保使用现有的错误响应格式)
  610. error_message = agent_result.get("error", "Agent处理失败")
  611. error_code = agent_result.get("error_code", 500)
  612. return jsonify(agent_error_response(
  613. response_text=error_message,
  614. error_type="agent_processing_failed",
  615. code=error_code,
  616. conversation_id=conversation_id,
  617. user_id=user_id
  618. )), error_code
  619. except Exception as e:
  620. logger.error(f"ask_agent执行失败: {str(e)}")
  621. return jsonify(internal_error_response(
  622. response_text="查询处理失败,请稍后重试"
  623. )), 500
  624. @app.flask_app.route('/api/v0/agent_health', methods=['GET'])
  625. def agent_health():
  626. """
  627. Agent健康检查接口
  628. 响应格式:
  629. {
  630. "success": true/false,
  631. "code": 200/503,
  632. "message": "healthy/degraded/unhealthy",
  633. "data": {
  634. "status": "healthy/degraded/unhealthy",
  635. "test_result": true/false,
  636. "workflow_compiled": true/false,
  637. "tools_count": 4,
  638. "message": "详细信息",
  639. "timestamp": "2024-01-01T12:00:00",
  640. "checks": {
  641. "agent_creation": true/false,
  642. "tools_import": true/false,
  643. "llm_connection": true/false,
  644. "classifier_ready": true/false
  645. }
  646. }
  647. }
  648. """
  649. try:
  650. # 基础健康检查
  651. health_data = {
  652. "status": "unknown",
  653. "test_result": False,
  654. "workflow_compiled": False,
  655. "tools_count": 0,
  656. "message": "",
  657. "timestamp": datetime.now().isoformat(),
  658. "checks": {
  659. "agent_creation": False,
  660. "tools_import": False,
  661. "llm_connection": False,
  662. "classifier_ready": False
  663. }
  664. }
  665. # 检查1: Agent创建
  666. try:
  667. agent = get_citu_langraph_agent()
  668. health_data["checks"]["agent_creation"] = True
  669. # 修正:Agent现在是动态创建workflow的,不再有预创建的workflow属性
  670. health_data["workflow_compiled"] = True # 动态创建,始终可用
  671. health_data["tools_count"] = len(agent.tools) if hasattr(agent, 'tools') else 0
  672. except Exception as e:
  673. health_data["message"] = f"Agent创建失败: {str(e)}"
  674. health_data["status"] = "unhealthy" # 设置状态
  675. from common.result import health_error_response
  676. return jsonify(health_error_response(**health_data)), 503
  677. # 检查2: 工具导入
  678. try:
  679. from agent.tools import TOOLS
  680. health_data["checks"]["tools_import"] = len(TOOLS) > 0
  681. except Exception as e:
  682. health_data["message"] = f"工具导入失败: {str(e)}"
  683. # 检查3: LLM连接(简单测试)
  684. try:
  685. from agent.tools.utils import get_compatible_llm
  686. llm = get_compatible_llm()
  687. health_data["checks"]["llm_connection"] = llm is not None
  688. except Exception as e:
  689. health_data["message"] = f"LLM连接失败: {str(e)}"
  690. # 检查4: 分类器准备
  691. try:
  692. from agent.classifier import QuestionClassifier
  693. classifier = QuestionClassifier()
  694. health_data["checks"]["classifier_ready"] = True
  695. except Exception as e:
  696. health_data["message"] = f"分类器失败: {str(e)}"
  697. # 检查5: 完整流程测试(可选)
  698. try:
  699. if all(health_data["checks"].values()):
  700. import asyncio
  701. # 异步调用健康检查
  702. test_result = asyncio.run(agent.health_check())
  703. health_data["test_result"] = test_result.get("status") == "healthy"
  704. health_data["status"] = test_result.get("status", "unknown")
  705. health_data["message"] = test_result.get("message", "健康检查完成")
  706. else:
  707. health_data["status"] = "degraded"
  708. health_data["message"] = "部分组件异常"
  709. except Exception as e:
  710. logger.error(f"健康检查异常: {str(e)}")
  711. import traceback
  712. logger.error(f"详细健康检查错误: {traceback.format_exc()}")
  713. health_data["status"] = "degraded"
  714. health_data["message"] = f"完整测试失败: {str(e)}"
  715. # 根据状态返回相应的HTTP代码 - 使用标准化健康检查响应
  716. from common.result import health_success_response, health_error_response
  717. if health_data["status"] == "healthy":
  718. return jsonify(health_success_response(**health_data))
  719. elif health_data["status"] == "degraded":
  720. return jsonify(health_error_response(**health_data)), 503
  721. else:
  722. # 确保状态设置为unhealthy
  723. health_data["status"] = "unhealthy"
  724. return jsonify(health_error_response(**health_data)), 503
  725. except Exception as e:
  726. logger.error(f"顶层健康检查异常: {str(e)}")
  727. import traceback
  728. logger.error(f"详细错误信息: {traceback.format_exc()}")
  729. from common.result import internal_error_response
  730. return jsonify(internal_error_response(
  731. response_text="健康检查失败,请稍后重试"
  732. )), 500
  733. # ==================== 日常管理API ====================
  734. @app.flask_app.route('/api/v0/cache_overview', methods=['GET'])
  735. def cache_overview():
  736. """日常管理:轻量概览 - 合并原cache_inspect的核心功能"""
  737. try:
  738. cache = app.cache
  739. result_data = {
  740. 'overview_summary': {
  741. 'total_conversations': 0,
  742. 'total_sessions': 0,
  743. 'query_time': datetime.now().isoformat()
  744. },
  745. 'recent_conversations': [], # 最近的对话
  746. 'session_summary': [] # 会话摘要
  747. }
  748. if hasattr(cache, 'cache') and isinstance(cache.cache, dict):
  749. result_data['overview_summary']['total_conversations'] = len(cache.cache)
  750. # 获取会话信息
  751. if hasattr(cache, 'get_all_sessions'):
  752. all_sessions = cache.get_all_sessions()
  753. result_data['overview_summary']['total_sessions'] = len(all_sessions)
  754. # 会话摘要(按最近活动排序)
  755. session_list = []
  756. for session_id, session_data in all_sessions.items():
  757. session_summary = {
  758. 'session_id': session_id,
  759. 'start_time': session_data['start_time'].isoformat(),
  760. 'conversation_count': session_data.get('conversation_count', 0),
  761. 'duration_seconds': session_data.get('session_duration_seconds', 0),
  762. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  763. 'is_active': (datetime.now() - session_data.get('last_activity', session_data['start_time'])).total_seconds() < 1800 # 30分钟内活跃
  764. }
  765. session_list.append(session_summary)
  766. # 按最后活动时间排序
  767. session_list.sort(key=lambda x: x['last_activity'], reverse=True)
  768. result_data['session_summary'] = session_list
  769. # 最近的对话(最多显示10个)
  770. conversation_list = []
  771. for conversation_id, conversation_data in cache.cache.items():
  772. conversation_start_time = cache.conversation_start_times.get(conversation_id)
  773. conversation_info = {
  774. 'conversation_id': conversation_id,
  775. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  776. 'session_id': cache.conversation_to_session.get(conversation_id),
  777. 'has_question': 'question' in conversation_data,
  778. 'has_sql': 'sql' in conversation_data,
  779. 'has_data': 'df' in conversation_data and conversation_data['df'] is not None,
  780. 'question_preview': conversation_data.get('question', '')[:80] + '...' if len(conversation_data.get('question', '')) > 80 else conversation_data.get('question', ''),
  781. }
  782. # 计算对话持续时间
  783. if conversation_start_time:
  784. duration = datetime.now() - conversation_start_time
  785. conversation_info['conversation_duration_seconds'] = duration.total_seconds()
  786. conversation_list.append(conversation_info)
  787. # 按对话开始时间排序,显示最新的10个
  788. conversation_list.sort(key=lambda x: x['conversation_start_time'] or '', reverse=True)
  789. result_data['recent_conversations'] = conversation_list[:10]
  790. from common.result import success_response
  791. return jsonify(success_response(
  792. response_text="缓存概览查询完成",
  793. data=result_data
  794. ))
  795. except Exception as e:
  796. from common.result import internal_error_response
  797. return jsonify(internal_error_response(
  798. response_text="获取缓存概览失败,请稍后重试"
  799. )), 500
  800. @app.flask_app.route('/api/v0/cache_stats', methods=['GET'])
  801. def cache_stats():
  802. """日常管理:统计信息 - 合并原session_stats和cache_stats功能"""
  803. try:
  804. cache = app.cache
  805. current_time = datetime.now()
  806. stats = {
  807. 'basic_stats': {
  808. 'total_sessions': len(getattr(cache, 'session_info', {})),
  809. 'total_conversations': len(getattr(cache, 'cache', {})),
  810. 'active_sessions': 0, # 最近30分钟有活动
  811. 'average_conversations_per_session': 0
  812. },
  813. 'time_distribution': {
  814. 'sessions': {
  815. 'last_1_hour': 0,
  816. 'last_6_hours': 0,
  817. 'last_24_hours': 0,
  818. 'last_7_days': 0,
  819. 'older': 0
  820. },
  821. 'conversations': {
  822. 'last_1_hour': 0,
  823. 'last_6_hours': 0,
  824. 'last_24_hours': 0,
  825. 'last_7_days': 0,
  826. 'older': 0
  827. }
  828. },
  829. 'session_details': [],
  830. 'time_ranges': {
  831. 'oldest_session': None,
  832. 'newest_session': None,
  833. 'oldest_conversation': None,
  834. 'newest_conversation': None
  835. }
  836. }
  837. # 会话统计
  838. if hasattr(cache, 'session_info'):
  839. session_times = []
  840. total_conversations = 0
  841. for session_id, session_data in cache.session_info.items():
  842. start_time = session_data['start_time']
  843. session_times.append(start_time)
  844. conversation_count = len(session_data.get('conversations', []))
  845. total_conversations += conversation_count
  846. # 检查活跃状态
  847. last_activity = session_data.get('last_activity', session_data['start_time'])
  848. if (current_time - last_activity).total_seconds() < 1800:
  849. stats['basic_stats']['active_sessions'] += 1
  850. # 时间分布统计
  851. age_hours = (current_time - start_time).total_seconds() / 3600
  852. if age_hours <= 1:
  853. stats['time_distribution']['sessions']['last_1_hour'] += 1
  854. elif age_hours <= 6:
  855. stats['time_distribution']['sessions']['last_6_hours'] += 1
  856. elif age_hours <= 24:
  857. stats['time_distribution']['sessions']['last_24_hours'] += 1
  858. elif age_hours <= 168: # 7 days
  859. stats['time_distribution']['sessions']['last_7_days'] += 1
  860. else:
  861. stats['time_distribution']['sessions']['older'] += 1
  862. # 会话详细信息
  863. session_duration = current_time - start_time
  864. stats['session_details'].append({
  865. 'session_id': session_id,
  866. 'start_time': start_time.isoformat(),
  867. 'last_activity': last_activity.isoformat(),
  868. 'conversation_count': conversation_count,
  869. 'duration_seconds': session_duration.total_seconds(),
  870. 'duration_formatted': str(session_duration),
  871. 'is_active': (current_time - last_activity).total_seconds() < 1800,
  872. 'browser_session_id': session_data.get('browser_session_id')
  873. })
  874. # 计算平均值
  875. if len(cache.session_info) > 0:
  876. stats['basic_stats']['average_conversations_per_session'] = total_conversations / len(cache.session_info)
  877. # 时间范围
  878. if session_times:
  879. stats['time_ranges']['oldest_session'] = min(session_times).isoformat()
  880. stats['time_ranges']['newest_session'] = max(session_times).isoformat()
  881. # 对话统计
  882. if hasattr(cache, 'conversation_start_times'):
  883. conversation_times = []
  884. for conv_time in cache.conversation_start_times.values():
  885. conversation_times.append(conv_time)
  886. age_hours = (current_time - conv_time).total_seconds() / 3600
  887. if age_hours <= 1:
  888. stats['time_distribution']['conversations']['last_1_hour'] += 1
  889. elif age_hours <= 6:
  890. stats['time_distribution']['conversations']['last_6_hours'] += 1
  891. elif age_hours <= 24:
  892. stats['time_distribution']['conversations']['last_24_hours'] += 1
  893. elif age_hours <= 168:
  894. stats['time_distribution']['conversations']['last_7_days'] += 1
  895. else:
  896. stats['time_distribution']['conversations']['older'] += 1
  897. if conversation_times:
  898. stats['time_ranges']['oldest_conversation'] = min(conversation_times).isoformat()
  899. stats['time_ranges']['newest_conversation'] = max(conversation_times).isoformat()
  900. # 按最近活动排序会话详情
  901. stats['session_details'].sort(key=lambda x: x['last_activity'], reverse=True)
  902. from common.result import success_response
  903. return jsonify(success_response(
  904. response_text="缓存统计信息查询完成",
  905. data=stats
  906. ))
  907. except Exception as e:
  908. from common.result import internal_error_response
  909. return jsonify(internal_error_response(
  910. response_text="获取缓存统计失败,请稍后重试"
  911. )), 500
  912. # ==================== 高级功能API ====================
  913. @app.flask_app.route('/api/v0/cache_export', methods=['GET'])
  914. def cache_export():
  915. """高级功能:完整导出 - 保持原cache_raw_export的完整功能"""
  916. try:
  917. cache = app.cache
  918. # 验证缓存的实际结构
  919. if not hasattr(cache, 'cache'):
  920. from common.result import internal_error_response
  921. return jsonify(internal_error_response(
  922. response_text="缓存对象结构异常,请联系系统管理员"
  923. )), 500
  924. if not isinstance(cache.cache, dict):
  925. from common.result import internal_error_response
  926. return jsonify(internal_error_response(
  927. response_text="缓存数据类型异常,请联系系统管理员"
  928. )), 500
  929. # 定义JSON序列化辅助函数
  930. def make_json_serializable(obj):
  931. """将对象转换为JSON可序列化的格式"""
  932. if obj is None:
  933. return None
  934. elif isinstance(obj, (str, int, float, bool)):
  935. return obj
  936. elif isinstance(obj, (list, tuple)):
  937. return [make_json_serializable(item) for item in obj]
  938. elif isinstance(obj, dict):
  939. return {str(k): make_json_serializable(v) for k, v in obj.items()}
  940. elif hasattr(obj, 'isoformat'): # datetime objects
  941. return obj.isoformat()
  942. elif hasattr(obj, 'item'): # numpy scalars
  943. return obj.item()
  944. elif hasattr(obj, 'tolist'): # numpy arrays
  945. return obj.tolist()
  946. elif hasattr(obj, '__dict__'): # pandas dtypes and other objects
  947. return str(obj)
  948. else:
  949. return str(obj)
  950. # 获取完整的原始缓存数据
  951. raw_cache = cache.cache
  952. # 获取会话和对话时间信息
  953. conversation_times = getattr(cache, 'conversation_start_times', {})
  954. session_info = getattr(cache, 'session_info', {})
  955. conversation_to_session = getattr(cache, 'conversation_to_session', {})
  956. export_data = {
  957. 'export_metadata': {
  958. 'export_time': datetime.now().isoformat(),
  959. 'total_conversations': len(raw_cache),
  960. 'total_sessions': len(session_info),
  961. 'cache_type': type(cache).__name__,
  962. 'cache_object_info': str(cache),
  963. 'has_session_times': bool(session_info),
  964. 'has_conversation_times': bool(conversation_times)
  965. },
  966. 'session_info': {
  967. session_id: {
  968. 'start_time': session_data['start_time'].isoformat(),
  969. 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
  970. 'conversations': session_data['conversations'],
  971. 'conversation_count': len(session_data['conversations']),
  972. 'browser_session_id': session_data.get('browser_session_id'),
  973. 'user_info': session_data.get('user_info', {})
  974. }
  975. for session_id, session_data in session_info.items()
  976. },
  977. 'conversation_times': {
  978. conversation_id: start_time.isoformat()
  979. for conversation_id, start_time in conversation_times.items()
  980. },
  981. 'conversation_to_session_mapping': conversation_to_session,
  982. 'conversations': {}
  983. }
  984. # 处理每个对话的完整数据
  985. for conversation_id, conversation_data in raw_cache.items():
  986. # 获取时间信息
  987. conversation_start_time = conversation_times.get(conversation_id)
  988. session_id = conversation_to_session.get(conversation_id)
  989. session_start_time = None
  990. if session_id and session_id in session_info:
  991. session_start_time = session_info[session_id]['start_time']
  992. processed_conversation = {
  993. 'conversation_id': conversation_id,
  994. 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
  995. 'session_id': session_id,
  996. 'session_start_time': session_start_time.isoformat() if session_start_time else None,
  997. 'field_count': len(conversation_data),
  998. 'fields': {}
  999. }
  1000. # 添加时间计算
  1001. if conversation_start_time:
  1002. conversation_duration = datetime.now() - conversation_start_time
  1003. processed_conversation['conversation_duration_seconds'] = conversation_duration.total_seconds()
  1004. processed_conversation['conversation_duration_formatted'] = str(conversation_duration)
  1005. if session_start_time:
  1006. session_duration = datetime.now() - session_start_time
  1007. processed_conversation['session_duration_seconds'] = session_duration.total_seconds()
  1008. processed_conversation['session_duration_formatted'] = str(session_duration)
  1009. # 处理每个字段,确保JSON序列化安全
  1010. for field_name, field_value in conversation_data.items():
  1011. field_info = {
  1012. 'field_name': field_name,
  1013. 'data_type': type(field_value).__name__,
  1014. 'is_none': field_value is None
  1015. }
  1016. try:
  1017. if field_value is None:
  1018. field_info['value'] = None
  1019. elif field_name in ['conversation_start_time', 'session_start_time']:
  1020. # 处理时间字段
  1021. field_info['content'] = make_json_serializable(field_value)
  1022. elif field_name == 'df' and field_value is not None:
  1023. # DataFrame的安全处理
  1024. if hasattr(field_value, 'to_dict'):
  1025. # 安全地处理dtypes
  1026. try:
  1027. dtypes_dict = {}
  1028. for col, dtype in field_value.dtypes.items():
  1029. dtypes_dict[col] = str(dtype)
  1030. except Exception:
  1031. dtypes_dict = {"error": "无法序列化dtypes"}
  1032. # 安全地处理内存使用
  1033. try:
  1034. memory_usage = field_value.memory_usage(deep=True)
  1035. memory_dict = {}
  1036. for idx, usage in memory_usage.items():
  1037. memory_dict[str(idx)] = int(usage) if hasattr(usage, 'item') else int(usage)
  1038. except Exception:
  1039. memory_dict = {"error": "无法获取内存使用信息"}
  1040. field_info.update({
  1041. 'dataframe_info': {
  1042. 'shape': list(field_value.shape),
  1043. 'columns': list(field_value.columns),
  1044. 'dtypes': dtypes_dict,
  1045. 'index_info': {
  1046. 'type': type(field_value.index).__name__,
  1047. 'length': len(field_value.index)
  1048. }
  1049. },
  1050. 'data': make_json_serializable(field_value.to_dict('records')),
  1051. 'memory_usage': memory_dict
  1052. })
  1053. else:
  1054. field_info['value'] = str(field_value)
  1055. field_info['note'] = 'not_standard_dataframe'
  1056. elif field_name == 'fig_json':
  1057. # 图表JSON数据处理
  1058. if isinstance(field_value, str):
  1059. try:
  1060. import json
  1061. parsed_fig = json.loads(field_value)
  1062. field_info.update({
  1063. 'json_valid': True,
  1064. 'json_size_bytes': len(field_value),
  1065. 'plotly_structure': {
  1066. 'has_data': 'data' in parsed_fig,
  1067. 'has_layout': 'layout' in parsed_fig,
  1068. 'data_traces_count': len(parsed_fig.get('data', [])),
  1069. },
  1070. 'raw_json': field_value
  1071. })
  1072. except json.JSONDecodeError:
  1073. field_info.update({
  1074. 'json_valid': False,
  1075. 'raw_content': str(field_value)
  1076. })
  1077. else:
  1078. field_info['value'] = make_json_serializable(field_value)
  1079. elif field_name == 'followup_questions':
  1080. # 后续问题列表
  1081. field_info.update({
  1082. 'content': make_json_serializable(field_value)
  1083. })
  1084. elif field_name in ['question', 'sql', 'summary']:
  1085. # 文本字段
  1086. if isinstance(field_value, str):
  1087. field_info.update({
  1088. 'text_length': len(field_value),
  1089. 'content': field_value
  1090. })
  1091. else:
  1092. field_info['value'] = make_json_serializable(field_value)
  1093. else:
  1094. # 未知字段的安全处理
  1095. field_info['content'] = make_json_serializable(field_value)
  1096. except Exception as e:
  1097. field_info.update({
  1098. 'processing_error': str(e),
  1099. 'fallback_value': str(field_value)[:500] + '...' if len(str(field_value)) > 500 else str(field_value)
  1100. })
  1101. processed_conversation['fields'][field_name] = field_info
  1102. export_data['conversations'][conversation_id] = processed_conversation
  1103. # 添加缓存统计信息
  1104. field_frequency = {}
  1105. data_types_found = set()
  1106. total_dataframes = 0
  1107. total_questions = 0
  1108. for conv_data in export_data['conversations'].values():
  1109. for field_name, field_info in conv_data['fields'].items():
  1110. field_frequency[field_name] = field_frequency.get(field_name, 0) + 1
  1111. data_types_found.add(field_info['data_type'])
  1112. if field_name == 'df' and not field_info['is_none']:
  1113. total_dataframes += 1
  1114. if field_name == 'question' and not field_info['is_none']:
  1115. total_questions += 1
  1116. export_data['cache_statistics'] = {
  1117. 'field_frequency': field_frequency,
  1118. 'data_types_found': list(data_types_found),
  1119. 'total_dataframes': total_dataframes,
  1120. 'total_questions': total_questions,
  1121. 'has_session_timing': 'session_start_time' in field_frequency,
  1122. 'has_conversation_timing': 'conversation_start_time' in field_frequency
  1123. }
  1124. from common.result import success_response
  1125. return jsonify(success_response(
  1126. response_text="缓存数据导出完成",
  1127. data=export_data
  1128. ))
  1129. except Exception as e:
  1130. import traceback
  1131. error_details = {
  1132. 'error_message': str(e),
  1133. 'error_type': type(e).__name__,
  1134. 'traceback': traceback.format_exc()
  1135. }
  1136. from common.result import internal_error_response
  1137. return jsonify(internal_error_response(
  1138. response_text="导出缓存失败,请稍后重试"
  1139. )), 500
  1140. # ==================== 清理功能API ====================
  1141. @app.flask_app.route('/api/v0/cache_preview_cleanup', methods=['POST'])
  1142. def cache_preview_cleanup():
  1143. """清理功能:预览删除操作 - 保持原功能"""
  1144. try:
  1145. req = request.get_json(force=True)
  1146. # 时间条件 - 支持三种方式
  1147. older_than_hours = req.get('older_than_hours')
  1148. older_than_days = req.get('older_than_days')
  1149. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1150. cache = app.cache
  1151. # 计算截止时间
  1152. cutoff_time = None
  1153. time_condition = None
  1154. if older_than_hours:
  1155. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1156. time_condition = f"older_than_hours: {older_than_hours}"
  1157. elif older_than_days:
  1158. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1159. time_condition = f"older_than_days: {older_than_days}"
  1160. elif before_timestamp:
  1161. try:
  1162. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1163. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1164. time_condition = f"before_timestamp: {before_timestamp}"
  1165. except ValueError:
  1166. from common.result import validation_failed_response
  1167. return jsonify(validation_failed_response(
  1168. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1169. )), 422
  1170. else:
  1171. from common.result import bad_request_response
  1172. return jsonify(bad_request_response(
  1173. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1174. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1175. )), 400
  1176. preview = {
  1177. 'time_condition': time_condition,
  1178. 'cutoff_time': cutoff_time.isoformat(),
  1179. 'will_be_removed': {
  1180. 'sessions': []
  1181. },
  1182. 'will_be_kept': {
  1183. 'sessions_count': 0,
  1184. 'conversations_count': 0
  1185. },
  1186. 'summary': {
  1187. 'sessions_to_remove': 0,
  1188. 'conversations_to_remove': 0,
  1189. 'sessions_to_keep': 0,
  1190. 'conversations_to_keep': 0
  1191. }
  1192. }
  1193. # 预览按session删除
  1194. sessions_to_remove_count = 0
  1195. conversations_to_remove_count = 0
  1196. for session_id, session_data in cache.session_info.items():
  1197. session_preview = {
  1198. 'session_id': session_id,
  1199. 'start_time': session_data['start_time'].isoformat(),
  1200. 'conversation_count': len(session_data['conversations']),
  1201. 'conversations': []
  1202. }
  1203. # 添加conversation详情
  1204. for conv_id in session_data['conversations']:
  1205. if conv_id in cache.cache:
  1206. conv_data = cache.cache[conv_id]
  1207. session_preview['conversations'].append({
  1208. 'conversation_id': conv_id,
  1209. 'question': conv_data.get('question', '')[:50] + '...' if conv_data.get('question') else '',
  1210. 'start_time': cache.conversation_start_times.get(conv_id, '').isoformat() if cache.conversation_start_times.get(conv_id) else ''
  1211. })
  1212. if session_data['start_time'] < cutoff_time:
  1213. preview['will_be_removed']['sessions'].append(session_preview)
  1214. sessions_to_remove_count += 1
  1215. conversations_to_remove_count += len(session_data['conversations'])
  1216. else:
  1217. preview['will_be_kept']['sessions_count'] += 1
  1218. preview['will_be_kept']['conversations_count'] += len(session_data['conversations'])
  1219. # 更新摘要统计
  1220. preview['summary'] = {
  1221. 'sessions_to_remove': sessions_to_remove_count,
  1222. 'conversations_to_remove': conversations_to_remove_count,
  1223. 'sessions_to_keep': preview['will_be_kept']['sessions_count'],
  1224. 'conversations_to_keep': preview['will_be_kept']['conversations_count']
  1225. }
  1226. from common.result import success_response
  1227. return jsonify(success_response(
  1228. response_text=f"清理预览完成,将删除 {sessions_to_remove_count} 个会话和 {conversations_to_remove_count} 个对话",
  1229. data=preview
  1230. ))
  1231. except Exception as e:
  1232. from common.result import internal_error_response
  1233. return jsonify(internal_error_response(
  1234. response_text="预览清理操作失败,请稍后重试"
  1235. )), 500
  1236. @app.flask_app.route('/api/v0/cache_cleanup', methods=['POST'])
  1237. def cache_cleanup():
  1238. """清理功能:实际删除缓存 - 保持原功能"""
  1239. try:
  1240. req = request.get_json(force=True)
  1241. # 时间条件 - 支持三种方式
  1242. older_than_hours = req.get('older_than_hours')
  1243. older_than_days = req.get('older_than_days')
  1244. before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
  1245. cache = app.cache
  1246. if not hasattr(cache, 'session_info'):
  1247. from common.result import service_unavailable_response
  1248. return jsonify(service_unavailable_response(
  1249. response_text="缓存不支持会话功能"
  1250. )), 503
  1251. # 计算截止时间
  1252. cutoff_time = None
  1253. time_condition = None
  1254. if older_than_hours:
  1255. cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
  1256. time_condition = f"older_than_hours: {older_than_hours}"
  1257. elif older_than_days:
  1258. cutoff_time = datetime.now() - timedelta(days=older_than_days)
  1259. time_condition = f"older_than_days: {older_than_days}"
  1260. elif before_timestamp:
  1261. try:
  1262. # 支持 YYYY-MM-DD HH:MM:SS 格式
  1263. cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
  1264. time_condition = f"before_timestamp: {before_timestamp}"
  1265. except ValueError:
  1266. from common.result import validation_failed_response
  1267. return jsonify(validation_failed_response(
  1268. response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
  1269. )), 422
  1270. else:
  1271. from common.result import bad_request_response
  1272. return jsonify(bad_request_response(
  1273. response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
  1274. missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
  1275. )), 400
  1276. cleanup_stats = {
  1277. 'time_condition': time_condition,
  1278. 'cutoff_time': cutoff_time.isoformat(),
  1279. 'sessions_removed': 0,
  1280. 'conversations_removed': 0,
  1281. 'sessions_kept': 0,
  1282. 'conversations_kept': 0,
  1283. 'removed_session_ids': [],
  1284. 'removed_conversation_ids': []
  1285. }
  1286. # 按session删除
  1287. sessions_to_remove = []
  1288. for session_id, session_data in cache.session_info.items():
  1289. if session_data['start_time'] < cutoff_time:
  1290. sessions_to_remove.append(session_id)
  1291. # 删除符合条件的sessions及其所有conversations
  1292. for session_id in sessions_to_remove:
  1293. session_data = cache.session_info[session_id]
  1294. conversations_in_session = session_data['conversations'].copy()
  1295. # 删除session中的所有conversations
  1296. for conv_id in conversations_in_session:
  1297. if conv_id in cache.cache:
  1298. del cache.cache[conv_id]
  1299. cleanup_stats['conversations_removed'] += 1
  1300. cleanup_stats['removed_conversation_ids'].append(conv_id)
  1301. # 清理conversation相关的时间记录
  1302. if hasattr(cache, 'conversation_start_times') and conv_id in cache.conversation_start_times:
  1303. del cache.conversation_start_times[conv_id]
  1304. if hasattr(cache, 'conversation_to_session') and conv_id in cache.conversation_to_session:
  1305. del cache.conversation_to_session[conv_id]
  1306. # 删除session记录
  1307. del cache.session_info[session_id]
  1308. cleanup_stats['sessions_removed'] += 1
  1309. cleanup_stats['removed_session_ids'].append(session_id)
  1310. # 统计保留的sessions和conversations
  1311. cleanup_stats['sessions_kept'] = len(cache.session_info)
  1312. cleanup_stats['conversations_kept'] = len(cache.cache)
  1313. from common.result import success_response
  1314. return jsonify(success_response(
  1315. response_text=f"缓存清理完成,删除了 {cleanup_stats['sessions_removed']} 个会话和 {cleanup_stats['conversations_removed']} 个对话",
  1316. data=cleanup_stats
  1317. ))
  1318. except Exception as e:
  1319. from common.result import internal_error_response
  1320. return jsonify(internal_error_response(
  1321. response_text="缓存清理失败,请稍后重试"
  1322. )), 500
  1323. @app.flask_app.route('/api/v0/training_error_question_sql', methods=['POST'])
  1324. def training_error_question_sql():
  1325. """
  1326. 存储错误的question-sql对到error_sql集合中
  1327. 此API将接收的错误question/sql pair写入到error_sql集合中,用于记录和分析错误的SQL查询。
  1328. Args:
  1329. question (str, required): 用户问题
  1330. sql (str, required): 对应的错误SQL查询语句
  1331. Returns:
  1332. JSON: 包含训练ID和成功消息的响应
  1333. """
  1334. try:
  1335. data = request.get_json()
  1336. question = data.get('question')
  1337. sql = data.get('sql')
  1338. logger.debug(f"接收到错误SQL训练请求: question={question}, sql={sql}")
  1339. if not question or not sql:
  1340. from common.result import bad_request_response
  1341. missing_params = []
  1342. if not question:
  1343. missing_params.append("question")
  1344. if not sql:
  1345. missing_params.append("sql")
  1346. return jsonify(bad_request_response(
  1347. response_text="question和sql参数都是必需的",
  1348. missing_params=missing_params
  1349. )), 400
  1350. # 使用vn实例的train_error_sql方法存储错误SQL
  1351. id = vn.train_error_sql(question=question, sql=sql)
  1352. logger.info(f"成功存储错误SQL,ID: {id}")
  1353. from common.result import success_response
  1354. return jsonify(success_response(
  1355. response_text="错误SQL对已成功存储",
  1356. data={
  1357. "id": id,
  1358. "message": "错误SQL对已成功存储到error_sql集合"
  1359. }
  1360. ))
  1361. except Exception as e:
  1362. logger.error(f"存储错误SQL失败: {str(e)}")
  1363. from common.result import internal_error_response
  1364. return jsonify(internal_error_response(
  1365. response_text="存储错误SQL失败,请稍后重试"
  1366. )), 500
  1367. # ==================== Redis对话管理API ====================
  1368. @app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
  1369. def get_user_conversations(user_id: str):
  1370. """获取用户的对话列表(按时间倒序)"""
  1371. try:
  1372. limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
  1373. conversations = redis_conversation_manager.get_conversations(user_id, limit)
  1374. # 为每个对话动态获取标题(第一条用户消息)
  1375. for conversation in conversations:
  1376. conversation_id = conversation['conversation_id']
  1377. try:
  1378. # 获取所有消息,然后取第一条用户消息作为标题
  1379. messages = redis_conversation_manager.get_conversation_messages(conversation_id)
  1380. if messages and len(messages) > 0:
  1381. # 找到第一条用户消息(按时间顺序)
  1382. first_user_message = None
  1383. for message in messages:
  1384. if message.get('role') == 'user':
  1385. first_user_message = message
  1386. break
  1387. if first_user_message:
  1388. title = first_user_message.get('content', '对话').strip()
  1389. # 限制标题长度,保持整洁
  1390. if len(title) > 50:
  1391. conversation['conversation_title'] = title[:47] + "..."
  1392. else:
  1393. conversation['conversation_title'] = title
  1394. else:
  1395. conversation['conversation_title'] = "对话"
  1396. else:
  1397. conversation['conversation_title'] = "空对话"
  1398. except Exception as e:
  1399. logger.warning(f"获取对话标题失败 {conversation_id}: {str(e)}")
  1400. conversation['conversation_title'] = "对话"
  1401. return jsonify(success_response(
  1402. response_text="获取用户对话列表成功",
  1403. data={
  1404. "user_id": user_id,
  1405. "conversations": conversations,
  1406. "total_count": len(conversations)
  1407. }
  1408. ))
  1409. except Exception as e:
  1410. return jsonify(internal_error_response(
  1411. response_text="获取对话列表失败,请稍后重试"
  1412. )), 500
  1413. @app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
  1414. def get_conversation_messages(conversation_id: str):
  1415. """获取特定对话的消息历史"""
  1416. try:
  1417. limit = request.args.get('limit', type=int) # 可选参数
  1418. messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
  1419. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1420. return jsonify(success_response(
  1421. response_text="获取对话消息成功",
  1422. data={
  1423. "conversation_id": conversation_id,
  1424. "conversation_meta": meta,
  1425. "messages": messages,
  1426. "message_count": len(messages)
  1427. }
  1428. ))
  1429. except Exception as e:
  1430. return jsonify(internal_error_response(
  1431. response_text="获取对话消息失败"
  1432. )), 500
  1433. @app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
  1434. def get_conversation_context(conversation_id: str):
  1435. """获取对话上下文(格式化用于LLM)"""
  1436. try:
  1437. count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
  1438. context = redis_conversation_manager.get_context_for_display(conversation_id, count)
  1439. return jsonify(success_response(
  1440. response_text="获取对话上下文成功",
  1441. data={
  1442. "conversation_id": conversation_id,
  1443. "context": context,
  1444. "context_message_count": count
  1445. }
  1446. ))
  1447. except Exception as e:
  1448. return jsonify(internal_error_response(
  1449. response_text="获取对话上下文失败"
  1450. )), 500
  1451. @app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
  1452. def conversation_stats():
  1453. """获取对话系统统计信息"""
  1454. try:
  1455. stats = redis_conversation_manager.get_stats()
  1456. return jsonify(success_response(
  1457. response_text="获取统计信息成功",
  1458. data=stats
  1459. ))
  1460. except Exception as e:
  1461. return jsonify(internal_error_response(
  1462. response_text="获取统计信息失败,请稍后重试"
  1463. )), 500
  1464. @app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
  1465. def conversation_cleanup():
  1466. """手动清理过期对话"""
  1467. try:
  1468. redis_conversation_manager.cleanup_expired_conversations()
  1469. return jsonify(success_response(
  1470. response_text="对话清理完成"
  1471. ))
  1472. except Exception as e:
  1473. return jsonify(internal_error_response(
  1474. response_text="对话清理失败,请稍后重试"
  1475. )), 500
  1476. @app.flask_app.route('/api/v0/user/<user_id>/conversations/full', methods=['GET'])
  1477. def get_user_conversations_with_messages(user_id: str):
  1478. """
  1479. 获取用户的完整对话数据(包含所有消息)
  1480. 一次性返回用户的所有对话和每个对话下的消息历史
  1481. Args:
  1482. user_id: 用户ID(路径参数)
  1483. conversation_limit: 对话数量限制(查询参数,可选,不传则返回所有对话)
  1484. message_limit: 每个对话的消息数限制(查询参数,可选,不传则返回所有消息)
  1485. Returns:
  1486. 包含用户所有对话和消息的完整数据
  1487. """
  1488. try:
  1489. # 获取可选参数,不传递时使用None(返回所有记录)
  1490. conversation_limit = request.args.get('conversation_limit', type=int)
  1491. message_limit = request.args.get('message_limit', type=int)
  1492. # 获取用户的对话列表
  1493. conversations = redis_conversation_manager.get_conversations(user_id, conversation_limit)
  1494. # 为每个对话获取消息历史
  1495. full_conversations = []
  1496. total_messages = 0
  1497. for conversation in conversations:
  1498. conversation_id = conversation['conversation_id']
  1499. # 获取对话消息
  1500. messages = redis_conversation_manager.get_conversation_messages(
  1501. conversation_id, message_limit
  1502. )
  1503. # 获取对话元数据
  1504. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  1505. # 组合完整数据
  1506. full_conversation = {
  1507. **conversation, # 基础对话信息
  1508. 'meta': meta, # 对话元数据
  1509. 'messages': messages, # 消息列表
  1510. 'message_count': len(messages)
  1511. }
  1512. full_conversations.append(full_conversation)
  1513. total_messages += len(messages)
  1514. return jsonify(success_response(
  1515. response_text="获取用户完整对话数据成功",
  1516. data={
  1517. "user_id": user_id,
  1518. "conversations": full_conversations,
  1519. "total_conversations": len(full_conversations),
  1520. "total_messages": total_messages,
  1521. "conversation_limit_applied": conversation_limit,
  1522. "message_limit_applied": message_limit,
  1523. "query_time": datetime.now().isoformat()
  1524. }
  1525. ))
  1526. except Exception as e:
  1527. logger.error(f"获取用户完整对话数据失败: {str(e)}")
  1528. return jsonify(internal_error_response(
  1529. response_text="获取用户对话数据失败,请稍后重试"
  1530. )), 500
  1531. # ==================== Embedding缓存管理接口 ====================
  1532. @app.flask_app.route('/api/v0/embedding_cache_stats', methods=['GET'])
  1533. def embedding_cache_stats():
  1534. """获取embedding缓存统计信息"""
  1535. try:
  1536. from common.embedding_cache_manager import get_embedding_cache_manager
  1537. cache_manager = get_embedding_cache_manager()
  1538. stats = cache_manager.get_cache_stats()
  1539. return jsonify(success_response(
  1540. response_text="获取embedding缓存统计成功",
  1541. data=stats
  1542. ))
  1543. except Exception as e:
  1544. logger.error(f"获取embedding缓存统计失败: {str(e)}")
  1545. return jsonify(internal_error_response(
  1546. response_text="获取embedding缓存统计失败,请稍后重试"
  1547. )), 500
  1548. @app.flask_app.route('/api/v0/embedding_cache_cleanup', methods=['POST'])
  1549. def embedding_cache_cleanup():
  1550. """清空所有embedding缓存"""
  1551. try:
  1552. from common.embedding_cache_manager import get_embedding_cache_manager
  1553. cache_manager = get_embedding_cache_manager()
  1554. if not cache_manager.is_available():
  1555. return jsonify(internal_error_response(
  1556. response_text="Embedding缓存功能未启用或不可用"
  1557. )), 400
  1558. success = cache_manager.clear_all_cache()
  1559. if success:
  1560. return jsonify(success_response(
  1561. response_text="所有embedding缓存已清空",
  1562. data={"cleared": True}
  1563. ))
  1564. else:
  1565. return jsonify(internal_error_response(
  1566. response_text="清空embedding缓存失败"
  1567. )), 500
  1568. except Exception as e:
  1569. logger.error(f"清空embedding缓存失败: {str(e)}")
  1570. return jsonify(internal_error_response(
  1571. response_text="清空embedding缓存失败,请稍后重试"
  1572. )), 500
  1573. # ==================== QA反馈系统接口 ====================
  1574. # 全局反馈管理器实例
  1575. qa_feedback_manager = None
  1576. def get_qa_feedback_manager():
  1577. """获取QA反馈管理器实例(懒加载)- 复用Vanna连接版本"""
  1578. global qa_feedback_manager
  1579. if qa_feedback_manager is None:
  1580. try:
  1581. # 优先尝试复用vanna连接
  1582. vanna_instance = None
  1583. try:
  1584. # 尝试获取现有的vanna实例
  1585. if 'get_citu_langraph_agent' in globals():
  1586. agent = get_citu_langraph_agent()
  1587. if hasattr(agent, 'vn'):
  1588. vanna_instance = agent.vn
  1589. elif 'vn' in globals():
  1590. vanna_instance = vn
  1591. else:
  1592. logger.info("未找到可用的vanna实例,将创建新的数据库连接")
  1593. except Exception as e:
  1594. logger.info(f"获取vanna实例失败: {e},将创建新的数据库连接")
  1595. vanna_instance = None
  1596. qa_feedback_manager = QAFeedbackManager(vanna_instance=vanna_instance)
  1597. logger.info("QA反馈管理器实例创建成功")
  1598. except Exception as e:
  1599. logger.critical(f"QA反馈管理器创建失败: {str(e)}")
  1600. raise Exception(f"QA反馈管理器初始化失败: {str(e)}")
  1601. return qa_feedback_manager
  1602. @app.flask_app.route('/api/v0/qa_feedback/query', methods=['POST'])
  1603. def qa_feedback_query():
  1604. """
  1605. 查询反馈记录API
  1606. 支持分页、筛选和排序功能
  1607. """
  1608. try:
  1609. req = request.get_json(force=True)
  1610. # 解析参数,设置默认值
  1611. page = req.get('page', 1)
  1612. page_size = req.get('page_size', 20)
  1613. is_thumb_up = req.get('is_thumb_up')
  1614. create_time_start = req.get('create_time_start')
  1615. create_time_end = req.get('create_time_end')
  1616. is_in_training_data = req.get('is_in_training_data')
  1617. sort_by = req.get('sort_by', 'create_time')
  1618. sort_order = req.get('sort_order', 'desc')
  1619. # 参数验证
  1620. if page < 1:
  1621. return jsonify(bad_request_response(
  1622. response_text="页码必须大于0",
  1623. invalid_params=["page"]
  1624. )), 400
  1625. if page_size < 1 or page_size > 100:
  1626. return jsonify(bad_request_response(
  1627. response_text="每页大小必须在1-100之间",
  1628. invalid_params=["page_size"]
  1629. )), 400
  1630. # 获取反馈管理器并查询
  1631. manager = get_qa_feedback_manager()
  1632. records, total = manager.query_feedback(
  1633. page=page,
  1634. page_size=page_size,
  1635. is_thumb_up=is_thumb_up,
  1636. create_time_start=create_time_start,
  1637. create_time_end=create_time_end,
  1638. is_in_training_data=is_in_training_data,
  1639. sort_by=sort_by,
  1640. sort_order=sort_order
  1641. )
  1642. # 计算分页信息
  1643. total_pages = (total + page_size - 1) // page_size
  1644. return jsonify(success_response(
  1645. response_text=f"查询成功,共找到 {total} 条记录",
  1646. data={
  1647. "records": records,
  1648. "pagination": {
  1649. "page": page,
  1650. "page_size": page_size,
  1651. "total": total,
  1652. "total_pages": total_pages,
  1653. "has_next": page < total_pages,
  1654. "has_prev": page > 1
  1655. }
  1656. }
  1657. ))
  1658. except Exception as e:
  1659. logger.error(f"qa_feedback_query执行失败: {str(e)}")
  1660. return jsonify(internal_error_response(
  1661. response_text="查询反馈记录失败,请稍后重试"
  1662. )), 500
  1663. @app.flask_app.route('/api/v0/qa_feedback/delete/<int:feedback_id>', methods=['DELETE'])
  1664. def qa_feedback_delete(feedback_id):
  1665. """
  1666. 删除反馈记录API
  1667. """
  1668. try:
  1669. manager = get_qa_feedback_manager()
  1670. success = manager.delete_feedback(feedback_id)
  1671. if success:
  1672. return jsonify(success_response(
  1673. response_text=f"反馈记录删除成功",
  1674. data={"deleted_id": feedback_id}
  1675. ))
  1676. else:
  1677. return jsonify(not_found_response(
  1678. response_text=f"反馈记录不存在 (ID: {feedback_id})"
  1679. )), 404
  1680. except Exception as e:
  1681. logger.error(f"qa_feedback_delete执行失败: {str(e)}")
  1682. return jsonify(internal_error_response(
  1683. response_text="删除反馈记录失败,请稍后重试"
  1684. )), 500
  1685. @app.flask_app.route('/api/v0/qa_feedback/update/<int:feedback_id>', methods=['PUT'])
  1686. def qa_feedback_update(feedback_id):
  1687. """
  1688. 更新反馈记录API
  1689. """
  1690. try:
  1691. req = request.get_json(force=True)
  1692. # 提取允许更新的字段
  1693. allowed_fields = ['question', 'sql', 'is_thumb_up', 'user_id', 'is_in_training_data']
  1694. update_data = {}
  1695. for field in allowed_fields:
  1696. if field in req:
  1697. update_data[field] = req[field]
  1698. if not update_data:
  1699. return jsonify(bad_request_response(
  1700. response_text="没有提供有效的更新字段",
  1701. missing_params=allowed_fields
  1702. )), 400
  1703. manager = get_qa_feedback_manager()
  1704. success = manager.update_feedback(feedback_id, **update_data)
  1705. if success:
  1706. return jsonify(success_response(
  1707. response_text="反馈记录更新成功",
  1708. data={
  1709. "updated_id": feedback_id,
  1710. "updated_fields": list(update_data.keys())
  1711. }
  1712. ))
  1713. else:
  1714. return jsonify(not_found_response(
  1715. response_text=f"反馈记录不存在或无变化 (ID: {feedback_id})"
  1716. )), 404
  1717. except Exception as e:
  1718. logger.error(f"qa_feedback_update执行失败: {str(e)}")
  1719. return jsonify(internal_error_response(
  1720. response_text="更新反馈记录失败,请稍后重试"
  1721. )), 500
  1722. @app.flask_app.route('/api/v0/qa_feedback/add_to_training', methods=['POST'])
  1723. def qa_feedback_add_to_training():
  1724. """
  1725. 将反馈记录添加到训练数据集API
  1726. 支持混合批量处理:正向反馈加入SQL训练集,负向反馈加入error_sql训练集
  1727. """
  1728. try:
  1729. req = request.get_json(force=True)
  1730. feedback_ids = req.get('feedback_ids', [])
  1731. if not feedback_ids or not isinstance(feedback_ids, list):
  1732. return jsonify(bad_request_response(
  1733. response_text="缺少有效的反馈ID列表",
  1734. missing_params=["feedback_ids"]
  1735. )), 400
  1736. manager = get_qa_feedback_manager()
  1737. # 获取反馈记录
  1738. records = manager.get_feedback_by_ids(feedback_ids)
  1739. if not records:
  1740. return jsonify(not_found_response(
  1741. response_text="未找到任何有效的反馈记录"
  1742. )), 404
  1743. # 分别处理正向和负向反馈
  1744. positive_count = 0 # 正向训练计数
  1745. negative_count = 0 # 负向训练计数
  1746. already_trained_count = 0 # 已训练计数
  1747. error_count = 0 # 错误计数
  1748. successfully_trained_ids = [] # 成功训练的ID列表
  1749. for record in records:
  1750. try:
  1751. # 检查是否已经在训练数据中
  1752. if record['is_in_training_data']:
  1753. already_trained_count += 1
  1754. continue
  1755. if record['is_thumb_up']:
  1756. # 正向反馈 - 加入标准SQL训练集
  1757. training_id = vn.train(
  1758. question=record['question'],
  1759. sql=record['sql']
  1760. )
  1761. positive_count += 1
  1762. logger.info(f"正向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1763. else:
  1764. # 负向反馈 - 加入错误SQL训练集
  1765. training_id = vn.train_error_sql(
  1766. question=record['question'],
  1767. sql=record['sql']
  1768. )
  1769. negative_count += 1
  1770. logger.info(f"负向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
  1771. successfully_trained_ids.append(record['id'])
  1772. except Exception as e:
  1773. logger.error(f"训练失败 - 反馈ID: {record['id']}, 错误: {e}")
  1774. error_count += 1
  1775. # 更新训练状态
  1776. if successfully_trained_ids:
  1777. updated_count = manager.mark_training_status(successfully_trained_ids, True)
  1778. logger.info(f"批量更新训练状态完成,影响 {updated_count} 条记录")
  1779. # 构建响应
  1780. total_processed = positive_count + negative_count + already_trained_count + error_count
  1781. return jsonify(success_response(
  1782. response_text=f"训练数据添加完成,成功处理 {positive_count + negative_count} 条记录",
  1783. data={
  1784. "summary": {
  1785. "total_requested": len(feedback_ids),
  1786. "total_processed": total_processed,
  1787. "positive_trained": positive_count,
  1788. "negative_trained": negative_count,
  1789. "already_trained": already_trained_count,
  1790. "errors": error_count
  1791. },
  1792. "successfully_trained_ids": successfully_trained_ids,
  1793. "training_details": {
  1794. "sql_training_count": positive_count,
  1795. "error_sql_training_count": negative_count
  1796. }
  1797. }
  1798. ))
  1799. except Exception as e:
  1800. logger.error(f"qa_feedback_add_to_training执行失败: {str(e)}")
  1801. return jsonify(internal_error_response(
  1802. response_text="添加训练数据失败,请稍后重试"
  1803. )), 500
  1804. @app.flask_app.route('/api/v0/qa_feedback/add', methods=['POST'])
  1805. def qa_feedback_add():
  1806. """
  1807. 添加反馈记录API
  1808. 用于前端直接创建反馈记录
  1809. """
  1810. try:
  1811. req = request.get_json(force=True)
  1812. question = req.get('question')
  1813. sql = req.get('sql')
  1814. is_thumb_up = req.get('is_thumb_up')
  1815. user_id = req.get('user_id', 'guest')
  1816. # 参数验证
  1817. if not question:
  1818. return jsonify(bad_request_response(
  1819. response_text="缺少必需参数:question",
  1820. missing_params=["question"]
  1821. )), 400
  1822. if not sql:
  1823. return jsonify(bad_request_response(
  1824. response_text="缺少必需参数:sql",
  1825. missing_params=["sql"]
  1826. )), 400
  1827. if is_thumb_up is None:
  1828. return jsonify(bad_request_response(
  1829. response_text="缺少必需参数:is_thumb_up",
  1830. missing_params=["is_thumb_up"]
  1831. )), 400
  1832. manager = get_qa_feedback_manager()
  1833. feedback_id = manager.add_feedback(
  1834. question=question,
  1835. sql=sql,
  1836. is_thumb_up=bool(is_thumb_up),
  1837. user_id=user_id
  1838. )
  1839. return jsonify(success_response(
  1840. response_text="反馈记录创建成功",
  1841. data={
  1842. "feedback_id": feedback_id
  1843. }
  1844. ))
  1845. except Exception as e:
  1846. logger.error(f"qa_feedback_add执行失败: {str(e)}")
  1847. return jsonify(internal_error_response(
  1848. response_text="创建反馈记录失败,请稍后重试"
  1849. )), 500
  1850. @app.flask_app.route('/api/v0/qa_feedback/stats', methods=['GET'])
  1851. def qa_feedback_stats():
  1852. """
  1853. 反馈统计API
  1854. 返回反馈数据的统计信息
  1855. """
  1856. try:
  1857. manager = get_qa_feedback_manager()
  1858. # 查询各种统计数据
  1859. all_records, total_count = manager.query_feedback(page=1, page_size=1)
  1860. positive_records, positive_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=True)
  1861. negative_records, negative_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=False)
  1862. trained_records, trained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=True)
  1863. untrained_records, untrained_count = manager.query_feedback(page=1, page_size=1, is_in_training_data=False)
  1864. return jsonify(success_response(
  1865. response_text="统计信息获取成功",
  1866. data={
  1867. "total_feedback": total_count,
  1868. "positive_feedback": positive_count,
  1869. "negative_feedback": negative_count,
  1870. "trained_feedback": trained_count,
  1871. "untrained_feedback": untrained_count,
  1872. "positive_rate": round(positive_count / max(total_count, 1) * 100, 2),
  1873. "training_rate": round(trained_count / max(total_count, 1) * 100, 2)
  1874. }
  1875. ))
  1876. except Exception as e:
  1877. logger.error(f"qa_feedback_stats执行失败: {str(e)}")
  1878. return jsonify(internal_error_response(
  1879. response_text="获取统计信息失败,请稍后重试"
  1880. )), 500
  1881. # ==================== 问答缓存管理接口 ====================
  1882. @app.flask_app.route('/api/v0/qa_cache_stats', methods=['GET'])
  1883. def qa_cache_stats():
  1884. """获取问答缓存统计信息"""
  1885. try:
  1886. stats = redis_conversation_manager.get_qa_cache_stats()
  1887. return jsonify(success_response(
  1888. response_text="获取问答缓存统计成功",
  1889. data=stats
  1890. ))
  1891. except Exception as e:
  1892. logger.error(f"获取问答缓存统计失败: {str(e)}")
  1893. return jsonify(internal_error_response(
  1894. response_text="获取问答缓存统计失败,请稍后重试"
  1895. )), 500
  1896. @app.flask_app.route('/api/v0/qa_cache_list', methods=['GET'])
  1897. def qa_cache_list():
  1898. """获取问答缓存列表(支持分页)"""
  1899. try:
  1900. # 获取分页参数,默认限制50条
  1901. limit = request.args.get('limit', 50, type=int)
  1902. # 限制最大返回数量,防止一次性返回过多数据
  1903. if limit > 500:
  1904. limit = 500
  1905. elif limit <= 0:
  1906. limit = 50
  1907. cache_list = redis_conversation_manager.get_qa_cache_list(limit)
  1908. return jsonify(success_response(
  1909. response_text="获取问答缓存列表成功",
  1910. data={
  1911. "cache_list": cache_list,
  1912. "total_returned": len(cache_list),
  1913. "limit_applied": limit,
  1914. "note": "按缓存时间倒序排列,最新的在前面"
  1915. }
  1916. ))
  1917. except Exception as e:
  1918. logger.error(f"获取问答缓存列表失败: {str(e)}")
  1919. return jsonify(internal_error_response(
  1920. response_text="获取问答缓存列表失败,请稍后重试"
  1921. )), 500
  1922. @app.flask_app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
  1923. def qa_cache_cleanup():
  1924. """清空所有问答缓存"""
  1925. try:
  1926. if not redis_conversation_manager.is_available():
  1927. return jsonify(internal_error_response(
  1928. response_text="Redis连接不可用,无法执行清理操作"
  1929. )), 500
  1930. deleted_count = redis_conversation_manager.clear_all_qa_cache()
  1931. return jsonify(success_response(
  1932. response_text="问答缓存清理完成",
  1933. data={
  1934. "deleted_count": deleted_count,
  1935. "cleared": deleted_count > 0,
  1936. "cleanup_time": datetime.now().isoformat()
  1937. }
  1938. ))
  1939. except Exception as e:
  1940. logger.error(f"清空问答缓存失败: {str(e)}")
  1941. return jsonify(internal_error_response(
  1942. response_text="清空问答缓存失败,请稍后重试"
  1943. )), 500
  1944. # ==================== 训练数据管理接口 ====================
  1945. def validate_sql_syntax(sql: str) -> tuple[bool, str]:
  1946. """SQL语法检查(仅对sql类型)"""
  1947. try:
  1948. parsed = sqlparse.parse(sql.strip())
  1949. if not parsed or not parsed[0].tokens:
  1950. return False, "SQL语法错误:空语句"
  1951. # 基本语法检查
  1952. sql_upper = sql.strip().upper()
  1953. if not any(sql_upper.startswith(keyword) for keyword in
  1954. ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']):
  1955. return False, "SQL语法错误:不是有效的SQL语句"
  1956. # 安全检查:禁止危险的SQL操作
  1957. dangerous_operations = ['UPDATE', 'DELETE', 'ALERT', 'DROP']
  1958. for operation in dangerous_operations:
  1959. if sql_upper.startswith(operation):
  1960. return False, f'在训练集中禁止使用"{",".join(dangerous_operations)}"'
  1961. return True, ""
  1962. except Exception as e:
  1963. return False, f"SQL语法错误:{str(e)}"
  1964. def paginate_data(data_list: list, page: int, page_size: int):
  1965. """分页处理算法"""
  1966. total = len(data_list)
  1967. start_idx = (page - 1) * page_size
  1968. end_idx = start_idx + page_size
  1969. page_data = data_list[start_idx:end_idx]
  1970. return {
  1971. "data": page_data,
  1972. "pagination": {
  1973. "page": page,
  1974. "page_size": page_size,
  1975. "total": total,
  1976. "total_pages": (total + page_size - 1) // page_size,
  1977. "has_next": end_idx < total,
  1978. "has_prev": page > 1
  1979. }
  1980. }
  1981. def filter_by_type(data_list: list, training_data_type: str):
  1982. """按类型筛选算法"""
  1983. if not training_data_type:
  1984. return data_list
  1985. return [
  1986. record for record in data_list
  1987. if record.get('training_data_type') == training_data_type
  1988. ]
  1989. def search_in_data(data_list: list, search_keyword: str):
  1990. """在数据中搜索关键词"""
  1991. if not search_keyword:
  1992. return data_list
  1993. keyword_lower = search_keyword.lower()
  1994. return [
  1995. record for record in data_list
  1996. if (record.get('question') and keyword_lower in record['question'].lower()) or
  1997. (record.get('content') and keyword_lower in record['content'].lower())
  1998. ]
  1999. def process_single_training_item(item: dict, index: int) -> dict:
  2000. """处理单个训练数据项"""
  2001. training_type = item.get('training_data_type')
  2002. if training_type == 'sql':
  2003. sql = item.get('sql')
  2004. if not sql:
  2005. raise ValueError("SQL字段是必需的")
  2006. # SQL语法检查
  2007. is_valid, error_msg = validate_sql_syntax(sql)
  2008. if not is_valid:
  2009. raise ValueError(error_msg)
  2010. question = item.get('question')
  2011. if question:
  2012. training_id = vn.train(question=question, sql=sql)
  2013. else:
  2014. training_id = vn.train(sql=sql)
  2015. elif training_type == 'error_sql':
  2016. # error_sql不需要语法检查
  2017. question = item.get('question')
  2018. sql = item.get('sql')
  2019. if not question or not sql:
  2020. raise ValueError("question和sql字段都是必需的")
  2021. training_id = vn.train_error_sql(question=question, sql=sql)
  2022. elif training_type == 'documentation':
  2023. content = item.get('content')
  2024. if not content:
  2025. raise ValueError("content字段是必需的")
  2026. training_id = vn.train(documentation=content)
  2027. elif training_type == 'ddl':
  2028. ddl = item.get('ddl')
  2029. if not ddl:
  2030. raise ValueError("ddl字段是必需的")
  2031. training_id = vn.train(ddl=ddl)
  2032. else:
  2033. raise ValueError(f"不支持的训练数据类型: {training_type}")
  2034. return {
  2035. "index": index,
  2036. "success": True,
  2037. "training_id": training_id,
  2038. "type": training_type,
  2039. "message": f"{training_type}训练数据创建成功"
  2040. }
  2041. def get_total_training_count():
  2042. """获取当前训练数据总数"""
  2043. try:
  2044. training_data = vn.get_training_data()
  2045. if training_data is not None and not training_data.empty:
  2046. return len(training_data)
  2047. return 0
  2048. except Exception as e:
  2049. logger.warning(f"获取训练数据总数失败: {e}")
  2050. return 0
  2051. @app.flask_app.route('/api/v0/training_data/query', methods=['POST'])
  2052. def training_data_query():
  2053. """
  2054. 分页查询训练数据API
  2055. 支持类型筛选、搜索和排序功能
  2056. """
  2057. try:
  2058. req = request.get_json(force=True)
  2059. # 解析参数,设置默认值
  2060. page = req.get('page', 1)
  2061. page_size = req.get('page_size', 20)
  2062. training_data_type = req.get('training_data_type')
  2063. sort_by = req.get('sort_by', 'id')
  2064. sort_order = req.get('sort_order', 'desc')
  2065. search_keyword = req.get('search_keyword')
  2066. # 参数验证
  2067. if page < 1:
  2068. return jsonify(bad_request_response(
  2069. response_text="页码必须大于0",
  2070. missing_params=["page"]
  2071. )), 400
  2072. if page_size < 1 or page_size > 100:
  2073. return jsonify(bad_request_response(
  2074. response_text="每页大小必须在1-100之间",
  2075. missing_params=["page_size"]
  2076. )), 400
  2077. if search_keyword and len(search_keyword) > 100:
  2078. return jsonify(bad_request_response(
  2079. response_text="搜索关键词最大长度为100字符",
  2080. missing_params=["search_keyword"]
  2081. )), 400
  2082. # 获取训练数据
  2083. training_data = vn.get_training_data()
  2084. if training_data is None or training_data.empty:
  2085. return jsonify(success_response(
  2086. response_text="查询成功,暂无训练数据",
  2087. data={
  2088. "records": [],
  2089. "pagination": {
  2090. "page": page,
  2091. "page_size": page_size,
  2092. "total": 0,
  2093. "total_pages": 0,
  2094. "has_next": False,
  2095. "has_prev": False
  2096. },
  2097. "filters_applied": {
  2098. "training_data_type": training_data_type,
  2099. "search_keyword": search_keyword
  2100. }
  2101. }
  2102. ))
  2103. # 转换为列表格式
  2104. records = training_data.to_dict(orient="records")
  2105. # 应用筛选条件
  2106. if training_data_type:
  2107. records = filter_by_type(records, training_data_type)
  2108. if search_keyword:
  2109. records = search_in_data(records, search_keyword)
  2110. # 排序
  2111. if sort_by in ['id', 'training_data_type']:
  2112. reverse = (sort_order.lower() == 'desc')
  2113. records.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse)
  2114. # 分页
  2115. paginated_result = paginate_data(records, page, page_size)
  2116. return jsonify(success_response(
  2117. response_text=f"查询成功,共找到 {paginated_result['pagination']['total']} 条记录",
  2118. data={
  2119. "records": paginated_result["data"],
  2120. "pagination": paginated_result["pagination"],
  2121. "filters_applied": {
  2122. "training_data_type": training_data_type,
  2123. "search_keyword": search_keyword
  2124. }
  2125. }
  2126. ))
  2127. except Exception as e:
  2128. logger.error(f"training_data_query执行失败: {str(e)}")
  2129. return jsonify(internal_error_response(
  2130. response_text="查询训练数据失败,请稍后重试"
  2131. )), 500
  2132. @app.flask_app.route('/api/v0/training_data/create', methods=['POST'])
  2133. def training_data_create():
  2134. """
  2135. 创建训练数据API
  2136. 支持单条和批量创建,支持四种数据类型
  2137. """
  2138. try:
  2139. req = request.get_json(force=True)
  2140. data = req.get('data')
  2141. if not data:
  2142. return jsonify(bad_request_response(
  2143. response_text="缺少必需参数:data",
  2144. missing_params=["data"]
  2145. )), 400
  2146. # 统一处理为列表格式
  2147. if isinstance(data, dict):
  2148. data_list = [data]
  2149. elif isinstance(data, list):
  2150. data_list = data
  2151. else:
  2152. return jsonify(bad_request_response(
  2153. response_text="data字段格式错误,应为对象或数组"
  2154. )), 400
  2155. # 批量操作限制
  2156. if len(data_list) > 50:
  2157. return jsonify(bad_request_response(
  2158. response_text="批量操作最大支持50条记录"
  2159. )), 400
  2160. results = []
  2161. successful_count = 0
  2162. type_summary = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  2163. for index, item in enumerate(data_list):
  2164. try:
  2165. result = process_single_training_item(item, index)
  2166. results.append(result)
  2167. if result['success']:
  2168. successful_count += 1
  2169. type_summary[result['type']] += 1
  2170. except Exception as e:
  2171. results.append({
  2172. "index": index,
  2173. "success": False,
  2174. "type": item.get('training_data_type', 'unknown'),
  2175. "error": str(e),
  2176. "message": "创建失败"
  2177. })
  2178. # 获取创建后的总记录数
  2179. current_total = get_total_training_count()
  2180. # 根据实际执行结果决定响应状态
  2181. failed_count = len(data_list) - successful_count
  2182. if failed_count == 0:
  2183. # 全部成功
  2184. return jsonify(success_response(
  2185. response_text="训练数据创建完成",
  2186. data={
  2187. "total_requested": len(data_list),
  2188. "successfully_created": successful_count,
  2189. "failed_count": failed_count,
  2190. "results": results,
  2191. "summary": type_summary,
  2192. "current_total_count": current_total
  2193. }
  2194. ))
  2195. elif successful_count == 0:
  2196. # 全部失败
  2197. return jsonify(error_response(
  2198. response_text="训练数据创建失败",
  2199. data={
  2200. "total_requested": len(data_list),
  2201. "successfully_created": successful_count,
  2202. "failed_count": failed_count,
  2203. "results": results,
  2204. "summary": type_summary,
  2205. "current_total_count": current_total
  2206. }
  2207. )), 400
  2208. else:
  2209. # 部分成功,部分失败
  2210. return jsonify(error_response(
  2211. response_text=f"训练数据创建部分成功,成功{successful_count}条,失败{failed_count}条",
  2212. data={
  2213. "total_requested": len(data_list),
  2214. "successfully_created": successful_count,
  2215. "failed_count": failed_count,
  2216. "results": results,
  2217. "summary": type_summary,
  2218. "current_total_count": current_total
  2219. }
  2220. )), 207
  2221. except Exception as e:
  2222. logger.error(f"training_data_create执行失败: {str(e)}")
  2223. return jsonify(internal_error_response(
  2224. response_text="创建训练数据失败,请稍后重试"
  2225. )), 500
  2226. @app.flask_app.route('/api/v0/training_data/delete', methods=['POST'])
  2227. def training_data_delete():
  2228. """
  2229. 删除训练数据API
  2230. 支持批量删除
  2231. """
  2232. try:
  2233. req = request.get_json(force=True)
  2234. ids = req.get('ids', [])
  2235. confirm = req.get('confirm', False)
  2236. if not ids or not isinstance(ids, list):
  2237. return jsonify(bad_request_response(
  2238. response_text="缺少有效的ID列表",
  2239. missing_params=["ids"]
  2240. )), 400
  2241. if not confirm:
  2242. return jsonify(bad_request_response(
  2243. response_text="删除操作需要确认,请设置confirm为true"
  2244. )), 400
  2245. # 批量操作限制
  2246. if len(ids) > 50:
  2247. return jsonify(bad_request_response(
  2248. response_text="批量删除最大支持50条记录"
  2249. )), 400
  2250. deleted_ids = []
  2251. failed_ids = []
  2252. failed_details = []
  2253. for training_id in ids:
  2254. try:
  2255. success = vn.remove_training_data(training_id)
  2256. if success:
  2257. deleted_ids.append(training_id)
  2258. else:
  2259. failed_ids.append(training_id)
  2260. failed_details.append({
  2261. "id": training_id,
  2262. "error": "记录不存在或删除失败"
  2263. })
  2264. except Exception as e:
  2265. failed_ids.append(training_id)
  2266. failed_details.append({
  2267. "id": training_id,
  2268. "error": str(e)
  2269. })
  2270. # 获取删除后的总记录数
  2271. current_total = get_total_training_count()
  2272. # 根据实际执行结果决定响应状态
  2273. failed_count = len(failed_ids)
  2274. if failed_count == 0:
  2275. # 全部成功
  2276. return jsonify(success_response(
  2277. response_text="训练数据删除完成",
  2278. data={
  2279. "total_requested": len(ids),
  2280. "successfully_deleted": len(deleted_ids),
  2281. "failed_count": failed_count,
  2282. "deleted_ids": deleted_ids,
  2283. "failed_ids": failed_ids,
  2284. "failed_details": failed_details,
  2285. "current_total_count": current_total
  2286. }
  2287. ))
  2288. elif len(deleted_ids) == 0:
  2289. # 全部失败
  2290. return jsonify(error_response(
  2291. response_text="训练数据删除失败",
  2292. data={
  2293. "total_requested": len(ids),
  2294. "successfully_deleted": len(deleted_ids),
  2295. "failed_count": failed_count,
  2296. "deleted_ids": deleted_ids,
  2297. "failed_ids": failed_ids,
  2298. "failed_details": failed_details,
  2299. "current_total_count": current_total
  2300. }
  2301. )), 400
  2302. else:
  2303. # 部分成功,部分失败
  2304. return jsonify(error_response(
  2305. response_text=f"训练数据删除部分成功,成功{len(deleted_ids)}条,失败{failed_count}条",
  2306. data={
  2307. "total_requested": len(ids),
  2308. "successfully_deleted": len(deleted_ids),
  2309. "failed_count": failed_count,
  2310. "deleted_ids": deleted_ids,
  2311. "failed_ids": failed_ids,
  2312. "failed_details": failed_details,
  2313. "current_total_count": current_total
  2314. }
  2315. )), 207
  2316. except Exception as e:
  2317. logger.error(f"training_data_delete执行失败: {str(e)}")
  2318. return jsonify(internal_error_response(
  2319. response_text="删除训练数据失败,请稍后重试"
  2320. )), 500
  2321. @app.flask_app.route('/api/v0/training_data/stats', methods=['GET'])
  2322. def training_data_stats():
  2323. """
  2324. 获取训练数据统计信息API
  2325. """
  2326. try:
  2327. training_data = vn.get_training_data()
  2328. if training_data is None or training_data.empty:
  2329. return jsonify(success_response(
  2330. response_text="统计信息获取成功",
  2331. data={
  2332. "total_count": 0,
  2333. "type_breakdown": {
  2334. "sql": 0,
  2335. "documentation": 0,
  2336. "ddl": 0,
  2337. "error_sql": 0
  2338. },
  2339. "type_percentages": {
  2340. "sql": 0.0,
  2341. "documentation": 0.0,
  2342. "ddl": 0.0,
  2343. "error_sql": 0.0
  2344. },
  2345. "last_updated": datetime.now().isoformat()
  2346. }
  2347. ))
  2348. total_count = len(training_data)
  2349. # 统计各类型数量
  2350. type_breakdown = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  2351. if 'training_data_type' in training_data.columns:
  2352. type_counts = training_data['training_data_type'].value_counts()
  2353. for data_type, count in type_counts.items():
  2354. if data_type in type_breakdown:
  2355. type_breakdown[data_type] = int(count)
  2356. # 计算百分比
  2357. type_percentages = {}
  2358. for data_type, count in type_breakdown.items():
  2359. type_percentages[data_type] = round(count / max(total_count, 1) * 100, 2)
  2360. return jsonify(success_response(
  2361. response_text="统计信息获取成功",
  2362. data={
  2363. "total_count": total_count,
  2364. "type_breakdown": type_breakdown,
  2365. "type_percentages": type_percentages,
  2366. "last_updated": datetime.now().isoformat()
  2367. }
  2368. ))
  2369. except Exception as e:
  2370. logger.error(f"training_data_stats执行失败: {str(e)}")
  2371. return jsonify(internal_error_response(
  2372. response_text="获取统计信息失败,请稍后重试"
  2373. )), 500
  2374. @app.flask_app.route('/api/v0/cache_overview_full', methods=['GET'])
  2375. def cache_overview_full():
  2376. """获取所有缓存系统的综合概览"""
  2377. try:
  2378. from common.embedding_cache_manager import get_embedding_cache_manager
  2379. from common.vanna_instance import get_vanna_instance
  2380. # 获取现有的缓存统计
  2381. vanna_cache = get_vanna_instance()
  2382. # 直接使用应用中的缓存实例
  2383. cache = app.cache
  2384. cache_overview = {
  2385. "conversation_aware_cache": {
  2386. "enabled": True,
  2387. "total_items": len(cache.cache) if hasattr(cache, 'cache') else 0,
  2388. "sessions": list(cache.cache.keys()) if hasattr(cache, 'cache') else [],
  2389. "cache_type": type(cache).__name__
  2390. },
  2391. "question_answer_cache": redis_conversation_manager.get_qa_cache_stats() if redis_conversation_manager.is_available() else {"available": False},
  2392. "embedding_cache": get_embedding_cache_manager().get_cache_stats(),
  2393. "redis_conversation_stats": redis_conversation_manager.get_stats() if redis_conversation_manager.is_available() else None
  2394. }
  2395. return jsonify(success_response(
  2396. response_text="获取综合缓存概览成功",
  2397. data=cache_overview
  2398. ))
  2399. except Exception as e:
  2400. logger.error(f"获取综合缓存概览失败: {str(e)}")
  2401. return jsonify(internal_error_response(
  2402. response_text="获取缓存概览失败,请稍后重试"
  2403. )), 500
  2404. # 前端JavaScript示例 - 如何维持会话
  2405. """
  2406. // 前端需要维护一个会话ID
  2407. class ChatSession {
  2408. constructor() {
  2409. // 从localStorage获取或创建新的会话ID
  2410. this.sessionId = localStorage.getItem('chat_session_id') || this.generateSessionId();
  2411. localStorage.setItem('chat_session_id', this.sessionId);
  2412. }
  2413. generateSessionId() {
  2414. return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  2415. }
  2416. async askQuestion(question) {
  2417. const response = await fetch('/api/v0/ask', {
  2418. method: 'POST',
  2419. headers: {
  2420. 'Content-Type': 'application/json',
  2421. },
  2422. body: JSON.stringify({
  2423. question: question,
  2424. session_id: this.sessionId // 关键:传递会话ID
  2425. })
  2426. });
  2427. return await response.json();
  2428. }
  2429. // 开始新会话
  2430. startNewSession() {
  2431. this.sessionId = this.generateSessionId();
  2432. localStorage.setItem('chat_session_id', this.sessionId);
  2433. }
  2434. }
  2435. // 使用示例
  2436. const chatSession = new ChatSession();
  2437. chatSession.askQuestion("各年龄段客户的流失率如何?");
  2438. """
  2439. # ==================== Data Pipeline API ====================
  2440. # 导入简化的Data Pipeline模块
  2441. import asyncio
  2442. import os
  2443. from threading import Thread
  2444. from flask import send_file
  2445. from data_pipeline.api.simple_workflow import SimpleWorkflowManager
  2446. from data_pipeline.api.simple_file_manager import SimpleFileManager
  2447. # 创建简化的管理器
  2448. data_pipeline_manager = None
  2449. data_pipeline_file_manager = None
  2450. def get_data_pipeline_manager():
  2451. """获取Data Pipeline管理器单例"""
  2452. global data_pipeline_manager
  2453. if data_pipeline_manager is None:
  2454. data_pipeline_manager = SimpleWorkflowManager()
  2455. return data_pipeline_manager
  2456. def get_data_pipeline_file_manager():
  2457. """获取Data Pipeline文件管理器单例"""
  2458. global data_pipeline_file_manager
  2459. if data_pipeline_file_manager is None:
  2460. data_pipeline_file_manager = SimpleFileManager()
  2461. return data_pipeline_file_manager
  2462. # ==================== 简化的Data Pipeline API端点 ====================
  2463. @app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
  2464. def create_data_pipeline_task():
  2465. """创建数据管道任务"""
  2466. try:
  2467. req = request.get_json(force=True)
  2468. # table_list_file和business_context现在都是可选参数
  2469. # 如果未提供table_list_file,将使用文件上传模式
  2470. # 创建任务(支持可选的db_connection参数)
  2471. manager = get_data_pipeline_manager()
  2472. task_id = manager.create_task(
  2473. table_list_file=req.get('table_list_file'),
  2474. business_context=req.get('business_context'),
  2475. db_name=req.get('db_name'), # 可选参数,用于指定特定数据库名称
  2476. db_connection=req.get('db_connection'), # 可选参数,用于指定数据库连接字符串
  2477. task_name=req.get('task_name'), # 可选参数,用于指定任务名称
  2478. enable_sql_validation=req.get('enable_sql_validation', True),
  2479. enable_llm_repair=req.get('enable_llm_repair', True),
  2480. modify_original_file=req.get('modify_original_file', True),
  2481. enable_training_data_load=req.get('enable_training_data_load', True)
  2482. )
  2483. # 获取任务信息
  2484. task_info = manager.get_task_status(task_id)
  2485. response_data = {
  2486. "task_id": task_id,
  2487. "task_name": task_info.get('task_name'),
  2488. "status": task_info.get('status'),
  2489. "created_at": task_info.get('created_at').isoformat() if task_info.get('created_at') else None
  2490. }
  2491. # 检查是否为文件上传模式
  2492. file_upload_mode = not req.get('table_list_file')
  2493. response_message = "任务创建成功"
  2494. if file_upload_mode:
  2495. response_data["file_upload_mode"] = True
  2496. response_data["next_step"] = f"POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list"
  2497. response_message += ",请上传表清单文件后再执行任务"
  2498. return jsonify(success_response(
  2499. response_text=response_message,
  2500. data=response_data
  2501. )), 201
  2502. except Exception as e:
  2503. logger.error(f"创建数据管道任务失败: {str(e)}")
  2504. return jsonify(internal_error_response(
  2505. response_text="创建任务失败,请稍后重试"
  2506. )), 500
  2507. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/execute', methods=['POST'])
  2508. def execute_data_pipeline_task(task_id):
  2509. """执行数据管道任务"""
  2510. try:
  2511. req = request.get_json(force=True) if request.is_json else {}
  2512. execution_mode = req.get('execution_mode', 'complete')
  2513. step_name = req.get('step_name')
  2514. # 验证执行模式
  2515. if execution_mode not in ['complete', 'step']:
  2516. return jsonify(bad_request_response(
  2517. response_text="无效的执行模式,必须是 'complete' 或 'step'",
  2518. invalid_params=['execution_mode']
  2519. )), 400
  2520. # 如果是步骤执行模式,验证步骤名称
  2521. if execution_mode == 'step':
  2522. if not step_name:
  2523. return jsonify(bad_request_response(
  2524. response_text="步骤执行模式需要指定step_name",
  2525. missing_params=['step_name']
  2526. )), 400
  2527. valid_steps = ['ddl_generation', 'qa_generation', 'sql_validation', 'training_load']
  2528. if step_name not in valid_steps:
  2529. return jsonify(bad_request_response(
  2530. response_text=f"无效的步骤名称,支持的步骤: {', '.join(valid_steps)}",
  2531. invalid_params=['step_name']
  2532. )), 400
  2533. # 检查任务是否存在
  2534. manager = get_data_pipeline_manager()
  2535. task_info = manager.get_task_status(task_id)
  2536. if not task_info:
  2537. return jsonify(not_found_response(
  2538. response_text=f"任务不存在: {task_id}"
  2539. )), 404
  2540. # 使用subprocess启动独立进程执行任务
  2541. def run_task_subprocess():
  2542. try:
  2543. import subprocess
  2544. import sys
  2545. from pathlib import Path
  2546. # 构建执行命令
  2547. python_executable = sys.executable
  2548. script_path = Path(__file__).parent / "data_pipeline" / "task_executor.py"
  2549. cmd = [
  2550. python_executable,
  2551. str(script_path),
  2552. "--task-id", task_id,
  2553. "--execution-mode", execution_mode
  2554. ]
  2555. if step_name:
  2556. cmd.extend(["--step-name", step_name])
  2557. logger.info(f"启动任务进程: {' '.join(cmd)}")
  2558. # 启动后台进程(不等待完成)
  2559. process = subprocess.Popen(
  2560. cmd,
  2561. stdout=subprocess.PIPE,
  2562. stderr=subprocess.PIPE,
  2563. text=True,
  2564. cwd=Path(__file__).parent
  2565. )
  2566. logger.info(f"任务进程已启动: PID={process.pid}, task_id={task_id}")
  2567. except Exception as e:
  2568. logger.error(f"启动任务进程失败: {task_id}, 错误: {str(e)}")
  2569. # 在新线程中启动subprocess(避免阻塞API响应)
  2570. thread = Thread(target=run_task_subprocess, daemon=True)
  2571. thread.start()
  2572. response_data = {
  2573. "task_id": task_id,
  2574. "execution_mode": execution_mode,
  2575. "step_name": step_name if execution_mode == 'step' else None,
  2576. "message": "任务正在后台执行,请通过状态接口查询进度"
  2577. }
  2578. return jsonify(success_response(
  2579. response_text="任务执行已启动",
  2580. data=response_data
  2581. )), 202
  2582. except Exception as e:
  2583. logger.error(f"启动数据管道任务执行失败: {str(e)}")
  2584. return jsonify(internal_error_response(
  2585. response_text="启动任务执行失败,请稍后重试"
  2586. )), 500
  2587. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
  2588. def get_data_pipeline_task_status(task_id):
  2589. """
  2590. 获取数据管道任务状态
  2591. 响应:
  2592. {
  2593. "success": true,
  2594. "code": 200,
  2595. "message": "获取任务状态成功",
  2596. "data": {
  2597. "task_id": "task_20250627_143052",
  2598. "status": "in_progress",
  2599. "step_status": {
  2600. "ddl_generation": "completed",
  2601. "qa_generation": "running",
  2602. "sql_validation": "pending",
  2603. "training_load": "pending"
  2604. },
  2605. "created_at": "2025-06-27T14:30:52",
  2606. "started_at": "2025-06-27T14:31:00",
  2607. "parameters": {...},
  2608. "current_execution": {...},
  2609. "total_executions": 2
  2610. }
  2611. }
  2612. """
  2613. try:
  2614. manager = get_data_pipeline_manager()
  2615. task_info = manager.get_task_status(task_id)
  2616. if not task_info:
  2617. return jsonify(not_found_response(
  2618. response_text=f"任务不存在: {task_id}"
  2619. )), 404
  2620. # 获取步骤状态
  2621. steps = manager.get_task_steps(task_id)
  2622. current_step = None
  2623. for step in steps:
  2624. if step['step_status'] == 'running':
  2625. current_step = step
  2626. break
  2627. # 构建步骤状态摘要
  2628. step_status_summary = {}
  2629. for step in steps:
  2630. step_status_summary[step['step_name']] = step['step_status']
  2631. response_data = {
  2632. "task_id": task_info['task_id'],
  2633. "task_name": task_info.get('task_name'),
  2634. "status": task_info['status'],
  2635. "step_status": step_status_summary,
  2636. "created_at": task_info['created_at'].isoformat() if task_info.get('created_at') else None,
  2637. "started_at": task_info['started_at'].isoformat() if task_info.get('started_at') else None,
  2638. "completed_at": task_info['completed_at'].isoformat() if task_info.get('completed_at') else None,
  2639. "parameters": task_info.get('parameters', {}),
  2640. "result": task_info.get('result'),
  2641. "error_message": task_info.get('error_message'),
  2642. "current_step": {
  2643. "execution_id": current_step['execution_id'],
  2644. "step": current_step['step_name'],
  2645. "status": current_step['step_status'],
  2646. "started_at": current_step['started_at'].isoformat() if current_step and current_step.get('started_at') else None
  2647. } if current_step else None,
  2648. "total_steps": len(steps),
  2649. "steps": [{
  2650. "step_name": step['step_name'],
  2651. "step_status": step['step_status'],
  2652. "started_at": step['started_at'].isoformat() if step.get('started_at') else None,
  2653. "completed_at": step['completed_at'].isoformat() if step.get('completed_at') else None,
  2654. "error_message": step.get('error_message')
  2655. } for step in steps]
  2656. }
  2657. return jsonify(success_response(
  2658. response_text="获取任务状态成功",
  2659. data=response_data
  2660. ))
  2661. except Exception as e:
  2662. logger.error(f"获取数据管道任务状态失败: {str(e)}")
  2663. return jsonify(internal_error_response(
  2664. response_text="获取任务状态失败,请稍后重试"
  2665. )), 500
  2666. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs', methods=['GET'])
  2667. def get_data_pipeline_task_logs(task_id):
  2668. """
  2669. 获取数据管道任务日志(从任务目录文件读取)
  2670. 查询参数:
  2671. - limit: 日志行数限制,默认100
  2672. - level: 日志级别过滤,可选
  2673. 响应:
  2674. {
  2675. "success": true,
  2676. "code": 200,
  2677. "message": "获取任务日志成功",
  2678. "data": {
  2679. "task_id": "task_20250627_143052",
  2680. "logs": [
  2681. {
  2682. "timestamp": "2025-06-27 14:30:52",
  2683. "level": "INFO",
  2684. "message": "任务开始执行"
  2685. }
  2686. ],
  2687. "total": 15,
  2688. "source": "file"
  2689. }
  2690. }
  2691. """
  2692. try:
  2693. limit = request.args.get('limit', 100, type=int)
  2694. level = request.args.get('level')
  2695. # 限制最大查询数量
  2696. limit = min(limit, 1000)
  2697. manager = get_data_pipeline_manager()
  2698. # 验证任务是否存在
  2699. task_info = manager.get_task_status(task_id)
  2700. if not task_info:
  2701. return jsonify(not_found_response(
  2702. response_text=f"任务不存在: {task_id}"
  2703. )), 404
  2704. # 获取任务目录下的日志文件
  2705. import os
  2706. from pathlib import Path
  2707. # 获取项目根目录的绝对路径
  2708. project_root = Path(__file__).parent.absolute()
  2709. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  2710. log_file = task_dir / "data_pipeline.log"
  2711. logs = []
  2712. if log_file.exists():
  2713. try:
  2714. # 读取日志文件的最后N行
  2715. with open(log_file, 'r', encoding='utf-8') as f:
  2716. lines = f.readlines()
  2717. # 取最后limit行
  2718. recent_lines = lines[-limit:] if len(lines) > limit else lines
  2719. # 解析日志行
  2720. import re
  2721. log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
  2722. for line in recent_lines:
  2723. line = line.strip()
  2724. if not line:
  2725. continue
  2726. match = re.match(log_pattern, line)
  2727. if match:
  2728. timestamp, log_level, logger_name, message = match.groups()
  2729. # 级别过滤
  2730. if level and log_level != level.upper():
  2731. continue
  2732. logs.append({
  2733. "timestamp": timestamp,
  2734. "level": log_level,
  2735. "logger": logger_name,
  2736. "message": message
  2737. })
  2738. else:
  2739. # 处理多行日志(如异常堆栈)
  2740. if logs:
  2741. logs[-1]["message"] += f"\n{line}"
  2742. except Exception as e:
  2743. logger.error(f"读取日志文件失败: {e}")
  2744. response_data = {
  2745. "task_id": task_id,
  2746. "logs": logs,
  2747. "total": len(logs),
  2748. "source": "file",
  2749. "log_file": str(log_file) if log_file.exists() else None
  2750. }
  2751. return jsonify(success_response(
  2752. response_text="获取任务日志成功",
  2753. data=response_data
  2754. ))
  2755. except Exception as e:
  2756. logger.error(f"获取数据管道任务日志失败: {str(e)}")
  2757. return jsonify(internal_error_response(
  2758. response_text="获取任务日志失败,请稍后重试"
  2759. )), 500
  2760. @app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['GET'])
  2761. def list_data_pipeline_tasks():
  2762. """获取数据管道任务列表"""
  2763. try:
  2764. limit = request.args.get('limit', 50, type=int)
  2765. offset = request.args.get('offset', 0, type=int)
  2766. status_filter = request.args.get('status')
  2767. # 限制查询数量
  2768. limit = min(limit, 100)
  2769. manager = get_data_pipeline_manager()
  2770. tasks = manager.get_tasks_list(
  2771. limit=limit,
  2772. offset=offset,
  2773. status_filter=status_filter
  2774. )
  2775. # 格式化任务列表
  2776. formatted_tasks = []
  2777. for task in tasks:
  2778. formatted_tasks.append({
  2779. "task_id": task.get('task_id'),
  2780. "task_name": task.get('task_name'),
  2781. "status": task.get('status'),
  2782. "step_status": task.get('step_status'),
  2783. "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
  2784. "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
  2785. "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
  2786. "created_by": task.get('by_user'),
  2787. "db_name": task.get('db_name'),
  2788. "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
  2789. # 新增字段
  2790. "directory_exists": task.get('directory_exists', True), # 默认为True,兼容旧数据
  2791. "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
  2792. })
  2793. response_data = {
  2794. "tasks": formatted_tasks,
  2795. "total": len(formatted_tasks),
  2796. "limit": limit,
  2797. "offset": offset
  2798. }
  2799. return jsonify(success_response(
  2800. response_text="获取任务列表成功",
  2801. data=response_data
  2802. ))
  2803. except Exception as e:
  2804. logger.error(f"获取数据管道任务列表失败: {str(e)}")
  2805. return jsonify(internal_error_response(
  2806. response_text="获取任务列表失败,请稍后重试"
  2807. )), 500
  2808. @app.flask_app.route('/api/v0/data_pipeline/tasks/query', methods=['POST'])
  2809. def query_data_pipeline_tasks():
  2810. """
  2811. 高级查询数据管道任务列表
  2812. 支持复杂筛选、排序、分页功能
  2813. 请求体:
  2814. {
  2815. "page": 1, // 页码,必须大于0,默认1
  2816. "page_size": 20, // 每页大小,1-100之间,默认20
  2817. "status": "completed", // 可选,任务状态筛选:"pending"|"running"|"completed"|"failed"|"cancelled"
  2818. "task_name": "highway", // 可选,任务名称模糊搜索,最大100字符
  2819. "created_by": "user123", // 可选,创建者精确匹配
  2820. "db_name": "highway_db", // 可选,数据库名称精确匹配
  2821. "created_time_start": "2025-01-01T00:00:00", // 可选,创建时间范围开始
  2822. "created_time_end": "2025-12-31T23:59:59", // 可选,创建时间范围结束
  2823. "started_time_start": "2025-01-01T00:00:00", // 可选,开始时间范围开始
  2824. "started_time_end": "2025-12-31T23:59:59", // 可选,开始时间范围结束
  2825. "completed_time_start": "2025-01-01T00:00:00", // 可选,完成时间范围开始
  2826. "completed_time_end": "2025-12-31T23:59:59", // 可选,完成时间范围结束
  2827. "sort_by": "created_at", // 可选,排序字段:"created_at"|"started_at"|"completed_at"|"task_name"|"status",默认"created_at"
  2828. "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
  2829. }
  2830. 响应:
  2831. {
  2832. "success": true,
  2833. "code": 200,
  2834. "message": "查询任务列表成功",
  2835. "data": {
  2836. "tasks": [...],
  2837. "pagination": {
  2838. "page": 1,
  2839. "page_size": 20,
  2840. "total": 150,
  2841. "total_pages": 8,
  2842. "has_next": true,
  2843. "has_prev": false
  2844. },
  2845. "filters_applied": {...},
  2846. "sort_applied": {...},
  2847. "query_time": "0.045s"
  2848. }
  2849. }
  2850. """
  2851. try:
  2852. # 获取请求数据
  2853. req = request.get_json(force=True) if request.is_json else {}
  2854. # 解析参数,设置默认值
  2855. page = req.get('page', 1)
  2856. page_size = req.get('page_size', 20)
  2857. status = req.get('status')
  2858. task_name = req.get('task_name')
  2859. created_by = req.get('created_by')
  2860. db_name = req.get('db_name')
  2861. created_time_start = req.get('created_time_start')
  2862. created_time_end = req.get('created_time_end')
  2863. started_time_start = req.get('started_time_start')
  2864. started_time_end = req.get('started_time_end')
  2865. completed_time_start = req.get('completed_time_start')
  2866. completed_time_end = req.get('completed_time_end')
  2867. sort_by = req.get('sort_by', 'created_at')
  2868. sort_order = req.get('sort_order', 'desc')
  2869. # 参数验证
  2870. # 验证分页参数
  2871. if page < 1:
  2872. return jsonify(bad_request_response(
  2873. response_text="页码必须大于0",
  2874. invalid_params=['page']
  2875. )), 400
  2876. if page_size < 1 or page_size > 100:
  2877. return jsonify(bad_request_response(
  2878. response_text="每页大小必须在1-100之间",
  2879. invalid_params=['page_size']
  2880. )), 400
  2881. # 验证任务名称长度
  2882. if task_name and len(task_name) > 100:
  2883. return jsonify(bad_request_response(
  2884. response_text="任务名称搜索关键词最大长度为100字符",
  2885. invalid_params=['task_name']
  2886. )), 400
  2887. # 验证排序参数
  2888. allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
  2889. if sort_by not in allowed_sort_fields:
  2890. return jsonify(bad_request_response(
  2891. response_text=f"不支持的排序字段: {sort_by},支持的字段: {', '.join(allowed_sort_fields)}",
  2892. invalid_params=['sort_by']
  2893. )), 400
  2894. if sort_order.lower() not in ['asc', 'desc']:
  2895. return jsonify(bad_request_response(
  2896. response_text="排序方向必须是 'asc' 或 'desc'",
  2897. invalid_params=['sort_order']
  2898. )), 400
  2899. # 验证状态筛选
  2900. if status:
  2901. allowed_statuses = ['pending', 'running', 'completed', 'failed', 'cancelled']
  2902. if status not in allowed_statuses:
  2903. return jsonify(bad_request_response(
  2904. response_text=f"不支持的状态值: {status},支持的状态: {', '.join(allowed_statuses)}",
  2905. invalid_params=['status']
  2906. )), 400
  2907. # 调用管理器执行查询
  2908. manager = get_data_pipeline_manager()
  2909. result = manager.query_tasks_advanced(
  2910. page=page,
  2911. page_size=page_size,
  2912. status=status,
  2913. task_name=task_name,
  2914. created_by=created_by,
  2915. db_name=db_name,
  2916. created_time_start=created_time_start,
  2917. created_time_end=created_time_end,
  2918. started_time_start=started_time_start,
  2919. started_time_end=started_time_end,
  2920. completed_time_start=completed_time_start,
  2921. completed_time_end=completed_time_end,
  2922. sort_by=sort_by,
  2923. sort_order=sort_order
  2924. )
  2925. # 格式化任务列表
  2926. formatted_tasks = []
  2927. for task in result['tasks']:
  2928. formatted_tasks.append({
  2929. "task_id": task.get('task_id'),
  2930. "task_name": task.get('task_name'),
  2931. "status": task.get('status'),
  2932. "step_status": task.get('step_status'),
  2933. "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
  2934. "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
  2935. "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
  2936. "created_by": task.get('by_user'),
  2937. "db_name": task.get('db_name'),
  2938. "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
  2939. "directory_exists": task.get('directory_exists', True),
  2940. "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
  2941. })
  2942. # 构建响应数据
  2943. response_data = {
  2944. "tasks": formatted_tasks,
  2945. "pagination": result['pagination'],
  2946. "filters_applied": {
  2947. k: v for k, v in {
  2948. "status": status,
  2949. "task_name": task_name,
  2950. "created_by": created_by,
  2951. "db_name": db_name,
  2952. "created_time_start": created_time_start,
  2953. "created_time_end": created_time_end,
  2954. "started_time_start": started_time_start,
  2955. "started_time_end": started_time_end,
  2956. "completed_time_start": completed_time_start,
  2957. "completed_time_end": completed_time_end
  2958. }.items() if v
  2959. },
  2960. "sort_applied": {
  2961. "sort_by": sort_by,
  2962. "sort_order": sort_order
  2963. },
  2964. "query_time": result.get('query_time', '0.000s')
  2965. }
  2966. return jsonify(success_response(
  2967. response_text="查询任务列表成功",
  2968. data=response_data
  2969. ))
  2970. except Exception as e:
  2971. logger.error(f"查询数据管道任务列表失败: {str(e)}")
  2972. return jsonify(internal_error_response(
  2973. response_text="查询任务列表失败,请稍后重试"
  2974. )), 500
  2975. # ==================== 表检查API端点 ====================
  2976. import asyncio
  2977. from data_pipeline.api.table_inspector_api import TableInspectorAPI
  2978. @app.flask_app.route('/api/v0/database/tables', methods=['POST'])
  2979. def get_database_tables():
  2980. """
  2981. 获取数据库表列表
  2982. 请求体:
  2983. {
  2984. "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
  2985. "schema": "public,ods", // 可选,支持多个schema用逗号分隔,默认为public
  2986. "table_name_pattern": "ods_*" // 可选,表名模式匹配,支持通配符:ods_*、*_dim、*fact*、ods_%
  2987. }
  2988. 响应:
  2989. {
  2990. "success": true,
  2991. "code": 200,
  2992. "message": "获取表列表成功",
  2993. "data": {
  2994. "tables": ["public.table1", "public.table2", "ods.table3"],
  2995. "total": 3,
  2996. "schemas": ["public", "ods"],
  2997. "table_name_pattern": "ods_*"
  2998. }
  2999. }
  3000. """
  3001. try:
  3002. req = request.get_json(force=True)
  3003. # 处理数据库连接参数(可选)
  3004. db_connection = req.get('db_connection')
  3005. if not db_connection:
  3006. # 使用app_config的默认数据库配置
  3007. import app_config
  3008. db_params = app_config.APP_DB_CONFIG
  3009. db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
  3010. logger.info("使用默认数据库配置获取表列表")
  3011. else:
  3012. logger.info("使用用户指定的数据库配置获取表列表")
  3013. # 可选参数
  3014. schema = req.get('schema', '')
  3015. table_name_pattern = req.get('table_name_pattern')
  3016. # 创建表检查API实例
  3017. table_inspector = TableInspectorAPI()
  3018. # 使用asyncio运行异步方法
  3019. async def get_tables():
  3020. return await table_inspector.get_tables_list(db_connection, schema, table_name_pattern)
  3021. # 在新的事件循环中运行异步方法
  3022. try:
  3023. loop = asyncio.new_event_loop()
  3024. asyncio.set_event_loop(loop)
  3025. tables = loop.run_until_complete(get_tables())
  3026. finally:
  3027. loop.close()
  3028. # 解析schema信息
  3029. parsed_schemas = table_inspector._parse_schemas(schema)
  3030. response_data = {
  3031. "tables": tables,
  3032. "total": len(tables),
  3033. "schemas": parsed_schemas,
  3034. "db_connection_info": {
  3035. "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
  3036. }
  3037. }
  3038. # 如果使用了表名模式,添加到响应中
  3039. if table_name_pattern:
  3040. response_data["table_name_pattern"] = table_name_pattern
  3041. return jsonify(success_response(
  3042. response_text="获取表列表成功",
  3043. data=response_data
  3044. )), 200
  3045. except Exception as e:
  3046. logger.error(f"获取数据库表列表失败: {str(e)}")
  3047. return jsonify(internal_error_response(
  3048. response_text=f"获取表列表失败: {str(e)}"
  3049. )), 500
  3050. @app.flask_app.route('/api/v0/database/table/ddl', methods=['POST'])
  3051. def get_table_ddl():
  3052. """
  3053. 获取表的DDL语句或MD文档
  3054. 请求体:
  3055. {
  3056. "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
  3057. "table": "public.test",
  3058. "business_context": "这是高速公路服务区的相关数据", // 可选
  3059. "type": "ddl" // 可选,支持ddl/md/both,默认为ddl
  3060. }
  3061. 响应:
  3062. {
  3063. "success": true,
  3064. "code": 200,
  3065. "message": "获取表DDL成功",
  3066. "data": {
  3067. "ddl": "create table public.test (...);",
  3068. "md": "## test表...", // 仅当type为md或both时返回
  3069. "table_info": {
  3070. "table_name": "test",
  3071. "schema_name": "public",
  3072. "full_name": "public.test",
  3073. "comment": "测试表",
  3074. "field_count": 10,
  3075. "row_count": 1000
  3076. },
  3077. "fields": [...]
  3078. }
  3079. }
  3080. """
  3081. try:
  3082. req = request.get_json(force=True)
  3083. # 处理参数(table仍为必需,db_connection可选)
  3084. table = req.get('table')
  3085. db_connection = req.get('db_connection')
  3086. if not table:
  3087. return jsonify(bad_request_response(
  3088. response_text="缺少必需参数:table",
  3089. missing_params=['table']
  3090. )), 400
  3091. if not db_connection:
  3092. # 使用app_config的默认数据库配置
  3093. import app_config
  3094. db_params = app_config.APP_DB_CONFIG
  3095. db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
  3096. logger.info("使用默认数据库配置获取表DDL")
  3097. else:
  3098. logger.info("使用用户指定的数据库配置获取表DDL")
  3099. # 可选参数
  3100. business_context = req.get('business_context', '')
  3101. output_type = req.get('type', 'ddl')
  3102. # 验证type参数
  3103. valid_types = ['ddl', 'md', 'both']
  3104. if output_type not in valid_types:
  3105. return jsonify(bad_request_response(
  3106. response_text=f"无效的type参数: {output_type},支持的值: {valid_types}",
  3107. invalid_params=['type']
  3108. )), 400
  3109. # 创建表检查API实例
  3110. table_inspector = TableInspectorAPI()
  3111. # 使用asyncio运行异步方法
  3112. async def get_ddl():
  3113. return await table_inspector.get_table_ddl(
  3114. db_connection=db_connection,
  3115. table=table,
  3116. business_context=business_context,
  3117. output_type=output_type
  3118. )
  3119. # 在新的事件循环中运行异步方法
  3120. try:
  3121. loop = asyncio.new_event_loop()
  3122. asyncio.set_event_loop(loop)
  3123. result = loop.run_until_complete(get_ddl())
  3124. finally:
  3125. loop.close()
  3126. response_data = {
  3127. **result,
  3128. "generation_info": {
  3129. "business_context": business_context,
  3130. "output_type": output_type,
  3131. "has_llm_comments": bool(business_context),
  3132. "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
  3133. }
  3134. }
  3135. return jsonify(success_response(
  3136. response_text=f"获取表{output_type.upper()}成功",
  3137. data=response_data
  3138. )), 200
  3139. except Exception as e:
  3140. logger.error(f"获取表DDL失败: {str(e)}")
  3141. return jsonify(internal_error_response(
  3142. response_text=f"获取表{output_type.upper() if 'output_type' in locals() else 'DDL'}失败: {str(e)}"
  3143. )), 500
  3144. # ==================== Data Pipeline 文件管理 API ====================
  3145. from flask import send_file
  3146. # 创建文件管理器
  3147. data_pipeline_file_manager = None
  3148. def get_data_pipeline_file_manager():
  3149. """获取Data Pipeline文件管理器单例"""
  3150. global data_pipeline_file_manager
  3151. if data_pipeline_file_manager is None:
  3152. data_pipeline_file_manager = SimpleFileManager()
  3153. return data_pipeline_file_manager
  3154. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['GET'])
  3155. def get_data_pipeline_task_files(task_id):
  3156. """获取任务文件列表"""
  3157. try:
  3158. file_manager = get_data_pipeline_file_manager()
  3159. # 获取任务文件
  3160. files = file_manager.get_task_files(task_id)
  3161. directory_info = file_manager.get_directory_info(task_id)
  3162. # 格式化文件信息
  3163. formatted_files = []
  3164. for file_info in files:
  3165. formatted_files.append({
  3166. "file_name": file_info['file_name'],
  3167. "file_type": file_info['file_type'],
  3168. "file_size": file_info['file_size'],
  3169. "file_size_formatted": file_info['file_size_formatted'],
  3170. "created_at": file_info['created_at'].isoformat() if file_info.get('created_at') else None,
  3171. "modified_at": file_info['modified_at'].isoformat() if file_info.get('modified_at') else None,
  3172. "is_readable": file_info['is_readable']
  3173. })
  3174. response_data = {
  3175. "task_id": task_id,
  3176. "files": formatted_files,
  3177. "directory_info": directory_info
  3178. }
  3179. return jsonify(success_response(
  3180. response_text="获取任务文件列表成功",
  3181. data=response_data
  3182. ))
  3183. except Exception as e:
  3184. logger.error(f"获取任务文件列表失败: {str(e)}")
  3185. return jsonify(internal_error_response(
  3186. response_text="获取任务文件列表失败,请稍后重试"
  3187. )), 500
  3188. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files/<file_name>', methods=['GET'])
  3189. def download_data_pipeline_task_file(task_id, file_name):
  3190. """下载任务文件"""
  3191. try:
  3192. logger.info(f"开始下载文件: task_id={task_id}, file_name={file_name}")
  3193. # 直接构建文件路径,避免依赖数据库
  3194. from pathlib import Path
  3195. import os
  3196. # 获取项目根目录的绝对路径
  3197. project_root = Path(__file__).parent.absolute()
  3198. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  3199. file_path = task_dir / file_name
  3200. logger.info(f"文件路径: {file_path}")
  3201. # 检查文件是否存在
  3202. if not file_path.exists():
  3203. logger.warning(f"文件不存在: {file_path}")
  3204. return jsonify(not_found_response(
  3205. response_text=f"文件不存在: {file_name}"
  3206. )), 404
  3207. # 检查是否为文件(而不是目录)
  3208. if not file_path.is_file():
  3209. logger.warning(f"路径不是文件: {file_path}")
  3210. return jsonify(bad_request_response(
  3211. response_text=f"路径不是有效文件: {file_name}"
  3212. )), 400
  3213. # 安全检查:确保文件在允许的目录内
  3214. try:
  3215. file_path.resolve().relative_to(task_dir.resolve())
  3216. except ValueError:
  3217. logger.warning(f"文件路径不安全: {file_path}")
  3218. return jsonify(bad_request_response(
  3219. response_text="非法的文件路径"
  3220. )), 400
  3221. # 检查文件是否可读
  3222. if not os.access(file_path, os.R_OK):
  3223. logger.warning(f"文件不可读: {file_path}")
  3224. return jsonify(bad_request_response(
  3225. response_text="文件不可读"
  3226. )), 400
  3227. logger.info(f"开始发送文件: {file_path}")
  3228. return send_file(
  3229. file_path,
  3230. as_attachment=True,
  3231. download_name=file_name
  3232. )
  3233. except Exception as e:
  3234. logger.error(f"下载任务文件失败: task_id={task_id}, file_name={file_name}, 错误: {str(e)}", exc_info=True)
  3235. return jsonify(internal_error_response(
  3236. response_text="下载文件失败,请稍后重试"
  3237. )), 500
  3238. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/upload-table-list', methods=['POST'])
  3239. def upload_table_list_file(task_id):
  3240. """
  3241. 上传表清单文件
  3242. 表单参数:
  3243. - file: 要上传的表清单文件(multipart/form-data)
  3244. 响应:
  3245. {
  3246. "success": true,
  3247. "code": 200,
  3248. "message": "表清单文件上传成功",
  3249. "data": {
  3250. "task_id": "task_20250701_123456",
  3251. "filename": "table_list.txt",
  3252. "file_size": 1024,
  3253. "file_size_formatted": "1.0 KB"
  3254. }
  3255. }
  3256. """
  3257. try:
  3258. # 验证任务是否存在
  3259. manager = get_data_pipeline_manager()
  3260. task_info = manager.get_task_status(task_id)
  3261. if not task_info:
  3262. return jsonify(not_found_response(
  3263. response_text=f"任务不存在: {task_id}"
  3264. )), 404
  3265. # 检查是否有文件上传
  3266. if 'file' not in request.files:
  3267. return jsonify(bad_request_response(
  3268. response_text="请选择要上传的表清单文件",
  3269. missing_params=['file']
  3270. )), 400
  3271. file = request.files['file']
  3272. # 验证文件名
  3273. if file.filename == '':
  3274. return jsonify(bad_request_response(
  3275. response_text="请选择有效的文件"
  3276. )), 400
  3277. try:
  3278. # 使用文件管理器上传文件
  3279. file_manager = get_data_pipeline_file_manager()
  3280. result = file_manager.upload_table_list_file(task_id, file)
  3281. response_data = {
  3282. "task_id": task_id,
  3283. "filename": result["filename"],
  3284. "file_size": result["file_size"],
  3285. "file_size_formatted": result["file_size_formatted"],
  3286. "upload_time": result["upload_time"].isoformat() if result.get("upload_time") else None
  3287. }
  3288. return jsonify(success_response(
  3289. response_text="表清单文件上传成功",
  3290. data=response_data
  3291. )), 200
  3292. except ValueError as e:
  3293. # 文件验证错误(如文件太大、空文件等)
  3294. return jsonify(bad_request_response(
  3295. response_text=str(e)
  3296. )), 400
  3297. except Exception as e:
  3298. logger.error(f"上传表清单文件失败: {str(e)}")
  3299. return jsonify(internal_error_response(
  3300. response_text="文件上传失败,请稍后重试"
  3301. )), 500
  3302. except Exception as e:
  3303. logger.error(f"处理表清单文件上传请求失败: {str(e)}")
  3304. return jsonify(internal_error_response(
  3305. response_text="处理上传请求失败,请稍后重试"
  3306. )), 500
  3307. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list-info', methods=['GET'])
  3308. def get_table_list_info(task_id):
  3309. """
  3310. 获取任务的表清单文件信息
  3311. 响应:
  3312. {
  3313. "success": true,
  3314. "code": 200,
  3315. "message": "获取表清单文件信息成功",
  3316. "data": {
  3317. "task_id": "task_20250701_123456",
  3318. "has_file": true,
  3319. "filename": "table_list.txt",
  3320. "file_path": "./data_pipeline/training_data/task_20250701_123456/table_list.txt",
  3321. "file_size": 1024,
  3322. "file_size_formatted": "1.0 KB",
  3323. "uploaded_at": "2025-07-01T12:34:56",
  3324. "table_count": 5,
  3325. "table_names": ["table_name_1", "table_name_2", "table_name_3", "table_name_4", "table_name_5"],
  3326. "is_readable": true
  3327. }
  3328. }
  3329. """
  3330. try:
  3331. file_manager = get_data_pipeline_file_manager()
  3332. # 获取表清单文件信息
  3333. table_list_info = file_manager.get_table_list_file_info(task_id)
  3334. response_data = {
  3335. "task_id": task_id,
  3336. "has_file": table_list_info.get("exists", False),
  3337. **table_list_info
  3338. }
  3339. return jsonify(success_response(
  3340. response_text="获取表清单文件信息成功",
  3341. data=response_data
  3342. ))
  3343. except Exception as e:
  3344. logger.error(f"获取表清单文件信息失败: {str(e)}")
  3345. return jsonify(internal_error_response(
  3346. response_text="获取表清单文件信息失败,请稍后重试"
  3347. )), 500
  3348. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list', methods=['POST'])
  3349. def create_table_list_from_names(task_id):
  3350. """
  3351. 通过POST方式提交表名列表并创建table_list.txt文件
  3352. 请求体:
  3353. {
  3354. "tables": ["table1", "schema.table2", "table3"]
  3355. }
  3356. 或者:
  3357. {
  3358. "tables": "table1,schema.table2,table3"
  3359. }
  3360. 响应:
  3361. {
  3362. "success": true,
  3363. "code": 200,
  3364. "message": "表清单已成功创建",
  3365. "data": {
  3366. "task_id": "task_20250701_123456",
  3367. "filename": "table_list.txt",
  3368. "table_count": 3,
  3369. "file_size": 45,
  3370. "file_size_formatted": "45 B",
  3371. "created_time": "2025-07-01T12:34:56"
  3372. }
  3373. }
  3374. """
  3375. try:
  3376. # 验证任务是否存在
  3377. manager = get_data_pipeline_manager()
  3378. task_info = manager.get_task_status(task_id)
  3379. if not task_info:
  3380. return jsonify(not_found_response(
  3381. response_text=f"任务不存在: {task_id}"
  3382. )), 404
  3383. # 获取请求数据
  3384. req = request.get_json(force=True)
  3385. tables_param = req.get('tables')
  3386. if not tables_param:
  3387. return jsonify(bad_request_response(
  3388. response_text="缺少必需参数:tables",
  3389. missing_params=['tables']
  3390. )), 400
  3391. # 处理不同格式的表名参数
  3392. try:
  3393. if isinstance(tables_param, str):
  3394. # 逗号分隔的字符串格式
  3395. table_names = [name.strip() for name in tables_param.split(',') if name.strip()]
  3396. elif isinstance(tables_param, list):
  3397. # 数组格式
  3398. table_names = [str(name).strip() for name in tables_param if str(name).strip()]
  3399. else:
  3400. return jsonify(bad_request_response(
  3401. response_text="tables参数格式错误,应为字符串(逗号分隔)或数组"
  3402. )), 400
  3403. if not table_names:
  3404. return jsonify(bad_request_response(
  3405. response_text="表名列表不能为空"
  3406. )), 400
  3407. except Exception as e:
  3408. return jsonify(bad_request_response(
  3409. response_text=f"解析tables参数失败: {str(e)}"
  3410. )), 400
  3411. try:
  3412. # 使用文件管理器创建表清单文件
  3413. file_manager = get_data_pipeline_file_manager()
  3414. result = file_manager.create_table_list_from_names(task_id, table_names)
  3415. response_data = {
  3416. "task_id": task_id,
  3417. "filename": result["filename"],
  3418. "table_count": result["table_count"],
  3419. "unique_table_count": result["unique_table_count"],
  3420. "file_size": result["file_size"],
  3421. "file_size_formatted": result["file_size_formatted"],
  3422. "created_time": result["created_time"].isoformat() if result.get("created_time") else None,
  3423. "original_count": len(table_names) if isinstance(table_names, list) else len(tables_param.split(','))
  3424. }
  3425. return jsonify(success_response(
  3426. response_text=f"表清单已成功创建,包含 {result['table_count']} 个表",
  3427. data=response_data
  3428. )), 200
  3429. except ValueError as e:
  3430. # 表名验证错误(如格式错误、数量限制等)
  3431. return jsonify(bad_request_response(
  3432. response_text=str(e)
  3433. )), 400
  3434. except Exception as e:
  3435. logger.error(f"创建表清单文件失败: {str(e)}")
  3436. return jsonify(internal_error_response(
  3437. response_text="创建表清单文件失败,请稍后重试"
  3438. )), 500
  3439. except Exception as e:
  3440. logger.error(f"处理表清单创建请求失败: {str(e)}")
  3441. return jsonify(internal_error_response(
  3442. response_text="处理请求失败,请稍后重试"
  3443. )), 500
  3444. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['POST'])
  3445. def upload_file_to_task(task_id):
  3446. """
  3447. 上传文件到指定任务目录
  3448. 表单参数:
  3449. - file: 要上传的文件(multipart/form-data)
  3450. - overwrite_mode: 重名处理模式 (backup, replace, skip),默认为backup
  3451. 支持的文件类型:
  3452. - .ddl: DDL文件
  3453. - .md: Markdown文档
  3454. - .txt: 文本文件
  3455. - .json: JSON文件
  3456. - .sql: SQL文件
  3457. - .csv: CSV文件
  3458. 重名处理模式:
  3459. - backup: 备份原文件(默认)
  3460. - replace: 直接覆盖
  3461. - skip: 跳过上传
  3462. 响应:
  3463. {
  3464. "success": true,
  3465. "code": 200,
  3466. "message": "文件上传成功",
  3467. "data": {
  3468. "task_id": "task_20250701_123456",
  3469. "uploaded_file": {
  3470. "filename": "test.ddl",
  3471. "size": 1024,
  3472. "size_formatted": "1.0 KB",
  3473. "uploaded_at": "2025-07-01T12:34:56",
  3474. "overwrite_mode": "backup"
  3475. },
  3476. "backup_info": { // 仅当overwrite_mode为backup且文件已存在时返回
  3477. "had_existing_file": true,
  3478. "backup_filename": "test.ddl_bak1",
  3479. "backup_version": 1,
  3480. "backup_created_at": "2025-07-01T12:34:56"
  3481. }
  3482. }
  3483. }
  3484. """
  3485. try:
  3486. # 验证任务是否存在
  3487. manager = get_data_pipeline_manager()
  3488. task_info = manager.get_task_status(task_id)
  3489. if not task_info:
  3490. return jsonify(not_found_response(
  3491. response_text=f"任务不存在: {task_id}"
  3492. )), 404
  3493. # 检查是否有文件上传
  3494. if 'file' not in request.files:
  3495. return jsonify(bad_request_response(
  3496. response_text="请选择要上传的文件",
  3497. missing_params=['file']
  3498. )), 400
  3499. file = request.files['file']
  3500. # 验证文件名
  3501. if file.filename == '':
  3502. return jsonify(bad_request_response(
  3503. response_text="请选择有效的文件"
  3504. )), 400
  3505. # 获取重名处理模式
  3506. overwrite_mode = request.form.get('overwrite_mode', 'backup')
  3507. # 验证重名处理模式
  3508. valid_modes = ['backup', 'replace', 'skip']
  3509. if overwrite_mode not in valid_modes:
  3510. return jsonify(bad_request_response(
  3511. response_text=f"无效的overwrite_mode参数: {overwrite_mode},支持的值: {valid_modes}",
  3512. invalid_params=['overwrite_mode']
  3513. )), 400
  3514. try:
  3515. # 使用文件管理器上传文件
  3516. file_manager = get_data_pipeline_file_manager()
  3517. result = file_manager.upload_file_to_task(task_id, file, file.filename, overwrite_mode)
  3518. # 检查是否跳过上传
  3519. if result.get('skipped'):
  3520. return jsonify(success_response(
  3521. response_text=result.get('message', '文件已存在,跳过上传'),
  3522. data=result
  3523. )), 200
  3524. return jsonify(success_response(
  3525. response_text="文件上传成功",
  3526. data=result
  3527. )), 200
  3528. except ValueError as e:
  3529. # 文件验证错误(如文件太大、空文件、不支持的类型等)
  3530. return jsonify(bad_request_response(
  3531. response_text=str(e)
  3532. )), 400
  3533. except Exception as e:
  3534. logger.error(f"上传文件失败: {str(e)}")
  3535. return jsonify(internal_error_response(
  3536. response_text="文件上传失败,请稍后重试"
  3537. )), 500
  3538. except Exception as e:
  3539. logger.error(f"处理文件上传请求失败: {str(e)}")
  3540. return jsonify(internal_error_response(
  3541. response_text="处理上传请求失败,请稍后重试"
  3542. )), 500
  3543. # ==================== 任务目录删除API ====================
  3544. import shutil
  3545. from pathlib import Path
  3546. from datetime import datetime
  3547. import psycopg2
  3548. from app_config import PGVECTOR_CONFIG
  3549. def delete_task_directory_simple(task_id, delete_database_records=False):
  3550. """
  3551. 简单的任务目录删除功能
  3552. - 删除 data_pipeline/training_data/{task_id} 目录
  3553. - 更新数据库中的 directory_exists 字段
  3554. - 可选:删除数据库记录
  3555. """
  3556. try:
  3557. # 1. 删除目录
  3558. project_root = Path(__file__).parent.absolute()
  3559. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  3560. deleted_files_count = 0
  3561. deleted_size = 0
  3562. if task_dir.exists():
  3563. # 计算删除前的统计信息
  3564. for file_path in task_dir.rglob('*'):
  3565. if file_path.is_file():
  3566. deleted_files_count += 1
  3567. deleted_size += file_path.stat().st_size
  3568. # 删除目录
  3569. shutil.rmtree(task_dir)
  3570. directory_deleted = True
  3571. operation_message = "目录删除成功"
  3572. else:
  3573. directory_deleted = False
  3574. operation_message = "目录不存在,无需删除"
  3575. # 2. 更新数据库
  3576. database_records_deleted = False
  3577. try:
  3578. conn = psycopg2.connect(**PGVECTOR_CONFIG)
  3579. cur = conn.cursor()
  3580. if delete_database_records:
  3581. # 删除任务步骤记录
  3582. cur.execute("DELETE FROM data_pipeline_task_steps WHERE task_id = %s", (task_id,))
  3583. # 删除任务主记录
  3584. cur.execute("DELETE FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
  3585. database_records_deleted = True
  3586. else:
  3587. # 只更新目录状态
  3588. cur.execute("""
  3589. UPDATE data_pipeline_tasks
  3590. SET directory_exists = FALSE, updated_at = CURRENT_TIMESTAMP
  3591. WHERE task_id = %s
  3592. """, (task_id,))
  3593. conn.commit()
  3594. cur.close()
  3595. conn.close()
  3596. except Exception as db_error:
  3597. logger.error(f"数据库操作失败: {db_error}")
  3598. # 数据库失败不影响文件删除的结果
  3599. # 3. 格式化文件大小
  3600. def format_size(size_bytes):
  3601. if size_bytes < 1024:
  3602. return f"{size_bytes} B"
  3603. elif size_bytes < 1024**2:
  3604. return f"{size_bytes/1024:.1f} KB"
  3605. elif size_bytes < 1024**3:
  3606. return f"{size_bytes/(1024**2):.1f} MB"
  3607. else:
  3608. return f"{size_bytes/(1024**3):.1f} GB"
  3609. return {
  3610. "success": True,
  3611. "task_id": task_id,
  3612. "directory_deleted": directory_deleted,
  3613. "database_records_deleted": database_records_deleted,
  3614. "deleted_files_count": deleted_files_count,
  3615. "deleted_size": format_size(deleted_size),
  3616. "deleted_at": datetime.now().isoformat(),
  3617. "operation_message": operation_message # 新增:具体的操作消息
  3618. }
  3619. except Exception as e:
  3620. logger.error(f"删除任务目录失败: {task_id}, 错误: {str(e)}")
  3621. return {
  3622. "success": False,
  3623. "task_id": task_id,
  3624. "error": str(e),
  3625. "error_code": "DELETE_FAILED",
  3626. "operation_message": f"删除操作失败: {str(e)}" # 新增:失败消息
  3627. }
  3628. @app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['DELETE'])
  3629. def delete_tasks():
  3630. """删除任务目录(支持单个和批量)"""
  3631. try:
  3632. # 智能获取参数:支持JSON body和URL查询参数两种方式
  3633. def get_request_parameter(param_name, array_param_name=None):
  3634. """从JSON body或URL查询参数中获取参数值"""
  3635. # 1. 优先从JSON body获取
  3636. if request.is_json:
  3637. try:
  3638. json_data = request.get_json()
  3639. if json_data and param_name in json_data:
  3640. return json_data[param_name]
  3641. except:
  3642. pass
  3643. # 2. 从URL查询参数获取
  3644. if param_name in request.args:
  3645. value = request.args.get(param_name)
  3646. # 处理布尔值
  3647. if value.lower() in ('true', '1', 'yes'):
  3648. return True
  3649. elif value.lower() in ('false', '0', 'no'):
  3650. return False
  3651. return value
  3652. # 3. 处理数组参数(如 task_ids[])
  3653. if array_param_name and array_param_name in request.args:
  3654. return request.args.getlist(array_param_name)
  3655. return None
  3656. # 获取参数
  3657. task_ids = get_request_parameter('task_ids', 'task_ids[]')
  3658. confirm = get_request_parameter('confirm')
  3659. if not task_ids:
  3660. return jsonify(bad_request_response(
  3661. response_text="缺少必需参数: task_ids",
  3662. missing_params=['task_ids']
  3663. )), 400
  3664. if not confirm:
  3665. return jsonify(bad_request_response(
  3666. response_text="缺少必需参数: confirm",
  3667. missing_params=['confirm']
  3668. )), 400
  3669. if confirm != True:
  3670. return jsonify(bad_request_response(
  3671. response_text="confirm参数必须为true以确认删除操作"
  3672. )), 400
  3673. if not isinstance(task_ids, list) or len(task_ids) == 0:
  3674. return jsonify(bad_request_response(
  3675. response_text="task_ids必须是非空的任务ID列表"
  3676. )), 400
  3677. # 获取可选参数
  3678. delete_database_records = get_request_parameter('delete_database_records') or False
  3679. continue_on_error = get_request_parameter('continue_on_error')
  3680. if continue_on_error is None:
  3681. continue_on_error = True
  3682. # 执行批量删除操作
  3683. deleted_tasks = []
  3684. failed_tasks = []
  3685. total_size_freed = 0
  3686. for task_id in task_ids:
  3687. result = delete_task_directory_simple(task_id, delete_database_records)
  3688. if result["success"]:
  3689. deleted_tasks.append(result)
  3690. # 累计释放的空间大小(这里简化处理,实际应该解析size字符串)
  3691. else:
  3692. failed_tasks.append({
  3693. "task_id": task_id,
  3694. "error": result["error"],
  3695. "error_code": result.get("error_code", "UNKNOWN")
  3696. })
  3697. if not continue_on_error:
  3698. break
  3699. # 构建响应
  3700. summary = {
  3701. "total_requested": len(task_ids),
  3702. "successfully_deleted": len(deleted_tasks),
  3703. "failed": len(failed_tasks)
  3704. }
  3705. batch_result = {
  3706. "deleted_tasks": deleted_tasks,
  3707. "failed_tasks": failed_tasks,
  3708. "summary": summary,
  3709. "deleted_at": datetime.now().isoformat()
  3710. }
  3711. # 构建智能响应消息
  3712. if len(task_ids) == 1:
  3713. # 单个删除:使用具体的操作消息
  3714. if summary["failed"] == 0:
  3715. # 从deleted_tasks中获取具体的操作消息
  3716. operation_msg = deleted_tasks[0].get('operation_message', '任务处理完成')
  3717. message = operation_msg
  3718. else:
  3719. # 从failed_tasks中获取错误消息
  3720. error_msg = failed_tasks[0].get('error', '删除失败')
  3721. message = f"任务删除失败: {error_msg}"
  3722. else:
  3723. # 批量删除:统计各种操作结果
  3724. directory_deleted_count = sum(1 for task in deleted_tasks if task.get('directory_deleted', False))
  3725. directory_not_exist_count = sum(1 for task in deleted_tasks if not task.get('directory_deleted', False))
  3726. if summary["failed"] == 0:
  3727. # 全部成功
  3728. if directory_deleted_count > 0 and directory_not_exist_count > 0:
  3729. message = f"批量操作完成:{directory_deleted_count}个目录已删除,{directory_not_exist_count}个目录不存在"
  3730. elif directory_deleted_count > 0:
  3731. message = f"批量删除完成:成功删除{directory_deleted_count}个目录"
  3732. elif directory_not_exist_count > 0:
  3733. message = f"批量操作完成:{directory_not_exist_count}个目录不存在,无需删除"
  3734. else:
  3735. message = "批量操作完成"
  3736. elif summary["successfully_deleted"] == 0:
  3737. message = f"批量删除失败:{summary['failed']}个任务处理失败"
  3738. else:
  3739. message = f"批量删除部分完成:成功{summary['successfully_deleted']}个,失败{summary['failed']}个"
  3740. return jsonify(success_response(
  3741. response_text=message,
  3742. data=batch_result
  3743. )), 200
  3744. except Exception as e:
  3745. logger.error(f"删除任务失败: 错误: {str(e)}")
  3746. return jsonify(internal_error_response(
  3747. response_text="删除任务失败,请稍后重试"
  3748. )), 500
  3749. @app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs/query', methods=['POST'])
  3750. def query_data_pipeline_task_logs(task_id):
  3751. """
  3752. 高级查询数据管道任务日志
  3753. 支持复杂筛选、排序、分页功能
  3754. 请求体:
  3755. {
  3756. "page": 1, // 页码,必须大于0,默认1
  3757. "page_size": 50, // 每页大小,1-500之间,默认50
  3758. "level": "ERROR", // 可选,日志级别筛选:"DEBUG"|"INFO"|"WARNING"|"ERROR"|"CRITICAL"
  3759. "start_time": "2025-01-01 00:00:00", // 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
  3760. "end_time": "2025-01-02 23:59:59", // 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
  3761. "keyword": "failed", // 可选,关键字搜索(消息内容模糊匹配)
  3762. "logger_name": "DDLGenerator", // 可选,日志记录器名称精确匹配
  3763. "step_name": "ddl_generation", // 可选,执行步骤名称精确匹配
  3764. "sort_by": "timestamp", // 可选,排序字段:"timestamp"|"level"|"logger"|"step"|"line_number",默认"timestamp"
  3765. "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
  3766. }
  3767. 响应:
  3768. {
  3769. "success": true,
  3770. "code": 200,
  3771. "message": "查询任务日志成功",
  3772. "data": {
  3773. "logs": [
  3774. {
  3775. "timestamp": "2025-07-01 14:30:52",
  3776. "level": "INFO",
  3777. "logger": "SimpleWorkflowExecutor",
  3778. "step": "ddl_generation",
  3779. "message": "开始DDL生成",
  3780. "line_number": 15
  3781. }
  3782. ],
  3783. "pagination": {
  3784. "page": 1,
  3785. "page_size": 50,
  3786. "total": 1000,
  3787. "total_pages": 20,
  3788. "has_next": true,
  3789. "has_prev": false
  3790. },
  3791. "log_file_info": {
  3792. "exists": true,
  3793. "file_path": "/path/to/log/file",
  3794. "file_size": 1024000,
  3795. "file_size_formatted": "1.0 MB",
  3796. "last_modified": "2025-07-01T14:30:52",
  3797. "total_lines": 5000
  3798. },
  3799. "query_time": "0.123s"
  3800. }
  3801. }
  3802. """
  3803. try:
  3804. # 验证任务是否存在
  3805. manager = get_data_pipeline_manager()
  3806. task_info = manager.get_task_status(task_id)
  3807. if not task_info:
  3808. return jsonify(not_found_response(
  3809. response_text=f"任务不存在: {task_id}"
  3810. )), 404
  3811. # 解析请求数据
  3812. request_data = request.get_json() or {}
  3813. # 参数验证
  3814. def _is_valid_time_format(time_str):
  3815. """验证时间格式是否有效"""
  3816. if not time_str:
  3817. return True
  3818. # 支持的时间格式
  3819. time_formats = [
  3820. '%Y-%m-%d %H:%M:%S', # 2025-01-01 00:00:00
  3821. '%Y-%m-%d', # 2025-01-01
  3822. '%Y-%m-%dT%H:%M:%S', # 2025-01-01T00:00:00
  3823. '%Y-%m-%dT%H:%M:%S.%f', # 2025-01-01T00:00:00.123456
  3824. ]
  3825. for fmt in time_formats:
  3826. try:
  3827. from datetime import datetime
  3828. datetime.strptime(time_str, fmt)
  3829. return True
  3830. except ValueError:
  3831. continue
  3832. return False
  3833. # 提取和验证参数
  3834. page = request_data.get('page', 1)
  3835. page_size = request_data.get('page_size', 50)
  3836. level = request_data.get('level')
  3837. start_time = request_data.get('start_time')
  3838. end_time = request_data.get('end_time')
  3839. keyword = request_data.get('keyword')
  3840. logger_name = request_data.get('logger_name')
  3841. step_name = request_data.get('step_name')
  3842. sort_by = request_data.get('sort_by', 'timestamp')
  3843. sort_order = request_data.get('sort_order', 'desc')
  3844. # 参数验证
  3845. if not isinstance(page, int) or page < 1:
  3846. return jsonify(bad_request_response(
  3847. response_text="页码必须是大于0的整数"
  3848. )), 400
  3849. if not isinstance(page_size, int) or page_size < 1 or page_size > 500:
  3850. return jsonify(bad_request_response(
  3851. response_text="每页大小必须是1-500之间的整数"
  3852. )), 400
  3853. # 验证日志级别
  3854. if level and level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
  3855. return jsonify(bad_request_response(
  3856. response_text="日志级别必须是DEBUG、INFO、WARNING、ERROR、CRITICAL之一"
  3857. )), 400
  3858. # 验证时间格式
  3859. if not _is_valid_time_format(start_time):
  3860. return jsonify(bad_request_response(
  3861. response_text="开始时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
  3862. )), 400
  3863. if not _is_valid_time_format(end_time):
  3864. return jsonify(bad_request_response(
  3865. response_text="结束时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
  3866. )), 400
  3867. # 验证关键字长度
  3868. if keyword and len(keyword) > 200:
  3869. return jsonify(bad_request_response(
  3870. response_text="关键字长度不能超过200个字符"
  3871. )), 400
  3872. # 验证排序字段
  3873. allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
  3874. if sort_by not in allowed_sort_fields:
  3875. return jsonify(bad_request_response(
  3876. response_text=f"排序字段必须是以下之一: {', '.join(allowed_sort_fields)}"
  3877. )), 400
  3878. # 验证排序方向
  3879. if sort_order.lower() not in ['asc', 'desc']:
  3880. return jsonify(bad_request_response(
  3881. response_text="排序方向必须是asc或desc"
  3882. )), 400
  3883. # 创建工作流执行器并查询日志
  3884. from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
  3885. executor = SimpleWorkflowExecutor(task_id)
  3886. try:
  3887. result = executor.query_logs_advanced(
  3888. page=page,
  3889. page_size=page_size,
  3890. level=level,
  3891. start_time=start_time,
  3892. end_time=end_time,
  3893. keyword=keyword,
  3894. logger_name=logger_name,
  3895. step_name=step_name,
  3896. sort_by=sort_by,
  3897. sort_order=sort_order
  3898. )
  3899. return jsonify(success_response(
  3900. response_text="查询任务日志成功",
  3901. data=result
  3902. ))
  3903. finally:
  3904. executor.cleanup()
  3905. except Exception as e:
  3906. logger.error(f"查询数据管道任务日志失败: {str(e)}")
  3907. return jsonify(internal_error_response(
  3908. response_text="查询任务日志失败,请稍后重试"
  3909. )), 500
  3910. if __name__ == '__main__':
  3911. logger.info("启动Flask应用: http://localhost:8084")
  3912. app.run(host="0.0.0.0", port=8084, debug=True)