12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916 |
- # 给dataops 对话助手返回结果
- from vanna.flask import VannaFlaskApp
- from core.vanna_llm_factory import create_vanna_instance
- from flask import request, jsonify
- import pandas as pd
- import common.result as result
- from datetime import datetime, timedelta
- from common.session_aware_cache import WebSessionAwareMemoryCache
- from app_config import API_MAX_RETURN_ROWS, ENABLE_RESULT_SUMMARY
- import re
- import chainlit as cl
- import json
- from flask import session # 添加session导入
- from common.redis_conversation_manager import RedisConversationManager # 添加Redis对话管理器导入
- from common.result import ( # 统一导入所有需要的响应函数
- bad_request_response, service_unavailable_response,
- agent_success_response, agent_error_response,
- internal_error_response, success_response,
- validation_failed_response
- )
- from app_config import ( # 添加Redis相关配置导入
- USER_MAX_CONVERSATIONS,
- CONVERSATION_CONTEXT_COUNT,
- DEFAULT_ANONYMOUS_USER,
- ENABLE_QUESTION_ANSWER_CACHE
- )
- # 设置默认的最大返回行数
- DEFAULT_MAX_RETURN_ROWS = 200
- MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
- vn = create_vanna_instance()
- # 创建带时间戳的缓存
- timestamped_cache = WebSessionAwareMemoryCache()
- # 实例化 VannaFlaskApp,使用自定义缓存
- app = VannaFlaskApp(
- vn,
- cache=timestamped_cache, # 使用带时间戳的缓存
- title="辞图智能数据问答平台",
- logo = "https://www.citupro.com/img/logo-black-2.png",
- subtitle="让 AI 为你写 SQL",
- chart=False,
- allow_llm_to_see_data=True,
- ask_results_correct=True,
- followup_questions=True,
- debug=True
- )
- # 创建Redis对话管理器实例
- redis_conversation_manager = RedisConversationManager()
- # 修改ask接口,支持前端传递session_id
- @app.flask_app.route('/api/v0/ask', methods=['POST'])
- def ask_full():
- req = request.get_json(force=True)
- question = req.get("question", None)
- browser_session_id = req.get("session_id", None) # 前端传递的会话ID
-
- if not question:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="缺少必需参数:question",
- missing_params=["question"]
- )), 400
- # 如果使用WebSessionAwareMemoryCache
- if hasattr(app.cache, 'generate_id_with_browser_session') and browser_session_id:
- # 这里需要修改vanna的ask方法来支持传递session_id
- # 或者预先调用generate_id来建立会话关联
- conversation_id = app.cache.generate_id_with_browser_session(
- question=question,
- browser_session_id=browser_session_id
- )
- try:
- sql, df, _ = vn.ask(
- question=question,
- print_results=False,
- visualize=False,
- allow_llm_to_see_data=True
- )
- # 关键:检查是否有LLM解释性文本(无法生成SQL的情况)
- if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
- # 在解释性文本末尾添加提示语
- explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
-
- # 使用标准化错误响应
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text=explanation_message
- )), 422 # 修改HTTP状态码为422
- # 如果sql为None但没有解释性文本,返回通用错误
- if sql is None:
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text="无法生成SQL查询,请检查问题描述或数据表结构"
- )), 422
- # 处理返回数据 - 使用新的query_result结构
- query_result = {
- "rows": [],
- "columns": [],
- "row_count": 0,
- "is_limited": False,
- "total_row_count": 0
- }
-
- summary = None
-
- if isinstance(df, pd.DataFrame):
- query_result["columns"] = list(df.columns)
- if not df.empty:
- total_rows = len(df)
- limited_df = df.head(MAX_RETURN_ROWS)
- query_result["rows"] = limited_df.to_dict(orient="records")
- query_result["row_count"] = len(limited_df)
- query_result["total_row_count"] = total_rows
- query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
-
- # 生成数据摘要(可通过配置控制,仅在有数据时生成)
- if ENABLE_RESULT_SUMMARY:
- try:
- summary = vn.generate_summary(question=question, df=df)
- print(f"[INFO] 成功生成摘要: {summary}")
- except Exception as e:
- print(f"[WARNING] 生成摘要失败: {str(e)}")
- summary = None
- # 构建返回数据
- response_data = {
- "sql": sql,
- "query_result": query_result,
- "conversation_id": conversation_id if 'conversation_id' in locals() else None,
- "session_id": browser_session_id
- }
-
- # 添加摘要(如果启用且生成成功)
- if ENABLE_RESULT_SUMMARY and summary is not None:
- response_data["summary"] = summary
- response_data["response"] = summary # 同时添加response字段
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="查询执行完成" if summary is None else None,
- data=response_data
- ))
-
- except Exception as e:
- print(f"[ERROR] ask_full执行失败: {str(e)}")
-
- # 即使发生异常,也检查是否有业务层面的解释
- if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
- # 在解释性文本末尾添加提示语
- explanation_message = vn.last_llm_explanation + "请尝试提问其它问题。"
-
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text=explanation_message
- )), 422
- else:
- # 技术错误,使用500错误码
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="查询处理失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/citu_run_sql', methods=['POST'])
- def citu_run_sql():
- req = request.get_json(force=True)
- sql = req.get('sql')
-
- if not sql:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="缺少必需参数:sql",
- missing_params=["sql"]
- )), 400
-
- try:
- df = vn.run_sql(sql)
-
- # 处理返回数据 - 使用新的query_result结构
- query_result = {
- "rows": [],
- "columns": [],
- "row_count": 0,
- "is_limited": False,
- "total_row_count": 0
- }
-
- if isinstance(df, pd.DataFrame):
- query_result["columns"] = list(df.columns)
- if not df.empty:
- total_rows = len(df)
- limited_df = df.head(MAX_RETURN_ROWS)
- query_result["rows"] = limited_df.to_dict(orient="records")
- query_result["row_count"] = len(limited_df)
- query_result["total_row_count"] = total_rows
- query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
-
- from common.result import success_response
- return jsonify(success_response(
- response_text=f"SQL执行完成,共返回 {query_result['total_row_count']} 条记录" +
- (f",已限制显示前 {MAX_RETURN_ROWS} 条" if query_result["is_limited"] else ""),
- data={
- "sql": sql,
- "query_result": query_result
- }
- ))
-
- except Exception as e:
- print(f"[ERROR] citu_run_sql执行失败: {str(e)}")
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text=f"SQL执行失败,请检查SQL语句是否正确"
- )), 500
- @app.flask_app.route('/api/v0/ask_cached', methods=['POST'])
- def ask_cached():
- """
- 带缓存功能的智能查询接口
- 支持会话管理和结果缓存,提高查询效率
- """
- req = request.get_json(force=True)
- question = req.get("question", None)
- browser_session_id = req.get("session_id", None)
-
- if not question:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="缺少必需参数:question",
- missing_params=["question"]
- )), 400
- try:
- # 生成conversation_id
- # 调试:查看generate_id的实际行为
- print(f"[DEBUG] 输入问题: '{question}'")
- conversation_id = app.cache.generate_id(question=question)
- print(f"[DEBUG] 生成的conversation_id: {conversation_id}")
-
- # 再次用相同问题测试
- conversation_id2 = app.cache.generate_id(question=question)
- print(f"[DEBUG] 再次生成的conversation_id: {conversation_id2}")
- print(f"[DEBUG] 两次ID是否相同: {conversation_id == conversation_id2}")
-
- # 检查缓存
- cached_sql = app.cache.get(id=conversation_id, field="sql")
-
- if cached_sql is not None:
- # 缓存命中
- print(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
- sql = cached_sql
- df = app.cache.get(id=conversation_id, field="df")
- summary = app.cache.get(id=conversation_id, field="summary")
- else:
- # 缓存未命中,执行新查询
- print(f"[CACHE MISS] 执行新查询: {conversation_id}")
-
- sql, df, _ = vn.ask(
- question=question,
- print_results=False,
- visualize=False,
- allow_llm_to_see_data=True
- )
-
- # 检查是否有LLM解释性文本(无法生成SQL的情况)
- if sql is None and hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
- # 在解释性文本末尾添加提示语
- explanation_message = vn.last_llm_explanation + "请尝试用其它方式提问。"
-
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text=explanation_message
- )), 422
-
- # 如果sql为None但没有解释性文本,返回通用错误
- if sql is None:
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text="无法生成SQL查询,请检查问题描述或数据表结构"
- )), 422
-
- # 缓存结果
- app.cache.set(id=conversation_id, field="question", value=question)
- app.cache.set(id=conversation_id, field="sql", value=sql)
- app.cache.set(id=conversation_id, field="df", value=df)
-
- # 生成并缓存摘要(可通过配置控制,仅在有数据时生成)
- summary = None
- if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
- try:
- summary = vn.generate_summary(question=question, df=df)
- print(f"[INFO] 成功生成摘要: {summary}")
- except Exception as e:
- print(f"[WARNING] 生成摘要失败: {str(e)}")
- summary = None
-
- app.cache.set(id=conversation_id, field="summary", value=summary)
- # 处理返回数据 - 使用新的query_result结构
- query_result = {
- "rows": [],
- "columns": [],
- "row_count": 0,
- "is_limited": False,
- "total_row_count": 0
- }
-
- if isinstance(df, pd.DataFrame):
- query_result["columns"] = list(df.columns)
- if not df.empty:
- total_rows = len(df)
- limited_df = df.head(MAX_RETURN_ROWS)
- query_result["rows"] = limited_df.to_dict(orient="records")
- query_result["row_count"] = len(limited_df)
- query_result["total_row_count"] = total_rows
- query_result["is_limited"] = total_rows > MAX_RETURN_ROWS
- # 构建返回数据
- response_data = {
- "sql": sql,
- "query_result": query_result,
- "conversation_id": conversation_id,
- "session_id": browser_session_id,
- "cached": cached_sql is not None # 标识是否来自缓存
- }
-
- # 添加摘要(如果启用且生成成功)
- if ENABLE_RESULT_SUMMARY and summary is not None:
- response_data["summary"] = summary
- response_data["response"] = summary # 同时添加response字段
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="查询执行完成" if summary is None else None,
- data=response_data
- ))
-
- except Exception as e:
- print(f"[ERROR] ask_cached执行失败: {str(e)}")
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="查询处理失败,请稍后重试"
- )), 500
-
- @app.flask_app.route('/api/v0/citu_train_question_sql', methods=['POST'])
- def citu_train_question_sql():
- """
- 训练问题-SQL对接口
-
- 此API将接收的question/sql pair写入到training库中,用于训练和改进AI模型。
- 支持仅传入SQL或同时传入问题和SQL进行训练。
-
- Args:
- question (str, optional): 用户问题
- sql (str, required): 对应的SQL查询语句
-
- Returns:
- JSON: 包含训练ID和成功消息的响应
- """
- try:
- req = request.get_json(force=True)
- question = req.get('question')
- sql = req.get('sql')
-
- if not sql:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="缺少必需参数:sql",
- missing_params=["sql"]
- )), 400
-
- # 正确的调用方式:同时传递question和sql
- if question:
- training_id = vn.train(question=question, sql=sql)
- print(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
- else:
- training_id = vn.train(sql=sql)
- print(f"训练成功,训练ID为:{training_id},SQL:{sql}")
- from common.result import success_response
- return jsonify(success_response(
- response_text="问题-SQL对训练成功",
- data={
- "training_id": training_id,
- "message": "Question-SQL pair trained successfully"
- }
- ))
-
- except Exception as e:
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="训练失败,请稍后重试"
- )), 500
-
- # ============ LangGraph Agent 集成 ============
- # 全局Agent实例(单例模式)
- citu_langraph_agent = None
- def get_citu_langraph_agent():
- """获取LangGraph Agent实例(懒加载)"""
- global citu_langraph_agent
- if citu_langraph_agent is None:
- try:
- from agent.citu_agent import CituLangGraphAgent
- print("[CITU_APP] 开始创建LangGraph Agent实例...")
- citu_langraph_agent = CituLangGraphAgent()
- print("[CITU_APP] LangGraph Agent实例创建成功")
- except ImportError as e:
- print(f"[CRITICAL] Agent模块导入失败: {str(e)}")
- print("[CRITICAL] 请检查agent模块是否存在以及依赖是否正确安装")
- raise Exception(f"Agent模块导入失败: {str(e)}")
- except Exception as e:
- print(f"[CRITICAL] LangGraph Agent实例创建失败: {str(e)}")
- print(f"[CRITICAL] 错误类型: {type(e).__name__}")
- # 提供更有用的错误信息
- if "config" in str(e).lower():
- print("[CRITICAL] 可能是配置文件问题,请检查配置")
- elif "llm" in str(e).lower():
- print("[CRITICAL] 可能是LLM连接问题,请检查LLM配置")
- elif "tool" in str(e).lower():
- print("[CRITICAL] 可能是工具加载问题,请检查工具模块")
- raise Exception(f"Agent初始化失败: {str(e)}")
- return citu_langraph_agent
- @app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
- def ask_agent():
- """
- 支持对话上下文的ask_agent API - 修正版
- """
- req = request.get_json(force=True)
- question = req.get("question", None)
- browser_session_id = req.get("session_id", None)
-
- # 新增参数解析
- user_id_input = req.get("user_id", None)
- conversation_id_input = req.get("conversation_id", None)
- continue_conversation = req.get("continue_conversation", False)
-
- # 新增:路由模式参数解析和验证
- api_routing_mode = req.get("routing_mode", None)
- VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
-
- if not question:
- return jsonify(bad_request_response(
- response_text="缺少必需参数:question",
- missing_params=["question"]
- )), 400
-
- # 验证routing_mode参数
- if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
- return jsonify(bad_request_response(
- response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
- invalid_params=["routing_mode"]
- )), 400
- try:
- # 1. 获取登录用户ID(修正:在函数中获取session信息)
- login_user_id = session.get('user_id') if 'user_id' in session else None
-
- # 2. 智能ID解析(修正:传入登录用户ID)
- user_id = redis_conversation_manager.resolve_user_id(
- user_id_input, browser_session_id, request.remote_addr, login_user_id
- )
- conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(
- user_id, conversation_id_input, continue_conversation
- )
-
- # 3. 获取上下文和上下文类型(提前到缓存检查之前)
- context = redis_conversation_manager.get_context(conversation_id)
-
- # 获取上下文类型:从最后一条助手消息的metadata中获取类型
- context_type = None
- if context:
- try:
- # 获取最后一条助手消息的metadata
- messages = redis_conversation_manager.get_messages(conversation_id, limit=10)
- for message in reversed(messages): # 从最新的开始找
- if message.get("role") == "assistant":
- metadata = message.get("metadata", {})
- context_type = metadata.get("type")
- if context_type:
- print(f"[AGENT_API] 检测到上下文类型: {context_type}")
- break
- except Exception as e:
- print(f"[WARNING] 获取上下文类型失败: {str(e)}")
-
- # 4. 检查缓存(新逻辑:放宽使用条件,严控存储条件)
- cached_answer = redis_conversation_manager.get_cached_answer(question, context)
- if cached_answer:
- print(f"[AGENT_API] 使用缓存答案")
-
- # 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
- cached_response_type = cached_answer.get("type", "UNKNOWN")
- if cached_response_type == "DATABASE":
- # DATABASE类型:按优先级选择内容
- if cached_answer.get("response"):
- # 优先级1:错误或解释性回复(如SQL生成失败)
- assistant_response = cached_answer.get("response")
- elif cached_answer.get("summary"):
- # 优先级2:查询成功的摘要
- assistant_response = cached_answer.get("summary")
- elif cached_answer.get("query_result"):
- # 优先级3:构造简单描述
- query_result = cached_answer.get("query_result")
- row_count = query_result.get("row_count", 0)
- assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
- else:
- # 异常情况
- assistant_response = "数据库查询已处理。"
- else:
- # CHAT类型:直接使用response
- assistant_response = cached_answer.get("response", "")
-
- # 更新对话历史
- redis_conversation_manager.save_message(conversation_id, "user", question)
- redis_conversation_manager.save_message(
- conversation_id, "assistant",
- assistant_response,
- metadata={"from_cache": True}
- )
-
- # 添加对话信息到缓存结果
- cached_answer["conversation_id"] = conversation_id
- cached_answer["user_id"] = user_id
- cached_answer["from_cache"] = True
- cached_answer.update(conversation_status)
-
- # 使用agent_success_response返回标准格式
- return jsonify(agent_success_response(
- response_type=cached_answer.get("type", "UNKNOWN"),
- response=cached_answer.get("response", ""), # 修正:使用response而不是response_text
- sql=cached_answer.get("sql"),
- query_result=cached_answer.get("query_result"),
- summary=cached_answer.get("summary"),
- session_id=browser_session_id,
- execution_path=cached_answer.get("execution_path", []),
- classification_info=cached_answer.get("classification_info", {}),
- conversation_id=conversation_id,
- user_id=user_id,
- is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
- context_used=bool(context),
- from_cache=True,
- conversation_status=conversation_status["status"],
- conversation_message=conversation_status["message"],
- requested_conversation_id=conversation_status.get("requested_id")
- ))
-
- # 5. 保存用户消息
- redis_conversation_manager.save_message(conversation_id, "user", question)
-
- # 6. 构建带上下文的问题
- if context:
- enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
- print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
- else:
- enhanced_question = question
- print(f"[AGENT_API] 新对话,无上下文")
-
- # 7. 确定最终使用的路由模式(优先级逻辑)
- if api_routing_mode:
- # API传了参数,优先使用
- effective_routing_mode = api_routing_mode
- print(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
- else:
- # API没传参数,使用配置文件
- try:
- from app_config import QUESTION_ROUTING_MODE
- effective_routing_mode = QUESTION_ROUTING_MODE
- print(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
- except ImportError:
- effective_routing_mode = "hybrid"
- print(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
-
- # 8. 现有Agent处理逻辑(修改为传递路由模式)
- try:
- agent = get_citu_langraph_agent()
- except Exception as e:
- print(f"[CRITICAL] Agent初始化失败: {str(e)}")
- return jsonify(service_unavailable_response(
- response_text="AI服务暂时不可用,请稍后重试",
- can_retry=True
- )), 503
-
- agent_result = agent.process_question(
- question=enhanced_question, # 使用增强后的问题
- session_id=browser_session_id,
- context_type=context_type, # 传递上下文类型
- routing_mode=effective_routing_mode # 新增:传递路由模式
- )
-
- # 8. 处理Agent结果
- if agent_result.get("success", False):
- # 修正:直接从agent_result获取字段,因为它就是final_response
- response_type = agent_result.get("type", "UNKNOWN")
- response_text = agent_result.get("response", "")
- sql = agent_result.get("sql")
- query_result = agent_result.get("query_result")
- summary = agent_result.get("summary")
- execution_path = agent_result.get("execution_path", [])
- classification_info = agent_result.get("classification_info", {})
-
- # 确定助手回复内容的优先级
- if response_type == "DATABASE":
- # DATABASE类型:按优先级选择内容
- if response_text:
- # 优先级1:错误或解释性回复(如SQL生成失败)
- assistant_response = response_text
- elif summary:
- # 优先级2:查询成功的摘要
- assistant_response = summary
- elif query_result:
- # 优先级3:构造简单描述
- row_count = query_result.get("row_count", 0)
- assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
- else:
- # 异常情况
- assistant_response = "数据库查询已处理。"
- else:
- # CHAT类型:直接使用response
- assistant_response = response_text
-
- # 保存助手回复
- redis_conversation_manager.save_message(
- conversation_id, "assistant", assistant_response,
- metadata={
- "type": response_type,
- "sql": sql,
- "execution_path": execution_path
- }
- )
-
- # 缓存成功的答案(新逻辑:只缓存无上下文的问答)
- # 直接缓存agent_result,它已经包含所有需要的字段
- redis_conversation_manager.cache_answer(question, agent_result, context)
-
- # 使用agent_success_response的正确方式
- return jsonify(agent_success_response(
- response_type=response_type,
- response=response_text, # 修正:使用response而不是response_text
- sql=sql,
- query_result=query_result,
- summary=summary,
- session_id=browser_session_id,
- execution_path=execution_path,
- classification_info=classification_info,
- conversation_id=conversation_id,
- user_id=user_id,
- is_guest_user=(user_id == DEFAULT_ANONYMOUS_USER),
- context_used=bool(context),
- from_cache=False,
- conversation_status=conversation_status["status"],
- conversation_message=conversation_status["message"],
- requested_conversation_id=conversation_status.get("requested_id"),
- routing_mode_used=effective_routing_mode, # 新增:实际使用的路由模式
- routing_mode_source="api" if api_routing_mode else "config" # 新增:路由模式来源
- ))
- else:
- # 错误处理(修正:确保使用现有的错误响应格式)
- error_message = agent_result.get("error", "Agent处理失败")
- error_code = agent_result.get("error_code", 500)
-
- return jsonify(agent_error_response(
- response_text=error_message,
- error_type="agent_processing_failed",
- code=error_code,
- session_id=browser_session_id,
- conversation_id=conversation_id,
- user_id=user_id
- )), error_code
-
- except Exception as e:
- print(f"[ERROR] ask_agent执行失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="查询处理失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/agent_health', methods=['GET'])
- def agent_health():
- """
- Agent健康检查接口
-
- 响应格式:
- {
- "success": true/false,
- "code": 200/503,
- "message": "healthy/degraded/unhealthy",
- "data": {
- "status": "healthy/degraded/unhealthy",
- "test_result": true/false,
- "workflow_compiled": true/false,
- "tools_count": 4,
- "message": "详细信息",
- "timestamp": "2024-01-01T12:00:00",
- "checks": {
- "agent_creation": true/false,
- "tools_import": true/false,
- "llm_connection": true/false,
- "classifier_ready": true/false
- }
- }
- }
- """
- try:
- # 基础健康检查
- health_data = {
- "status": "unknown",
- "test_result": False,
- "workflow_compiled": False,
- "tools_count": 0,
- "message": "",
- "timestamp": datetime.now().isoformat(),
- "checks": {
- "agent_creation": False,
- "tools_import": False,
- "llm_connection": False,
- "classifier_ready": False
- }
- }
-
- # 检查1: Agent创建
- try:
- agent = get_citu_langraph_agent()
- health_data["checks"]["agent_creation"] = True
- health_data["workflow_compiled"] = agent.workflow is not None
- health_data["tools_count"] = len(agent.tools) if hasattr(agent, 'tools') else 0
- except Exception as e:
- health_data["message"] = f"Agent创建失败: {str(e)}"
- from common.result import health_error_response
- return jsonify(health_error_response(
- status="unhealthy",
- **health_data
- )), 503
-
- # 检查2: 工具导入
- try:
- from agent.tools import TOOLS
- health_data["checks"]["tools_import"] = len(TOOLS) > 0
- except Exception as e:
- health_data["message"] = f"工具导入失败: {str(e)}"
-
- # 检查3: LLM连接(简单测试)
- try:
- from agent.utils import get_compatible_llm
- llm = get_compatible_llm()
- health_data["checks"]["llm_connection"] = llm is not None
- except Exception as e:
- health_data["message"] = f"LLM连接失败: {str(e)}"
-
- # 检查4: 分类器准备
- try:
- from agent.classifier import QuestionClassifier
- classifier = QuestionClassifier()
- health_data["checks"]["classifier_ready"] = True
- except Exception as e:
- health_data["message"] = f"分类器失败: {str(e)}"
-
- # 检查5: 完整流程测试(可选)
- try:
- if all(health_data["checks"].values()):
- test_result = agent.health_check()
- health_data["test_result"] = test_result.get("status") == "healthy"
- health_data["status"] = test_result.get("status", "unknown")
- health_data["message"] = test_result.get("message", "健康检查完成")
- else:
- health_data["status"] = "degraded"
- health_data["message"] = "部分组件异常"
- except Exception as e:
- health_data["status"] = "degraded"
- health_data["message"] = f"完整测试失败: {str(e)}"
-
- # 根据状态返回相应的HTTP代码 - 使用标准化健康检查响应
- from common.result import health_success_response, health_error_response
-
- if health_data["status"] == "healthy":
- return jsonify(health_success_response(**health_data))
- elif health_data["status"] == "degraded":
- return jsonify(health_error_response(status="degraded", **health_data)), 503
- else:
- return jsonify(health_error_response(status="unhealthy", **health_data)), 503
-
- except Exception as e:
- print(f"[ERROR] 健康检查异常: {str(e)}")
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="健康检查失败,请稍后重试"
- )), 500
- # ==================== 日常管理API ====================
- @app.flask_app.route('/api/v0/cache_overview', methods=['GET'])
- def cache_overview():
- """日常管理:轻量概览 - 合并原cache_inspect的核心功能"""
- try:
- cache = app.cache
- result_data = {
- 'overview_summary': {
- 'total_conversations': 0,
- 'total_sessions': 0,
- 'query_time': datetime.now().isoformat()
- },
- 'recent_conversations': [], # 最近的对话
- 'session_summary': [] # 会话摘要
- }
-
- if hasattr(cache, 'cache') and isinstance(cache.cache, dict):
- result_data['overview_summary']['total_conversations'] = len(cache.cache)
-
- # 获取会话信息
- if hasattr(cache, 'get_all_sessions'):
- all_sessions = cache.get_all_sessions()
- result_data['overview_summary']['total_sessions'] = len(all_sessions)
-
- # 会话摘要(按最近活动排序)
- session_list = []
- for session_id, session_data in all_sessions.items():
- session_summary = {
- 'session_id': session_id,
- 'start_time': session_data['start_time'].isoformat(),
- 'conversation_count': session_data.get('conversation_count', 0),
- 'duration_seconds': session_data.get('session_duration_seconds', 0),
- 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
- 'is_active': (datetime.now() - session_data.get('last_activity', session_data['start_time'])).total_seconds() < 1800 # 30分钟内活跃
- }
- session_list.append(session_summary)
-
- # 按最后活动时间排序
- session_list.sort(key=lambda x: x['last_activity'], reverse=True)
- result_data['session_summary'] = session_list
-
- # 最近的对话(最多显示10个)
- conversation_list = []
- for conversation_id, conversation_data in cache.cache.items():
- conversation_start_time = cache.conversation_start_times.get(conversation_id)
-
- conversation_info = {
- 'conversation_id': conversation_id,
- 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
- 'session_id': cache.conversation_to_session.get(conversation_id),
- 'has_question': 'question' in conversation_data,
- 'has_sql': 'sql' in conversation_data,
- 'has_data': 'df' in conversation_data and conversation_data['df'] is not None,
- 'question_preview': conversation_data.get('question', '')[:80] + '...' if len(conversation_data.get('question', '')) > 80 else conversation_data.get('question', ''),
- }
-
- # 计算对话持续时间
- if conversation_start_time:
- duration = datetime.now() - conversation_start_time
- conversation_info['conversation_duration_seconds'] = duration.total_seconds()
-
- conversation_list.append(conversation_info)
-
- # 按对话开始时间排序,显示最新的10个
- conversation_list.sort(key=lambda x: x['conversation_start_time'] or '', reverse=True)
- result_data['recent_conversations'] = conversation_list[:10]
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="缓存概览查询完成",
- data=result_data
- ))
-
- except Exception as e:
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="获取缓存概览失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/cache_stats', methods=['GET'])
- def cache_stats():
- """日常管理:统计信息 - 合并原session_stats和cache_stats功能"""
- try:
- cache = app.cache
- current_time = datetime.now()
-
- stats = {
- 'basic_stats': {
- 'total_sessions': len(getattr(cache, 'session_info', {})),
- 'total_conversations': len(getattr(cache, 'cache', {})),
- 'active_sessions': 0, # 最近30分钟有活动
- 'average_conversations_per_session': 0
- },
- 'time_distribution': {
- 'sessions': {
- 'last_1_hour': 0,
- 'last_6_hours': 0,
- 'last_24_hours': 0,
- 'last_7_days': 0,
- 'older': 0
- },
- 'conversations': {
- 'last_1_hour': 0,
- 'last_6_hours': 0,
- 'last_24_hours': 0,
- 'last_7_days': 0,
- 'older': 0
- }
- },
- 'session_details': [],
- 'time_ranges': {
- 'oldest_session': None,
- 'newest_session': None,
- 'oldest_conversation': None,
- 'newest_conversation': None
- }
- }
-
- # 会话统计
- if hasattr(cache, 'session_info'):
- session_times = []
- total_conversations = 0
-
- for session_id, session_data in cache.session_info.items():
- start_time = session_data['start_time']
- session_times.append(start_time)
- conversation_count = len(session_data.get('conversations', []))
- total_conversations += conversation_count
-
- # 检查活跃状态
- last_activity = session_data.get('last_activity', session_data['start_time'])
- if (current_time - last_activity).total_seconds() < 1800:
- stats['basic_stats']['active_sessions'] += 1
-
- # 时间分布统计
- age_hours = (current_time - start_time).total_seconds() / 3600
- if age_hours <= 1:
- stats['time_distribution']['sessions']['last_1_hour'] += 1
- elif age_hours <= 6:
- stats['time_distribution']['sessions']['last_6_hours'] += 1
- elif age_hours <= 24:
- stats['time_distribution']['sessions']['last_24_hours'] += 1
- elif age_hours <= 168: # 7 days
- stats['time_distribution']['sessions']['last_7_days'] += 1
- else:
- stats['time_distribution']['sessions']['older'] += 1
-
- # 会话详细信息
- session_duration = current_time - start_time
- stats['session_details'].append({
- 'session_id': session_id,
- 'start_time': start_time.isoformat(),
- 'last_activity': last_activity.isoformat(),
- 'conversation_count': conversation_count,
- 'duration_seconds': session_duration.total_seconds(),
- 'duration_formatted': str(session_duration),
- 'is_active': (current_time - last_activity).total_seconds() < 1800,
- 'browser_session_id': session_data.get('browser_session_id')
- })
-
- # 计算平均值
- if len(cache.session_info) > 0:
- stats['basic_stats']['average_conversations_per_session'] = total_conversations / len(cache.session_info)
-
- # 时间范围
- if session_times:
- stats['time_ranges']['oldest_session'] = min(session_times).isoformat()
- stats['time_ranges']['newest_session'] = max(session_times).isoformat()
-
- # 对话统计
- if hasattr(cache, 'conversation_start_times'):
- conversation_times = []
- for conv_time in cache.conversation_start_times.values():
- conversation_times.append(conv_time)
- age_hours = (current_time - conv_time).total_seconds() / 3600
-
- if age_hours <= 1:
- stats['time_distribution']['conversations']['last_1_hour'] += 1
- elif age_hours <= 6:
- stats['time_distribution']['conversations']['last_6_hours'] += 1
- elif age_hours <= 24:
- stats['time_distribution']['conversations']['last_24_hours'] += 1
- elif age_hours <= 168:
- stats['time_distribution']['conversations']['last_7_days'] += 1
- else:
- stats['time_distribution']['conversations']['older'] += 1
-
- if conversation_times:
- stats['time_ranges']['oldest_conversation'] = min(conversation_times).isoformat()
- stats['time_ranges']['newest_conversation'] = max(conversation_times).isoformat()
-
- # 按最近活动排序会话详情
- stats['session_details'].sort(key=lambda x: x['last_activity'], reverse=True)
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="缓存统计信息查询完成",
- data=stats
- ))
-
- except Exception as e:
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="获取缓存统计失败,请稍后重试"
- )), 500
- # ==================== 高级功能API ====================
- @app.flask_app.route('/api/v0/cache_export', methods=['GET'])
- def cache_export():
- """高级功能:完整导出 - 保持原cache_raw_export的完整功能"""
- try:
- cache = app.cache
-
- # 验证缓存的实际结构
- if not hasattr(cache, 'cache'):
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="缓存对象结构异常,请联系系统管理员"
- )), 500
-
- if not isinstance(cache.cache, dict):
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="缓存数据类型异常,请联系系统管理员"
- )), 500
-
- # 定义JSON序列化辅助函数
- def make_json_serializable(obj):
- """将对象转换为JSON可序列化的格式"""
- if obj is None:
- return None
- elif isinstance(obj, (str, int, float, bool)):
- return obj
- elif isinstance(obj, (list, tuple)):
- return [make_json_serializable(item) for item in obj]
- elif isinstance(obj, dict):
- return {str(k): make_json_serializable(v) for k, v in obj.items()}
- elif hasattr(obj, 'isoformat'): # datetime objects
- return obj.isoformat()
- elif hasattr(obj, 'item'): # numpy scalars
- return obj.item()
- elif hasattr(obj, 'tolist'): # numpy arrays
- return obj.tolist()
- elif hasattr(obj, '__dict__'): # pandas dtypes and other objects
- return str(obj)
- else:
- return str(obj)
-
- # 获取完整的原始缓存数据
- raw_cache = cache.cache
-
- # 获取会话和对话时间信息
- conversation_times = getattr(cache, 'conversation_start_times', {})
- session_info = getattr(cache, 'session_info', {})
- conversation_to_session = getattr(cache, 'conversation_to_session', {})
-
- export_data = {
- 'export_metadata': {
- 'export_time': datetime.now().isoformat(),
- 'total_conversations': len(raw_cache),
- 'total_sessions': len(session_info),
- 'cache_type': type(cache).__name__,
- 'cache_object_info': str(cache),
- 'has_session_times': bool(session_info),
- 'has_conversation_times': bool(conversation_times)
- },
- 'session_info': {
- session_id: {
- 'start_time': session_data['start_time'].isoformat(),
- 'last_activity': session_data.get('last_activity', session_data['start_time']).isoformat(),
- 'conversations': session_data['conversations'],
- 'conversation_count': len(session_data['conversations']),
- 'browser_session_id': session_data.get('browser_session_id'),
- 'user_info': session_data.get('user_info', {})
- }
- for session_id, session_data in session_info.items()
- },
- 'conversation_times': {
- conversation_id: start_time.isoformat()
- for conversation_id, start_time in conversation_times.items()
- },
- 'conversation_to_session_mapping': conversation_to_session,
- 'conversations': {}
- }
-
- # 处理每个对话的完整数据
- for conversation_id, conversation_data in raw_cache.items():
- # 获取时间信息
- conversation_start_time = conversation_times.get(conversation_id)
- session_id = conversation_to_session.get(conversation_id)
- session_start_time = None
- if session_id and session_id in session_info:
- session_start_time = session_info[session_id]['start_time']
-
- processed_conversation = {
- 'conversation_id': conversation_id,
- 'conversation_start_time': conversation_start_time.isoformat() if conversation_start_time else None,
- 'session_id': session_id,
- 'session_start_time': session_start_time.isoformat() if session_start_time else None,
- 'field_count': len(conversation_data),
- 'fields': {}
- }
-
- # 添加时间计算
- if conversation_start_time:
- conversation_duration = datetime.now() - conversation_start_time
- processed_conversation['conversation_duration_seconds'] = conversation_duration.total_seconds()
- processed_conversation['conversation_duration_formatted'] = str(conversation_duration)
-
- if session_start_time:
- session_duration = datetime.now() - session_start_time
- processed_conversation['session_duration_seconds'] = session_duration.total_seconds()
- processed_conversation['session_duration_formatted'] = str(session_duration)
-
- # 处理每个字段,确保JSON序列化安全
- for field_name, field_value in conversation_data.items():
- field_info = {
- 'field_name': field_name,
- 'data_type': type(field_value).__name__,
- 'is_none': field_value is None
- }
-
- try:
- if field_value is None:
- field_info['value'] = None
-
- elif field_name in ['conversation_start_time', 'session_start_time']:
- # 处理时间字段
- field_info['content'] = make_json_serializable(field_value)
-
- elif field_name == 'df' and field_value is not None:
- # DataFrame的安全处理
- if hasattr(field_value, 'to_dict'):
- # 安全地处理dtypes
- try:
- dtypes_dict = {}
- for col, dtype in field_value.dtypes.items():
- dtypes_dict[col] = str(dtype)
- except Exception:
- dtypes_dict = {"error": "无法序列化dtypes"}
-
- # 安全地处理内存使用
- try:
- memory_usage = field_value.memory_usage(deep=True)
- memory_dict = {}
- for idx, usage in memory_usage.items():
- memory_dict[str(idx)] = int(usage) if hasattr(usage, 'item') else int(usage)
- except Exception:
- memory_dict = {"error": "无法获取内存使用信息"}
-
- field_info.update({
- 'dataframe_info': {
- 'shape': list(field_value.shape),
- 'columns': list(field_value.columns),
- 'dtypes': dtypes_dict,
- 'index_info': {
- 'type': type(field_value.index).__name__,
- 'length': len(field_value.index)
- }
- },
- 'data': make_json_serializable(field_value.to_dict('records')),
- 'memory_usage': memory_dict
- })
- else:
- field_info['value'] = str(field_value)
- field_info['note'] = 'not_standard_dataframe'
-
- elif field_name == 'fig_json':
- # 图表JSON数据处理
- if isinstance(field_value, str):
- try:
- import json
- parsed_fig = json.loads(field_value)
- field_info.update({
- 'json_valid': True,
- 'json_size_bytes': len(field_value),
- 'plotly_structure': {
- 'has_data': 'data' in parsed_fig,
- 'has_layout': 'layout' in parsed_fig,
- 'data_traces_count': len(parsed_fig.get('data', [])),
- },
- 'raw_json': field_value
- })
- except json.JSONDecodeError:
- field_info.update({
- 'json_valid': False,
- 'raw_content': str(field_value)
- })
- else:
- field_info['value'] = make_json_serializable(field_value)
-
- elif field_name == 'followup_questions':
- # 后续问题列表
- field_info.update({
- 'content': make_json_serializable(field_value)
- })
-
- elif field_name in ['question', 'sql', 'summary']:
- # 文本字段
- if isinstance(field_value, str):
- field_info.update({
- 'text_length': len(field_value),
- 'content': field_value
- })
- else:
- field_info['value'] = make_json_serializable(field_value)
-
- else:
- # 未知字段的安全处理
- field_info['content'] = make_json_serializable(field_value)
-
- except Exception as e:
- field_info.update({
- 'processing_error': str(e),
- 'fallback_value': str(field_value)[:500] + '...' if len(str(field_value)) > 500 else str(field_value)
- })
-
- processed_conversation['fields'][field_name] = field_info
-
- export_data['conversations'][conversation_id] = processed_conversation
-
- # 添加缓存统计信息
- field_frequency = {}
- data_types_found = set()
- total_dataframes = 0
- total_questions = 0
-
- for conv_data in export_data['conversations'].values():
- for field_name, field_info in conv_data['fields'].items():
- field_frequency[field_name] = field_frequency.get(field_name, 0) + 1
- data_types_found.add(field_info['data_type'])
-
- if field_name == 'df' and not field_info['is_none']:
- total_dataframes += 1
- if field_name == 'question' and not field_info['is_none']:
- total_questions += 1
-
- export_data['cache_statistics'] = {
- 'field_frequency': field_frequency,
- 'data_types_found': list(data_types_found),
- 'total_dataframes': total_dataframes,
- 'total_questions': total_questions,
- 'has_session_timing': 'session_start_time' in field_frequency,
- 'has_conversation_timing': 'conversation_start_time' in field_frequency
- }
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="缓存数据导出完成",
- data=export_data
- ))
-
- except Exception as e:
- import traceback
- error_details = {
- 'error_message': str(e),
- 'error_type': type(e).__name__,
- 'traceback': traceback.format_exc()
- }
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="导出缓存失败,请稍后重试"
- )), 500
- # ==================== 清理功能API ====================
- @app.flask_app.route('/api/v0/cache_preview_cleanup', methods=['POST'])
- def cache_preview_cleanup():
- """清理功能:预览删除操作 - 保持原功能"""
- try:
- req = request.get_json(force=True)
-
- # 时间条件 - 支持三种方式
- older_than_hours = req.get('older_than_hours')
- older_than_days = req.get('older_than_days')
- before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
-
- cache = app.cache
-
- # 计算截止时间
- cutoff_time = None
- time_condition = None
-
- if older_than_hours:
- cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
- time_condition = f"older_than_hours: {older_than_hours}"
- elif older_than_days:
- cutoff_time = datetime.now() - timedelta(days=older_than_days)
- time_condition = f"older_than_days: {older_than_days}"
- elif before_timestamp:
- try:
- # 支持 YYYY-MM-DD HH:MM:SS 格式
- cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
- time_condition = f"before_timestamp: {before_timestamp}"
- except ValueError:
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
- )), 422
- else:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
- missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
- )), 400
-
- preview = {
- 'time_condition': time_condition,
- 'cutoff_time': cutoff_time.isoformat(),
- 'will_be_removed': {
- 'sessions': []
- },
- 'will_be_kept': {
- 'sessions_count': 0,
- 'conversations_count': 0
- },
- 'summary': {
- 'sessions_to_remove': 0,
- 'conversations_to_remove': 0,
- 'sessions_to_keep': 0,
- 'conversations_to_keep': 0
- }
- }
-
- # 预览按session删除
- sessions_to_remove_count = 0
- conversations_to_remove_count = 0
-
- for session_id, session_data in cache.session_info.items():
- session_preview = {
- 'session_id': session_id,
- 'start_time': session_data['start_time'].isoformat(),
- 'conversation_count': len(session_data['conversations']),
- 'conversations': []
- }
-
- # 添加conversation详情
- for conv_id in session_data['conversations']:
- if conv_id in cache.cache:
- conv_data = cache.cache[conv_id]
- session_preview['conversations'].append({
- 'conversation_id': conv_id,
- 'question': conv_data.get('question', '')[:50] + '...' if conv_data.get('question') else '',
- 'start_time': cache.conversation_start_times.get(conv_id, '').isoformat() if cache.conversation_start_times.get(conv_id) else ''
- })
-
- if session_data['start_time'] < cutoff_time:
- preview['will_be_removed']['sessions'].append(session_preview)
- sessions_to_remove_count += 1
- conversations_to_remove_count += len(session_data['conversations'])
- else:
- preview['will_be_kept']['sessions_count'] += 1
- preview['will_be_kept']['conversations_count'] += len(session_data['conversations'])
-
- # 更新摘要统计
- preview['summary'] = {
- 'sessions_to_remove': sessions_to_remove_count,
- 'conversations_to_remove': conversations_to_remove_count,
- 'sessions_to_keep': preview['will_be_kept']['sessions_count'],
- 'conversations_to_keep': preview['will_be_kept']['conversations_count']
- }
-
- from common.result import success_response
- return jsonify(success_response(
- response_text=f"清理预览完成,将删除 {sessions_to_remove_count} 个会话和 {conversations_to_remove_count} 个对话",
- data=preview
- ))
-
- except Exception as e:
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="预览清理操作失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/cache_cleanup', methods=['POST'])
- def cache_cleanup():
- """清理功能:实际删除缓存 - 保持原功能"""
- try:
- req = request.get_json(force=True)
-
- # 时间条件 - 支持三种方式
- older_than_hours = req.get('older_than_hours')
- older_than_days = req.get('older_than_days')
- before_timestamp = req.get('before_timestamp') # YYYY-MM-DD HH:MM:SS 格式
-
- cache = app.cache
-
- if not hasattr(cache, 'session_info'):
- from common.result import service_unavailable_response
- return jsonify(service_unavailable_response(
- response_text="缓存不支持会话功能"
- )), 503
-
- # 计算截止时间
- cutoff_time = None
- time_condition = None
-
- if older_than_hours:
- cutoff_time = datetime.now() - timedelta(hours=older_than_hours)
- time_condition = f"older_than_hours: {older_than_hours}"
- elif older_than_days:
- cutoff_time = datetime.now() - timedelta(days=older_than_days)
- time_condition = f"older_than_days: {older_than_days}"
- elif before_timestamp:
- try:
- # 支持 YYYY-MM-DD HH:MM:SS 格式
- cutoff_time = datetime.strptime(before_timestamp, '%Y-%m-%d %H:%M:%S')
- time_condition = f"before_timestamp: {before_timestamp}"
- except ValueError:
- from common.result import validation_failed_response
- return jsonify(validation_failed_response(
- response_text="before_timestamp格式错误,请使用 YYYY-MM-DD HH:MM:SS 格式"
- )), 422
- else:
- from common.result import bad_request_response
- return jsonify(bad_request_response(
- response_text="必须提供时间条件:older_than_hours, older_than_days 或 before_timestamp (YYYY-MM-DD HH:MM:SS)",
- missing_params=["older_than_hours", "older_than_days", "before_timestamp"]
- )), 400
-
- cleanup_stats = {
- 'time_condition': time_condition,
- 'cutoff_time': cutoff_time.isoformat(),
- 'sessions_removed': 0,
- 'conversations_removed': 0,
- 'sessions_kept': 0,
- 'conversations_kept': 0,
- 'removed_session_ids': [],
- 'removed_conversation_ids': []
- }
-
- # 按session删除
- sessions_to_remove = []
-
- for session_id, session_data in cache.session_info.items():
- if session_data['start_time'] < cutoff_time:
- sessions_to_remove.append(session_id)
-
- # 删除符合条件的sessions及其所有conversations
- for session_id in sessions_to_remove:
- session_data = cache.session_info[session_id]
- conversations_in_session = session_data['conversations'].copy()
-
- # 删除session中的所有conversations
- for conv_id in conversations_in_session:
- if conv_id in cache.cache:
- del cache.cache[conv_id]
- cleanup_stats['conversations_removed'] += 1
- cleanup_stats['removed_conversation_ids'].append(conv_id)
-
- # 清理conversation相关的时间记录
- if hasattr(cache, 'conversation_start_times') and conv_id in cache.conversation_start_times:
- del cache.conversation_start_times[conv_id]
-
- if hasattr(cache, 'conversation_to_session') and conv_id in cache.conversation_to_session:
- del cache.conversation_to_session[conv_id]
-
- # 删除session记录
- del cache.session_info[session_id]
- cleanup_stats['sessions_removed'] += 1
- cleanup_stats['removed_session_ids'].append(session_id)
-
- # 统计保留的sessions和conversations
- cleanup_stats['sessions_kept'] = len(cache.session_info)
- cleanup_stats['conversations_kept'] = len(cache.cache)
-
- from common.result import success_response
- return jsonify(success_response(
- response_text=f"缓存清理完成,删除了 {cleanup_stats['sessions_removed']} 个会话和 {cleanup_stats['conversations_removed']} 个对话",
- data=cleanup_stats
- ))
-
- except Exception as e:
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="缓存清理失败,请稍后重试"
- )), 500
-
- @app.flask_app.route('/api/v0/training_error_question_sql', methods=['POST'])
- def training_error_question_sql():
- """
- 存储错误的question-sql对到error_sql集合中
-
- 此API将接收的错误question/sql pair写入到error_sql集合中,用于记录和分析错误的SQL查询。
-
- Args:
- question (str, required): 用户问题
- sql (str, required): 对应的错误SQL查询语句
-
- Returns:
- JSON: 包含训练ID和成功消息的响应
- """
- try:
- data = request.get_json()
- question = data.get('question')
- sql = data.get('sql')
-
- print(f"[DEBUG] 接收到错误SQL训练请求: question={question}, sql={sql}")
-
- if not question or not sql:
- from common.result import bad_request_response
- missing_params = []
- if not question:
- missing_params.append("question")
- if not sql:
- missing_params.append("sql")
-
- return jsonify(bad_request_response(
- response_text="question和sql参数都是必需的",
- missing_params=missing_params
- )), 400
-
- # 使用vn实例的train_error_sql方法存储错误SQL
- id = vn.train_error_sql(question=question, sql=sql)
-
- print(f"[INFO] 成功存储错误SQL,ID: {id}")
-
- from common.result import success_response
- return jsonify(success_response(
- response_text="错误SQL对已成功存储",
- data={
- "id": id,
- "message": "错误SQL对已成功存储到error_sql集合"
- }
- ))
-
- except Exception as e:
- print(f"[ERROR] 存储错误SQL失败: {str(e)}")
- from common.result import internal_error_response
- return jsonify(internal_error_response(
- response_text="存储错误SQL失败,请稍后重试"
- )), 500
- # ==================== Redis对话管理API ====================
- @app.flask_app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
- def get_user_conversations(user_id: str):
- """获取用户的对话列表(按时间倒序)"""
- try:
- limit = request.args.get('limit', USER_MAX_CONVERSATIONS, type=int)
- conversations = redis_conversation_manager.get_conversations(user_id, limit)
-
- return jsonify(success_response(
- response_text="获取用户对话列表成功",
- data={
- "user_id": user_id,
- "conversations": conversations,
- "total_count": len(conversations)
- }
- ))
-
- except Exception as e:
- return jsonify(internal_error_response(
- response_text="获取对话列表失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/conversation/<conversation_id>/messages', methods=['GET'])
- def get_conversation_messages(conversation_id: str):
- """获取特定对话的消息历史"""
- try:
- limit = request.args.get('limit', type=int) # 可选参数
- messages = redis_conversation_manager.get_conversation_messages(conversation_id, limit)
- meta = redis_conversation_manager.get_conversation_meta(conversation_id)
-
- return jsonify(success_response(
- response_text="获取对话消息成功",
- data={
- "conversation_id": conversation_id,
- "conversation_meta": meta,
- "messages": messages,
- "message_count": len(messages)
- }
- ))
-
- except Exception as e:
- return jsonify(internal_error_response(
- response_text="获取对话消息失败"
- )), 500
- @app.flask_app.route('/api/v0/conversation/<conversation_id>/context', methods=['GET'])
- def get_conversation_context(conversation_id: str):
- """获取对话上下文(格式化用于LLM)"""
- try:
- count = request.args.get('count', CONVERSATION_CONTEXT_COUNT, type=int)
- context = redis_conversation_manager.get_context(conversation_id, count)
-
- return jsonify(success_response(
- response_text="获取对话上下文成功",
- data={
- "conversation_id": conversation_id,
- "context": context,
- "context_message_count": count
- }
- ))
-
- except Exception as e:
- return jsonify(internal_error_response(
- response_text="获取对话上下文失败"
- )), 500
- @app.flask_app.route('/api/v0/conversation_stats', methods=['GET'])
- def conversation_stats():
- """获取对话系统统计信息"""
- try:
- stats = redis_conversation_manager.get_stats()
-
- return jsonify(success_response(
- response_text="获取统计信息成功",
- data=stats
- ))
-
- except Exception as e:
- return jsonify(internal_error_response(
- response_text="获取统计信息失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/conversation_cleanup', methods=['POST'])
- def conversation_cleanup():
- """手动清理过期对话"""
- try:
- redis_conversation_manager.cleanup_expired_conversations()
-
- return jsonify(success_response(
- response_text="对话清理完成"
- ))
-
- except Exception as e:
- return jsonify(internal_error_response(
- response_text="对话清理失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/user/<user_id>/conversations/full', methods=['GET'])
- def get_user_conversations_with_messages(user_id: str):
- """
- 获取用户的完整对话数据(包含所有消息)
- 一次性返回用户的所有对话和每个对话下的消息历史
-
- Args:
- user_id: 用户ID(路径参数)
- conversation_limit: 对话数量限制(查询参数,可选,不传则返回所有对话)
- message_limit: 每个对话的消息数限制(查询参数,可选,不传则返回所有消息)
-
- Returns:
- 包含用户所有对话和消息的完整数据
- """
- try:
- # 获取可选参数,不传递时使用None(返回所有记录)
- conversation_limit = request.args.get('conversation_limit', type=int)
- message_limit = request.args.get('message_limit', type=int)
-
- # 获取用户的对话列表
- conversations = redis_conversation_manager.get_conversations(user_id, conversation_limit)
-
- # 为每个对话获取消息历史
- full_conversations = []
- total_messages = 0
-
- for conversation in conversations:
- conversation_id = conversation['conversation_id']
-
- # 获取对话消息
- messages = redis_conversation_manager.get_conversation_messages(
- conversation_id, message_limit
- )
-
- # 获取对话元数据
- meta = redis_conversation_manager.get_conversation_meta(conversation_id)
-
- # 组合完整数据
- full_conversation = {
- **conversation, # 基础对话信息
- 'meta': meta, # 对话元数据
- 'messages': messages, # 消息列表
- 'message_count': len(messages)
- }
-
- full_conversations.append(full_conversation)
- total_messages += len(messages)
-
- return jsonify(success_response(
- response_text="获取用户完整对话数据成功",
- data={
- "user_id": user_id,
- "conversations": full_conversations,
- "total_conversations": len(full_conversations),
- "total_messages": total_messages,
- "conversation_limit_applied": conversation_limit,
- "message_limit_applied": message_limit,
- "query_time": datetime.now().isoformat()
- }
- ))
-
- except Exception as e:
- print(f"[ERROR] 获取用户完整对话数据失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="获取用户对话数据失败,请稍后重试"
- )), 500
- # ==================== Embedding缓存管理接口 ====================
- @app.flask_app.route('/api/v0/embedding_cache_stats', methods=['GET'])
- def embedding_cache_stats():
- """获取embedding缓存统计信息"""
- try:
- from common.embedding_cache_manager import get_embedding_cache_manager
-
- cache_manager = get_embedding_cache_manager()
- stats = cache_manager.get_cache_stats()
-
- return jsonify(success_response(
- response_text="获取embedding缓存统计成功",
- data=stats
- ))
-
- except Exception as e:
- print(f"[ERROR] 获取embedding缓存统计失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="获取embedding缓存统计失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/embedding_cache_cleanup', methods=['POST'])
- def embedding_cache_cleanup():
- """清空所有embedding缓存"""
- try:
- from common.embedding_cache_manager import get_embedding_cache_manager
-
- cache_manager = get_embedding_cache_manager()
-
- if not cache_manager.is_available():
- return jsonify(internal_error_response(
- response_text="Embedding缓存功能未启用或不可用"
- )), 400
-
- success = cache_manager.clear_all_cache()
-
- if success:
- return jsonify(success_response(
- response_text="所有embedding缓存已清空",
- data={"cleared": True}
- ))
- else:
- return jsonify(internal_error_response(
- response_text="清空embedding缓存失败"
- )), 500
-
- except Exception as e:
- print(f"[ERROR] 清空embedding缓存失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="清空embedding缓存失败,请稍后重试"
- )), 500
- # ==================== 问答缓存管理接口 ====================
- @app.flask_app.route('/api/v0/qa_cache_stats', methods=['GET'])
- def qa_cache_stats():
- """获取问答缓存统计信息"""
- try:
- stats = redis_conversation_manager.get_qa_cache_stats()
-
- return jsonify(success_response(
- response_text="获取问答缓存统计成功",
- data=stats
- ))
-
- except Exception as e:
- print(f"[ERROR] 获取问答缓存统计失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="获取问答缓存统计失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/qa_cache_list', methods=['GET'])
- def qa_cache_list():
- """获取问答缓存列表(支持分页)"""
- try:
- # 获取分页参数,默认限制50条
- limit = request.args.get('limit', 50, type=int)
-
- # 限制最大返回数量,防止一次性返回过多数据
- if limit > 500:
- limit = 500
- elif limit <= 0:
- limit = 50
-
- cache_list = redis_conversation_manager.get_qa_cache_list(limit)
-
- return jsonify(success_response(
- response_text="获取问答缓存列表成功",
- data={
- "cache_list": cache_list,
- "total_returned": len(cache_list),
- "limit_applied": limit,
- "note": "按缓存时间倒序排列,最新的在前面"
- }
- ))
-
- except Exception as e:
- print(f"[ERROR] 获取问答缓存列表失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="获取问答缓存列表失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/qa_cache_cleanup', methods=['POST'])
- def qa_cache_cleanup():
- """清空所有问答缓存"""
- try:
- if not redis_conversation_manager.is_available():
- return jsonify(internal_error_response(
- response_text="Redis连接不可用,无法执行清理操作"
- )), 500
-
- deleted_count = redis_conversation_manager.clear_all_qa_cache()
-
- return jsonify(success_response(
- response_text="问答缓存清理完成",
- data={
- "deleted_count": deleted_count,
- "cleared": deleted_count > 0,
- "cleanup_time": datetime.now().isoformat()
- }
- ))
-
- except Exception as e:
- print(f"[ERROR] 清空问答缓存失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="清空问答缓存失败,请稍后重试"
- )), 500
- @app.flask_app.route('/api/v0/cache_overview_full', methods=['GET'])
- def cache_overview_full():
- """获取所有缓存系统的综合概览"""
- try:
- from common.embedding_cache_manager import get_embedding_cache_manager
- from common.vanna_instance import get_vanna_instance
-
- # 获取现有的缓存统计
- vanna_cache = get_vanna_instance()
- # 直接使用应用中的缓存实例
- cache = app.cache
-
- cache_overview = {
- "conversation_aware_cache": {
- "enabled": True,
- "total_items": len(cache.cache) if hasattr(cache, 'cache') else 0,
- "sessions": list(cache.cache.keys()) if hasattr(cache, 'cache') else [],
- "cache_type": type(cache).__name__
- },
- "question_answer_cache": redis_conversation_manager.get_qa_cache_stats() if redis_conversation_manager.is_available() else {"available": False},
- "embedding_cache": get_embedding_cache_manager().get_cache_stats(),
- "redis_conversation_stats": redis_conversation_manager.get_stats() if redis_conversation_manager.is_available() else None
- }
-
- return jsonify(success_response(
- response_text="获取综合缓存概览成功",
- data=cache_overview
- ))
-
- except Exception as e:
- print(f"[ERROR] 获取综合缓存概览失败: {str(e)}")
- return jsonify(internal_error_response(
- response_text="获取缓存概览失败,请稍后重试"
- )), 500
- # 前端JavaScript示例 - 如何维持会话
- """
- // 前端需要维护一个会话ID
- class ChatSession {
- constructor() {
- // 从localStorage获取或创建新的会话ID
- this.sessionId = localStorage.getItem('chat_session_id') || this.generateSessionId();
- localStorage.setItem('chat_session_id', this.sessionId);
- }
-
- generateSessionId() {
- return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
- }
-
- async askQuestion(question) {
- const response = await fetch('/api/v0/ask', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- },
- body: JSON.stringify({
- question: question,
- session_id: this.sessionId // 关键:传递会话ID
- })
- });
- return await response.json();
- }
-
- // 开始新会话
- startNewSession() {
- this.sessionId = this.generateSessionId();
- localStorage.setItem('chat_session_id', this.sessionId);
- }
- }
- // 使用示例
- const chatSession = new ChatSession();
- chatSession.askQuestion("各年龄段客户的流失率如何?");
- """
- print("正在启动Flask应用: http://localhost:8084")
- app.run(host="0.0.0.0", port=8084, debug=True)
|