unified_api.py 146 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800
  1. """
  2. 统一 API 服务
  3. 集成 citu_app.py 指定API 和 react_agent/api.py 的所有功能
  4. 提供数据库问答、Redis对话管理、QA反馈、训练数据管理、React Agent等功能
  5. 使用普通 Flask 应用 + ASGI 包装实现异步支持
  6. """
  7. import asyncio
  8. import logging
  9. import atexit
  10. import os
  11. import sys
  12. from datetime import datetime, timedelta
  13. from typing import Optional, Dict, Any, TYPE_CHECKING, Union
  14. import signal
  15. from threading import Thread
  16. if TYPE_CHECKING:
  17. from react_agent.agent import CustomReactAgent
  18. # 初始化日志系统 - 必须在最前面
  19. from core.logging import initialize_logging, get_app_logger
  20. initialize_logging()
  21. # 标准 Flask 导入
  22. from flask import Flask, request, jsonify, session, send_file
  23. import redis.asyncio as redis
  24. # 基础依赖
  25. import pandas as pd
  26. import json
  27. import sqlparse
  28. # 项目模块导入
  29. from core.vanna_llm_factory import create_vanna_instance
  30. from common.redis_conversation_manager import RedisConversationManager
  31. from common.qa_feedback_manager import QAFeedbackManager
  32. # Data Pipeline 相关导入 - 从 citu_app.py 迁移
  33. from data_pipeline.api.simple_workflow import SimpleWorkflowManager, SimpleWorkflowExecutor
  34. from data_pipeline.api.simple_file_manager import SimpleFileManager
  35. from data_pipeline.api.table_inspector_api import TableInspectorAPI
  36. from common.result import (
  37. success_response, bad_request_response, not_found_response, internal_error_response,
  38. error_response, service_unavailable_response,
  39. agent_success_response, agent_error_response,
  40. validation_failed_response
  41. )
  42. from app_config import (
  43. USER_MAX_CONVERSATIONS, CONVERSATION_CONTEXT_COUNT,
  44. DEFAULT_ANONYMOUS_USER, ENABLE_QUESTION_ANSWER_CACHE
  45. )
  46. # 创建标准 Flask 应用
  47. app = Flask(__name__)
  48. # 创建日志记录器
  49. logger = get_app_logger("UnifiedApp")
  50. # React Agent 导入
  51. try:
  52. from react_agent.agent import CustomReactAgent
  53. from react_agent.enhanced_redis_api import get_conversation_detail_from_redis
  54. except ImportError:
  55. try:
  56. from test.custom_react_agent.agent import CustomReactAgent
  57. from test.custom_react_agent.enhanced_redis_api import get_conversation_detail_from_redis
  58. except ImportError:
  59. logger.warning("无法导入 CustomReactAgent,React Agent功能将不可用")
  60. CustomReactAgent = None
  61. get_conversation_detail_from_redis = None
  62. # 初始化核心组件
  63. vn = create_vanna_instance()
  64. redis_conversation_manager = RedisConversationManager()
  65. # ==================== React Agent 全局实例管理 ====================
  66. _react_agent_instance: Optional[Any] = None
  67. _redis_client: Optional[redis.Redis] = None
  68. def validate_request_data(data: Dict[str, Any]) -> Dict[str, Any]:
  69. """验证请求数据,并支持从thread_id中推断user_id"""
  70. errors = []
  71. # 验证 question(必填)
  72. question = data.get('question', '')
  73. if not question or not question.strip():
  74. errors.append('问题不能为空')
  75. elif len(question) > 2000:
  76. errors.append('问题长度不能超过2000字符')
  77. # 优先获取 thread_id
  78. thread_id = data.get('thread_id') or data.get('conversation_id')
  79. # 获取 user_id,但暂不设置默认值
  80. user_id = data.get('user_id')
  81. # 如果没有传递 user_id,则尝试从 thread_id 中推断
  82. if not user_id:
  83. if thread_id and ':' in thread_id:
  84. inferred_user_id = thread_id.split(':', 1)[0]
  85. if inferred_user_id:
  86. user_id = inferred_user_id
  87. logger.info(f"👤 未提供user_id,从 thread_id '{thread_id}' 中推断出: '{user_id}'")
  88. else:
  89. user_id = 'guest'
  90. else:
  91. user_id = 'guest'
  92. # 验证 user_id 长度
  93. if user_id and len(user_id) > 50:
  94. errors.append('用户ID长度不能超过50字符')
  95. # 用户ID与会话ID一致性校验
  96. if thread_id:
  97. if ':' not in thread_id:
  98. errors.append('会话ID格式无效,期望格式为 user_id:timestamp')
  99. else:
  100. thread_user_id = thread_id.split(':', 1)[0]
  101. if thread_user_id != user_id:
  102. errors.append(f'会话归属验证失败:会话ID [{thread_id}] 不属于当前用户 [{user_id}]')
  103. if errors:
  104. raise ValueError('; '.join(errors))
  105. return {
  106. 'question': question.strip(),
  107. 'user_id': user_id,
  108. 'thread_id': thread_id # 可选,不传则自动生成新会话
  109. }
  110. async def get_react_agent() -> Any:
  111. """获取 React Agent 实例(懒加载)"""
  112. global _react_agent_instance, _redis_client
  113. if _react_agent_instance is None:
  114. if CustomReactAgent is None:
  115. logger.error("❌ CustomReactAgent 未能导入,无法初始化")
  116. raise ImportError("CustomReactAgent 未能导入")
  117. logger.info("🚀 正在异步初始化 Custom React Agent...")
  118. try:
  119. # 设置环境变量
  120. os.environ['REDIS_URL'] = 'redis://localhost:6379'
  121. # 初始化共享的Redis客户端
  122. _redis_client = redis.from_url('redis://localhost:6379', decode_responses=True)
  123. await _redis_client.ping()
  124. logger.info("✅ Redis客户端连接成功")
  125. _react_agent_instance = await CustomReactAgent.create()
  126. logger.info("✅ React Agent 异步初始化完成")
  127. except Exception as e:
  128. logger.error(f"❌ React Agent 异步初始化失败: {e}")
  129. raise
  130. return _react_agent_instance
  131. async def ensure_agent_ready() -> bool:
  132. """异步确保Agent实例可用"""
  133. global _react_agent_instance
  134. if _react_agent_instance is None:
  135. await get_react_agent()
  136. # 测试Agent是否还可用
  137. try:
  138. test_result = await _react_agent_instance.get_user_recent_conversations("__test__", 1)
  139. return True
  140. except Exception as e:
  141. logger.warning(f"⚠️ Agent实例不可用: {e}")
  142. _react_agent_instance = None
  143. await get_react_agent()
  144. return True
  145. def get_user_conversations_simple_sync(user_id: str, limit: int = 10):
  146. """直接从Redis获取用户对话,测试版本"""
  147. import redis
  148. import json
  149. try:
  150. # 创建Redis连接
  151. redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
  152. redis_client.ping()
  153. # 扫描用户的checkpoint keys
  154. pattern = f"checkpoint:{user_id}:*"
  155. logger.info(f"🔍 扫描模式: {pattern}")
  156. keys = []
  157. cursor = 0
  158. while True:
  159. cursor, batch = redis_client.scan(cursor=cursor, match=pattern, count=1000)
  160. keys.extend(batch)
  161. if cursor == 0:
  162. break
  163. logger.info(f"📋 找到 {len(keys)} 个keys")
  164. # 解析thread信息
  165. thread_data = {}
  166. for key in keys:
  167. try:
  168. parts = key.split(':')
  169. if len(parts) >= 4:
  170. thread_id = f"{parts[1]}:{parts[2]}" # user_id:timestamp
  171. timestamp = parts[2]
  172. if thread_id not in thread_data:
  173. thread_data[thread_id] = {
  174. "thread_id": thread_id,
  175. "timestamp": timestamp,
  176. "keys": []
  177. }
  178. thread_data[thread_id]["keys"].append(key)
  179. except Exception as e:
  180. logger.warning(f"解析key失败 {key}: {e}")
  181. continue
  182. logger.info(f"📊 找到 {len(thread_data)} 个thread")
  183. # 按时间戳排序
  184. sorted_threads = sorted(
  185. thread_data.values(),
  186. key=lambda x: x["timestamp"],
  187. reverse=True
  188. )[:limit]
  189. # 获取每个thread的详细信息
  190. conversations = []
  191. for thread_info in sorted_threads:
  192. try:
  193. thread_id = thread_info["thread_id"]
  194. # 获取最新的checkpoint数据
  195. latest_key = max(thread_info["keys"])
  196. # 先检查key的数据类型
  197. key_type = redis_client.type(latest_key)
  198. logger.info(f"🔍 Key {latest_key} 的类型: {key_type}")
  199. data = None
  200. if key_type == 'string':
  201. data = redis_client.get(latest_key)
  202. elif key_type == 'hash':
  203. # 如果是hash类型,获取所有字段
  204. hash_data = redis_client.hgetall(latest_key)
  205. logger.info(f"🔍 Hash字段: {list(hash_data.keys())}")
  206. # 尝试获取可能的数据字段
  207. for field in ['data', 'state', 'value', 'checkpoint']:
  208. if field in hash_data:
  209. data = hash_data[field]
  210. break
  211. if not data and hash_data:
  212. # 如果没找到预期字段,取第一个值试试
  213. data = list(hash_data.values())[0]
  214. elif key_type == 'list':
  215. # 如果是list类型,获取最后一个元素
  216. data = redis_client.lindex(latest_key, -1)
  217. elif key_type == 'ReJSON-RL':
  218. # 这是RedisJSON类型,使用JSON.GET命令
  219. logger.info(f"🔍 使用JSON.GET获取RedisJSON数据")
  220. try:
  221. # 使用JSON.GET命令获取整个JSON对象
  222. json_data = redis_client.execute_command('JSON.GET', latest_key)
  223. if json_data:
  224. data = json_data # JSON.GET返回的就是JSON字符串
  225. logger.info(f"🔍 JSON数据长度: {len(data)} 字符")
  226. else:
  227. logger.warning(f"⚠️ JSON.GET 返回空数据")
  228. continue
  229. except Exception as json_error:
  230. logger.error(f"❌ JSON.GET 失败: {json_error}")
  231. continue
  232. else:
  233. logger.warning(f"⚠️ 未知的key类型: {key_type}")
  234. continue
  235. if data:
  236. try:
  237. checkpoint_data = json.loads(data)
  238. # 调试:查看JSON数据结构
  239. logger.info(f"🔍 JSON顶级keys: {list(checkpoint_data.keys())}")
  240. # 根据您提供的JSON结构,消息在 checkpoint.channel_values.messages
  241. messages = []
  242. # 首先检查是否有checkpoint字段
  243. if 'checkpoint' in checkpoint_data:
  244. checkpoint = checkpoint_data['checkpoint']
  245. if isinstance(checkpoint, dict) and 'channel_values' in checkpoint:
  246. channel_values = checkpoint['channel_values']
  247. if isinstance(channel_values, dict) and 'messages' in channel_values:
  248. messages = channel_values['messages']
  249. logger.info(f"🔍 找到messages: {len(messages)} 条消息")
  250. # 如果没有checkpoint字段,尝试直接在channel_values
  251. if not messages and 'channel_values' in checkpoint_data:
  252. channel_values = checkpoint_data['channel_values']
  253. if isinstance(channel_values, dict) and 'messages' in channel_values:
  254. messages = channel_values['messages']
  255. logger.info(f"🔍 找到messages(直接路径): {len(messages)} 条消息")
  256. # 生成对话预览
  257. preview = "空对话"
  258. if messages:
  259. for msg in messages:
  260. # 处理LangChain消息格式:{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "...", "type": "human"}}
  261. if isinstance(msg, dict):
  262. # 检查是否是LangChain格式的HumanMessage
  263. if (msg.get('lc') == 1 and
  264. msg.get('type') == 'constructor' and
  265. 'id' in msg and
  266. isinstance(msg['id'], list) and
  267. len(msg['id']) >= 4 and
  268. msg['id'][3] == 'HumanMessage' and
  269. 'kwargs' in msg):
  270. kwargs = msg['kwargs']
  271. if kwargs.get('type') == 'human' and 'content' in kwargs:
  272. content = str(kwargs['content'])
  273. preview = content[:50] + "..." if len(content) > 50 else content
  274. break
  275. # 兼容其他格式
  276. elif msg.get('type') == 'human' and 'content' in msg:
  277. content = str(msg['content'])
  278. preview = content[:50] + "..." if len(content) > 50 else content
  279. break
  280. conversations.append({
  281. "thread_id": thread_id,
  282. "user_id": user_id,
  283. "timestamp": thread_info["timestamp"],
  284. "message_count": len(messages),
  285. "conversation_preview": preview
  286. })
  287. except json.JSONDecodeError:
  288. logger.error(f"❌ JSON解析失败,数据类型: {type(data)}, 长度: {len(str(data))}")
  289. logger.error(f"❌ 数据开头: {str(data)[:200]}...")
  290. continue
  291. except Exception as e:
  292. logger.error(f"处理thread {thread_info['thread_id']} 失败: {e}")
  293. continue
  294. redis_client.close()
  295. logger.info(f"✅ 返回 {len(conversations)} 个对话")
  296. return conversations
  297. except Exception as e:
  298. logger.error(f"❌ Redis查询失败: {e}")
  299. return []
  300. def cleanup_resources():
  301. """清理资源"""
  302. global _react_agent_instance, _redis_client
  303. async def async_cleanup():
  304. if _react_agent_instance:
  305. await _react_agent_instance.close()
  306. logger.info("✅ React Agent 资源已清理")
  307. if _redis_client:
  308. await _redis_client.aclose()
  309. logger.info("✅ Redis客户端已关闭")
  310. try:
  311. asyncio.run(async_cleanup())
  312. except Exception as e:
  313. logger.error(f"清理资源失败: {e}")
  314. atexit.register(cleanup_resources)
  315. # ==================== 基础路由 ====================
  316. @app.route("/")
  317. def index():
  318. """根路径健康检查"""
  319. return jsonify({"message": "统一API服务正在运行", "version": "1.0.0"})
  320. @app.route('/health', methods=['GET'])
  321. def health_check():
  322. """健康检查端点"""
  323. try:
  324. health_status = {
  325. "status": "healthy",
  326. "react_agent_initialized": _react_agent_instance is not None,
  327. "timestamp": datetime.now().isoformat(),
  328. "services": {
  329. "redis": redis_conversation_manager.is_available(),
  330. "vanna": vn is not None
  331. }
  332. }
  333. return jsonify(health_status), 200
  334. except Exception as e:
  335. logger.error(f"健康检查失败: {e}")
  336. return jsonify({"status": "unhealthy", "error": str(e)}), 500
  337. # ==================== React Agent API ====================
  338. @app.route("/api/v0/ask_react_agent", methods=["POST"])
  339. async def ask_react_agent():
  340. """异步React Agent智能问答接口(从 custom_react_agent 迁移,原路由:/api/chat)"""
  341. global _react_agent_instance
  342. # 确保Agent已初始化
  343. if not await ensure_agent_ready():
  344. return jsonify({
  345. "code": 503,
  346. "message": "服务未就绪",
  347. "success": False,
  348. "error": "React Agent 初始化失败"
  349. }), 503
  350. try:
  351. # 获取请求数据
  352. try:
  353. data = request.get_json(force=True)
  354. except Exception as json_error:
  355. logger.warning(f"⚠️ JSON解析失败: {json_error}")
  356. return jsonify({
  357. "code": 400,
  358. "message": "请求格式错误",
  359. "success": False,
  360. "error": "无效的JSON格式,请检查请求体中是否存在语法错误(如多余的逗号、引号不匹配等)",
  361. "details": str(json_error)
  362. }), 400
  363. if not data:
  364. return jsonify({
  365. "code": 400,
  366. "message": "请求参数错误",
  367. "success": False,
  368. "error": "请求体不能为空"
  369. }), 400
  370. # 验证请求数据
  371. validated_data = validate_request_data(data)
  372. logger.info(f"📨 收到React Agent请求 - User: {validated_data['user_id']}, Question: {validated_data['question'][:50]}...")
  373. # 异步调用处理
  374. agent_result = await _react_agent_instance.chat(
  375. message=validated_data['question'],
  376. user_id=validated_data['user_id'],
  377. thread_id=validated_data['thread_id']
  378. )
  379. if not agent_result.get("success", False):
  380. # Agent处理失败
  381. error_msg = agent_result.get("error", "React Agent处理失败")
  382. logger.error(f"❌ React Agent处理失败: {error_msg}")
  383. # 检查是否建议重试
  384. retry_suggested = agent_result.get("retry_suggested", False)
  385. error_code = 503 if retry_suggested else 500
  386. message = "服务暂时不可用,请稍后重试" if retry_suggested else "处理失败"
  387. return jsonify({
  388. "code": error_code,
  389. "message": message,
  390. "success": False,
  391. "error": error_msg,
  392. "retry_suggested": retry_suggested,
  393. "data": {
  394. "conversation_id": agent_result.get("thread_id"),
  395. "user_id": validated_data['user_id'],
  396. "timestamp": datetime.now().isoformat()
  397. }
  398. }), error_code
  399. # Agent处理成功
  400. api_data = agent_result.get("api_data", {})
  401. # 构建响应数据(按照 react_agent/api.py 的正确格式)
  402. response_data = {
  403. "response": api_data.get("response", ""),
  404. "conversation_id": agent_result.get("thread_id"),
  405. "user_id": validated_data['user_id'],
  406. "react_agent_meta": api_data.get("react_agent_meta", {
  407. "thread_id": agent_result.get("thread_id"),
  408. "agent_version": "custom_react_v1_async"
  409. }),
  410. "timestamp": datetime.now().isoformat()
  411. }
  412. # 可选字段:SQL(仅当执行SQL时存在)
  413. if "sql" in api_data:
  414. response_data["sql"] = api_data["sql"]
  415. # 可选字段:records(仅当有查询结果时存在)
  416. if "records" in api_data:
  417. response_data["records"] = api_data["records"]
  418. return jsonify({
  419. "code": 200,
  420. "message": "处理成功",
  421. "success": True,
  422. "data": response_data
  423. }), 200
  424. except ValueError as ve:
  425. # 参数验证错误
  426. logger.warning(f"⚠️ 参数验证失败: {ve}")
  427. return jsonify({
  428. "code": 400,
  429. "message": "参数验证失败",
  430. "success": False,
  431. "error": str(ve)
  432. }), 400
  433. except Exception as e:
  434. logger.error(f"❌ React Agent API 异常: {e}")
  435. return jsonify({
  436. "code": 500,
  437. "message": "内部服务错误",
  438. "success": False,
  439. "error": "服务暂时不可用,请稍后重试"
  440. }), 500
  441. # ==================== LangGraph Agent API ====================
  442. # 全局Agent实例(单例模式)
  443. citu_langraph_agent = None
  444. def get_citu_langraph_agent():
  445. """获取LangGraph Agent实例(懒加载)"""
  446. global citu_langraph_agent
  447. if citu_langraph_agent is None:
  448. try:
  449. from agent.citu_agent import CituLangGraphAgent
  450. logger.info("开始创建LangGraph Agent实例...")
  451. citu_langraph_agent = CituLangGraphAgent()
  452. logger.info("LangGraph Agent实例创建成功")
  453. except ImportError as e:
  454. logger.critical(f"Agent模块导入失败: {str(e)}")
  455. raise Exception(f"Agent模块导入失败: {str(e)}")
  456. except Exception as e:
  457. logger.critical(f"LangGraph Agent实例创建失败: {str(e)}")
  458. raise Exception(f"Agent初始化失败: {str(e)}")
  459. return citu_langraph_agent
  460. @app.route('/api/v0/ask_agent', methods=['POST'])
  461. def ask_agent():
  462. """支持对话上下文的ask_agent API"""
  463. req = request.get_json(force=True)
  464. question = req.get("question", None)
  465. browser_session_id = req.get("session_id", None)
  466. user_id_input = req.get("user_id", None)
  467. conversation_id_input = req.get("conversation_id", None)
  468. continue_conversation = req.get("continue_conversation", False)
  469. api_routing_mode = req.get("routing_mode", None)
  470. VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
  471. if not question:
  472. return jsonify(bad_request_response(
  473. response_text="缺少必需参数:question",
  474. missing_params=["question"]
  475. )), 400
  476. if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
  477. return jsonify(bad_request_response(
  478. response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
  479. invalid_params=["routing_mode"]
  480. )), 400
  481. try:
  482. # 获取登录用户ID
  483. login_user_id = session.get('user_id') if 'user_id' in session else None
  484. # 智能ID解析
  485. user_id = redis_conversation_manager.resolve_user_id(
  486. user_id_input, browser_session_id, request.remote_addr, login_user_id
  487. )
  488. conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
  489. user_id, conversation_id_input, continue_conversation
  490. )
  491. # 获取上下文和上下文类型(提前到缓存检查之前)
  492. context = redis_conversation_manager.get_context(conversation_id)
  493. # 获取上下文类型:从最后一条助手消息的metadata中获取类型
  494. context_type = None
  495. if context:
  496. try:
  497. # 获取最后一条助手消息的metadata
  498. messages = redis_conversation_manager.get_messages(conversation_id, limit=10)
  499. for message in reversed(messages): # 从最新的开始找
  500. if message.get("role") == "assistant":
  501. metadata = message.get("metadata", {})
  502. context_type = metadata.get("type")
  503. if context_type:
  504. logger.info(f"[AGENT_API] 检测到上下文类型: {context_type}")
  505. break
  506. except Exception as e:
  507. logger.warning(f"获取上下文类型失败: {str(e)}")
  508. # 检查缓存(新逻辑:放宽使用条件,严控存储条件)
  509. cached_answer = redis_conversation_manager.get_cached_answer(question, context)
  510. if cached_answer:
  511. logger.info(f"[AGENT_API] 使用缓存答案")
  512. # 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
  513. cached_response_type = cached_answer.get("type", "UNKNOWN")
  514. if cached_response_type == "DATABASE":
  515. # DATABASE类型:按优先级选择内容
  516. if cached_answer.get("response"):
  517. # 优先级1:错误或解释性回复(如SQL生成失败)
  518. assistant_response = cached_answer.get("response")
  519. elif cached_answer.get("summary"):
  520. # 优先级2:查询成功的摘要
  521. assistant_response = cached_answer.get("summary")
  522. elif cached_answer.get("query_result"):
  523. # 优先级3:构造简单描述
  524. query_result = cached_answer.get("query_result")
  525. row_count = query_result.get("row_count", 0)
  526. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  527. else:
  528. # 异常情况
  529. assistant_response = "数据库查询已处理。"
  530. else:
  531. # CHAT类型:直接使用response
  532. assistant_response = cached_answer.get("response", "")
  533. # 更新对话历史
  534. redis_conversation_manager.save_message(conversation_id, "user", question)
  535. redis_conversation_manager.save_message(
  536. conversation_id, "assistant",
  537. assistant_response,
  538. metadata={"from_cache": True}
  539. )
  540. # 添加对话信息到缓存结果
  541. cached_answer["conversation_id"] = conversation_id
  542. cached_answer["user_id"] = user_id
  543. cached_answer["from_cache"] = True
  544. cached_answer.update(conversation_status)
  545. # 使用agent_success_response返回标准格式
  546. return jsonify(agent_success_response(
  547. response_type=cached_answer.get("type", "UNKNOWN"),
  548. response=cached_answer.get("response", ""),
  549. sql=cached_answer.get("sql"),
  550. records=cached_answer.get("query_result"),
  551. summary=cached_answer.get("summary"),
  552. session_id=browser_session_id,
  553. execution_path=cached_answer.get("execution_path", []),
  554. classification_info=cached_answer.get("classification_info", {}),
  555. conversation_id=conversation_id,
  556. user_id=user_id,
  557. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  558. context_used=bool(context),
  559. from_cache=True,
  560. conversation_status=conversation_status["status"],
  561. conversation_message=conversation_status["message"],
  562. requested_conversation_id=conversation_status.get("requested_id")
  563. ))
  564. # 保存用户消息
  565. redis_conversation_manager.save_message(conversation_id, "user", question)
  566. # 构建带上下文的问题
  567. if context:
  568. enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
  569. logger.info(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
  570. else:
  571. enhanced_question = question
  572. logger.info(f"[AGENT_API] 新对话,无上下文")
  573. # 确定最终使用的路由模式(优先级逻辑)
  574. if api_routing_mode:
  575. # API传了参数,优先使用
  576. effective_routing_mode = api_routing_mode
  577. logger.info(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
  578. else:
  579. # API没传参数,使用配置文件
  580. try:
  581. from app_config import QUESTION_ROUTING_MODE
  582. effective_routing_mode = QUESTION_ROUTING_MODE
  583. logger.info(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
  584. except ImportError:
  585. effective_routing_mode = "hybrid"
  586. logger.info(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
  587. # Agent处理
  588. try:
  589. agent = get_citu_langraph_agent()
  590. except Exception as e:
  591. logger.critical(f"Agent初始化失败: {str(e)}")
  592. return jsonify(service_unavailable_response(
  593. response_text="AI服务暂时不可用,请稍后重试",
  594. can_retry=True
  595. )), 503
  596. # 异步调用Agent处理问题
  597. import asyncio
  598. agent_result = asyncio.run(agent.process_question(
  599. question=enhanced_question, # 使用增强后的问题
  600. session_id=browser_session_id,
  601. context_type=context_type, # 传递上下文类型
  602. routing_mode=effective_routing_mode # 新增:传递路由模式
  603. ))
  604. # 处理Agent结果
  605. if agent_result.get("success", False):
  606. response_type = agent_result.get("type", "UNKNOWN")
  607. response_text = agent_result.get("response", "")
  608. sql = agent_result.get("sql")
  609. query_result = agent_result.get("query_result")
  610. summary = agent_result.get("summary")
  611. execution_path = agent_result.get("execution_path", [])
  612. classification_info = agent_result.get("classification_info", {})
  613. # 确定助手回复内容的优先级
  614. if response_type == "DATABASE":
  615. if response_text:
  616. assistant_response = response_text
  617. elif summary:
  618. assistant_response = summary
  619. elif query_result:
  620. row_count = query_result.get("row_count", 0)
  621. assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
  622. else:
  623. assistant_response = "数据库查询已处理。"
  624. else:
  625. assistant_response = response_text
  626. # 保存助手回复
  627. redis_conversation_manager.save_message(
  628. conversation_id, "assistant", assistant_response,
  629. metadata={
  630. "type": response_type,
  631. "sql": sql,
  632. "execution_path": execution_path
  633. }
  634. )
  635. # 缓存成功的答案(新逻辑:只缓存无上下文的问答)
  636. # 直接缓存agent_result,它已经包含所有需要的字段
  637. redis_conversation_manager.cache_answer(question, agent_result, context)
  638. # 使用agent_success_response的正确方式
  639. return jsonify(agent_success_response(
  640. response_type=response_type,
  641. response=response_text,
  642. sql=sql,
  643. records=query_result,
  644. summary=summary,
  645. session_id=browser_session_id,
  646. execution_path=execution_path,
  647. classification_info=classification_info,
  648. conversation_id=conversation_id,
  649. user_id=user_id,
  650. is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
  651. context_used=bool(context),
  652. from_cache=False,
  653. conversation_status=conversation_status["status"],
  654. conversation_message=conversation_status["message"],
  655. requested_conversation_id=conversation_status.get("requested_id"),
  656. routing_mode_used=effective_routing_mode, # 新增:实际使用的路由模式
  657. routing_mode_source="api" if api_routing_mode else "config" # 新增:路由模式来源
  658. ))
  659. else:
  660. # 错误处理
  661. error_message = agent_result.get("error", "Agent处理失败")
  662. error_code = agent_result.get("error_code", 500)
  663. return jsonify(agent_error_response(
  664. response_text=error_message,
  665. error_type="agent_processing_failed",
  666. code=error_code,
  667. session_id=browser_session_id,
  668. conversation_id=conversation_id,
  669. user_id=user_id
  670. )), error_code
  671. except Exception as e:
  672. logger.error(f"ask_agent执行失败: {str(e)}")
  673. return jsonify(internal_error_response(
  674. response_text="查询处理失败,请稍后重试"
  675. )), 500
  676. # ==================== QA反馈系统API ====================
  677. qa_feedback_manager = None
  678. def get_qa_feedback_manager():
  679. """获取QA反馈管理器实例(懒加载)"""
  680. global qa_feedback_manager
  681. if qa_feedback_manager is None:
  682. try:
  683. qa_feedback_manager = QAFeedbackManager(vanna_instance=vn)
  684. logger.info("QA反馈管理器实例创建成功")
  685. except Exception as e:
  686. logger.critical(f"QA反馈管理器创建失败: {str(e)}")
  687. raise Exception(f"QA反馈管理器初始化失败: {str(e)}")
  688. return qa_feedback_manager
  689. @app.route('/api/v0/qa_feedback/query', methods=['POST'])
  690. def qa_feedback_query():
  691. """
  692. 查询反馈记录API
  693. 支持分页、筛选和排序功能
  694. """
  695. try:
  696. req = request.get_json(force=True)
  697. # 解析参数,设置默认值
  698. page = req.get('page', 1)
  699. page_size = req.get('page_size', 20)
  700. is_thumb_up = req.get('is_thumb_up')
  701. create_time_start = req.get('create_time_start')
  702. create_time_end = req.get('create_time_end')
  703. is_in_training_data = req.get('is_in_training_data')
  704. sort_by = req.get('sort_by', 'create_time')
  705. sort_order = req.get('sort_order', 'desc')
  706. # 参数验证
  707. if page < 1:
  708. return jsonify(bad_request_response(
  709. response_text="页码必须大于0",
  710. invalid_params=["page"]
  711. )), 400
  712. if page_size < 1 or page_size > 100:
  713. return jsonify(bad_request_response(
  714. response_text="每页大小必须在1-100之间",
  715. invalid_params=["page_size"]
  716. )), 400
  717. # 获取反馈管理器并查询
  718. manager = get_qa_feedback_manager()
  719. records, total = manager.query_feedback(
  720. page=page,
  721. page_size=page_size,
  722. is_thumb_up=is_thumb_up,
  723. create_time_start=create_time_start,
  724. create_time_end=create_time_end,
  725. is_in_training_data=is_in_training_data,
  726. sort_by=sort_by,
  727. sort_order=sort_order
  728. )
  729. total_pages = (total + page_size - 1) // page_size
  730. return jsonify(success_response(
  731. response_text=f"查询成功,共找到 {total} 条记录",
  732. data={
  733. "records": records,
  734. "pagination": {
  735. "page": page,
  736. "page_size": page_size,
  737. "total": total,
  738. "total_pages": total_pages,
  739. "has_next": page < total_pages,
  740. "has_prev": page > 1
  741. }
  742. }
  743. ))
  744. except Exception as e:
  745. logger.error(f"qa_feedback_query执行失败: {str(e)}")
  746. return jsonify(internal_error_response(
  747. response_text="查询反馈记录失败,请稍后重试"
  748. )), 500
  749. @app.route('/api/v0/qa_feedback/delete/<int:feedback_id>', methods=['DELETE'])
  750. def qa_feedback_delete(feedback_id):
  751. """删除反馈记录API"""
  752. try:
  753. manager = get_qa_feedback_manager()
  754. success = manager.delete_feedback(feedback_id)
  755. if success:
  756. return jsonify(success_response(
  757. response_text=f"反馈记录删除成功",
  758. data={"deleted_id": feedback_id}
  759. ))
  760. else:
  761. return jsonify(not_found_response(
  762. response_text=f"反馈记录不存在 (ID: {feedback_id})"
  763. )), 404
  764. except Exception as e:
  765. logger.error(f"qa_feedback_delete执行失败: {str(e)}")
  766. return jsonify(internal_error_response(
  767. response_text="删除反馈记录失败,请稍后重试"
  768. )), 500
  769. @app.route('/api/v0/qa_feedback/update/<int:feedback_id>', methods=['PUT'])
  770. def qa_feedback_update(feedback_id):
  771. """更新反馈记录API"""
  772. try:
  773. req = request.get_json(force=True)
  774. allowed_fields = ['question', 'sql', 'is_thumb_up', 'user_id', 'is_in_training_data']
  775. update_data = {}
  776. for field in allowed_fields:
  777. if field in req:
  778. update_data[field] = req[field]
  779. if not update_data:
  780. return jsonify(bad_request_response(
  781. response_text="没有提供有效的更新字段"
  782. )), 400
  783. manager = get_qa_feedback_manager()
  784. success = manager.update_feedback(feedback_id, **update_data)
  785. if success:
  786. return jsonify(success_response(
  787. response_text="反馈记录更新成功",
  788. data={
  789. "updated_id": feedback_id,
  790. "updated_fields": list(update_data.keys())
  791. }
  792. ))
  793. else:
  794. return jsonify(not_found_response(
  795. response_text=f"反馈记录不存在或无变化 (ID: {feedback_id})"
  796. )), 404
  797. except Exception as e:
  798. logger.error(f"qa_feedback_update执行失败: {str(e)}")
  799. return jsonify(internal_error_response(
  800. response_text="更新反馈记录失败,请稍后重试"
  801. )), 500
  802. @app.route('/api/v0/qa_feedback/add_to_training', methods=['POST'])
  803. def qa_feedback_add_to_training():
  804. """将反馈记录添加到训练数据集API"""
  805. try:
  806. req = request.get_json(force=True)
  807. feedback_ids = req.get('feedback_ids', [])
  808. if not feedback_ids or not isinstance(feedback_ids, list):
  809. return jsonify(bad_request_response(
  810. response_text="缺少有效的反馈ID列表"
  811. )), 400
  812. manager = get_qa_feedback_manager()
  813. records = manager.get_feedback_by_ids(feedback_ids)
  814. if not records:
  815. return jsonify(not_found_response(
  816. response_text="未找到任何有效的反馈记录"
  817. )), 404
  818. positive_count = 0
  819. negative_count = 0
  820. successfully_trained_ids = []
  821. for record in records:
  822. try:
  823. if record['is_in_training_data']:
  824. continue
  825. if record['is_thumb_up']:
  826. training_id = vn.train(
  827. question=record['question'],
  828. sql=record['sql']
  829. )
  830. positive_count += 1
  831. else:
  832. training_id = vn.train_error_sql(
  833. question=record['question'],
  834. sql=record['sql']
  835. )
  836. negative_count += 1
  837. successfully_trained_ids.append(record['id'])
  838. except Exception as e:
  839. logger.error(f"训练失败 - 反馈ID: {record['id']}, 错误: {e}")
  840. if successfully_trained_ids:
  841. manager.mark_training_status(successfully_trained_ids, True)
  842. return jsonify(success_response(
  843. response_text=f"训练数据添加完成,成功处理 {positive_count + negative_count} 条记录",
  844. data={
  845. "positive_trained": positive_count,
  846. "negative_trained": negative_count,
  847. "successfully_trained_ids": successfully_trained_ids
  848. }
  849. ))
  850. except Exception as e:
  851. logger.error(f"qa_feedback_add_to_training执行失败: {str(e)}")
  852. return jsonify(internal_error_response(
  853. response_text="添加训练数据失败,请稍后重试"
  854. )), 500
  855. @app.route('/api/v0/qa_feedback/add', methods=['POST'])
  856. def qa_feedback_add():
  857. """添加反馈记录API"""
  858. try:
  859. req = request.get_json(force=True)
  860. question = req.get('question')
  861. sql = req.get('sql')
  862. is_thumb_up = req.get('is_thumb_up')
  863. user_id = req.get('user_id', 'guest')
  864. if not question or not sql or is_thumb_up is None:
  865. return jsonify(bad_request_response(
  866. response_text="缺少必需参数"
  867. )), 400
  868. manager = get_qa_feedback_manager()
  869. feedback_id = manager.add_feedback(
  870. question=question,
  871. sql=sql,
  872. is_thumb_up=bool(is_thumb_up),
  873. user_id=user_id
  874. )
  875. return jsonify(success_response(
  876. response_text="反馈记录创建成功",
  877. data={"feedback_id": feedback_id}
  878. ))
  879. except Exception as e:
  880. logger.error(f"qa_feedback_add执行失败: {str(e)}")
  881. return jsonify(internal_error_response(
  882. response_text="创建反馈记录失败,请稍后重试"
  883. )), 500
  884. @app.route('/api/v0/qa_feedback/stats', methods=['GET'])
  885. def qa_feedback_stats():
  886. """反馈统计API"""
  887. try:
  888. manager = get_qa_feedback_manager()
  889. all_records, total_count = manager.query_feedback(page=1, page_size=1)
  890. positive_records, positive_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=True)
  891. negative_records, negative_count = manager.query_feedback(page=1, page_size=1, is_thumb_up=False)
  892. return jsonify(success_response(
  893. response_text="统计信息获取成功",
  894. data={
  895. "total_feedback": total_count,
  896. "positive_feedback": positive_count,
  897. "negative_feedback": negative_count,
  898. "positive_rate": round(positive_count / max(total_count, 1) * 100, 2)
  899. }
  900. ))
  901. except Exception as e:
  902. logger.error(f"qa_feedback_stats执行失败: {str(e)}")
  903. return jsonify(internal_error_response(
  904. response_text="获取统计信息失败,请稍后重试"
  905. )), 500
  906. # ==================== Redis对话管理API ====================
  907. @app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
  908. def get_user_conversations_redis(user_id: str):
  909. """获取用户的对话列表"""
  910. try:
  911. limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
  912. conversations = redis_conversation_manager.get_conversations(user_id, limit)
  913. return jsonify(success_response(
  914. response_text="获取用户对话列表成功",
  915. data={
  916. "user_id": user_id,
  917. "conversations": conversations,
  918. "total_count": len(conversations)
  919. }
  920. ))
  921. except Exception as e:
  922. return jsonify(internal_error_response(
  923. response_text="获取对话列表失败,请稍后重试"
  924. )), 500
  925. @app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
  926. def get_conversation_messages_redis(conversation_id: str):
  927. """获取特定对话的消息历史"""
  928. try:
  929. limit = request.args.get('limit', type=int)
  930. messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
  931. meta = redis_conversation_manager.get_conversation_meta(conversation_id)
  932. return jsonify(success_response(
  933. response_text="获取对话消息成功",
  934. data={
  935. "conversation_id": conversation_id,
  936. "conversation_meta": meta,
  937. "messages": messages,
  938. "message_count": len(messages)
  939. }
  940. ))
  941. except Exception as e:
  942. return jsonify(internal_error_response(
  943. response_text="获取对话消息失败"
  944. )), 500
  945. @app.route('/api/v0/conversation_stats', methods=['GET'])
  946. def conversation_stats():
  947. """获取对话系统统计信息"""
  948. try:
  949. stats = redis_conversation_manager.get_stats()
  950. return jsonify(success_response(
  951. response_text="获取统计信息成功",
  952. data=stats
  953. ))
  954. except Exception as e:
  955. return jsonify(internal_error_response(
  956. response_text="获取统计信息失败,请稍后重试"
  957. )), 500
  958. @app.route('/api/v0/conversation_cleanup', methods=['POST'])
  959. def conversation_cleanup():
  960. """手动清理过期对话"""
  961. try:
  962. redis_conversation_manager.cleanup_expired_conversations()
  963. return jsonify(success_response(
  964. response_text="对话清理完成"
  965. ))
  966. except Exception as e:
  967. return jsonify(internal_error_response(
  968. response_text="对话清理失败,请稍后重试"
  969. )), 500
  970. @app.route('/api/v0/embedding_cache_stats', methods=['GET'])
  971. def embedding_cache_stats():
  972. """获取embedding缓存统计信息"""
  973. try:
  974. from common.embedding_cache_manager import get_embedding_cache_manager
  975. cache_manager = get_embedding_cache_manager()
  976. stats = cache_manager.get_cache_stats()
  977. return jsonify(success_response(
  978. response_text="获取embedding缓存统计成功",
  979. data=stats
  980. ))
  981. except Exception as e:
  982. logger.error(f"获取embedding缓存统计失败: {str(e)}")
  983. return jsonify(internal_error_response(
  984. response_text="获取embedding缓存统计失败,请稍后重试"
  985. )), 500
  986. @app.route('/api/v0/embedding_cache_cleanup', methods=['POST'])
  987. def embedding_cache_cleanup():
  988. """清空所有embedding缓存"""
  989. try:
  990. from common.embedding_cache_manager import get_embedding_cache_manager
  991. cache_manager = get_embedding_cache_manager()
  992. if not cache_manager.is_available():
  993. return jsonify(internal_error_response(
  994. response_text="Embedding缓存功能未启用或不可用"
  995. )), 400
  996. success = cache_manager.clear_all_cache()
  997. if success:
  998. return jsonify(success_response(
  999. response_text="所有embedding缓存已清空",
  1000. data={"cleared": True}
  1001. ))
  1002. else:
  1003. return jsonify(internal_error_response(
  1004. response_text="清空embedding缓存失败"
  1005. )), 500
  1006. except Exception as e:
  1007. logger.error(f"清空embedding缓存失败: {str(e)}")
  1008. return jsonify(internal_error_response(
  1009. response_text="清空embedding缓存失败,请稍后重试"
  1010. )), 500
  1011. # ==================== 训练数据管理API ====================
  1012. def validate_sql_syntax(sql: str) -> tuple[bool, str]:
  1013. """SQL语法检查"""
  1014. try:
  1015. parsed = sqlparse.parse(sql.strip())
  1016. if not parsed or not parsed[0].tokens:
  1017. return False, "SQL语法错误:空语句"
  1018. sql_upper = sql.strip().upper()
  1019. if not any(sql_upper.startswith(keyword) for keyword in
  1020. ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP']):
  1021. return False, "SQL语法错误:不是有效的SQL语句"
  1022. return True, ""
  1023. except Exception as e:
  1024. return False, f"SQL语法错误:{str(e)}"
  1025. def paginate_data(data_list: list, page: int, page_size: int):
  1026. """分页算法"""
  1027. total = len(data_list)
  1028. start_idx = (page - 1) * page_size
  1029. end_idx = start_idx + page_size
  1030. page_data = data_list[start_idx:end_idx]
  1031. total_pages = (total + page_size - 1) // page_size
  1032. return {
  1033. "data": page_data,
  1034. "pagination": {
  1035. "page": page,
  1036. "page_size": page_size,
  1037. "total": total,
  1038. "total_pages": total_pages,
  1039. "has_next": end_idx < total,
  1040. "has_prev": page > 1
  1041. }
  1042. }
  1043. def filter_by_type(data_list: list, training_data_type: str):
  1044. """按类型筛选算法"""
  1045. if not training_data_type:
  1046. return data_list
  1047. return [
  1048. record for record in data_list
  1049. if record.get('training_data_type') == training_data_type
  1050. ]
  1051. def search_in_data(data_list: list, search_keyword: str):
  1052. """在数据中搜索关键词"""
  1053. if not search_keyword:
  1054. return data_list
  1055. keyword_lower = search_keyword.lower()
  1056. return [
  1057. record for record in data_list
  1058. if (record.get('question') and keyword_lower in record['question'].lower()) or
  1059. (record.get('content') and keyword_lower in record['content'].lower())
  1060. ]
  1061. def get_total_training_count():
  1062. """获取当前训练数据总数"""
  1063. try:
  1064. training_data = vn.get_training_data()
  1065. if training_data is not None and not training_data.empty:
  1066. return len(training_data)
  1067. return 0
  1068. except Exception as e:
  1069. logger.warning(f"获取训练数据总数失败: {e}")
  1070. return 0
  1071. def process_single_training_item(item: dict, index: int) -> dict:
  1072. """处理单个训练数据项"""
  1073. training_type = item.get('training_data_type')
  1074. if training_type == 'sql':
  1075. sql = item.get('sql')
  1076. if not sql:
  1077. raise ValueError("SQL字段是必需的")
  1078. # SQL语法检查
  1079. is_valid, error_msg = validate_sql_syntax(sql)
  1080. if not is_valid:
  1081. raise ValueError(error_msg)
  1082. question = item.get('question')
  1083. if question:
  1084. training_id = vn.train(question=question, sql=sql)
  1085. else:
  1086. training_id = vn.train(sql=sql)
  1087. elif training_type == 'error_sql':
  1088. # error_sql不需要语法检查
  1089. question = item.get('question')
  1090. sql = item.get('sql')
  1091. if not question or not sql:
  1092. raise ValueError("question和sql字段都是必需的")
  1093. training_id = vn.train_error_sql(question=question, sql=sql)
  1094. elif training_type == 'documentation':
  1095. content = item.get('content')
  1096. if not content:
  1097. raise ValueError("content字段是必需的")
  1098. training_id = vn.train(documentation=content)
  1099. elif training_type == 'ddl':
  1100. ddl = item.get('ddl')
  1101. if not ddl:
  1102. raise ValueError("ddl字段是必需的")
  1103. training_id = vn.train(ddl=ddl)
  1104. else:
  1105. raise ValueError(f"不支持的训练数据类型: {training_type}")
  1106. return {
  1107. "index": index,
  1108. "success": True,
  1109. "training_id": training_id,
  1110. "type": training_type,
  1111. "message": f"{training_type}训练数据创建成功"
  1112. }
  1113. @app.route('/api/v0/training_data/stats', methods=['GET'])
  1114. def training_data_stats():
  1115. """获取训练数据统计信息API"""
  1116. try:
  1117. training_data = vn.get_training_data()
  1118. if training_data is None or training_data.empty:
  1119. return jsonify(success_response(
  1120. response_text="统计信息获取成功",
  1121. data={
  1122. "total_count": 0,
  1123. "type_breakdown": {
  1124. "sql": 0,
  1125. "documentation": 0,
  1126. "ddl": 0,
  1127. "error_sql": 0
  1128. },
  1129. "type_percentages": {
  1130. "sql": 0.0,
  1131. "documentation": 0.0,
  1132. "ddl": 0.0,
  1133. "error_sql": 0.0
  1134. },
  1135. "last_updated": datetime.now().isoformat()
  1136. }
  1137. ))
  1138. total_count = len(training_data)
  1139. # 统计各类型数量
  1140. type_breakdown = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  1141. if 'training_data_type' in training_data.columns:
  1142. type_counts = training_data['training_data_type'].value_counts()
  1143. for data_type, count in type_counts.items():
  1144. if data_type in type_breakdown:
  1145. type_breakdown[data_type] = int(count)
  1146. # 计算百分比
  1147. type_percentages = {}
  1148. for data_type, count in type_breakdown.items():
  1149. type_percentages[data_type] = round(count / max(total_count, 1) * 100, 2)
  1150. return jsonify(success_response(
  1151. response_text="统计信息获取成功",
  1152. data={
  1153. "total_count": total_count,
  1154. "type_breakdown": type_breakdown,
  1155. "type_percentages": type_percentages,
  1156. "last_updated": datetime.now().isoformat()
  1157. }
  1158. ))
  1159. except Exception as e:
  1160. logger.error(f"training_data_stats执行失败: {str(e)}")
  1161. return jsonify(internal_error_response(
  1162. response_text="获取统计信息失败,请稍后重试"
  1163. )), 500
  1164. @app.route('/api/v0/training_data/query', methods=['POST'])
  1165. def training_data_query():
  1166. """分页查询训练数据API - 支持类型筛选、搜索和排序功能"""
  1167. try:
  1168. req = request.get_json(force=True)
  1169. # 解析参数,设置默认值
  1170. page = req.get('page', 1)
  1171. page_size = req.get('page_size', 20)
  1172. training_data_type = req.get('training_data_type')
  1173. sort_by = req.get('sort_by', 'id')
  1174. sort_order = req.get('sort_order', 'desc')
  1175. search_keyword = req.get('search_keyword')
  1176. # 参数验证
  1177. if page < 1:
  1178. return jsonify(bad_request_response(
  1179. response_text="页码必须大于0",
  1180. missing_params=["page"]
  1181. )), 400
  1182. if page_size < 1 or page_size > 100:
  1183. return jsonify(bad_request_response(
  1184. response_text="每页大小必须在1-100之间",
  1185. missing_params=["page_size"]
  1186. )), 400
  1187. if search_keyword and len(search_keyword) > 100:
  1188. return jsonify(bad_request_response(
  1189. response_text="搜索关键词最大长度为100字符",
  1190. missing_params=["search_keyword"]
  1191. )), 400
  1192. # 获取训练数据
  1193. training_data = vn.get_training_data()
  1194. if training_data is None or training_data.empty:
  1195. return jsonify(success_response(
  1196. response_text="查询成功,暂无训练数据",
  1197. data={
  1198. "records": [],
  1199. "pagination": {
  1200. "page": page,
  1201. "page_size": page_size,
  1202. "total": 0,
  1203. "total_pages": 0,
  1204. "has_next": False,
  1205. "has_prev": False
  1206. },
  1207. "filters_applied": {
  1208. "training_data_type": training_data_type,
  1209. "search_keyword": search_keyword
  1210. }
  1211. }
  1212. ))
  1213. # 转换为列表格式
  1214. records = training_data.to_dict(orient="records")
  1215. # 应用筛选条件
  1216. if training_data_type:
  1217. records = filter_by_type(records, training_data_type)
  1218. if search_keyword:
  1219. records = search_in_data(records, search_keyword)
  1220. # 排序
  1221. if sort_by in ['id', 'training_data_type']:
  1222. reverse = (sort_order.lower() == 'desc')
  1223. records.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse)
  1224. # 分页
  1225. paginated_result = paginate_data(records, page, page_size)
  1226. return jsonify(success_response(
  1227. response_text=f"查询成功,共找到 {paginated_result['pagination']['total']} 条记录",
  1228. data={
  1229. "records": paginated_result["data"],
  1230. "pagination": paginated_result["pagination"],
  1231. "filters_applied": {
  1232. "training_data_type": training_data_type,
  1233. "search_keyword": search_keyword
  1234. }
  1235. }
  1236. ))
  1237. except Exception as e:
  1238. logger.error(f"training_data_query执行失败: {str(e)}")
  1239. return jsonify(internal_error_response(
  1240. response_text="查询训练数据失败,请稍后重试"
  1241. )), 500
  1242. @app.route('/api/v0/training_data/create', methods=['POST'])
  1243. def training_data_create():
  1244. """创建训练数据API - 支持单条和批量创建,支持四种数据类型"""
  1245. try:
  1246. req = request.get_json(force=True)
  1247. data = req.get('data')
  1248. if not data:
  1249. return jsonify(bad_request_response(
  1250. response_text="缺少必需参数:data",
  1251. missing_params=["data"]
  1252. )), 400
  1253. # 统一处理为列表格式
  1254. if isinstance(data, dict):
  1255. data_list = [data]
  1256. elif isinstance(data, list):
  1257. data_list = data
  1258. else:
  1259. return jsonify(bad_request_response(
  1260. response_text="data字段格式错误,应为对象或数组"
  1261. )), 400
  1262. # 批量操作限制
  1263. if len(data_list) > 50:
  1264. return jsonify(bad_request_response(
  1265. response_text="批量操作最大支持50条记录"
  1266. )), 400
  1267. results = []
  1268. successful_count = 0
  1269. type_summary = {"sql": 0, "documentation": 0, "ddl": 0, "error_sql": 0}
  1270. for index, item in enumerate(data_list):
  1271. try:
  1272. result = process_single_training_item(item, index)
  1273. results.append(result)
  1274. if result['success']:
  1275. successful_count += 1
  1276. type_summary[result['type']] += 1
  1277. except Exception as e:
  1278. results.append({
  1279. "index": index,
  1280. "success": False,
  1281. "type": item.get('training_data_type', 'unknown'),
  1282. "error": str(e),
  1283. "message": "创建失败"
  1284. })
  1285. # 获取创建后的总记录数
  1286. current_total = get_total_training_count()
  1287. # 根据实际执行结果决定响应状态
  1288. failed_count = len(data_list) - successful_count
  1289. if failed_count == 0:
  1290. # 全部成功
  1291. return jsonify(success_response(
  1292. response_text="训练数据创建完成",
  1293. data={
  1294. "total_requested": len(data_list),
  1295. "successfully_created": successful_count,
  1296. "failed_count": failed_count,
  1297. "results": results,
  1298. "summary": type_summary,
  1299. "current_total_count": current_total
  1300. }
  1301. ))
  1302. elif successful_count == 0:
  1303. # 全部失败
  1304. return jsonify(error_response(
  1305. response_text="训练数据创建失败",
  1306. data={
  1307. "total_requested": len(data_list),
  1308. "successfully_created": successful_count,
  1309. "failed_count": failed_count,
  1310. "results": results,
  1311. "summary": type_summary,
  1312. "current_total_count": current_total
  1313. }
  1314. )), 400
  1315. else:
  1316. # 部分成功,部分失败
  1317. return jsonify(error_response(
  1318. response_text=f"训练数据创建部分成功,成功{successful_count}条,失败{failed_count}条",
  1319. data={
  1320. "total_requested": len(data_list),
  1321. "successfully_created": successful_count,
  1322. "failed_count": failed_count,
  1323. "results": results,
  1324. "summary": type_summary,
  1325. "current_total_count": current_total
  1326. }
  1327. )), 207
  1328. except Exception as e:
  1329. logger.error(f"training_data_create执行失败: {str(e)}")
  1330. return jsonify(internal_error_response(
  1331. response_text="创建训练数据失败,请稍后重试"
  1332. )), 500
  1333. @app.route('/api/v0/training_data/delete', methods=['POST'])
  1334. def training_data_delete():
  1335. """删除训练数据API - 支持批量删除"""
  1336. try:
  1337. req = request.get_json(force=True)
  1338. ids = req.get('ids', [])
  1339. confirm = req.get('confirm', False)
  1340. if not ids or not isinstance(ids, list):
  1341. return jsonify(bad_request_response(
  1342. response_text="缺少有效的ID列表",
  1343. missing_params=["ids"]
  1344. )), 400
  1345. if not confirm:
  1346. return jsonify(bad_request_response(
  1347. response_text="删除操作需要确认,请设置confirm为true"
  1348. )), 400
  1349. # 批量操作限制
  1350. if len(ids) > 50:
  1351. return jsonify(bad_request_response(
  1352. response_text="批量删除最大支持50条记录"
  1353. )), 400
  1354. deleted_ids = []
  1355. failed_ids = []
  1356. failed_details = []
  1357. for training_id in ids:
  1358. try:
  1359. success = vn.remove_training_data(training_id)
  1360. if success:
  1361. deleted_ids.append(training_id)
  1362. else:
  1363. failed_ids.append(training_id)
  1364. failed_details.append({
  1365. "id": training_id,
  1366. "error": "记录不存在或删除失败"
  1367. })
  1368. except Exception as e:
  1369. failed_ids.append(training_id)
  1370. failed_details.append({
  1371. "id": training_id,
  1372. "error": str(e)
  1373. })
  1374. # 获取删除后的总记录数
  1375. current_total = get_total_training_count()
  1376. # 根据实际执行结果决定响应状态
  1377. failed_count = len(failed_ids)
  1378. if failed_count == 0:
  1379. # 全部成功
  1380. return jsonify(success_response(
  1381. response_text="训练数据删除完成",
  1382. data={
  1383. "total_requested": len(ids),
  1384. "successfully_deleted": len(deleted_ids),
  1385. "failed_count": failed_count,
  1386. "deleted_ids": deleted_ids,
  1387. "failed_ids": failed_ids,
  1388. "failed_details": failed_details,
  1389. "current_total_count": current_total
  1390. }
  1391. ))
  1392. elif len(deleted_ids) == 0:
  1393. # 全部失败
  1394. return jsonify(error_response(
  1395. response_text="训练数据删除失败",
  1396. data={
  1397. "total_requested": len(ids),
  1398. "successfully_deleted": len(deleted_ids),
  1399. "failed_count": failed_count,
  1400. "deleted_ids": deleted_ids,
  1401. "failed_ids": failed_ids,
  1402. "failed_details": failed_details,
  1403. "current_total_count": current_total
  1404. }
  1405. )), 400
  1406. else:
  1407. # 部分成功,部分失败
  1408. return jsonify(error_response(
  1409. response_text=f"训练数据删除部分成功,成功{len(deleted_ids)}条,失败{failed_count}条",
  1410. data={
  1411. "total_requested": len(ids),
  1412. "successfully_deleted": len(deleted_ids),
  1413. "failed_count": failed_count,
  1414. "deleted_ids": deleted_ids,
  1415. "failed_ids": failed_ids,
  1416. "failed_details": failed_details,
  1417. "current_total_count": current_total
  1418. }
  1419. )), 207
  1420. except Exception as e:
  1421. logger.error(f"training_data_delete执行失败: {str(e)}")
  1422. return jsonify(internal_error_response(
  1423. response_text="删除训练数据失败,请稍后重试"
  1424. )), 500
  1425. # ==================== React Agent 扩展API ====================
  1426. @app.route('/api/v0/react/users/<user_id>/conversations', methods=['GET'])
  1427. async def get_user_conversations_react(user_id: str):
  1428. """异步获取用户的聊天记录列表(从 custom_react_agent 迁移)"""
  1429. global _react_agent_instance
  1430. try:
  1431. # 获取查询参数
  1432. limit = request.args.get('limit', 10, type=int)
  1433. # 限制limit的范围
  1434. limit = max(1, min(limit, 50)) # 限制在1-50之间
  1435. logger.info(f"📋 异步获取用户 {user_id} 的对话列表,限制 {limit} 条")
  1436. # 确保Agent可用
  1437. if not await ensure_agent_ready():
  1438. return jsonify({
  1439. "success": False,
  1440. "error": "Agent 未就绪",
  1441. "timestamp": datetime.now().isoformat()
  1442. }), 503
  1443. # 直接调用异步方法
  1444. conversations = await _react_agent_instance.get_user_recent_conversations(user_id, limit)
  1445. return jsonify({
  1446. "success": True,
  1447. "data": {
  1448. "user_id": user_id,
  1449. "conversations": conversations,
  1450. "total_count": len(conversations),
  1451. "limit": limit
  1452. },
  1453. "timestamp": datetime.now().isoformat()
  1454. }), 200
  1455. except Exception as e:
  1456. logger.error(f"❌ 异步获取用户 {user_id} 对话列表失败: {e}")
  1457. return jsonify({
  1458. "success": False,
  1459. "error": str(e),
  1460. "timestamp": datetime.now().isoformat()
  1461. }), 500
  1462. @app.route('/api/v0/react/users/<user_id>/conversations/<thread_id>', methods=['GET'])
  1463. async def get_user_conversation_detail_react(user_id: str, thread_id: str):
  1464. """异步获取特定对话的详细历史(从 custom_react_agent 迁移)"""
  1465. global _react_agent_instance
  1466. try:
  1467. # 验证thread_id格式是否匹配user_id
  1468. if not thread_id.startswith(f"{user_id}:"):
  1469. return jsonify({
  1470. "success": False,
  1471. "error": f"Thread ID {thread_id} 不属于用户 {user_id}",
  1472. "timestamp": datetime.now().isoformat()
  1473. }), 400
  1474. logger.info(f"📖 异步获取用户 {user_id} 的对话 {thread_id} 详情")
  1475. # 确保Agent可用
  1476. if not await ensure_agent_ready():
  1477. return jsonify({
  1478. "success": False,
  1479. "error": "Agent 未就绪",
  1480. "timestamp": datetime.now().isoformat()
  1481. }), 503
  1482. # 直接调用异步方法
  1483. history = await _react_agent_instance.get_conversation_history(thread_id)
  1484. logger.info(f"✅ 异步成功获取对话历史,消息数量: {len(history)}")
  1485. if not history:
  1486. return jsonify({
  1487. "success": False,
  1488. "error": f"未找到对话 {thread_id}",
  1489. "timestamp": datetime.now().isoformat()
  1490. }), 404
  1491. return jsonify({
  1492. "success": True,
  1493. "data": {
  1494. "user_id": user_id,
  1495. "thread_id": thread_id,
  1496. "message_count": len(history),
  1497. "messages": history
  1498. },
  1499. "timestamp": datetime.now().isoformat()
  1500. }), 200
  1501. except Exception as e:
  1502. import traceback
  1503. logger.error(f"❌ 异步获取对话 {thread_id} 详情失败: {e}")
  1504. logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
  1505. return jsonify({
  1506. "success": False,
  1507. "error": str(e),
  1508. "timestamp": datetime.now().isoformat()
  1509. }), 500
  1510. @app.route('/api/test/redis', methods=['GET'])
  1511. def test_redis_connection():
  1512. """测试Redis连接和基本查询(从 custom_react_agent 迁移)"""
  1513. try:
  1514. import redis
  1515. # 创建Redis连接
  1516. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  1517. r.ping()
  1518. # 扫描checkpoint keys
  1519. pattern = "checkpoint:*"
  1520. keys = []
  1521. cursor = 0
  1522. count = 0
  1523. while True:
  1524. cursor, batch = r.scan(cursor=cursor, match=pattern, count=100)
  1525. keys.extend(batch)
  1526. count += len(batch)
  1527. if cursor == 0 or count > 500: # 限制扫描数量
  1528. break
  1529. # 统计用户
  1530. users = {}
  1531. for key in keys:
  1532. try:
  1533. parts = key.split(':')
  1534. if len(parts) >= 2:
  1535. user_id = parts[1]
  1536. users[user_id] = users.get(user_id, 0) + 1
  1537. except:
  1538. continue
  1539. r.close()
  1540. return jsonify({
  1541. "success": True,
  1542. "data": {
  1543. "redis_connected": True,
  1544. "total_checkpoint_keys": len(keys),
  1545. "users_found": list(users.keys()),
  1546. "user_key_counts": users,
  1547. "sample_keys": keys[:5] if keys else []
  1548. },
  1549. "timestamp": datetime.now().isoformat()
  1550. }), 200
  1551. except Exception as e:
  1552. logger.error(f"❌ Redis测试失败: {e}")
  1553. return jsonify({
  1554. "success": False,
  1555. "error": str(e),
  1556. "timestamp": datetime.now().isoformat()
  1557. }), 500
  1558. @app.route('/api/v0/react/direct/users/<user_id>/conversations', methods=['GET'])
  1559. def test_get_user_conversations_simple(user_id: str):
  1560. """测试简单Redis查询获取用户对话列表(从 custom_react_agent 迁移)"""
  1561. try:
  1562. limit = request.args.get('limit', 10, type=int)
  1563. limit = max(1, min(limit, 50))
  1564. logger.info(f"🧪 测试获取用户 {user_id} 的对话列表(简单Redis方式)")
  1565. # 使用简单Redis查询
  1566. conversations = get_user_conversations_simple_sync(user_id, limit)
  1567. return jsonify({
  1568. "success": True,
  1569. "method": "simple_redis_query",
  1570. "data": {
  1571. "user_id": user_id,
  1572. "conversations": conversations,
  1573. "total_count": len(conversations),
  1574. "limit": limit
  1575. },
  1576. "timestamp": datetime.now().isoformat()
  1577. }), 200
  1578. except Exception as e:
  1579. logger.error(f"❌ 测试简单Redis查询失败: {e}")
  1580. return jsonify({
  1581. "success": False,
  1582. "error": str(e),
  1583. "timestamp": datetime.now().isoformat()
  1584. }), 500
  1585. @app.route('/api/v0/react/direct/conversations/<thread_id>', methods=['GET'])
  1586. def get_conversation_detail_api(thread_id: str):
  1587. """
  1588. 获取特定对话的详细信息 - 支持include_tools开关参数(从 custom_react_agent 迁移)
  1589. Query Parameters:
  1590. - include_tools: bool, 是否包含工具调用信息,默认false
  1591. true: 返回完整对话(human/ai/tool/system)
  1592. false: 只返回human/ai消息,清理工具调用信息
  1593. - user_id: str, 可选的用户ID验证
  1594. Examples:
  1595. GET /api/conversations/wang:20250709195048728?include_tools=true # 完整模式
  1596. GET /api/conversations/wang:20250709195048728?include_tools=false # 简化模式(默认)
  1597. GET /api/conversations/wang:20250709195048728 # 简化模式(默认)
  1598. """
  1599. try:
  1600. # 获取查询参数
  1601. include_tools = request.args.get('include_tools', 'false').lower() == 'true'
  1602. user_id = request.args.get('user_id')
  1603. # 验证thread_id格式
  1604. if ':' not in thread_id:
  1605. return jsonify({
  1606. "success": False,
  1607. "error": "Invalid thread_id format. Expected format: user_id:timestamp",
  1608. "timestamp": datetime.now().isoformat()
  1609. }), 400
  1610. # 如果提供了user_id,验证thread_id是否属于该用户
  1611. thread_user_id = thread_id.split(':')[0]
  1612. if user_id and thread_user_id != user_id:
  1613. return jsonify({
  1614. "success": False,
  1615. "error": f"Thread ID {thread_id} does not belong to user {user_id}",
  1616. "timestamp": datetime.now().isoformat()
  1617. }), 400
  1618. logger.info(f"📖 获取对话详情 - Thread: {thread_id}, Include Tools: {include_tools}")
  1619. # 检查enhanced_redis_api是否可用
  1620. if get_conversation_detail_from_redis is None:
  1621. return jsonify({
  1622. "success": False,
  1623. "error": "enhanced_redis_api 模块不可用",
  1624. "timestamp": datetime.now().isoformat()
  1625. }), 503
  1626. # 从Redis获取对话详情(使用我们的新函数)
  1627. result = get_conversation_detail_from_redis(thread_id, include_tools)
  1628. if not result['success']:
  1629. logger.warning(f"⚠️ 获取对话详情失败: {result['error']}")
  1630. return jsonify({
  1631. "success": False,
  1632. "error": result['error'],
  1633. "timestamp": datetime.now().isoformat()
  1634. }), 404
  1635. # 添加API元数据
  1636. result['data']['api_metadata'] = {
  1637. "timestamp": datetime.now().isoformat(),
  1638. "api_version": "v1",
  1639. "endpoint": "get_conversation_detail",
  1640. "query_params": {
  1641. "include_tools": include_tools,
  1642. "user_id": user_id
  1643. }
  1644. }
  1645. mode_desc = "完整模式" if include_tools else "简化模式"
  1646. logger.info(f"✅ 成功获取对话详情 - Messages: {result['data']['message_count']}, Mode: {mode_desc}")
  1647. return jsonify({
  1648. "success": True,
  1649. "data": result['data'],
  1650. "timestamp": datetime.now().isoformat()
  1651. }), 200
  1652. except Exception as e:
  1653. import traceback
  1654. logger.error(f"❌ 获取对话详情异常: {e}")
  1655. logger.error(f"❌ 详细错误信息: {traceback.format_exc()}")
  1656. return jsonify({
  1657. "success": False,
  1658. "error": str(e),
  1659. "timestamp": datetime.now().isoformat()
  1660. }), 500
  1661. @app.route('/api/v0/react/direct/conversations/<thread_id>/compare', methods=['GET'])
  1662. def compare_conversation_modes_api(thread_id: str):
  1663. """
  1664. 比较完整模式和简化模式的对话内容
  1665. 用于调试和理解两种模式的差异(从 custom_react_agent 迁移)
  1666. Examples:
  1667. GET /api/conversations/wang:20250709195048728/compare
  1668. """
  1669. try:
  1670. logger.info(f"🔍 比较对话模式 - Thread: {thread_id}")
  1671. # 检查enhanced_redis_api是否可用
  1672. if get_conversation_detail_from_redis is None:
  1673. return jsonify({
  1674. "success": False,
  1675. "error": "enhanced_redis_api 模块不可用",
  1676. "timestamp": datetime.now().isoformat()
  1677. }), 503
  1678. # 获取完整模式
  1679. full_result = get_conversation_detail_from_redis(thread_id, include_tools=True)
  1680. # 获取简化模式
  1681. simple_result = get_conversation_detail_from_redis(thread_id, include_tools=False)
  1682. if not (full_result['success'] and simple_result['success']):
  1683. return jsonify({
  1684. "success": False,
  1685. "error": "无法获取对话数据进行比较",
  1686. "timestamp": datetime.now().isoformat()
  1687. }), 404
  1688. # 构建比较结果
  1689. comparison = {
  1690. "thread_id": thread_id,
  1691. "full_mode": {
  1692. "message_count": full_result['data']['message_count'],
  1693. "stats": full_result['data']['stats'],
  1694. "sample_messages": full_result['data']['messages'][:3] # 只显示前3条作为示例
  1695. },
  1696. "simple_mode": {
  1697. "message_count": simple_result['data']['message_count'],
  1698. "stats": simple_result['data']['stats'],
  1699. "sample_messages": simple_result['data']['messages'][:3] # 只显示前3条作为示例
  1700. },
  1701. "comparison_summary": {
  1702. "message_count_difference": full_result['data']['message_count'] - simple_result['data']['message_count'],
  1703. "tools_filtered_out": full_result['data']['stats'].get('tool_messages', 0),
  1704. "ai_messages_with_tools": full_result['data']['stats'].get('messages_with_tools', 0),
  1705. "filtering_effectiveness": "有效" if (full_result['data']['message_count'] - simple_result['data']['message_count']) > 0 else "无差异"
  1706. },
  1707. "metadata": {
  1708. "timestamp": datetime.now().isoformat(),
  1709. "note": "sample_messages 只显示前3条消息作为示例,完整消息请使用相应的详情API"
  1710. }
  1711. }
  1712. logger.info(f"✅ 模式比较完成 - 完整: {comparison['full_mode']['message_count']}, 简化: {comparison['simple_mode']['message_count']}")
  1713. return jsonify({
  1714. "success": True,
  1715. "data": comparison,
  1716. "timestamp": datetime.now().isoformat()
  1717. }), 200
  1718. except Exception as e:
  1719. logger.error(f"❌ 对话模式比较失败: {e}")
  1720. return jsonify({
  1721. "success": False,
  1722. "error": str(e),
  1723. "timestamp": datetime.now().isoformat()
  1724. }), 500
  1725. @app.route('/api/v0/react/direct/conversations/<thread_id>/summary', methods=['GET'])
  1726. def get_conversation_summary_api(thread_id: str):
  1727. """
  1728. 获取对话摘要信息(只包含基本统计,不返回具体消息)(从 custom_react_agent 迁移)
  1729. Query Parameters:
  1730. - include_tools: bool, 影响统计信息的计算方式
  1731. Examples:
  1732. GET /api/conversations/wang:20250709195048728/summary?include_tools=true
  1733. """
  1734. try:
  1735. include_tools = request.args.get('include_tools', 'false').lower() == 'true'
  1736. # 验证thread_id格式
  1737. if ':' not in thread_id:
  1738. return jsonify({
  1739. "success": False,
  1740. "error": "Invalid thread_id format. Expected format: user_id:timestamp",
  1741. "timestamp": datetime.now().isoformat()
  1742. }), 400
  1743. logger.info(f"📊 获取对话摘要 - Thread: {thread_id}, Include Tools: {include_tools}")
  1744. # 检查enhanced_redis_api是否可用
  1745. if get_conversation_detail_from_redis is None:
  1746. return jsonify({
  1747. "success": False,
  1748. "error": "enhanced_redis_api 模块不可用",
  1749. "timestamp": datetime.now().isoformat()
  1750. }), 503
  1751. # 获取完整对话信息
  1752. result = get_conversation_detail_from_redis(thread_id, include_tools)
  1753. if not result['success']:
  1754. return jsonify({
  1755. "success": False,
  1756. "error": result['error'],
  1757. "timestamp": datetime.now().isoformat()
  1758. }), 404
  1759. # 只返回摘要信息,不包含具体消息
  1760. data = result['data']
  1761. summary = {
  1762. "thread_id": data['thread_id'],
  1763. "user_id": data['user_id'],
  1764. "include_tools": data['include_tools'],
  1765. "message_count": data['message_count'],
  1766. "stats": data['stats'],
  1767. "metadata": data['metadata'],
  1768. "first_message_preview": None,
  1769. "last_message_preview": None,
  1770. "conversation_preview": None
  1771. }
  1772. # 添加消息预览
  1773. messages = data.get('messages', [])
  1774. if messages:
  1775. # 第一条human消息预览
  1776. for msg in messages:
  1777. if msg['type'] == 'human':
  1778. content = str(msg['content'])
  1779. summary['first_message_preview'] = content[:100] + "..." if len(content) > 100 else content
  1780. break
  1781. # 最后一条ai消息预览
  1782. for msg in reversed(messages):
  1783. if msg['type'] == 'ai' and msg.get('content', '').strip():
  1784. content = str(msg['content'])
  1785. summary['last_message_preview'] = content[:100] + "..." if len(content) > 100 else content
  1786. break
  1787. # 生成对话预览(第一条human消息)
  1788. summary['conversation_preview'] = summary['first_message_preview']
  1789. # 添加API元数据
  1790. summary['api_metadata'] = {
  1791. "timestamp": datetime.now().isoformat(),
  1792. "api_version": "v1",
  1793. "endpoint": "get_conversation_summary"
  1794. }
  1795. logger.info(f"✅ 成功获取对话摘要")
  1796. return jsonify({
  1797. "success": True,
  1798. "data": summary,
  1799. "timestamp": datetime.now().isoformat()
  1800. }), 200
  1801. except Exception as e:
  1802. logger.error(f"❌ 获取对话摘要失败: {e}")
  1803. return jsonify({
  1804. "success": False,
  1805. "error": str(e),
  1806. "timestamp": datetime.now().isoformat()
  1807. }), 500
  1808. # ==================== 启动逻辑 ====================
  1809. def signal_handler(signum, frame):
  1810. """信号处理器,优雅退出"""
  1811. logger.info(f"接收到信号 {signum},准备退出...")
  1812. cleanup_resources()
  1813. sys.exit(0)
  1814. if __name__ == '__main__':
  1815. # 注册信号处理器
  1816. signal.signal(signal.SIGINT, signal_handler)
  1817. signal.signal(signal.SIGTERM, signal_handler)
  1818. logger.info("🚀 启动统一API服务...")
  1819. logger.info("📍 服务地址: http://localhost:8084")
  1820. logger.info("🔗 健康检查: http://localhost:8084/health")
  1821. logger.info("📘 React Agent API: http://localhost:8084/api/v0/ask_react_agent")
  1822. logger.info("📘 LangGraph Agent API: http://localhost:8084/api/v0/ask_agent")
  1823. try:
  1824. # 尝试使用ASGI模式启动(推荐)
  1825. import uvicorn
  1826. from asgiref.wsgi import WsgiToAsgi
  1827. logger.info("🚀 使用ASGI模式启动异步Flask应用...")
  1828. logger.info(" 这将解决事件循环冲突问题,支持LangGraph异步checkpoint保存")
  1829. # 将Flask WSGI应用转换为ASGI应用
  1830. asgi_app = WsgiToAsgi(app)
  1831. # 使用uvicorn启动ASGI应用
  1832. uvicorn.run(
  1833. asgi_app,
  1834. host="0.0.0.0",
  1835. port=8084,
  1836. log_level="info",
  1837. access_log=True
  1838. )
  1839. except ImportError as e:
  1840. # 如果缺少ASGI依赖,fallback到传统Flask模式
  1841. logger.warning("⚠️ ASGI依赖缺失,使用传统Flask模式启动")
  1842. logger.warning(" 建议安装: pip install uvicorn asgiref")
  1843. logger.warning(" 传统模式可能存在异步事件循环冲突问题")
  1844. # 启动标准Flask应用(支持异步路由)
  1845. app.run(host="0.0.0.0", port=8084, debug=False, threaded=True)
  1846. # Data Pipeline 全局变量 - 从 citu_app.py 迁移
  1847. data_pipeline_manager = None
  1848. data_pipeline_file_manager = None
  1849. def get_data_pipeline_manager():
  1850. """获取Data Pipeline管理器单例(从 citu_app.py 迁移)"""
  1851. global data_pipeline_manager
  1852. if data_pipeline_manager is None:
  1853. data_pipeline_manager = SimpleWorkflowManager()
  1854. return data_pipeline_manager
  1855. def get_data_pipeline_file_manager():
  1856. """获取Data Pipeline文件管理器单例(从 citu_app.py 迁移)"""
  1857. global data_pipeline_file_manager
  1858. if data_pipeline_file_manager is None:
  1859. data_pipeline_file_manager = SimpleFileManager()
  1860. return data_pipeline_file_manager
  1861. # ==================== QA缓存管理API (从 citu_app.py 迁移) ====================
  1862. @app.route('/api/v0/qa_cache_stats', methods=['GET'])
  1863. def qa_cache_stats():
  1864. """获取问答缓存统计信息(从 citu_app.py 迁移)"""
  1865. try:
  1866. stats = redis_conversation_manager.get_qa_cache_stats()
  1867. return jsonify(success_response(
  1868. response_text="获取问答缓存统计成功",
  1869. data=stats
  1870. ))
  1871. except Exception as e:
  1872. logger.error(f"获取问答缓存统计失败: {str(e)}")
  1873. return jsonify(internal_error_response(
  1874. response_text="获取问答缓存统计失败,请稍后重试"
  1875. )), 500
  1876. @app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
  1877. def qa_cache_cleanup():
  1878. """清空所有问答缓存(从 citu_app.py 迁移)"""
  1879. try:
  1880. if not redis_conversation_manager.is_available():
  1881. return jsonify(internal_error_response(
  1882. response_text="Redis连接不可用,无法执行清理操作"
  1883. )), 500
  1884. deleted_count = redis_conversation_manager.clear_all_qa_cache()
  1885. return jsonify(success_response(
  1886. response_text="问答缓存清理完成",
  1887. data={
  1888. "deleted_count": deleted_count,
  1889. "cleared": deleted_count > 0,
  1890. "cleanup_time": datetime.now().isoformat()
  1891. }
  1892. ))
  1893. except Exception as e:
  1894. logger.error(f"清空问答缓存失败: {str(e)}")
  1895. return jsonify(internal_error_response(
  1896. response_text="清空问答缓存失败,请稍后重试"
  1897. )), 500
  1898. # ==================== Database API (从 citu_app.py 迁移) ====================
  1899. @app.route('/api/v0/database/tables', methods=['POST'])
  1900. def get_database_tables():
  1901. """
  1902. 获取数据库表列表(从 citu_app.py 迁移)
  1903. 请求体:
  1904. {
  1905. "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
  1906. "schema": "public,ods", // 可选,支持多个schema用逗号分隔,默认为public
  1907. "table_name_pattern": "ods_*" // 可选,表名模式匹配,支持通配符:ods_*、*_dim、*fact*、ods_%
  1908. }
  1909. 响应:
  1910. {
  1911. "success": true,
  1912. "code": 200,
  1913. "message": "获取表列表成功",
  1914. "data": {
  1915. "tables": ["public.table1", "public.table2", "ods.table3"],
  1916. "total": 3,
  1917. "schemas": ["public", "ods"],
  1918. "table_name_pattern": "ods_*"
  1919. }
  1920. }
  1921. """
  1922. try:
  1923. req = request.get_json(force=True)
  1924. # 处理数据库连接参数(可选)
  1925. db_connection = req.get('db_connection')
  1926. if not db_connection:
  1927. # 使用app_config的默认数据库配置
  1928. import app_config
  1929. db_params = app_config.APP_DB_CONFIG
  1930. db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
  1931. logger.info("使用默认数据库配置获取表列表")
  1932. else:
  1933. logger.info("使用用户指定的数据库配置获取表列表")
  1934. # 可选参数
  1935. schema = req.get('schema', '')
  1936. table_name_pattern = req.get('table_name_pattern')
  1937. # 创建表检查API实例
  1938. table_inspector = TableInspectorAPI()
  1939. # 使用asyncio运行异步方法
  1940. async def get_tables():
  1941. return await table_inspector.get_tables_list(db_connection, schema, table_name_pattern)
  1942. # 在新的事件循环中运行异步方法
  1943. try:
  1944. loop = asyncio.new_event_loop()
  1945. asyncio.set_event_loop(loop)
  1946. tables = loop.run_until_complete(get_tables())
  1947. finally:
  1948. loop.close()
  1949. # 解析schema信息
  1950. parsed_schemas = table_inspector._parse_schemas(schema)
  1951. response_data = {
  1952. "tables": tables,
  1953. "total": len(tables),
  1954. "schemas": parsed_schemas,
  1955. "db_connection_info": {
  1956. "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
  1957. }
  1958. }
  1959. # 如果使用了表名模式,添加到响应中
  1960. if table_name_pattern:
  1961. response_data["table_name_pattern"] = table_name_pattern
  1962. return jsonify(success_response(
  1963. response_text="获取表列表成功",
  1964. data=response_data
  1965. )), 200
  1966. except Exception as e:
  1967. logger.error(f"获取数据库表列表失败: {str(e)}")
  1968. return jsonify(internal_error_response(
  1969. response_text=f"获取表列表失败: {str(e)}"
  1970. )), 500
  1971. @app.route('/api/v0/database/table/ddl', methods=['POST'])
  1972. def get_table_ddl():
  1973. """
  1974. 获取表的DDL语句或MD文档(从 citu_app.py 迁移)
  1975. 请求体:
  1976. {
  1977. "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
  1978. "table": "public.test",
  1979. "business_context": "这是高速公路服务区的相关数据", // 可选
  1980. "type": "ddl" // 可选,支持ddl/md/both,默认为ddl
  1981. }
  1982. 响应:
  1983. {
  1984. "success": true,
  1985. "code": 200,
  1986. "message": "获取表DDL成功",
  1987. "data": {
  1988. "ddl": "create table public.test (...);",
  1989. "md": "## test表...", // 仅当type为md或both时返回
  1990. "table_info": {
  1991. "table_name": "test",
  1992. "schema_name": "public",
  1993. "full_name": "public.test",
  1994. "comment": "测试表",
  1995. "field_count": 10,
  1996. "row_count": 1000
  1997. },
  1998. "fields": [...]
  1999. }
  2000. }
  2001. """
  2002. try:
  2003. req = request.get_json(force=True)
  2004. # 处理参数(table仍为必需,db_connection可选)
  2005. table = req.get('table')
  2006. db_connection = req.get('db_connection')
  2007. if not table:
  2008. return jsonify(bad_request_response(
  2009. response_text="缺少必需参数:table",
  2010. missing_params=['table']
  2011. )), 400
  2012. if not db_connection:
  2013. # 使用app_config的默认数据库配置
  2014. import app_config
  2015. db_params = app_config.APP_DB_CONFIG
  2016. db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
  2017. logger.info("使用默认数据库配置获取表DDL")
  2018. else:
  2019. logger.info("使用用户指定的数据库配置获取表DDL")
  2020. # 可选参数
  2021. business_context = req.get('business_context', '')
  2022. output_type = req.get('type', 'ddl')
  2023. # 验证type参数
  2024. valid_types = ['ddl', 'md', 'both']
  2025. if output_type not in valid_types:
  2026. return jsonify(bad_request_response(
  2027. response_text=f"无效的type参数: {output_type},支持的值: {valid_types}",
  2028. invalid_params=['type']
  2029. )), 400
  2030. # 创建表检查API实例
  2031. table_inspector = TableInspectorAPI()
  2032. # 使用asyncio运行异步方法
  2033. async def get_ddl():
  2034. return await table_inspector.get_table_ddl(
  2035. db_connection=db_connection,
  2036. table=table,
  2037. business_context=business_context,
  2038. output_type=output_type
  2039. )
  2040. # 在新的事件循环中运行异步方法
  2041. try:
  2042. loop = asyncio.new_event_loop()
  2043. asyncio.set_event_loop(loop)
  2044. result = loop.run_until_complete(get_ddl())
  2045. finally:
  2046. loop.close()
  2047. response_data = {
  2048. **result,
  2049. "generation_info": {
  2050. "business_context": business_context,
  2051. "output_type": output_type,
  2052. "has_llm_comments": bool(business_context),
  2053. "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
  2054. }
  2055. }
  2056. return jsonify(success_response(
  2057. response_text=f"获取表{output_type.upper()}成功",
  2058. data=response_data
  2059. )), 200
  2060. except Exception as e:
  2061. logger.error(f"获取表DDL失败: {str(e)}")
  2062. return jsonify(internal_error_response(
  2063. response_text=f"获取表{output_type.upper() if 'output_type' in locals() else 'DDL'}失败: {str(e)}"
  2064. )), 500
  2065. # ==================== Data Pipeline API (从 citu_app.py 迁移) ====================
  2066. @app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
  2067. def create_data_pipeline_task():
  2068. """创建数据管道任务(从 citu_app.py 迁移)"""
  2069. try:
  2070. req = request.get_json(force=True)
  2071. # table_list_file和business_context现在都是可选参数
  2072. # 如果未提供table_list_file,将使用文件上传模式
  2073. # 创建任务(支持可选的db_connection参数)
  2074. manager = get_data_pipeline_manager()
  2075. task_id = manager.create_task(
  2076. table_list_file=req.get('table_list_file'),
  2077. business_context=req.get('business_context'),
  2078. db_name=req.get('db_name'), # 可选参数,用于指定特定数据库名称
  2079. db_connection=req.get('db_connection'), # 可选参数,用于指定数据库连接字符串
  2080. task_name=req.get('task_name'), # 可选参数,用于指定任务名称
  2081. enable_sql_validation=req.get('enable_sql_validation', True),
  2082. enable_llm_repair=req.get('enable_llm_repair', True),
  2083. modify_original_file=req.get('modify_original_file', True),
  2084. enable_training_data_load=req.get('enable_training_data_load', True)
  2085. )
  2086. # 获取任务信息
  2087. task_info = manager.get_task_status(task_id)
  2088. response_data = {
  2089. "task_id": task_id,
  2090. "task_name": task_info.get('task_name'),
  2091. "status": task_info.get('status'),
  2092. "created_at": task_info.get('created_at').isoformat() if task_info.get('created_at') else None
  2093. }
  2094. # 检查是否为文件上传模式
  2095. file_upload_mode = not req.get('table_list_file')
  2096. response_message = "任务创建成功"
  2097. if file_upload_mode:
  2098. response_data["file_upload_mode"] = True
  2099. response_data["next_step"] = f"POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list"
  2100. response_message += ",请上传表清单文件后再执行任务"
  2101. return jsonify(success_response(
  2102. response_text=response_message,
  2103. data=response_data
  2104. )), 201
  2105. except Exception as e:
  2106. logger.error(f"创建数据管道任务失败: {str(e)}")
  2107. return jsonify(internal_error_response(
  2108. response_text="创建任务失败,请稍后重试"
  2109. )), 500
  2110. @app.route('/api/v0/data_pipeline/tasks/<task_id>/execute', methods=['POST'])
  2111. def execute_data_pipeline_task(task_id):
  2112. """执行数据管道任务(从 citu_app.py 迁移)"""
  2113. try:
  2114. req = request.get_json(force=True) if request.is_json else {}
  2115. execution_mode = req.get('execution_mode', 'complete')
  2116. step_name = req.get('step_name')
  2117. # 验证执行模式
  2118. if execution_mode not in ['complete', 'step']:
  2119. return jsonify(bad_request_response(
  2120. response_text="无效的执行模式,必须是 'complete' 或 'step'",
  2121. invalid_params=['execution_mode']
  2122. )), 400
  2123. # 如果是步骤执行模式,验证步骤名称
  2124. if execution_mode == 'step':
  2125. if not step_name:
  2126. return jsonify(bad_request_response(
  2127. response_text="步骤执行模式需要指定step_name",
  2128. missing_params=['step_name']
  2129. )), 400
  2130. valid_steps = ['ddl_generation', 'qa_generation', 'sql_validation', 'training_load']
  2131. if step_name not in valid_steps:
  2132. return jsonify(bad_request_response(
  2133. response_text=f"无效的步骤名称,支持的步骤: {', '.join(valid_steps)}",
  2134. invalid_params=['step_name']
  2135. )), 400
  2136. # 检查任务是否存在
  2137. manager = get_data_pipeline_manager()
  2138. task_info = manager.get_task_status(task_id)
  2139. if not task_info:
  2140. return jsonify(not_found_response(
  2141. response_text=f"任务不存在: {task_id}"
  2142. )), 404
  2143. # 使用subprocess启动独立进程执行任务
  2144. def run_task_subprocess():
  2145. try:
  2146. import subprocess
  2147. import sys
  2148. from pathlib import Path
  2149. # 构建执行命令
  2150. python_executable = sys.executable
  2151. script_path = Path(__file__).parent / "data_pipeline" / "task_executor.py"
  2152. cmd = [
  2153. python_executable,
  2154. str(script_path),
  2155. "--task-id", task_id,
  2156. "--execution-mode", execution_mode
  2157. ]
  2158. if step_name:
  2159. cmd.extend(["--step-name", step_name])
  2160. logger.info(f"启动任务进程: {' '.join(cmd)}")
  2161. # 启动后台进程(不等待完成)
  2162. process = subprocess.Popen(
  2163. cmd,
  2164. stdout=subprocess.PIPE,
  2165. stderr=subprocess.PIPE,
  2166. text=True,
  2167. cwd=Path(__file__).parent
  2168. )
  2169. logger.info(f"任务进程已启动: PID={process.pid}, task_id={task_id}")
  2170. except Exception as e:
  2171. logger.error(f"启动任务进程失败: {task_id}, 错误: {str(e)}")
  2172. # 在新线程中启动subprocess(避免阻塞API响应)
  2173. thread = Thread(target=run_task_subprocess, daemon=True)
  2174. thread.start()
  2175. response_data = {
  2176. "task_id": task_id,
  2177. "execution_mode": execution_mode,
  2178. "step_name": step_name if execution_mode == 'step' else None,
  2179. "message": "任务正在后台执行,请通过状态接口查询进度"
  2180. }
  2181. return jsonify(success_response(
  2182. response_text="任务执行已启动",
  2183. data=response_data
  2184. )), 202
  2185. except Exception as e:
  2186. logger.error(f"启动数据管道任务执行失败: {str(e)}")
  2187. return jsonify(internal_error_response(
  2188. response_text="启动任务执行失败,请稍后重试"
  2189. )), 500
  2190. @app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
  2191. def get_data_pipeline_task_status(task_id):
  2192. """
  2193. 获取数据管道任务状态(从 citu_app.py 迁移)
  2194. 响应:
  2195. {
  2196. "success": true,
  2197. "code": 200,
  2198. "message": "获取任务状态成功",
  2199. "data": {
  2200. "task_id": "task_20250627_143052",
  2201. "status": "in_progress",
  2202. "step_status": {
  2203. "ddl_generation": "completed",
  2204. "qa_generation": "running",
  2205. "sql_validation": "pending",
  2206. "training_load": "pending"
  2207. },
  2208. "created_at": "2025-06-27T14:30:52",
  2209. "started_at": "2025-06-27T14:31:00",
  2210. "parameters": {...},
  2211. "current_execution": {...},
  2212. "total_executions": 2
  2213. }
  2214. }
  2215. """
  2216. try:
  2217. manager = get_data_pipeline_manager()
  2218. task_info = manager.get_task_status(task_id)
  2219. if not task_info:
  2220. return jsonify(not_found_response(
  2221. response_text=f"任务不存在: {task_id}"
  2222. )), 404
  2223. # 获取步骤状态
  2224. steps = manager.get_task_steps(task_id)
  2225. current_step = None
  2226. for step in steps:
  2227. if step['step_status'] == 'running':
  2228. current_step = step
  2229. break
  2230. # 构建步骤状态摘要
  2231. step_status_summary = {}
  2232. for step in steps:
  2233. step_status_summary[step['step_name']] = step['step_status']
  2234. response_data = {
  2235. "task_id": task_info['task_id'],
  2236. "task_name": task_info.get('task_name'),
  2237. "status": task_info['status'],
  2238. "step_status": step_status_summary,
  2239. "created_at": task_info['created_at'].isoformat() if task_info.get('created_at') else None,
  2240. "started_at": task_info['started_at'].isoformat() if task_info.get('started_at') else None,
  2241. "completed_at": task_info['completed_at'].isoformat() if task_info.get('completed_at') else None,
  2242. "parameters": task_info.get('parameters', {}),
  2243. "result": task_info.get('result'),
  2244. "error_message": task_info.get('error_message'),
  2245. "current_step": {
  2246. "execution_id": current_step['execution_id'],
  2247. "step": current_step['step_name'],
  2248. "status": current_step['step_status'],
  2249. "started_at": current_step['started_at'].isoformat() if current_step and current_step.get('started_at') else None
  2250. } if current_step else None,
  2251. "total_steps": len(steps),
  2252. "steps": [{
  2253. "step_name": step['step_name'],
  2254. "step_status": step['step_status'],
  2255. "started_at": step['started_at'].isoformat() if step.get('started_at') else None,
  2256. "completed_at": step['completed_at'].isoformat() if step.get('completed_at') else None,
  2257. "error_message": step.get('error_message')
  2258. } for step in steps]
  2259. }
  2260. return jsonify(success_response(
  2261. response_text="获取任务状态成功",
  2262. data=response_data
  2263. ))
  2264. except Exception as e:
  2265. logger.error(f"获取数据管道任务状态失败: {str(e)}")
  2266. return jsonify(internal_error_response(
  2267. response_text="获取任务状态失败,请稍后重试"
  2268. )), 500
  2269. @app.route('/api/v0/data_pipeline/tasks/<task_id>/logs', methods=['GET'])
  2270. def get_data_pipeline_task_logs(task_id):
  2271. """
  2272. 获取数据管道任务日志(从任务目录文件读取)(从 citu_app.py 迁移)
  2273. 查询参数:
  2274. - limit: 日志行数限制,默认100
  2275. - level: 日志级别过滤,可选
  2276. 响应:
  2277. {
  2278. "success": true,
  2279. "code": 200,
  2280. "message": "获取任务日志成功",
  2281. "data": {
  2282. "task_id": "task_20250627_143052",
  2283. "logs": [
  2284. {
  2285. "timestamp": "2025-06-27 14:30:52",
  2286. "level": "INFO",
  2287. "message": "任务开始执行"
  2288. }
  2289. ],
  2290. "total": 15,
  2291. "source": "file"
  2292. }
  2293. }
  2294. """
  2295. try:
  2296. limit = request.args.get('limit', 100, type=int)
  2297. level = request.args.get('level')
  2298. # 限制最大查询数量
  2299. limit = min(limit, 1000)
  2300. manager = get_data_pipeline_manager()
  2301. # 验证任务是否存在
  2302. task_info = manager.get_task_status(task_id)
  2303. if not task_info:
  2304. return jsonify(not_found_response(
  2305. response_text=f"任务不存在: {task_id}"
  2306. )), 404
  2307. # 获取任务目录下的日志文件
  2308. import os
  2309. from pathlib import Path
  2310. # 获取项目根目录的绝对路径
  2311. project_root = Path(__file__).parent.absolute()
  2312. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  2313. log_file = task_dir / "data_pipeline.log"
  2314. logs = []
  2315. if log_file.exists():
  2316. try:
  2317. # 读取日志文件的最后N行
  2318. with open(log_file, 'r', encoding='utf-8') as f:
  2319. lines = f.readlines()
  2320. # 取最后limit行
  2321. recent_lines = lines[-limit:] if len(lines) > limit else lines
  2322. # 解析日志行
  2323. import re
  2324. log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
  2325. for line in recent_lines:
  2326. line = line.strip()
  2327. if not line:
  2328. continue
  2329. match = re.match(log_pattern, line)
  2330. if match:
  2331. timestamp, log_level, logger_name, message = match.groups()
  2332. # 级别过滤
  2333. if level and log_level != level.upper():
  2334. continue
  2335. logs.append({
  2336. "timestamp": timestamp,
  2337. "level": log_level,
  2338. "logger": logger_name,
  2339. "message": message
  2340. })
  2341. else:
  2342. # 处理多行日志(如异常堆栈)
  2343. if logs:
  2344. logs[-1]["message"] += f"\n{line}"
  2345. except Exception as e:
  2346. logger.error(f"读取日志文件失败: {e}")
  2347. response_data = {
  2348. "task_id": task_id,
  2349. "logs": logs,
  2350. "total": len(logs),
  2351. "source": "file",
  2352. "log_file": str(log_file) if log_file.exists() else None
  2353. }
  2354. return jsonify(success_response(
  2355. response_text="获取任务日志成功",
  2356. data=response_data
  2357. ))
  2358. except Exception as e:
  2359. logger.error(f"获取数据管道任务日志失败: {str(e)}")
  2360. return jsonify(internal_error_response(
  2361. response_text="获取任务日志失败,请稍后重试"
  2362. )), 500
  2363. @app.route('/api/v0/data_pipeline/tasks', methods=['GET'])
  2364. def list_data_pipeline_tasks():
  2365. """获取数据管道任务列表(从 citu_app.py 迁移)"""
  2366. try:
  2367. limit = request.args.get('limit', 50, type=int)
  2368. offset = request.args.get('offset', 0, type=int)
  2369. status_filter = request.args.get('status')
  2370. # 限制查询数量
  2371. limit = min(limit, 100)
  2372. manager = get_data_pipeline_manager()
  2373. tasks = manager.get_tasks_list(
  2374. limit=limit,
  2375. offset=offset,
  2376. status_filter=status_filter
  2377. )
  2378. # 格式化任务列表
  2379. formatted_tasks = []
  2380. for task in tasks:
  2381. formatted_tasks.append({
  2382. "task_id": task.get('task_id'),
  2383. "task_name": task.get('task_name'),
  2384. "status": task.get('status'),
  2385. "step_status": task.get('step_status'),
  2386. "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
  2387. "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
  2388. "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
  2389. "created_by": task.get('by_user'),
  2390. "db_name": task.get('db_name'),
  2391. "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
  2392. # 新增字段
  2393. "directory_exists": task.get('directory_exists', True), # 默认为True,兼容旧数据
  2394. "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
  2395. })
  2396. response_data = {
  2397. "tasks": formatted_tasks,
  2398. "total": len(formatted_tasks),
  2399. "limit": limit,
  2400. "offset": offset
  2401. }
  2402. return jsonify(success_response(
  2403. response_text="获取任务列表成功",
  2404. data=response_data
  2405. ))
  2406. except Exception as e:
  2407. logger.error(f"获取数据管道任务列表失败: {str(e)}")
  2408. return jsonify(internal_error_response(
  2409. response_text="获取任务列表失败,请稍后重试"
  2410. )), 500
  2411. @app.route('/api/v0/data_pipeline/tasks/query', methods=['POST'])
  2412. def query_data_pipeline_tasks():
  2413. """
  2414. 高级查询数据管道任务列表(从 citu_app.py 迁移)
  2415. 支持复杂筛选、排序、分页功能
  2416. 请求体:
  2417. {
  2418. "page": 1, // 页码,必须大于0,默认1
  2419. "page_size": 20, // 每页大小,1-100之间,默认20
  2420. "status": "completed", // 可选,任务状态筛选:"pending"|"running"|"completed"|"failed"|"cancelled"
  2421. "task_name": "highway", // 可选,任务名称模糊搜索,最大100字符
  2422. "created_by": "user123", // 可选,创建者精确匹配
  2423. "db_name": "highway_db", // 可选,数据库名称精确匹配
  2424. "created_time_start": "2025-01-01T00:00:00", // 可选,创建时间范围开始
  2425. "created_time_end": "2025-12-31T23:59:59", // 可选,创建时间范围结束
  2426. "started_time_start": "2025-01-01T00:00:00", // 可选,开始时间范围开始
  2427. "started_time_end": "2025-12-31T23:59:59", // 可选,开始时间范围结束
  2428. "completed_time_start": "2025-01-01T00:00:00", // 可选,完成时间范围开始
  2429. "completed_time_end": "2025-12-31T23:59:59", // 可选,完成时间范围结束
  2430. "sort_by": "created_at", // 可选,排序字段:"created_at"|"started_at"|"completed_at"|"task_name"|"status",默认"created_at"
  2431. "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
  2432. }
  2433. """
  2434. try:
  2435. # 获取请求数据
  2436. req = request.get_json(force=True) if request.is_json else {}
  2437. # 解析参数,设置默认值
  2438. page = req.get('page', 1)
  2439. page_size = req.get('page_size', 20)
  2440. status = req.get('status')
  2441. task_name = req.get('task_name')
  2442. created_by = req.get('created_by')
  2443. db_name = req.get('db_name')
  2444. created_time_start = req.get('created_time_start')
  2445. created_time_end = req.get('created_time_end')
  2446. started_time_start = req.get('started_time_start')
  2447. started_time_end = req.get('started_time_end')
  2448. completed_time_start = req.get('completed_time_start')
  2449. completed_time_end = req.get('completed_time_end')
  2450. sort_by = req.get('sort_by', 'created_at')
  2451. sort_order = req.get('sort_order', 'desc')
  2452. # 参数验证
  2453. # 验证分页参数
  2454. if page < 1:
  2455. return jsonify(bad_request_response(
  2456. response_text="页码必须大于0",
  2457. invalid_params=['page']
  2458. )), 400
  2459. if page_size < 1 or page_size > 100:
  2460. return jsonify(bad_request_response(
  2461. response_text="每页大小必须在1-100之间",
  2462. invalid_params=['page_size']
  2463. )), 400
  2464. # 验证任务名称长度
  2465. if task_name and len(task_name) > 100:
  2466. return jsonify(bad_request_response(
  2467. response_text="任务名称搜索关键词最大长度为100字符",
  2468. invalid_params=['task_name']
  2469. )), 400
  2470. # 验证排序参数
  2471. allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
  2472. if sort_by not in allowed_sort_fields:
  2473. return jsonify(bad_request_response(
  2474. response_text=f"不支持的排序字段: {sort_by},支持的字段: {', '.join(allowed_sort_fields)}",
  2475. invalid_params=['sort_by']
  2476. )), 400
  2477. if sort_order.lower() not in ['asc', 'desc']:
  2478. return jsonify(bad_request_response(
  2479. response_text="排序方向必须是 'asc' 或 'desc'",
  2480. invalid_params=['sort_order']
  2481. )), 400
  2482. # 验证状态筛选
  2483. if status:
  2484. allowed_statuses = ['pending', 'running', 'completed', 'failed', 'cancelled']
  2485. if status not in allowed_statuses:
  2486. return jsonify(bad_request_response(
  2487. response_text=f"不支持的状态值: {status},支持的状态: {', '.join(allowed_statuses)}",
  2488. invalid_params=['status']
  2489. )), 400
  2490. # 调用管理器执行查询
  2491. manager = get_data_pipeline_manager()
  2492. result = manager.query_tasks_advanced(
  2493. page=page,
  2494. page_size=page_size,
  2495. status=status,
  2496. task_name=task_name,
  2497. created_by=created_by,
  2498. db_name=db_name,
  2499. created_time_start=created_time_start,
  2500. created_time_end=created_time_end,
  2501. started_time_start=started_time_start,
  2502. started_time_end=started_time_end,
  2503. completed_time_start=completed_time_start,
  2504. completed_time_end=completed_time_end,
  2505. sort_by=sort_by,
  2506. sort_order=sort_order
  2507. )
  2508. # 格式化任务列表
  2509. formatted_tasks = []
  2510. for task in result['tasks']:
  2511. formatted_tasks.append({
  2512. "task_id": task.get('task_id'),
  2513. "task_name": task.get('task_name'),
  2514. "status": task.get('status'),
  2515. "step_status": task.get('step_status'),
  2516. "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
  2517. "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
  2518. "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
  2519. "created_by": task.get('by_user'),
  2520. "db_name": task.get('db_name'),
  2521. "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
  2522. "directory_exists": task.get('directory_exists', True),
  2523. "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
  2524. })
  2525. # 构建响应数据
  2526. response_data = {
  2527. "tasks": formatted_tasks,
  2528. "pagination": result['pagination'],
  2529. "filters_applied": {
  2530. k: v for k, v in {
  2531. "status": status,
  2532. "task_name": task_name,
  2533. "created_by": created_by,
  2534. "db_name": db_name,
  2535. "created_time_start": created_time_start,
  2536. "created_time_end": created_time_end,
  2537. "started_time_start": started_time_start,
  2538. "started_time_end": started_time_end,
  2539. "completed_time_start": completed_time_start,
  2540. "completed_time_end": completed_time_end
  2541. }.items() if v
  2542. },
  2543. "sort_applied": {
  2544. "sort_by": sort_by,
  2545. "sort_order": sort_order
  2546. },
  2547. "query_time": result.get('query_time', '0.000s')
  2548. }
  2549. return jsonify(success_response(
  2550. response_text="查询任务列表成功",
  2551. data=response_data
  2552. ))
  2553. except Exception as e:
  2554. logger.error(f"查询数据管道任务列表失败: {str(e)}")
  2555. return jsonify(internal_error_response(
  2556. response_text="查询任务列表失败,请稍后重试"
  2557. )), 500
  2558. @app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['GET'])
  2559. def get_data_pipeline_task_files(task_id):
  2560. """获取任务文件列表(从 citu_app.py 迁移)"""
  2561. try:
  2562. file_manager = get_data_pipeline_file_manager()
  2563. # 获取任务文件
  2564. files = file_manager.get_task_files(task_id)
  2565. directory_info = file_manager.get_directory_info(task_id)
  2566. # 格式化文件信息
  2567. formatted_files = []
  2568. for file_info in files:
  2569. formatted_files.append({
  2570. "file_name": file_info['file_name'],
  2571. "file_type": file_info['file_type'],
  2572. "file_size": file_info['file_size'],
  2573. "file_size_formatted": file_info['file_size_formatted'],
  2574. "created_at": file_info['created_at'].isoformat() if file_info.get('created_at') else None,
  2575. "modified_at": file_info['modified_at'].isoformat() if file_info.get('modified_at') else None,
  2576. "is_readable": file_info['is_readable']
  2577. })
  2578. response_data = {
  2579. "task_id": task_id,
  2580. "files": formatted_files,
  2581. "directory_info": directory_info
  2582. }
  2583. return jsonify(success_response(
  2584. response_text="获取任务文件列表成功",
  2585. data=response_data
  2586. ))
  2587. except Exception as e:
  2588. logger.error(f"获取任务文件列表失败: {str(e)}")
  2589. return jsonify(internal_error_response(
  2590. response_text="获取任务文件列表失败,请稍后重试"
  2591. )), 500
  2592. @app.route('/api/v0/data_pipeline/tasks/<task_id>/files/<file_name>', methods=['GET'])
  2593. def download_data_pipeline_task_file(task_id, file_name):
  2594. """下载任务文件(从 citu_app.py 迁移)"""
  2595. try:
  2596. logger.info(f"开始下载文件: task_id={task_id}, file_name={file_name}")
  2597. # 直接构建文件路径,避免依赖数据库
  2598. from pathlib import Path
  2599. import os
  2600. # 获取项目根目录的绝对路径
  2601. project_root = Path(__file__).parent.absolute()
  2602. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  2603. file_path = task_dir / file_name
  2604. logger.info(f"文件路径: {file_path}")
  2605. # 检查文件是否存在
  2606. if not file_path.exists():
  2607. logger.warning(f"文件不存在: {file_path}")
  2608. return jsonify(not_found_response(
  2609. response_text=f"文件不存在: {file_name}"
  2610. )), 404
  2611. # 检查是否为文件(而不是目录)
  2612. if not file_path.is_file():
  2613. logger.warning(f"路径不是文件: {file_path}")
  2614. return jsonify(bad_request_response(
  2615. response_text=f"路径不是有效文件: {file_name}"
  2616. )), 400
  2617. # 安全检查:确保文件在允许的目录内
  2618. try:
  2619. file_path.resolve().relative_to(task_dir.resolve())
  2620. except ValueError:
  2621. logger.warning(f"文件路径不安全: {file_path}")
  2622. return jsonify(bad_request_response(
  2623. response_text="非法的文件路径"
  2624. )), 400
  2625. # 检查文件是否可读
  2626. if not os.access(file_path, os.R_OK):
  2627. logger.warning(f"文件不可读: {file_path}")
  2628. return jsonify(bad_request_response(
  2629. response_text="文件不可读"
  2630. )), 400
  2631. logger.info(f"开始发送文件: {file_path}")
  2632. return send_file(
  2633. file_path,
  2634. as_attachment=True,
  2635. download_name=file_name
  2636. )
  2637. except Exception as e:
  2638. logger.error(f"下载任务文件失败: task_id={task_id}, file_name={file_name}, 错误: {str(e)}", exc_info=True)
  2639. return jsonify(internal_error_response(
  2640. response_text="下载文件失败,请稍后重试"
  2641. )), 500
  2642. @app.route('/api/v0/data_pipeline/tasks/<task_id>/upload-table-list', methods=['POST'])
  2643. def upload_table_list_file(task_id):
  2644. """
  2645. 上传表清单文件(从 citu_app.py 迁移)
  2646. 表单参数:
  2647. - file: 要上传的表清单文件(multipart/form-data)
  2648. 响应:
  2649. {
  2650. "success": true,
  2651. "code": 200,
  2652. "message": "表清单文件上传成功",
  2653. "data": {
  2654. "task_id": "task_20250701_123456",
  2655. "filename": "table_list.txt",
  2656. "file_size": 1024,
  2657. "file_size_formatted": "1.0 KB"
  2658. }
  2659. }
  2660. """
  2661. try:
  2662. # 验证任务是否存在
  2663. manager = get_data_pipeline_manager()
  2664. task_info = manager.get_task_status(task_id)
  2665. if not task_info:
  2666. return jsonify(not_found_response(
  2667. response_text=f"任务不存在: {task_id}"
  2668. )), 404
  2669. # 检查是否有文件上传
  2670. if 'file' not in request.files:
  2671. return jsonify(bad_request_response(
  2672. response_text="请选择要上传的表清单文件",
  2673. missing_params=['file']
  2674. )), 400
  2675. file = request.files['file']
  2676. # 验证文件名
  2677. if file.filename == '':
  2678. return jsonify(bad_request_response(
  2679. response_text="请选择有效的文件"
  2680. )), 400
  2681. try:
  2682. # 使用文件管理器上传文件
  2683. file_manager = get_data_pipeline_file_manager()
  2684. result = file_manager.upload_table_list_file(task_id, file)
  2685. response_data = {
  2686. "task_id": task_id,
  2687. "filename": result["filename"],
  2688. "file_size": result["file_size"],
  2689. "file_size_formatted": result["file_size_formatted"],
  2690. "upload_time": result["upload_time"].isoformat() if result.get("upload_time") else None
  2691. }
  2692. return jsonify(success_response(
  2693. response_text="表清单文件上传成功",
  2694. data=response_data
  2695. )), 200
  2696. except ValueError as e:
  2697. # 文件验证错误(如文件太大、空文件等)
  2698. return jsonify(bad_request_response(
  2699. response_text=str(e)
  2700. )), 400
  2701. except Exception as e:
  2702. logger.error(f"上传表清单文件失败: {str(e)}")
  2703. return jsonify(internal_error_response(
  2704. response_text="文件上传失败,请稍后重试"
  2705. )), 500
  2706. except Exception as e:
  2707. logger.error(f"处理表清单文件上传请求失败: {str(e)}")
  2708. return jsonify(internal_error_response(
  2709. response_text="处理上传请求失败,请稍后重试"
  2710. )), 500
  2711. @app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list-info', methods=['GET'])
  2712. def get_table_list_info(task_id):
  2713. """
  2714. 获取任务的表清单文件信息(从 citu_app.py 迁移)
  2715. 响应:
  2716. {
  2717. "success": true,
  2718. "code": 200,
  2719. "message": "获取表清单文件信息成功",
  2720. "data": {
  2721. "task_id": "task_20250701_123456",
  2722. "has_file": true,
  2723. "filename": "table_list.txt",
  2724. "file_path": "./data_pipeline/training_data/task_20250701_123456/table_list.txt",
  2725. "file_size": 1024,
  2726. "file_size_formatted": "1.0 KB",
  2727. "uploaded_at": "2025-07-01T12:34:56",
  2728. "table_count": 5,
  2729. "is_readable": true
  2730. }
  2731. }
  2732. """
  2733. try:
  2734. file_manager = get_data_pipeline_file_manager()
  2735. # 获取表清单文件信息
  2736. table_list_info = file_manager.get_table_list_file_info(task_id)
  2737. response_data = {
  2738. "task_id": task_id,
  2739. "has_file": table_list_info.get("exists", False),
  2740. **table_list_info
  2741. }
  2742. return jsonify(success_response(
  2743. response_text="获取表清单文件信息成功",
  2744. data=response_data
  2745. ))
  2746. except Exception as e:
  2747. logger.error(f"获取表清单文件信息失败: {str(e)}")
  2748. return jsonify(internal_error_response(
  2749. response_text="获取表清单文件信息失败,请稍后重试"
  2750. )), 500
  2751. @app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list', methods=['POST'])
  2752. def create_table_list_from_names(task_id):
  2753. """
  2754. 通过POST方式提交表名列表并创建table_list.txt文件(从 citu_app.py 迁移)
  2755. 请求体:
  2756. {
  2757. "tables": ["table1", "schema.table2", "table3"]
  2758. }
  2759. 或者:
  2760. {
  2761. "tables": "table1,schema.table2,table3"
  2762. }
  2763. 响应:
  2764. {
  2765. "success": true,
  2766. "code": 200,
  2767. "message": "表清单已成功创建",
  2768. "data": {
  2769. "task_id": "task_20250701_123456",
  2770. "filename": "table_list.txt",
  2771. "table_count": 3,
  2772. "file_size": 45,
  2773. "file_size_formatted": "45 B",
  2774. "created_time": "2025-07-01T12:34:56"
  2775. }
  2776. }
  2777. """
  2778. try:
  2779. # 验证任务是否存在
  2780. manager = get_data_pipeline_manager()
  2781. task_info = manager.get_task_status(task_id)
  2782. if not task_info:
  2783. return jsonify(not_found_response(
  2784. response_text=f"任务不存在: {task_id}"
  2785. )), 404
  2786. # 获取请求数据
  2787. req = request.get_json(force=True)
  2788. tables_param = req.get('tables')
  2789. if not tables_param:
  2790. return jsonify(bad_request_response(
  2791. response_text="缺少必需参数:tables",
  2792. missing_params=['tables']
  2793. )), 400
  2794. # 处理不同格式的表名参数
  2795. try:
  2796. if isinstance(tables_param, str):
  2797. # 逗号分隔的字符串格式
  2798. table_names = [name.strip() for name in tables_param.split(',') if name.strip()]
  2799. elif isinstance(tables_param, list):
  2800. # 数组格式
  2801. table_names = [str(name).strip() for name in tables_param if str(name).strip()]
  2802. else:
  2803. return jsonify(bad_request_response(
  2804. response_text="tables参数格式错误,应为字符串(逗号分隔)或数组"
  2805. )), 400
  2806. if not table_names:
  2807. return jsonify(bad_request_response(
  2808. response_text="表名列表不能为空"
  2809. )), 400
  2810. except Exception as e:
  2811. return jsonify(bad_request_response(
  2812. response_text=f"解析tables参数失败: {str(e)}"
  2813. )), 400
  2814. try:
  2815. # 使用文件管理器创建表清单文件
  2816. file_manager = get_data_pipeline_file_manager()
  2817. result = file_manager.create_table_list_from_names(task_id, table_names)
  2818. response_data = {
  2819. "task_id": task_id,
  2820. "filename": result["filename"],
  2821. "table_count": result["table_count"],
  2822. "unique_table_count": result["unique_table_count"],
  2823. "file_size": result["file_size"],
  2824. "file_size_formatted": result["file_size_formatted"],
  2825. "created_time": result["created_time"].isoformat() if result.get("created_time") else None,
  2826. "original_count": len(table_names) if isinstance(table_names, list) else len(tables_param.split(','))
  2827. }
  2828. return jsonify(success_response(
  2829. response_text=f"表清单已成功创建,包含 {result['table_count']} 个表",
  2830. data=response_data
  2831. )), 200
  2832. except ValueError as e:
  2833. # 表名验证错误(如格式错误、数量限制等)
  2834. return jsonify(bad_request_response(
  2835. response_text=str(e)
  2836. )), 400
  2837. except Exception as e:
  2838. logger.error(f"创建表清单文件失败: {str(e)}")
  2839. return jsonify(internal_error_response(
  2840. response_text="创建表清单文件失败,请稍后重试"
  2841. )), 500
  2842. except Exception as e:
  2843. logger.error(f"处理表清单创建请求失败: {str(e)}")
  2844. return jsonify(internal_error_response(
  2845. response_text="处理请求失败,请稍后重试"
  2846. )), 500
  2847. @app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['POST'])
  2848. def upload_file_to_task(task_id):
  2849. """
  2850. 上传文件到指定任务目录(从 citu_app.py 迁移)
  2851. 表单参数:
  2852. - file: 要上传的文件(multipart/form-data)
  2853. - overwrite_mode: 重名处理模式 (backup, replace, skip),默认为backup
  2854. 支持的文件类型:
  2855. - .ddl: DDL文件
  2856. - .md: Markdown文档
  2857. - .txt: 文本文件
  2858. - .json: JSON文件
  2859. - .sql: SQL文件
  2860. - .csv: CSV文件
  2861. 重名处理模式:
  2862. - backup: 备份原文件(默认)
  2863. - replace: 直接覆盖
  2864. - skip: 跳过上传
  2865. """
  2866. try:
  2867. # 验证任务是否存在
  2868. manager = get_data_pipeline_manager()
  2869. task_info = manager.get_task_status(task_id)
  2870. if not task_info:
  2871. return jsonify(not_found_response(
  2872. response_text=f"任务不存在: {task_id}"
  2873. )), 404
  2874. # 检查是否有文件上传
  2875. if 'file' not in request.files:
  2876. return jsonify(bad_request_response(
  2877. response_text="请选择要上传的文件",
  2878. missing_params=['file']
  2879. )), 400
  2880. file = request.files['file']
  2881. # 验证文件名
  2882. if file.filename == '':
  2883. return jsonify(bad_request_response(
  2884. response_text="请选择有效的文件"
  2885. )), 400
  2886. # 获取重名处理模式
  2887. overwrite_mode = request.form.get('overwrite_mode', 'backup')
  2888. # 验证重名处理模式
  2889. valid_modes = ['backup', 'replace', 'skip']
  2890. if overwrite_mode not in valid_modes:
  2891. return jsonify(bad_request_response(
  2892. response_text=f"无效的overwrite_mode参数: {overwrite_mode},支持的值: {valid_modes}",
  2893. invalid_params=['overwrite_mode']
  2894. )), 400
  2895. try:
  2896. # 使用文件管理器上传文件
  2897. file_manager = get_data_pipeline_file_manager()
  2898. result = file_manager.upload_file_to_task(task_id, file, file.filename, overwrite_mode)
  2899. # 检查是否跳过上传
  2900. if result.get('skipped'):
  2901. return jsonify(success_response(
  2902. response_text=result.get('message', '文件已存在,跳过上传'),
  2903. data=result
  2904. )), 200
  2905. return jsonify(success_response(
  2906. response_text="文件上传成功",
  2907. data=result
  2908. )), 200
  2909. except ValueError as e:
  2910. # 文件验证错误(如文件太大、空文件、不支持的类型等)
  2911. return jsonify(bad_request_response(
  2912. response_text=str(e)
  2913. )), 400
  2914. except Exception as e:
  2915. logger.error(f"上传文件失败: {str(e)}")
  2916. return jsonify(internal_error_response(
  2917. response_text="文件上传失败,请稍后重试"
  2918. )), 500
  2919. except Exception as e:
  2920. logger.error(f"处理文件上传请求失败: {str(e)}")
  2921. return jsonify(internal_error_response(
  2922. response_text="处理上传请求失败,请稍后重试"
  2923. )), 500
  2924. # 任务目录删除功能(从 citu_app.py 迁移)
  2925. import shutil
  2926. from pathlib import Path
  2927. import psycopg2
  2928. from app_config import PGVECTOR_CONFIG
  2929. def delete_task_directory_simple(task_id, delete_database_records=False):
  2930. """
  2931. 简单的任务目录删除功能(从 citu_app.py 迁移)
  2932. - 删除 data_pipeline/training_data/{task_id} 目录
  2933. - 更新数据库中的 directory_exists 字段
  2934. - 可选:删除数据库记录
  2935. """
  2936. try:
  2937. # 1. 删除目录
  2938. project_root = Path(__file__).parent.absolute()
  2939. task_dir = project_root / "data_pipeline" / "training_data" / task_id
  2940. deleted_files_count = 0
  2941. deleted_size = 0
  2942. if task_dir.exists():
  2943. # 计算删除前的统计信息
  2944. for file_path in task_dir.rglob('*'):
  2945. if file_path.is_file():
  2946. deleted_files_count += 1
  2947. deleted_size += file_path.stat().st_size
  2948. # 删除目录
  2949. shutil.rmtree(task_dir)
  2950. directory_deleted = True
  2951. operation_message = "目录删除成功"
  2952. else:
  2953. directory_deleted = False
  2954. operation_message = "目录不存在,无需删除"
  2955. # 2. 更新数据库
  2956. database_records_deleted = False
  2957. try:
  2958. conn = psycopg2.connect(**PGVECTOR_CONFIG)
  2959. cur = conn.cursor()
  2960. if delete_database_records:
  2961. # 删除任务步骤记录
  2962. cur.execute("DELETE FROM data_pipeline_task_steps WHERE task_id = %s", (task_id,))
  2963. # 删除任务主记录
  2964. cur.execute("DELETE FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
  2965. database_records_deleted = True
  2966. else:
  2967. # 只更新目录状态
  2968. cur.execute("""
  2969. UPDATE data_pipeline_tasks
  2970. SET directory_exists = FALSE, updated_at = CURRENT_TIMESTAMP
  2971. WHERE task_id = %s
  2972. """, (task_id,))
  2973. conn.commit()
  2974. cur.close()
  2975. conn.close()
  2976. except Exception as db_error:
  2977. logger.error(f"数据库操作失败: {db_error}")
  2978. # 数据库失败不影响文件删除的结果
  2979. # 3. 格式化文件大小
  2980. def format_size(size_bytes):
  2981. if size_bytes < 1024:
  2982. return f"{size_bytes} B"
  2983. elif size_bytes < 1024**2:
  2984. return f"{size_bytes/1024:.1f} KB"
  2985. elif size_bytes < 1024**3:
  2986. return f"{size_bytes/(1024**2):.1f} MB"
  2987. else:
  2988. return f"{size_bytes/(1024**3):.1f} GB"
  2989. return {
  2990. "success": True,
  2991. "task_id": task_id,
  2992. "directory_deleted": directory_deleted,
  2993. "database_records_deleted": database_records_deleted,
  2994. "deleted_files_count": deleted_files_count,
  2995. "deleted_size": format_size(deleted_size),
  2996. "deleted_at": datetime.now().isoformat(),
  2997. "operation_message": operation_message # 新增:具体的操作消息
  2998. }
  2999. except Exception as e:
  3000. logger.error(f"删除任务目录失败: {task_id}, 错误: {str(e)}")
  3001. return {
  3002. "success": False,
  3003. "task_id": task_id,
  3004. "error": str(e),
  3005. "error_code": "DELETE_FAILED",
  3006. "operation_message": f"删除操作失败: {str(e)}" # 新增:失败消息
  3007. }
  3008. @app.route('/api/v0/data_pipeline/tasks', methods=['DELETE'])
  3009. def delete_tasks():
  3010. """删除任务目录(支持单个和批量)(从 citu_app.py 迁移)"""
  3011. try:
  3012. # 智能获取参数:支持JSON body和URL查询参数两种方式
  3013. def get_request_parameter(param_name, array_param_name=None):
  3014. """从JSON body或URL查询参数中获取参数值"""
  3015. # 1. 优先从JSON body获取
  3016. if request.is_json:
  3017. try:
  3018. json_data = request.get_json()
  3019. if json_data and param_name in json_data:
  3020. return json_data[param_name]
  3021. except:
  3022. pass
  3023. # 2. 从URL查询参数获取
  3024. if param_name in request.args:
  3025. value = request.args.get(param_name)
  3026. # 处理布尔值
  3027. if value.lower() in ('true', '1', 'yes'):
  3028. return True
  3029. elif value.lower() in ('false', '0', 'no'):
  3030. return False
  3031. return value
  3032. # 3. 处理数组参数(如 task_ids[])
  3033. if array_param_name and array_param_name in request.args:
  3034. return request.args.getlist(array_param_name)
  3035. return None
  3036. # 获取参数
  3037. task_ids = get_request_parameter('task_ids', 'task_ids[]')
  3038. confirm = get_request_parameter('confirm')
  3039. if not task_ids:
  3040. return jsonify(bad_request_response(
  3041. response_text="缺少必需参数: task_ids",
  3042. missing_params=['task_ids']
  3043. )), 400
  3044. if not confirm:
  3045. return jsonify(bad_request_response(
  3046. response_text="缺少必需参数: confirm",
  3047. missing_params=['confirm']
  3048. )), 400
  3049. if confirm != True:
  3050. return jsonify(bad_request_response(
  3051. response_text="confirm参数必须为true以确认删除操作"
  3052. )), 400
  3053. if not isinstance(task_ids, list) or len(task_ids) == 0:
  3054. return jsonify(bad_request_response(
  3055. response_text="task_ids必须是非空的任务ID列表"
  3056. )), 400
  3057. # 获取可选参数
  3058. delete_database_records = get_request_parameter('delete_database_records') or False
  3059. continue_on_error = get_request_parameter('continue_on_error')
  3060. if continue_on_error is None:
  3061. continue_on_error = True
  3062. # 执行批量删除操作
  3063. deleted_tasks = []
  3064. failed_tasks = []
  3065. total_size_freed = 0
  3066. for task_id in task_ids:
  3067. result = delete_task_directory_simple(task_id, delete_database_records)
  3068. if result["success"]:
  3069. deleted_tasks.append(result)
  3070. # 累计释放的空间大小(这里简化处理,实际应该解析size字符串)
  3071. else:
  3072. failed_tasks.append({
  3073. "task_id": task_id,
  3074. "error": result["error"],
  3075. "error_code": result.get("error_code", "UNKNOWN")
  3076. })
  3077. if not continue_on_error:
  3078. break
  3079. # 构建响应
  3080. summary = {
  3081. "total_requested": len(task_ids),
  3082. "successfully_deleted": len(deleted_tasks),
  3083. "failed": len(failed_tasks)
  3084. }
  3085. batch_result = {
  3086. "deleted_tasks": deleted_tasks,
  3087. "failed_tasks": failed_tasks,
  3088. "summary": summary,
  3089. "deleted_at": datetime.now().isoformat()
  3090. }
  3091. # 构建智能响应消息
  3092. if len(task_ids) == 1:
  3093. # 单个删除:使用具体的操作消息
  3094. if summary["failed"] == 0:
  3095. # 从deleted_tasks中获取具体的操作消息
  3096. operation_msg = deleted_tasks[0].get('operation_message', '任务处理完成')
  3097. message = operation_msg
  3098. else:
  3099. # 从failed_tasks中获取错误消息
  3100. error_msg = failed_tasks[0].get('error', '删除失败')
  3101. message = f"任务删除失败: {error_msg}"
  3102. else:
  3103. # 批量删除:统计各种操作结果
  3104. directory_deleted_count = sum(1 for task in deleted_tasks if task.get('directory_deleted', False))
  3105. directory_not_exist_count = sum(1 for task in deleted_tasks if not task.get('directory_deleted', False))
  3106. if summary["failed"] == 0:
  3107. # 全部成功
  3108. if directory_deleted_count > 0 and directory_not_exist_count > 0:
  3109. message = f"批量操作完成:{directory_deleted_count}个目录已删除,{directory_not_exist_count}个目录不存在"
  3110. elif directory_deleted_count > 0:
  3111. message = f"批量删除完成:成功删除{directory_deleted_count}个目录"
  3112. elif directory_not_exist_count > 0:
  3113. message = f"批量操作完成:{directory_not_exist_count}个目录不存在,无需删除"
  3114. else:
  3115. message = "批量操作完成"
  3116. elif summary["successfully_deleted"] == 0:
  3117. message = f"批量删除失败:{summary['failed']}个任务处理失败"
  3118. else:
  3119. message = f"批量删除部分完成:成功{summary['successfully_deleted']}个,失败{summary['failed']}个"
  3120. return jsonify(success_response(
  3121. response_text=message,
  3122. data=batch_result
  3123. )), 200
  3124. except Exception as e:
  3125. logger.error(f"删除任务失败: 错误: {str(e)}")
  3126. return jsonify(internal_error_response(
  3127. response_text="删除任务失败,请稍后重试"
  3128. )), 500
  3129. @app.route('/api/v0/data_pipeline/tasks/<task_id>/logs/query', methods=['POST'])
  3130. def query_data_pipeline_task_logs(task_id):
  3131. """
  3132. 高级查询数据管道任务日志(从 citu_app.py 迁移)
  3133. 支持复杂筛选、排序、分页功能
  3134. 请求体:
  3135. {
  3136. "page": 1, // 页码,必须大于0,默认1
  3137. "page_size": 50, // 每页大小,1-500之间,默认50
  3138. "level": "ERROR", // 可选,日志级别筛选:"DEBUG"|"INFO"|"WARNING"|"ERROR"|"CRITICAL"
  3139. "start_time": "2025-01-01 00:00:00", // 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
  3140. "end_time": "2025-01-02 23:59:59", // 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
  3141. "keyword": "failed", // 可选,关键字搜索(消息内容模糊匹配)
  3142. "logger_name": "DDLGenerator", // 可选,日志记录器名称精确匹配
  3143. "step_name": "ddl_generation", // 可选,执行步骤名称精确匹配
  3144. "sort_by": "timestamp", // 可选,排序字段:"timestamp"|"level"|"logger"|"step"|"line_number",默认"timestamp"
  3145. "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
  3146. }
  3147. """
  3148. try:
  3149. # 验证任务是否存在
  3150. manager = get_data_pipeline_manager()
  3151. task_info = manager.get_task_status(task_id)
  3152. if not task_info:
  3153. return jsonify(not_found_response(
  3154. response_text=f"任务不存在: {task_id}"
  3155. )), 404
  3156. # 解析请求数据
  3157. request_data = request.get_json() or {}
  3158. # 参数验证
  3159. def _is_valid_time_format(time_str):
  3160. """验证时间格式是否有效"""
  3161. if not time_str:
  3162. return True
  3163. # 支持的时间格式
  3164. time_formats = [
  3165. '%Y-%m-%d %H:%M:%S', # 2025-01-01 00:00:00
  3166. '%Y-%m-%d', # 2025-01-01
  3167. '%Y-%m-%dT%H:%M:%S', # 2025-01-01T00:00:00
  3168. '%Y-%m-%dT%H:%M:%S.%f', # 2025-01-01T00:00:00.123456
  3169. ]
  3170. for fmt in time_formats:
  3171. try:
  3172. from datetime import datetime
  3173. datetime.strptime(time_str, fmt)
  3174. return True
  3175. except ValueError:
  3176. continue
  3177. return False
  3178. # 提取和验证参数
  3179. page = request_data.get('page', 1)
  3180. page_size = request_data.get('page_size', 50)
  3181. level = request_data.get('level')
  3182. start_time = request_data.get('start_time')
  3183. end_time = request_data.get('end_time')
  3184. keyword = request_data.get('keyword')
  3185. logger_name = request_data.get('logger_name')
  3186. step_name = request_data.get('step_name')
  3187. sort_by = request_data.get('sort_by', 'timestamp')
  3188. sort_order = request_data.get('sort_order', 'desc')
  3189. # 参数验证
  3190. if not isinstance(page, int) or page < 1:
  3191. return jsonify(bad_request_response(
  3192. response_text="页码必须是大于0的整数"
  3193. )), 400
  3194. if not isinstance(page_size, int) or page_size < 1 or page_size > 500:
  3195. return jsonify(bad_request_response(
  3196. response_text="每页大小必须是1-500之间的整数"
  3197. )), 400
  3198. # 验证日志级别
  3199. if level and level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
  3200. return jsonify(bad_request_response(
  3201. response_text="日志级别必须是DEBUG、INFO、WARNING、ERROR、CRITICAL之一"
  3202. )), 400
  3203. # 验证时间格式
  3204. if not _is_valid_time_format(start_time):
  3205. return jsonify(bad_request_response(
  3206. response_text="开始时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
  3207. )), 400
  3208. if not _is_valid_time_format(end_time):
  3209. return jsonify(bad_request_response(
  3210. response_text="结束时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
  3211. )), 400
  3212. # 验证关键字长度
  3213. if keyword and len(keyword) > 200:
  3214. return jsonify(bad_request_response(
  3215. response_text="关键字长度不能超过200个字符"
  3216. )), 400
  3217. # 验证排序字段
  3218. allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
  3219. if sort_by not in allowed_sort_fields:
  3220. return jsonify(bad_request_response(
  3221. response_text=f"排序字段必须是以下之一: {', '.join(allowed_sort_fields)}"
  3222. )), 400
  3223. # 验证排序方向
  3224. if sort_order.lower() not in ['asc', 'desc']:
  3225. return jsonify(bad_request_response(
  3226. response_text="排序方向必须是asc或desc"
  3227. )), 400
  3228. # 创建工作流执行器并查询日志
  3229. from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
  3230. executor = SimpleWorkflowExecutor(task_id)
  3231. try:
  3232. result = executor.query_logs_advanced(
  3233. page=page,
  3234. page_size=page_size,
  3235. level=level,
  3236. start_time=start_time,
  3237. end_time=end_time,
  3238. keyword=keyword,
  3239. logger_name=logger_name,
  3240. step_name=step_name,
  3241. sort_by=sort_by,
  3242. sort_order=sort_order
  3243. )
  3244. return jsonify(success_response(
  3245. response_text="查询任务日志成功",
  3246. data=result
  3247. ))
  3248. finally:
  3249. executor.cleanup()
  3250. except Exception as e:
  3251. logger.error(f"查询数据管道任务日志失败: {str(e)}")
  3252. return jsonify(internal_error_response(
  3253. response_text="查询任务日志失败,请稍后重试"
  3254. )), 500