citu_agent.py 54 KB


  1. # agent/citu_agent.py
  2. from typing import Dict, Any, Literal
  3. from langgraph.graph import StateGraph, END
  4. from langchain.agents import AgentExecutor, create_openai_tools_agent
  5. from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
  6. from langchain_core.messages import SystemMessage, HumanMessage
  7. from agent.state import AgentState
  8. from agent.classifier import QuestionClassifier
  9. from agent.tools import TOOLS, generate_sql, execute_sql, generate_summary, general_chat
  10. from agent.tools.utils import get_compatible_llm
  11. from app_config import ENABLE_RESULT_SUMMARY
  12. class CituLangGraphAgent:
  13. """Citu LangGraph智能助手主类 - 使用@tool装饰器 + Agent工具调用"""
  14. def __init__(self):
  15. # 加载配置
  16. try:
  17. from agent.config import get_current_config, get_nested_config
  18. self.config = get_current_config()
  19. print("[CITU_AGENT] 加载Agent配置完成")
  20. except ImportError:
  21. self.config = {}
  22. print("[CITU_AGENT] 配置文件不可用,使用默认配置")
  23. self.classifier = QuestionClassifier()
  24. self.tools = TOOLS
  25. self.llm = get_compatible_llm()
  26. # 注意:现在使用直接工具调用模式,不再需要预创建Agent执行器
  27. print("[CITU_AGENT] 使用直接工具调用模式")
  28. # 不在构造时创建workflow,改为动态创建以支持路由模式参数
  29. # self.workflow = self._create_workflow()
  30. print("[CITU_AGENT] LangGraph Agent with Direct Tools初始化完成")
  31. def _create_workflow(self, routing_mode: str = None) -> StateGraph:
  32. """根据路由模式创建不同的工作流"""
  33. # 确定使用的路由模式
  34. if routing_mode:
  35. QUESTION_ROUTING_MODE = routing_mode
  36. print(f"[CITU_AGENT] 创建工作流,使用传入的路由模式: {QUESTION_ROUTING_MODE}")
  37. else:
  38. try:
  39. from app_config import QUESTION_ROUTING_MODE
  40. print(f"[CITU_AGENT] 创建工作流,使用配置文件路由模式: {QUESTION_ROUTING_MODE}")
  41. except ImportError:
  42. QUESTION_ROUTING_MODE = "hybrid"
  43. print(f"[CITU_AGENT] 配置导入失败,使用默认路由模式: {QUESTION_ROUTING_MODE}")
  44. workflow = StateGraph(AgentState)
  45. # 根据路由模式创建不同的工作流
  46. if QUESTION_ROUTING_MODE == "database_direct":
  47. # 直接数据库模式:跳过分类,直接进入数据库处理(使用新的拆分节点)
  48. workflow.add_node("init_direct_database", self._init_direct_database_node)
  49. workflow.add_node("agent_sql_generation", self._agent_sql_generation_node)
  50. workflow.add_node("agent_sql_execution", self._agent_sql_execution_node)
  51. workflow.add_node("format_response", self._format_response_node)
  52. workflow.set_entry_point("init_direct_database")
  53. # 添加条件路由
  54. workflow.add_edge("init_direct_database", "agent_sql_generation")
  55. workflow.add_conditional_edges(
  56. "agent_sql_generation",
  57. self._route_after_sql_generation,
  58. {
  59. "continue_execution": "agent_sql_execution",
  60. "return_to_user": "format_response"
  61. }
  62. )
  63. workflow.add_edge("agent_sql_execution", "format_response")
  64. workflow.add_edge("format_response", END)
  65. elif QUESTION_ROUTING_MODE == "chat_direct":
  66. # 直接聊天模式:跳过分类,直接进入聊天处理
  67. workflow.add_node("init_direct_chat", self._init_direct_chat_node)
  68. workflow.add_node("agent_chat", self._agent_chat_node)
  69. workflow.add_node("format_response", self._format_response_node)
  70. workflow.set_entry_point("init_direct_chat")
  71. workflow.add_edge("init_direct_chat", "agent_chat")
  72. workflow.add_edge("agent_chat", "format_response")
  73. workflow.add_edge("format_response", END)
  74. else:
  75. # 其他模式(hybrid, llm_only):使用新的拆分工作流
  76. workflow.add_node("classify_question", self._classify_question_node)
  77. workflow.add_node("agent_chat", self._agent_chat_node)
  78. workflow.add_node("agent_sql_generation", self._agent_sql_generation_node)
  79. workflow.add_node("agent_sql_execution", self._agent_sql_execution_node)
  80. workflow.add_node("format_response", self._format_response_node)
  81. workflow.set_entry_point("classify_question")
  82. # 添加条件边:分类后的路由
  83. workflow.add_conditional_edges(
  84. "classify_question",
  85. self._route_after_classification,
  86. {
  87. "DATABASE": "agent_sql_generation",
  88. "CHAT": "agent_chat"
  89. }
  90. )
  91. # 添加条件边:SQL生成后的路由
  92. workflow.add_conditional_edges(
  93. "agent_sql_generation",
  94. self._route_after_sql_generation,
  95. {
  96. "continue_execution": "agent_sql_execution",
  97. "return_to_user": "format_response"
  98. }
  99. )
  100. # 普通边
  101. workflow.add_edge("agent_chat", "format_response")
  102. workflow.add_edge("agent_sql_execution", "format_response")
  103. workflow.add_edge("format_response", END)
  104. return workflow.compile()
  105. def _init_direct_database_node(self, state: AgentState) -> AgentState:
  106. """初始化直接数据库模式的状态"""
  107. try:
  108. # 从state中获取路由模式,而不是从配置文件读取
  109. routing_mode = state.get("routing_mode", "database_direct")
  110. # 设置直接数据库模式的分类状态
  111. state["question_type"] = "DATABASE"
  112. state["classification_confidence"] = 1.0
  113. state["classification_reason"] = "配置为直接数据库查询模式"
  114. state["classification_method"] = "direct_database"
  115. state["routing_mode"] = routing_mode
  116. state["current_step"] = "direct_database_init"
  117. state["execution_path"].append("init_direct_database")
  118. print(f"[DIRECT_DATABASE] 直接数据库模式初始化完成")
  119. return state
  120. except Exception as e:
  121. print(f"[ERROR] 直接数据库模式初始化异常: {str(e)}")
  122. state["error"] = f"直接数据库模式初始化失败: {str(e)}"
  123. state["error_code"] = 500
  124. state["execution_path"].append("init_direct_database_error")
  125. return state
  126. def _init_direct_chat_node(self, state: AgentState) -> AgentState:
  127. """初始化直接聊天模式的状态"""
  128. try:
  129. # 从state中获取路由模式,而不是从配置文件读取
  130. routing_mode = state.get("routing_mode", "chat_direct")
  131. # 设置直接聊天模式的分类状态
  132. state["question_type"] = "CHAT"
  133. state["classification_confidence"] = 1.0
  134. state["classification_reason"] = "配置为直接聊天模式"
  135. state["classification_method"] = "direct_chat"
  136. state["routing_mode"] = routing_mode
  137. state["current_step"] = "direct_chat_init"
  138. state["execution_path"].append("init_direct_chat")
  139. print(f"[DIRECT_CHAT] 直接聊天模式初始化完成")
  140. return state
  141. except Exception as e:
  142. print(f"[ERROR] 直接聊天模式初始化异常: {str(e)}")
  143. state["error"] = f"直接聊天模式初始化失败: {str(e)}"
  144. state["error_code"] = 500
  145. state["execution_path"].append("init_direct_chat_error")
  146. return state
  147. def _classify_question_node(self, state: AgentState) -> AgentState:
  148. """问题分类节点 - 支持渐进式分类策略"""
  149. try:
  150. # 从state中获取路由模式,而不是从配置文件读取
  151. routing_mode = state.get("routing_mode", "hybrid")
  152. print(f"[CLASSIFY_NODE] 开始分类问题: {state['question']}")
  153. # 获取上下文类型(如果有的话)
  154. context_type = state.get("context_type")
  155. if context_type:
  156. print(f"[CLASSIFY_NODE] 检测到上下文类型: {context_type}")
  157. # 使用渐进式分类策略,传递路由模式
  158. classification_result = self.classifier.classify(state["question"], context_type, routing_mode)
  159. # 更新状态
  160. state["question_type"] = classification_result.question_type
  161. state["classification_confidence"] = classification_result.confidence
  162. state["classification_reason"] = classification_result.reason
  163. state["classification_method"] = classification_result.method
  164. state["routing_mode"] = routing_mode
  165. state["current_step"] = "classified"
  166. state["execution_path"].append("classify")
  167. print(f"[CLASSIFY_NODE] 分类结果: {classification_result.question_type}, 置信度: {classification_result.confidence}")
  168. print(f"[CLASSIFY_NODE] 路由模式: {routing_mode}, 分类方法: {classification_result.method}")
  169. return state
  170. except Exception as e:
  171. print(f"[ERROR] 问题分类异常: {str(e)}")
  172. state["error"] = f"问题分类失败: {str(e)}"
  173. state["error_code"] = 500
  174. state["execution_path"].append("classify_error")
  175. return state
  176. async def _agent_sql_generation_node(self, state: AgentState) -> AgentState:
  177. """SQL生成验证节点 - 负责生成SQL、验证SQL和决定路由"""
  178. try:
  179. print(f"[SQL_GENERATION] 开始处理SQL生成和验证: {state['question']}")
  180. question = state["question"]
  181. # 步骤1:生成SQL
  182. print(f"[SQL_GENERATION] 步骤1:生成SQL")
  183. sql_result = generate_sql.invoke({"question": question, "allow_llm_to_see_data": True})
  184. if not sql_result.get("success"):
  185. # SQL生成失败的统一处理
  186. error_message = sql_result.get("error", "")
  187. error_type = sql_result.get("error_type", "")
  188. #print(f"[SQL_GENERATION] SQL生成失败: {error_message}")
  189. print(f"[DEBUG] error_type = '{error_type}'")
  190. # 根据错误类型生成用户提示
  191. if "no relevant tables" in error_message.lower() or "table not found" in error_message.lower():
  192. user_prompt = "数据库中没有相关的表或字段信息,请您提供更多具体信息或修改问题。"
  193. failure_reason = "missing_database_info"
  194. elif "ambiguous" in error_message.lower() or "more information" in error_message.lower():
  195. user_prompt = "您的问题需要更多信息才能准确查询,请提供更详细的描述。"
  196. failure_reason = "ambiguous_question"
  197. elif error_type == "llm_explanation" or error_type == "generation_failed_with_explanation":
  198. # 对于解释性文本,直接设置为聊天响应
  199. state["chat_response"] = error_message + " 请尝试提问其它问题。"
  200. state["sql_generation_success"] = False
  201. state["validation_error_type"] = "llm_explanation"
  202. state["current_step"] = "sql_generation_completed"
  203. state["execution_path"].append("agent_sql_generation")
  204. print(f"[SQL_GENERATION] 返回LLM解释性答案: {error_message}")
  205. return state
  206. else:
  207. user_prompt = "无法生成有效的SQL查询,请尝试重新描述您的问题。"
  208. failure_reason = "unknown_generation_failure"
  209. # 统一返回失败状态
  210. state["sql_generation_success"] = False
  211. state["user_prompt"] = user_prompt
  212. state["validation_error_type"] = failure_reason
  213. state["current_step"] = "sql_generation_failed"
  214. state["execution_path"].append("agent_sql_generation_failed")
  215. print(f"[SQL_GENERATION] 生成失败: {failure_reason} - {user_prompt}")
  216. return state
  217. sql = sql_result.get("sql")
  218. state["sql"] = sql
  219. # 步骤1.5:检查是否为解释性响应而非SQL
  220. error_type = sql_result.get("error_type")
  221. if error_type == "llm_explanation" or error_type == "generation_failed_with_explanation":
  222. # LLM返回了解释性文本,直接作为最终答案
  223. explanation = sql_result.get("error", "")
  224. state["chat_response"] = explanation + " 请尝试提问其它问题。"
  225. state["sql_generation_success"] = False
  226. state["validation_error_type"] = "llm_explanation"
  227. state["current_step"] = "sql_generation_completed"
  228. state["execution_path"].append("agent_sql_generation")
  229. print(f"[SQL_GENERATION] 返回LLM解释性答案: {explanation}")
  230. return state
  231. if sql:
  232. print(f"[SQL_GENERATION] SQL生成成功: {sql}")
  233. else:
  234. print(f"[SQL_GENERATION] SQL为空,但不是解释性响应")
  235. # 这种情况应该很少见,但为了安全起见保留原有的错误处理
  236. return state
  237. # 额外验证:检查SQL格式(防止工具误判)
  238. from agent.tools.utils import _is_valid_sql_format
  239. if not _is_valid_sql_format(sql):
  240. # 内容看起来不是SQL,当作解释性响应处理
  241. state["chat_response"] = sql + " 请尝试提问其它问题。"
  242. state["sql_generation_success"] = False
  243. state["validation_error_type"] = "invalid_sql_format"
  244. state["current_step"] = "sql_generation_completed"
  245. state["execution_path"].append("agent_sql_generation")
  246. print(f"[SQL_GENERATION] 内容不是有效SQL,当作解释返回: {sql}")
  247. return state
  248. # 步骤2:SQL验证(如果启用)
  249. if self._is_sql_validation_enabled():
  250. print(f"[SQL_GENERATION] 步骤2:验证SQL")
  251. validation_result = await self._validate_sql_with_custom_priority(sql)
  252. if not validation_result.get("valid"):
  253. # 验证失败,检查是否可以修复
  254. error_type = validation_result.get("error_type")
  255. error_message = validation_result.get("error_message")
  256. can_repair = validation_result.get("can_repair", False)
  257. print(f"[SQL_GENERATION] SQL验证失败: {error_type} - {error_message}")
  258. if error_type == "forbidden_keywords":
  259. # 禁止词错误,直接失败,不尝试修复
  260. state["sql_generation_success"] = False
  261. state["sql_validation_success"] = False
  262. state["user_prompt"] = error_message
  263. state["validation_error_type"] = "forbidden_keywords"
  264. state["current_step"] = "sql_validation_failed"
  265. state["execution_path"].append("forbidden_keywords_failed")
  266. print(f"[SQL_GENERATION] 禁止词验证失败,直接结束")
  267. return state
  268. elif error_type == "syntax_error" and can_repair and self._is_auto_repair_enabled():
  269. # 语法错误,尝试修复(仅一次)
  270. print(f"[SQL_GENERATION] 尝试修复SQL语法错误(仅一次): {error_message}")
  271. state["sql_repair_attempted"] = True
  272. repair_result = await self._attempt_sql_repair_once(sql, error_message)
  273. if repair_result.get("success"):
  274. # 修复成功
  275. repaired_sql = repair_result.get("repaired_sql")
  276. state["sql"] = repaired_sql
  277. state["sql_generation_success"] = True
  278. state["sql_validation_success"] = True
  279. state["sql_repair_success"] = True
  280. state["current_step"] = "sql_generation_completed"
  281. state["execution_path"].append("sql_repair_success")
  282. print(f"[SQL_GENERATION] SQL修复成功: {repaired_sql}")
  283. return state
  284. else:
  285. # 修复失败,直接结束
  286. repair_error = repair_result.get("error", "修复失败")
  287. print(f"[SQL_GENERATION] SQL修复失败: {repair_error}")
  288. state["sql_generation_success"] = False
  289. state["sql_validation_success"] = False
  290. state["sql_repair_success"] = False
  291. state["user_prompt"] = f"SQL语法修复失败: {repair_error}"
  292. state["validation_error_type"] = "syntax_repair_failed"
  293. state["current_step"] = "sql_repair_failed"
  294. state["execution_path"].append("sql_repair_failed")
  295. return state
  296. else:
  297. # 不启用修复或其他错误类型,直接失败
  298. state["sql_generation_success"] = False
  299. state["sql_validation_success"] = False
  300. state["user_prompt"] = f"SQL验证失败: {error_message}"
  301. state["validation_error_type"] = error_type
  302. state["current_step"] = "sql_validation_failed"
  303. state["execution_path"].append("sql_validation_failed")
  304. print(f"[SQL_GENERATION] SQL验证失败,不尝试修复")
  305. return state
  306. else:
  307. print(f"[SQL_GENERATION] SQL验证通过")
  308. state["sql_validation_success"] = True
  309. else:
  310. print(f"[SQL_GENERATION] 跳过SQL验证(未启用)")
  311. state["sql_validation_success"] = True
  312. # 生成和验证都成功
  313. state["sql_generation_success"] = True
  314. state["current_step"] = "sql_generation_completed"
  315. state["execution_path"].append("agent_sql_generation")
  316. print(f"[SQL_GENERATION] SQL生成验证完成,准备执行")
  317. return state
  318. except Exception as e:
  319. print(f"[ERROR] SQL生成验证节点异常: {str(e)}")
  320. import traceback
  321. print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
  322. state["sql_generation_success"] = False
  323. state["sql_validation_success"] = False
  324. state["user_prompt"] = f"SQL生成验证异常: {str(e)}"
  325. state["validation_error_type"] = "node_exception"
  326. state["current_step"] = "sql_generation_error"
  327. state["execution_path"].append("agent_sql_generation_error")
  328. return state
  329. def _agent_sql_execution_node(self, state: AgentState) -> AgentState:
  330. """SQL执行节点 - 负责执行已验证的SQL和生成摘要"""
  331. try:
  332. print(f"[SQL_EXECUTION] 开始执行SQL: {state.get('sql', 'N/A')}")
  333. sql = state.get("sql")
  334. question = state["question"]
  335. if not sql:
  336. print(f"[SQL_EXECUTION] 没有可执行的SQL")
  337. state["error"] = "没有可执行的SQL语句"
  338. state["error_code"] = 500
  339. state["current_step"] = "sql_execution_error"
  340. state["execution_path"].append("agent_sql_execution_error")
  341. return state
  342. # 步骤1:执行SQL
  343. print(f"[SQL_EXECUTION] 步骤1:执行SQL")
  344. execute_result = execute_sql.invoke({"sql": sql})
  345. if not execute_result.get("success"):
  346. print(f"[SQL_EXECUTION] SQL执行失败: {execute_result.get('error')}")
  347. state["error"] = execute_result.get("error", "SQL执行失败")
  348. state["error_code"] = 500
  349. state["current_step"] = "sql_execution_error"
  350. state["execution_path"].append("agent_sql_execution_error")
  351. return state
  352. query_result = execute_result.get("data_result")
  353. state["query_result"] = query_result
  354. print(f"[SQL_EXECUTION] SQL执行成功,返回 {query_result.get('row_count', 0)} 行数据")
  355. # 步骤2:生成摘要(根据配置和数据情况)
  356. if ENABLE_RESULT_SUMMARY and query_result.get('row_count', 0) > 0:
  357. print(f"[SQL_EXECUTION] 步骤2:生成摘要")
  358. # 重要:提取原始问题用于摘要生成,避免历史记录循环嵌套
  359. original_question = self._extract_original_question(question)
  360. print(f"[SQL_EXECUTION] 原始问题: {original_question}")
  361. summary_result = generate_summary.invoke({
  362. "question": original_question, # 使用原始问题而不是enhanced_question
  363. "query_result": query_result,
  364. "sql": sql
  365. })
  366. if not summary_result.get("success"):
  367. print(f"[SQL_EXECUTION] 摘要生成失败: {summary_result.get('message')}")
  368. # 摘要生成失败不是致命错误,使用默认摘要
  369. state["summary"] = f"查询执行完成,共返回 {query_result.get('row_count', 0)} 条记录。"
  370. else:
  371. state["summary"] = summary_result.get("summary")
  372. print(f"[SQL_EXECUTION] 摘要生成成功")
  373. else:
  374. print(f"[SQL_EXECUTION] 跳过摘要生成(ENABLE_RESULT_SUMMARY={ENABLE_RESULT_SUMMARY},数据行数={query_result.get('row_count', 0)})")
  375. # 不生成摘要时,不设置summary字段,让格式化响应节点决定如何处理
  376. state["current_step"] = "sql_execution_completed"
  377. state["execution_path"].append("agent_sql_execution")
  378. print(f"[SQL_EXECUTION] SQL执行完成")
  379. return state
  380. except Exception as e:
  381. print(f"[ERROR] SQL执行节点异常: {str(e)}")
  382. import traceback
  383. print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
  384. state["error"] = f"SQL执行失败: {str(e)}"
  385. state["error_code"] = 500
  386. state["current_step"] = "sql_execution_error"
  387. state["execution_path"].append("agent_sql_execution_error")
  388. return state
  389. def _agent_database_node(self, state: AgentState) -> AgentState:
  390. """
  391. 数据库Agent节点 - 直接工具调用模式 [已废弃]
  392. 注意:此方法已被拆分为 _agent_sql_generation_node 和 _agent_sql_execution_node
  393. 保留此方法仅为向后兼容,新的工作流使用拆分后的节点
  394. """
  395. try:
  396. print(f"[DATABASE_AGENT] ⚠️ 使用已废弃的database节点,建议使用新的拆分节点")
  397. print(f"[DATABASE_AGENT] 开始处理数据库查询: {state['question']}")
  398. question = state["question"]
  399. # 步骤1:生成SQL
  400. print(f"[DATABASE_AGENT] 步骤1:生成SQL")
  401. sql_result = generate_sql.invoke({"question": question, "allow_llm_to_see_data": True})
  402. if not sql_result.get("success"):
  403. print(f"[DATABASE_AGENT] SQL生成失败: {sql_result.get('error')}")
  404. state["error"] = sql_result.get("error", "SQL生成失败")
  405. state["error_code"] = 500
  406. state["current_step"] = "database_error"
  407. state["execution_path"].append("agent_database_error")
  408. return state
  409. sql = sql_result.get("sql")
  410. state["sql"] = sql
  411. print(f"[DATABASE_AGENT] SQL生成成功: {sql}")
  412. # 步骤1.5:检查是否为解释性响应而非SQL
  413. error_type = sql_result.get("error_type")
  414. if error_type == "llm_explanation":
  415. # LLM返回了解释性文本,直接作为最终答案
  416. explanation = sql_result.get("error", "")
  417. state["chat_response"] = explanation + " 请尝试提问其它问题。"
  418. state["current_step"] = "database_completed"
  419. state["execution_path"].append("agent_database")
  420. print(f"[DATABASE_AGENT] 返回LLM解释性答案: {explanation}")
  421. return state
  422. # 额外验证:检查SQL格式(防止工具误判)
  423. from agent.tools.utils import _is_valid_sql_format
  424. if not _is_valid_sql_format(sql):
  425. # 内容看起来不是SQL,当作解释性响应处理
  426. state["chat_response"] = sql + " 请尝试提问其它问题。"
  427. state["current_step"] = "database_completed"
  428. state["execution_path"].append("agent_database")
  429. print(f"[DATABASE_AGENT] 内容不是有效SQL,当作解释返回: {sql}")
  430. return state
  431. # 步骤2:执行SQL
  432. print(f"[DATABASE_AGENT] 步骤2:执行SQL")
  433. execute_result = execute_sql.invoke({"sql": sql})
  434. if not execute_result.get("success"):
  435. print(f"[DATABASE_AGENT] SQL执行失败: {execute_result.get('error')}")
  436. state["error"] = execute_result.get("error", "SQL执行失败")
  437. state["error_code"] = 500
  438. state["current_step"] = "database_error"
  439. state["execution_path"].append("agent_database_error")
  440. return state
  441. query_result = execute_result.get("data_result")
  442. state["query_result"] = query_result
  443. print(f"[DATABASE_AGENT] SQL执行成功,返回 {query_result.get('row_count', 0)} 行数据")
  444. # 步骤3:生成摘要(可通过配置控制,仅在有数据时生成)
  445. if ENABLE_RESULT_SUMMARY and query_result.get('row_count', 0) > 0:
  446. print(f"[DATABASE_AGENT] 步骤3:生成摘要")
  447. # 重要:提取原始问题用于摘要生成,避免历史记录循环嵌套
  448. original_question = self._extract_original_question(question)
  449. print(f"[DATABASE_AGENT] 原始问题: {original_question}")
  450. summary_result = generate_summary.invoke({
  451. "question": original_question, # 使用原始问题而不是enhanced_question
  452. "query_result": query_result,
  453. "sql": sql
  454. })
  455. if not summary_result.get("success"):
  456. print(f"[DATABASE_AGENT] 摘要生成失败: {summary_result.get('message')}")
  457. # 摘要生成失败不是致命错误,使用默认摘要
  458. state["summary"] = f"查询执行完成,共返回 {query_result.get('row_count', 0)} 条记录。"
  459. else:
  460. state["summary"] = summary_result.get("summary")
  461. print(f"[DATABASE_AGENT] 摘要生成成功")
  462. else:
  463. print(f"[DATABASE_AGENT] 跳过摘要生成(ENABLE_RESULT_SUMMARY={ENABLE_RESULT_SUMMARY},数据行数={query_result.get('row_count', 0)})")
  464. # 不生成摘要时,不设置summary字段,让格式化响应节点决定如何处理
  465. state["current_step"] = "database_completed"
  466. state["execution_path"].append("agent_database")
  467. print(f"[DATABASE_AGENT] 数据库查询完成")
  468. return state
  469. except Exception as e:
  470. print(f"[ERROR] 数据库Agent异常: {str(e)}")
  471. import traceback
  472. print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
  473. state["error"] = f"数据库查询失败: {str(e)}"
  474. state["error_code"] = 500
  475. state["current_step"] = "database_error"
  476. state["execution_path"].append("agent_database_error")
  477. return state
  478. def _agent_chat_node(self, state: AgentState) -> AgentState:
  479. """聊天Agent节点 - 直接工具调用模式"""
  480. try:
  481. print(f"[CHAT_AGENT] 开始处理聊天: {state['question']}")
  482. question = state["question"]
  483. # 构建上下文 - 仅使用真实的对话历史上下文
  484. # 注意:不要将分类原因传递给LLM,那是系统内部的路由信息
  485. enable_context_injection = self.config.get("chat_agent", {}).get("enable_context_injection", True)
  486. context = None
  487. if enable_context_injection:
  488. # TODO: 在这里可以添加真实的对话历史上下文
  489. # 例如从Redis或其他存储中获取最近的对话记录
  490. # context = get_conversation_history(state.get("session_id"))
  491. pass
  492. # 直接调用general_chat工具
  493. print(f"[CHAT_AGENT] 调用general_chat工具")
  494. chat_result = general_chat.invoke({
  495. "question": question,
  496. "context": context
  497. })
  498. if chat_result.get("success"):
  499. state["chat_response"] = chat_result.get("response", "")
  500. print(f"[CHAT_AGENT] 聊天处理成功")
  501. else:
  502. # 处理失败,使用备用响应
  503. state["chat_response"] = chat_result.get("response", "抱歉,我暂时无法处理您的问题。请稍后再试。")
  504. print(f"[CHAT_AGENT] 聊天处理失败,使用备用响应: {chat_result.get('error')}")
  505. state["current_step"] = "chat_completed"
  506. state["execution_path"].append("agent_chat")
  507. print(f"[CHAT_AGENT] 聊天处理完成")
  508. return state
  509. except Exception as e:
  510. print(f"[ERROR] 聊天Agent异常: {str(e)}")
  511. import traceback
  512. print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
  513. state["chat_response"] = "抱歉,我暂时无法处理您的问题。请稍后再试,或者尝试询问数据相关的问题。"
  514. state["current_step"] = "chat_error"
  515. state["execution_path"].append("agent_chat_error")
  516. return state
  517. def _format_response_node(self, state: AgentState) -> AgentState:
  518. """格式化最终响应节点"""
  519. try:
  520. print(f"[FORMAT_NODE] 开始格式化响应,问题类型: {state['question_type']}")
  521. state["current_step"] = "completed"
  522. state["execution_path"].append("format_response")
  523. # 根据问题类型和执行状态格式化响应
  524. if state.get("error"):
  525. # 有错误的情况
  526. state["final_response"] = {
  527. "success": False,
  528. "error": state["error"],
  529. "error_code": state.get("error_code", 500),
  530. "question_type": state["question_type"],
  531. "execution_path": state["execution_path"],
  532. "classification_info": {
  533. "confidence": state.get("classification_confidence", 0),
  534. "reason": state.get("classification_reason", ""),
  535. "method": state.get("classification_method", "")
  536. }
  537. }
  538. elif state["question_type"] == "DATABASE":
  539. # 数据库查询类型
  540. # 处理SQL生成失败的情况
  541. if not state.get("sql_generation_success", True) and state.get("user_prompt"):
  542. state["final_response"] = {
  543. "success": False,
  544. "response": state["user_prompt"],
  545. "type": "DATABASE",
  546. "sql_generation_failed": True,
  547. "validation_error_type": state.get("validation_error_type"),
  548. "sql": state.get("sql"),
  549. "execution_path": state["execution_path"],
  550. "classification_info": {
  551. "confidence": state["classification_confidence"],
  552. "reason": state["classification_reason"],
  553. "method": state["classification_method"]
  554. },
  555. "sql_validation_info": {
  556. "sql_generation_success": state.get("sql_generation_success", False),
  557. "sql_validation_success": state.get("sql_validation_success", False),
  558. "sql_repair_attempted": state.get("sql_repair_attempted", False),
  559. "sql_repair_success": state.get("sql_repair_success", False)
  560. }
  561. }
  562. elif state.get("chat_response"):
  563. # SQL生成失败的解释性响应(不受ENABLE_RESULT_SUMMARY配置影响)
  564. state["final_response"] = {
  565. "success": True,
  566. "response": state["chat_response"],
  567. "type": "DATABASE",
  568. "sql": state.get("sql"),
  569. "query_result": state.get("query_result"), # 保持内部字段名不变
  570. "execution_path": state["execution_path"],
  571. "classification_info": {
  572. "confidence": state["classification_confidence"],
  573. "reason": state["classification_reason"],
  574. "method": state["classification_method"]
  575. }
  576. }
  577. elif state.get("summary"):
  578. # 正常的数据库查询结果,有摘要的情况
  579. # 将summary的值同时赋给response字段(为将来移除summary字段做准备)
  580. state["final_response"] = {
  581. "success": True,
  582. "type": "DATABASE",
  583. "response": state["summary"], # 新增:将summary的值赋给response
  584. "sql": state.get("sql"),
  585. "query_result": state.get("query_result"), # 保持内部字段名不变
  586. "summary": state["summary"], # 暂时保留summary字段
  587. "execution_path": state["execution_path"],
  588. "classification_info": {
  589. "confidence": state["classification_confidence"],
  590. "reason": state["classification_reason"],
  591. "method": state["classification_method"]
  592. }
  593. }
  594. elif state.get("query_result"):
  595. # 有数据但没有摘要(摘要被配置禁用)
  596. query_result = state.get("query_result")
  597. row_count = query_result.get("row_count", 0)
  598. # 构建基本响应,不包含summary字段和response字段
  599. # 用户应该直接从query_result.columns和query_result.rows获取数据
  600. state["final_response"] = {
  601. "success": True,
  602. "type": "DATABASE",
  603. "sql": state.get("sql"),
  604. "query_result": query_result, # 保持内部字段名不变
  605. "execution_path": state["execution_path"],
  606. "classification_info": {
  607. "confidence": state["classification_confidence"],
  608. "reason": state["classification_reason"],
  609. "method": state["classification_method"]
  610. }
  611. }
  612. else:
  613. # 数据库查询失败,没有任何结果
  614. state["final_response"] = {
  615. "success": False,
  616. "error": state.get("error", "数据库查询未完成"),
  617. "type": "DATABASE",
  618. "sql": state.get("sql"),
  619. "execution_path": state["execution_path"]
  620. }
  621. else:
  622. # 聊天类型
  623. state["final_response"] = {
  624. "success": True,
  625. "response": state.get("chat_response", ""),
  626. "type": "CHAT",
  627. "execution_path": state["execution_path"],
  628. "classification_info": {
  629. "confidence": state["classification_confidence"],
  630. "reason": state["classification_reason"],
  631. "method": state["classification_method"]
  632. }
  633. }
  634. print(f"[FORMAT_NODE] 响应格式化完成")
  635. return state
  636. except Exception as e:
  637. print(f"[ERROR] 响应格式化异常: {str(e)}")
  638. state["final_response"] = {
  639. "success": False,
  640. "error": f"响应格式化异常: {str(e)}",
  641. "error_code": 500,
  642. "execution_path": state["execution_path"]
  643. }
  644. return state
  645. def _route_after_sql_generation(self, state: AgentState) -> Literal["continue_execution", "return_to_user"]:
  646. """
  647. SQL生成后的路由决策
  648. 根据SQL生成和验证的结果决定后续流向:
  649. - SQL生成验证成功 → 继续执行SQL
  650. - SQL生成验证失败 → 直接返回用户提示
  651. """
  652. sql_generation_success = state.get("sql_generation_success", False)
  653. print(f"[ROUTE] SQL生成路由: success={sql_generation_success}")
  654. if sql_generation_success:
  655. return "continue_execution" # 路由到SQL执行节点
  656. else:
  657. return "return_to_user" # 路由到format_response,结束流程
  658. def _route_after_classification(self, state: AgentState) -> Literal["DATABASE", "CHAT"]:
  659. """
  660. 分类后的路由决策
  661. 完全信任QuestionClassifier的决策:
  662. - DATABASE类型 → 数据库Agent
  663. - CHAT和UNCERTAIN类型 → 聊天Agent
  664. 这样避免了双重决策的冲突,所有分类逻辑都集中在QuestionClassifier中
  665. """
  666. question_type = state["question_type"]
  667. confidence = state["classification_confidence"]
  668. print(f"[ROUTE] 分类路由: {question_type}, 置信度: {confidence} (完全信任分类器决策)")
  669. if question_type == "DATABASE":
  670. return "DATABASE"
  671. else:
  672. # 将 "CHAT" 和 "UNCERTAIN" 类型都路由到聊天流程
  673. # 聊天Agent可以处理不确定的情况,并在必要时引导用户提供更多信息
  674. return "CHAT"
  675. async def process_question(self, question: str, session_id: str = None, context_type: str = None, routing_mode: str = None) -> Dict[str, Any]:
  676. """
  677. 统一的问题处理入口
  678. Args:
  679. question: 用户问题
  680. session_id: 会话ID
  681. context_type: 上下文类型 ("DATABASE" 或 "CHAT"),用于渐进式分类
  682. routing_mode: 路由模式,可选,用于覆盖配置文件设置
  683. Returns:
  684. Dict包含完整的处理结果
  685. """
  686. try:
  687. print(f"[CITU_AGENT] 开始处理问题: {question}")
  688. if context_type:
  689. print(f"[CITU_AGENT] 上下文类型: {context_type}")
  690. if routing_mode:
  691. print(f"[CITU_AGENT] 使用指定路由模式: {routing_mode}")
  692. # 动态创建workflow(基于路由模式)
  693. workflow = self._create_workflow(routing_mode)
  694. # 初始化状态
  695. initial_state = self._create_initial_state(question, session_id, context_type, routing_mode)
  696. # 执行工作流
  697. final_state = await workflow.ainvoke(
  698. initial_state,
  699. config={
  700. "configurable": {"session_id": session_id}
  701. } if session_id else None
  702. )
  703. # 提取最终结果
  704. result = final_state["final_response"]
  705. print(f"[CITU_AGENT] 问题处理完成: {result.get('success', False)}")
  706. return result
  707. except Exception as e:
  708. print(f"[ERROR] Agent执行异常: {str(e)}")
  709. return {
  710. "success": False,
  711. "error": f"Agent系统异常: {str(e)}",
  712. "error_code": 500,
  713. "execution_path": ["error"]
  714. }
  715. def _create_initial_state(self, question: str, session_id: str = None, context_type: str = None, routing_mode: str = None) -> AgentState:
  716. """创建初始状态 - 支持渐进式分类"""
  717. # 确定使用的路由模式
  718. if routing_mode:
  719. effective_routing_mode = routing_mode
  720. else:
  721. try:
  722. from app_config import QUESTION_ROUTING_MODE
  723. effective_routing_mode = QUESTION_ROUTING_MODE
  724. except ImportError:
  725. effective_routing_mode = "hybrid"
  726. return AgentState(
  727. # 输入信息
  728. question=question,
  729. session_id=session_id,
  730. # 上下文信息
  731. context_type=context_type,
  732. # 分类结果 (初始值,会在分类节点或直接模式初始化节点中更新)
  733. question_type="UNCERTAIN",
  734. classification_confidence=0.0,
  735. classification_reason="",
  736. classification_method="",
  737. # 数据库查询流程状态
  738. sql=None,
  739. sql_generation_attempts=0,
  740. query_result=None,
  741. summary=None,
  742. # SQL验证和修复相关状态
  743. sql_generation_success=False,
  744. sql_validation_success=False,
  745. sql_repair_attempted=False,
  746. sql_repair_success=False,
  747. validation_error_type=None,
  748. user_prompt=None,
  749. # 聊天响应
  750. chat_response=None,
  751. # 最终输出
  752. final_response={},
  753. # 错误处理
  754. error=None,
  755. error_code=None,
  756. # 流程控制
  757. current_step="initialized",
  758. execution_path=["start"],
  759. retry_count=0,
  760. max_retries=3,
  761. # 调试信息
  762. debug_info={},
  763. # 路由模式
  764. routing_mode=effective_routing_mode
  765. )
  766. # ==================== SQL验证和修复相关方法 ====================
  767. def _is_sql_validation_enabled(self) -> bool:
  768. """检查是否启用SQL验证"""
  769. from agent.config import get_nested_config
  770. return (get_nested_config(self.config, "sql_validation.enable_syntax_validation", False) or
  771. get_nested_config(self.config, "sql_validation.enable_forbidden_check", False))
  772. def _is_auto_repair_enabled(self) -> bool:
  773. """检查是否启用自动修复"""
  774. from agent.config import get_nested_config
  775. return (get_nested_config(self.config, "sql_validation.enable_auto_repair", False) and
  776. get_nested_config(self.config, "sql_validation.enable_syntax_validation", False))
  777. async def _validate_sql_with_custom_priority(self, sql: str) -> Dict[str, Any]:
  778. """
  779. 按照自定义优先级验证SQL:先禁止词,再语法
  780. Args:
  781. sql: 要验证的SQL语句
  782. Returns:
  783. 验证结果字典
  784. """
  785. try:
  786. from agent.config import get_nested_config
  787. # 1. 优先检查禁止词(您要求的优先级)
  788. if get_nested_config(self.config, "sql_validation.enable_forbidden_check", True):
  789. forbidden_result = self._check_forbidden_keywords(sql)
  790. if not forbidden_result.get("valid"):
  791. return {
  792. "valid": False,
  793. "error_type": "forbidden_keywords",
  794. "error_message": forbidden_result.get("error"),
  795. "can_repair": False # 禁止词错误不能修复
  796. }
  797. # 2. 再检查语法(EXPLAIN SQL)
  798. if get_nested_config(self.config, "sql_validation.enable_syntax_validation", True):
  799. syntax_result = await self._validate_sql_syntax(sql)
  800. if not syntax_result.get("valid"):
  801. return {
  802. "valid": False,
  803. "error_type": "syntax_error",
  804. "error_message": syntax_result.get("error"),
  805. "can_repair": True # 语法错误可以尝试修复
  806. }
  807. return {"valid": True}
  808. except Exception as e:
  809. return {
  810. "valid": False,
  811. "error_type": "validation_exception",
  812. "error_message": str(e),
  813. "can_repair": False
  814. }
  815. def _check_forbidden_keywords(self, sql: str) -> Dict[str, Any]:
  816. """检查禁止的SQL关键词"""
  817. try:
  818. from agent.config import get_nested_config
  819. forbidden_operations = get_nested_config(
  820. self.config,
  821. "sql_validation.forbidden_operations",
  822. ['UPDATE', 'DELETE', 'DROP', 'ALTER', 'INSERT']
  823. )
  824. sql_upper = sql.upper().strip()
  825. for operation in forbidden_operations:
  826. if sql_upper.startswith(operation.upper()):
  827. return {
  828. "valid": False,
  829. "error": f"不允许的操作: {operation}。本系统只支持查询操作(SELECT)。"
  830. }
  831. return {"valid": True}
  832. except Exception as e:
  833. return {
  834. "valid": False,
  835. "error": f"禁止词检查异常: {str(e)}"
  836. }
  837. async def _validate_sql_syntax(self, sql: str) -> Dict[str, Any]:
  838. """语法验证 - 使用EXPLAIN SQL"""
  839. try:
  840. from common.vanna_instance import get_vanna_instance
  841. import asyncio
  842. vn = get_vanna_instance()
  843. # 构建EXPLAIN查询
  844. explain_sql = f"EXPLAIN {sql}"
  845. # 异步执行验证
  846. result = await asyncio.to_thread(vn.run_sql, explain_sql)
  847. if result is not None:
  848. return {"valid": True}
  849. else:
  850. return {
  851. "valid": False,
  852. "error": "SQL语法验证失败"
  853. }
  854. except Exception as e:
  855. return {
  856. "valid": False,
  857. "error": str(e)
  858. }
  859. async def _attempt_sql_repair_once(self, sql: str, error_message: str) -> Dict[str, Any]:
  860. """
  861. 使用LLM尝试修复SQL - 只修复一次
  862. Args:
  863. sql: 原始SQL
  864. error_message: 错误信息
  865. Returns:
  866. 修复结果字典
  867. """
  868. try:
  869. from common.vanna_instance import get_vanna_instance
  870. from agent.config import get_nested_config
  871. import asyncio
  872. vn = get_vanna_instance()
  873. # 构建修复提示词
  874. repair_prompt = f"""你是一个PostgreSQL SQL专家,请修复以下SQL语句的语法错误。
  875. 当前数据库类型: PostgreSQL
  876. 错误信息: {error_message}
  877. 需要修复的SQL:
  878. {sql}
  879. 修复要求:
  880. 1. 只修复语法错误和表结构错误
  881. 2. 保持SQL的原始业务逻辑不变
  882. 3. 使用PostgreSQL标准语法
  883. 4. 确保修复后的SQL语法正确
  884. 请直接输出修复后的SQL语句,不要添加其他说明文字。"""
  885. # 获取超时配置
  886. timeout = get_nested_config(self.config, "sql_validation.repair_timeout", 60)
  887. # 异步调用LLM修复
  888. response = await asyncio.wait_for(
  889. asyncio.to_thread(
  890. vn.chat_with_llm,
  891. question=repair_prompt,
  892. system_prompt="你是一个专业的PostgreSQL SQL专家,专门负责修复SQL语句中的语法错误。"
  893. ),
  894. timeout=timeout
  895. )
  896. if response and response.strip():
  897. repaired_sql = response.strip()
  898. # 验证修复后的SQL
  899. validation_result = await self._validate_sql_syntax(repaired_sql)
  900. if validation_result.get("valid"):
  901. return {
  902. "success": True,
  903. "repaired_sql": repaired_sql,
  904. "error": None
  905. }
  906. else:
  907. return {
  908. "success": False,
  909. "repaired_sql": None,
  910. "error": f"修复后的SQL仍然无效: {validation_result.get('error')}"
  911. }
  912. else:
  913. return {
  914. "success": False,
  915. "repaired_sql": None,
  916. "error": "LLM返回空响应"
  917. }
  918. except asyncio.TimeoutError:
  919. return {
  920. "success": False,
  921. "repaired_sql": None,
  922. "error": f"修复超时({get_nested_config(self.config, 'sql_validation.repair_timeout', 60)}秒)"
  923. }
  924. except Exception as e:
  925. return {
  926. "success": False,
  927. "repaired_sql": None,
  928. "error": f"修复异常: {str(e)}"
  929. }
  930. # ==================== 原有方法 ====================
  931. def _extract_original_question(self, question: str) -> str:
  932. """
  933. 从enhanced_question中提取原始问题
  934. Args:
  935. question: 可能包含上下文的问题
  936. Returns:
  937. str: 原始问题
  938. """
  939. try:
  940. # 检查是否为enhanced_question格式
  941. if "\n[CONTEXT]\n" in question and "\n[CURRENT]\n" in question:
  942. # 提取[CURRENT]标签后的内容
  943. current_start = question.find("\n[CURRENT]\n")
  944. if current_start != -1:
  945. original_question = question[current_start + len("\n[CURRENT]\n"):].strip()
  946. return original_question
  947. # 如果不是enhanced_question格式,直接返回原问题
  948. return question.strip()
  949. except Exception as e:
  950. print(f"[WARNING] 提取原始问题失败: {str(e)}")
  951. return question.strip()
  952. async def health_check(self) -> Dict[str, Any]:
  953. """健康检查"""
  954. try:
  955. # 从配置获取健康检查参数
  956. from agent.config import get_nested_config
  957. test_question = get_nested_config(self.config, "health_check.test_question", "你好")
  958. enable_full_test = get_nested_config(self.config, "health_check.enable_full_test", True)
  959. if enable_full_test:
  960. # 完整流程测试
  961. test_result = await self.process_question(test_question, "health_check")
  962. return {
  963. "status": "healthy" if test_result.get("success") else "degraded",
  964. "test_result": test_result.get("success", False),
  965. "workflow_compiled": True, # 动态创建,始终可用
  966. "tools_count": len(self.tools),
  967. "agent_reuse_enabled": False,
  968. "message": "Agent健康检查完成"
  969. }
  970. else:
  971. # 简单检查
  972. return {
  973. "status": "healthy",
  974. "test_result": True,
  975. "workflow_compiled": True, # 动态创建,始终可用
  976. "tools_count": len(self.tools),
  977. "agent_reuse_enabled": False,
  978. "message": "Agent简单健康检查完成"
  979. }
  980. except Exception as e:
  981. return {
  982. "status": "unhealthy",
  983. "error": str(e),
  984. "workflow_compiled": True, # 动态创建,始终可用
  985. "tools_count": len(self.tools) if hasattr(self, 'tools') else 0,
  986. "agent_reuse_enabled": False,
  987. "message": "Agent健康检查失败"
  988. }