citu_agent.py 54 KB


  1. # agent/citu_agent.py
  2. from typing import Dict, Any, Literal
  3. from langgraph.graph import StateGraph, END
  4. from langchain.agents import AgentExecutor, create_openai_tools_agent
  5. from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
  6. from langchain_core.messages import SystemMessage, HumanMessage
  7. from core.logging import get_agent_logger
  8. from agent.state import AgentState
  9. from agent.classifier import QuestionClassifier
  10. from agent.tools import TOOLS, generate_sql, execute_sql, generate_summary, general_chat
  11. from agent.tools.utils import get_compatible_llm
  12. from app_config import ENABLE_RESULT_SUMMARY
  13. class CituLangGraphAgent:
  14. """Citu LangGraph智能助手主类 - 使用@tool装饰器 + Agent工具调用"""
  15. def __init__(self):
  16. # 初始化日志
  17. self.logger = get_agent_logger("CituAgent")
  18. # 加载配置
  19. try:
  20. from agent.config import get_current_config, get_nested_config
  21. self.config = get_current_config()
  22. self.logger.info("加载Agent配置完成")
  23. except ImportError:
  24. self.config = {}
  25. self.logger.warning("配置文件不可用,使用默认配置")
  26. self.classifier = QuestionClassifier()
  27. self.tools = TOOLS
  28. self.llm = get_compatible_llm()
  29. # 注意:现在使用直接工具调用模式,不再需要预创建Agent执行器
  30. self.logger.info("使用直接工具调用模式")
  31. # 不在构造时创建workflow,改为动态创建以支持路由模式参数
  32. # self.workflow = self._create_workflow()
  33. self.logger.info("LangGraph Agent with Direct Tools初始化完成")
  34. def _create_workflow(self, routing_mode: str = None) -> StateGraph:
  35. """根据路由模式创建不同的工作流"""
  36. self.logger.info(f"🏗️ [WORKFLOW] 动态创建workflow被调用")
  37. # 确定使用的路由模式
  38. if routing_mode:
  39. QUESTION_ROUTING_MODE = routing_mode
  40. self.logger.info(f"使用传入的路由模式: {QUESTION_ROUTING_MODE}")
  41. else:
  42. try:
  43. from app_config import QUESTION_ROUTING_MODE
  44. self.logger.info(f"使用配置文件路由模式: {QUESTION_ROUTING_MODE}")
  45. except ImportError:
  46. QUESTION_ROUTING_MODE = "hybrid"
  47. self.logger.warning(f"配置导入失败,使用默认路由模式: {QUESTION_ROUTING_MODE}")
  48. workflow = StateGraph(AgentState)
  49. # 根据路由模式创建不同的工作流
  50. if QUESTION_ROUTING_MODE == "database_direct":
  51. # 直接数据库模式:跳过分类,直接进入数据库处理(使用新的拆分节点)
  52. workflow.add_node("init_direct_database", self._init_direct_database_node)
  53. workflow.add_node("agent_sql_generation", self._agent_sql_generation_node)
  54. workflow.add_node("agent_sql_execution", self._agent_sql_execution_node)
  55. workflow.add_node("format_response", self._format_response_node)
  56. workflow.set_entry_point("init_direct_database")
  57. # 添加条件路由
  58. workflow.add_edge("init_direct_database", "agent_sql_generation")
  59. workflow.add_conditional_edges(
  60. "agent_sql_generation",
  61. self._route_after_sql_generation,
  62. {
  63. "continue_execution": "agent_sql_execution",
  64. "return_to_user": "format_response"
  65. }
  66. )
  67. workflow.add_edge("agent_sql_execution", "format_response")
  68. workflow.add_edge("format_response", END)
  69. elif QUESTION_ROUTING_MODE == "chat_direct":
  70. # 直接聊天模式:跳过分类,直接进入聊天处理
  71. workflow.add_node("init_direct_chat", self._init_direct_chat_node)
  72. workflow.add_node("agent_chat", self._agent_chat_node)
  73. workflow.add_node("format_response", self._format_response_node)
  74. workflow.set_entry_point("init_direct_chat")
  75. workflow.add_edge("init_direct_chat", "agent_chat")
  76. workflow.add_edge("agent_chat", "format_response")
  77. workflow.add_edge("format_response", END)
  78. else:
  79. self.logger.info(f"🧠 [WORKFLOW] 构建hybrid模式的workflow...")
  80. # 其他模式(hybrid, llm_only):使用新的拆分工作流
  81. workflow.add_node("classify_question", self._classify_question_node)
  82. workflow.add_node("agent_chat", self._agent_chat_node)
  83. workflow.add_node("agent_sql_generation", self._agent_sql_generation_node)
  84. workflow.add_node("agent_sql_execution", self._agent_sql_execution_node)
  85. workflow.add_node("format_response", self._format_response_node)
  86. workflow.set_entry_point("classify_question")
  87. # 添加条件边:分类后的路由
  88. workflow.add_conditional_edges(
  89. "classify_question",
  90. self._route_after_classification,
  91. {
  92. "DATABASE": "agent_sql_generation",
  93. "CHAT": "agent_chat"
  94. }
  95. )
  96. # 添加条件边:SQL生成后的路由
  97. workflow.add_conditional_edges(
  98. "agent_sql_generation",
  99. self._route_after_sql_generation,
  100. {
  101. "continue_execution": "agent_sql_execution",
  102. "return_to_user": "format_response"
  103. }
  104. )
  105. # 普通边
  106. workflow.add_edge("agent_chat", "format_response")
  107. workflow.add_edge("agent_sql_execution", "format_response")
  108. workflow.add_edge("format_response", END)
  109. return workflow.compile()
  110. def _init_direct_database_node(self, state: AgentState) -> AgentState:
  111. """初始化直接数据库模式的状态"""
  112. try:
  113. # 从state中获取路由模式,而不是从配置文件读取
  114. routing_mode = state.get("routing_mode", "database_direct")
  115. # 设置直接数据库模式的分类状态
  116. state["question_type"] = "DATABASE"
  117. state["classification_confidence"] = 1.0
  118. state["classification_reason"] = "配置为直接数据库查询模式"
  119. state["classification_method"] = "direct_database"
  120. state["routing_mode"] = routing_mode
  121. state["current_step"] = "direct_database_init"
  122. state["execution_path"].append("init_direct_database")
  123. self.logger.info("直接数据库模式初始化完成")
  124. return state
  125. except Exception as e:
  126. self.logger.error(f"直接数据库模式初始化异常: {str(e)}")
  127. state["error"] = f"直接数据库模式初始化失败: {str(e)}"
  128. state["error_code"] = 500
  129. state["execution_path"].append("init_direct_database_error")
  130. return state
  131. def _init_direct_chat_node(self, state: AgentState) -> AgentState:
  132. """初始化直接聊天模式的状态"""
  133. try:
  134. # 从state中获取路由模式,而不是从配置文件读取
  135. routing_mode = state.get("routing_mode", "chat_direct")
  136. # 设置直接聊天模式的分类状态
  137. state["question_type"] = "CHAT"
  138. state["classification_confidence"] = 1.0
  139. state["classification_reason"] = "配置为直接聊天模式"
  140. state["classification_method"] = "direct_chat"
  141. state["routing_mode"] = routing_mode
  142. state["current_step"] = "direct_chat_init"
  143. state["execution_path"].append("init_direct_chat")
  144. self.logger.info("直接聊天模式初始化完成")
  145. return state
  146. except Exception as e:
  147. self.logger.error(f"直接聊天模式初始化异常: {str(e)}")
  148. state["error"] = f"直接聊天模式初始化失败: {str(e)}"
  149. state["error_code"] = 500
  150. state["execution_path"].append("init_direct_chat_error")
  151. return state
  152. def _classify_question_node(self, state: AgentState) -> AgentState:
  153. """问题分类节点 - 支持渐进式分类策略"""
  154. try:
  155. # 从state中获取路由模式,而不是从配置文件读取
  156. routing_mode = state.get("routing_mode", "hybrid")
  157. self.logger.info(f"开始分类问题: {state['question']}")
  158. # 获取上下文类型(如果有的话)
  159. context_type = state.get("context_type")
  160. if context_type:
  161. self.logger.info(f"检测到上下文类型: {context_type}")
  162. # 使用渐进式分类策略,传递路由模式
  163. classification_result = self.classifier.classify(state["question"], context_type, routing_mode)
  164. # 更新状态
  165. state["question_type"] = classification_result.question_type
  166. state["classification_confidence"] = classification_result.confidence
  167. state["classification_reason"] = classification_result.reason
  168. state["classification_method"] = classification_result.method
  169. state["routing_mode"] = routing_mode
  170. state["current_step"] = "classified"
  171. state["execution_path"].append("classify")
  172. self.logger.info(f"分类结果: {classification_result.question_type}, 置信度: {classification_result.confidence}")
  173. self.logger.info(f"路由模式: {routing_mode}, 分类方法: {classification_result.method}")
  174. return state
  175. except Exception as e:
  176. self.logger.error(f"问题分类异常: {str(e)}")
  177. state["error"] = f"问题分类失败: {str(e)}"
  178. state["error_code"] = 500
  179. state["execution_path"].append("classify_error")
  180. return state
  181. async def _agent_sql_generation_node(self, state: AgentState) -> AgentState:
  182. """SQL生成验证节点 - 负责生成SQL、验证SQL和决定路由"""
  183. try:
  184. self.logger.info(f"开始处理SQL生成和验证: {state['question']}")
  185. question = state["question"]
  186. # 步骤1:生成SQL
  187. self.logger.info("步骤1:生成SQL")
  188. sql_result = generate_sql.invoke({"question": question, "allow_llm_to_see_data": True})
  189. if not sql_result.get("success"):
  190. # SQL生成失败的统一处理
  191. error_message = sql_result.get("error", "")
  192. error_type = sql_result.get("error_type", "")
  193. self.logger.debug(f"error_type = '{error_type}'")
  194. # 根据错误类型生成用户提示
  195. if "no relevant tables" in error_message.lower() or "table not found" in error_message.lower():
  196. user_prompt = "数据库中没有相关的表或字段信息,请您提供更多具体信息或修改问题。"
  197. failure_reason = "missing_database_info"
  198. elif "ambiguous" in error_message.lower() or "more information" in error_message.lower():
  199. user_prompt = "您的问题需要更多信息才能准确查询,请提供更详细的描述。"
  200. failure_reason = "ambiguous_question"
  201. elif error_type == "llm_explanation" or error_type == "generation_failed_with_explanation":
  202. # 对于解释性文本,直接设置为聊天响应
  203. state["chat_response"] = error_message + " 请尝试提问其它问题。"
  204. state["sql_generation_success"] = False
  205. state["validation_error_type"] = "llm_explanation"
  206. state["current_step"] = "sql_generation_completed"
  207. state["execution_path"].append("agent_sql_generation")
  208. self.logger.info(f"返回LLM解释性答案: {error_message}")
  209. return state
  210. else:
  211. user_prompt = "无法生成有效的SQL查询,请尝试重新描述您的问题。"
  212. failure_reason = "unknown_generation_failure"
  213. # 统一返回失败状态
  214. state["sql_generation_success"] = False
  215. state["user_prompt"] = user_prompt
  216. state["validation_error_type"] = failure_reason
  217. state["current_step"] = "sql_generation_failed"
  218. state["execution_path"].append("agent_sql_generation_failed")
  219. self.logger.warning(f"生成失败: {failure_reason} - {user_prompt}")
  220. return state
  221. sql = sql_result.get("sql")
  222. state["sql"] = sql
  223. # 步骤1.5:检查是否为解释性响应而非SQL
  224. error_type = sql_result.get("error_type")
  225. if error_type == "llm_explanation" or error_type == "generation_failed_with_explanation":
  226. # LLM返回了解释性文本,直接作为最终答案
  227. explanation = sql_result.get("error", "")
  228. state["chat_response"] = explanation + " 请尝试提问其它问题。"
  229. state["sql_generation_success"] = False
  230. state["validation_error_type"] = "llm_explanation"
  231. state["current_step"] = "sql_generation_completed"
  232. state["execution_path"].append("agent_sql_generation")
  233. self.logger.info(f"返回LLM解释性答案: {explanation}")
  234. return state
  235. if sql:
  236. self.logger.info(f"SQL生成成功: {sql}")
  237. else:
  238. self.logger.warning("SQL为空,但不是解释性响应")
  239. # 这种情况应该很少见,但为了安全起见保留原有的错误处理
  240. return state
  241. # 额外验证:检查SQL格式(防止工具误判)
  242. from agent.tools.utils import _is_valid_sql_format
  243. if not _is_valid_sql_format(sql):
  244. # 内容看起来不是SQL,当作解释性响应处理
  245. state["chat_response"] = sql + " 请尝试提问其它问题。"
  246. state["sql_generation_success"] = False
  247. state["validation_error_type"] = "invalid_sql_format"
  248. state["current_step"] = "sql_generation_completed"
  249. state["execution_path"].append("agent_sql_generation")
  250. self.logger.info(f"内容不是有效SQL,当作解释返回: {sql}")
  251. return state
  252. # 步骤2:SQL验证(如果启用)
  253. if self._is_sql_validation_enabled():
  254. self.logger.info("步骤2:验证SQL")
  255. validation_result = await self._validate_sql_with_custom_priority(sql)
  256. if not validation_result.get("valid"):
  257. # 验证失败,检查是否可以修复
  258. error_type = validation_result.get("error_type")
  259. error_message = validation_result.get("error_message")
  260. can_repair = validation_result.get("can_repair", False)
  261. self.logger.warning(f"SQL验证失败: {error_type} - {error_message}")
  262. if error_type == "forbidden_keywords":
  263. # 禁止词错误,直接失败,不尝试修复
  264. state["sql_generation_success"] = False
  265. state["sql_validation_success"] = False
  266. state["user_prompt"] = error_message
  267. state["validation_error_type"] = "forbidden_keywords"
  268. state["current_step"] = "sql_validation_failed"
  269. state["execution_path"].append("forbidden_keywords_failed")
  270. self.logger.warning("禁止词验证失败,直接结束")
  271. return state
  272. elif error_type == "syntax_error" and can_repair and self._is_auto_repair_enabled():
  273. # 语法错误,尝试修复(仅一次)
  274. self.logger.info(f"尝试修复SQL语法错误(仅一次): {error_message}")
  275. state["sql_repair_attempted"] = True
  276. repair_result = await self._attempt_sql_repair_once(sql, error_message)
  277. if repair_result.get("success"):
  278. # 修复成功
  279. repaired_sql = repair_result.get("repaired_sql")
  280. state["sql"] = repaired_sql
  281. state["sql_generation_success"] = True
  282. state["sql_validation_success"] = True
  283. state["sql_repair_success"] = True
  284. state["current_step"] = "sql_generation_completed"
  285. state["execution_path"].append("sql_repair_success")
  286. self.logger.info(f"SQL修复成功: {repaired_sql}")
  287. return state
  288. else:
  289. # 修复失败,直接结束
  290. repair_error = repair_result.get("error", "修复失败")
  291. self.logger.warning(f"SQL修复失败: {repair_error}")
  292. state["sql_generation_success"] = False
  293. state["sql_validation_success"] = False
  294. state["sql_repair_success"] = False
  295. state["user_prompt"] = f"SQL语法修复失败: {repair_error}"
  296. state["validation_error_type"] = "syntax_repair_failed"
  297. state["current_step"] = "sql_repair_failed"
  298. state["execution_path"].append("sql_repair_failed")
  299. return state
  300. else:
  301. # 不启用修复或其他错误类型,直接失败
  302. state["sql_generation_success"] = False
  303. state["sql_validation_success"] = False
  304. state["user_prompt"] = f"SQL验证失败: {error_message}"
  305. state["validation_error_type"] = error_type
  306. state["current_step"] = "sql_validation_failed"
  307. state["execution_path"].append("sql_validation_failed")
  308. self.logger.warning("SQL验证失败,不尝试修复")
  309. return state
  310. else:
  311. self.logger.info("SQL验证通过")
  312. state["sql_validation_success"] = True
  313. else:
  314. self.logger.info("跳过SQL验证(未启用)")
  315. state["sql_validation_success"] = True
  316. # 生成和验证都成功
  317. state["sql_generation_success"] = True
  318. state["current_step"] = "sql_generation_completed"
  319. state["execution_path"].append("agent_sql_generation")
  320. self.logger.info("SQL生成验证完成,准备执行")
  321. return state
  322. except Exception as e:
  323. self.logger.error(f"SQL生成验证节点异常: {str(e)}")
  324. import traceback
  325. self.logger.error(f"详细错误信息: {traceback.format_exc()}")
  326. state["sql_generation_success"] = False
  327. state["sql_validation_success"] = False
  328. state["user_prompt"] = f"SQL生成验证异常: {str(e)}"
  329. state["validation_error_type"] = "node_exception"
  330. state["current_step"] = "sql_generation_error"
  331. state["execution_path"].append("agent_sql_generation_error")
  332. return state
  333. def _agent_sql_execution_node(self, state: AgentState) -> AgentState:
  334. """SQL执行节点 - 负责执行已验证的SQL和生成摘要"""
  335. try:
  336. self.logger.info(f"开始执行SQL: {state.get('sql', 'N/A')}")
  337. sql = state.get("sql")
  338. question = state["question"]
  339. if not sql:
  340. self.logger.warning("没有可执行的SQL")
  341. state["error"] = "没有可执行的SQL语句"
  342. state["error_code"] = 500
  343. state["current_step"] = "sql_execution_error"
  344. state["execution_path"].append("agent_sql_execution_error")
  345. return state
  346. # 步骤1:执行SQL
  347. self.logger.info("步骤1:执行SQL")
  348. execute_result = execute_sql.invoke({"sql": sql})
  349. if not execute_result.get("success"):
  350. self.logger.error(f"SQL执行失败: {execute_result.get('error')}")
  351. state["error"] = execute_result.get("error", "SQL执行失败")
  352. state["error_code"] = 500
  353. state["current_step"] = "sql_execution_error"
  354. state["execution_path"].append("agent_sql_execution_error")
  355. return state
  356. query_result = execute_result.get("data_result")
  357. state["query_result"] = query_result
  358. self.logger.info(f"SQL执行成功,返回 {query_result.get('row_count', 0)} 行数据")
  359. # 步骤2:生成摘要(根据配置和数据情况)
  360. if ENABLE_RESULT_SUMMARY and query_result.get('row_count', 0) > 0:
  361. self.logger.info("步骤2:生成摘要")
  362. # 重要:提取原始问题用于摘要生成,避免历史记录循环嵌套
  363. original_question = self._extract_original_question(question)
  364. self.logger.debug(f"原始问题: {original_question}")
  365. summary_result = generate_summary.invoke({
  366. "question": original_question, # 使用原始问题而不是enhanced_question
  367. "query_result": query_result,
  368. "sql": sql
  369. })
  370. if not summary_result.get("success"):
  371. self.logger.warning(f"摘要生成失败: {summary_result.get('message')}")
  372. # 摘要生成失败不是致命错误,使用默认摘要
  373. state["summary"] = f"查询执行完成,共返回 {query_result.get('row_count', 0)} 条记录。"
  374. else:
  375. state["summary"] = summary_result.get("summary")
  376. self.logger.info("摘要生成成功")
  377. else:
  378. self.logger.info(f"跳过摘要生成(ENABLE_RESULT_SUMMARY={ENABLE_RESULT_SUMMARY},数据行数={query_result.get('row_count', 0)})")
  379. # 不生成摘要时,不设置summary字段,让格式化响应节点决定如何处理
  380. state["current_step"] = "sql_execution_completed"
  381. state["execution_path"].append("agent_sql_execution")
  382. self.logger.info("SQL执行完成")
  383. return state
  384. except Exception as e:
  385. self.logger.error(f"SQL执行节点异常: {str(e)}")
  386. import traceback
  387. self.logger.error(f"详细错误信息: {traceback.format_exc()}")
  388. state["error"] = f"SQL执行失败: {str(e)}"
  389. state["error_code"] = 500
  390. state["current_step"] = "sql_execution_error"
  391. state["execution_path"].append("agent_sql_execution_error")
  392. return state
  393. def _agent_database_node(self, state: AgentState) -> AgentState:
  394. """
  395. 数据库Agent节点 - 直接工具调用模式 [已废弃]
  396. 注意:此方法已被拆分为 _agent_sql_generation_node 和 _agent_sql_execution_node
  397. 保留此方法仅为向后兼容,新的工作流使用拆分后的节点
  398. """
  399. try:
  400. self.logger.warning("使用已废弃的database节点,建议使用新的拆分节点")
  401. self.logger.info(f"开始处理数据库查询: {state['question']}")
  402. question = state["question"]
  403. # 步骤1:生成SQL
  404. self.logger.info("步骤1:生成SQL")
  405. sql_result = generate_sql.invoke({"question": question, "allow_llm_to_see_data": True})
  406. if not sql_result.get("success"):
  407. self.logger.error(f"SQL生成失败: {sql_result.get('error')}")
  408. state["error"] = sql_result.get("error", "SQL生成失败")
  409. state["error_code"] = 500
  410. state["current_step"] = "database_error"
  411. state["execution_path"].append("agent_database_error")
  412. return state
  413. sql = sql_result.get("sql")
  414. state["sql"] = sql
  415. self.logger.info(f"SQL生成成功: {sql}")
  416. # 步骤1.5:检查是否为解释性响应而非SQL
  417. error_type = sql_result.get("error_type")
  418. if error_type == "llm_explanation":
  419. # LLM返回了解释性文本,直接作为最终答案
  420. explanation = sql_result.get("error", "")
  421. state["chat_response"] = explanation + " 请尝试提问其它问题。"
  422. state["current_step"] = "database_completed"
  423. state["execution_path"].append("agent_database")
  424. self.logger.info(f"返回LLM解释性答案: {explanation}")
  425. return state
  426. # 额外验证:检查SQL格式(防止工具误判)
  427. from agent.tools.utils import _is_valid_sql_format
  428. if not _is_valid_sql_format(sql):
  429. # 内容看起来不是SQL,当作解释性响应处理
  430. state["chat_response"] = sql + " 请尝试提问其它问题。"
  431. state["current_step"] = "database_completed"
  432. state["execution_path"].append("agent_database")
  433. self.logger.info(f"内容不是有效SQL,当作解释返回: {sql}")
  434. return state
  435. # 步骤2:执行SQL
  436. self.logger.info("步骤2:执行SQL")
  437. execute_result = execute_sql.invoke({"sql": sql})
  438. if not execute_result.get("success"):
  439. self.logger.error(f"SQL执行失败: {execute_result.get('error')}")
  440. state["error"] = execute_result.get("error", "SQL执行失败")
  441. state["error_code"] = 500
  442. state["current_step"] = "database_error"
  443. state["execution_path"].append("agent_database_error")
  444. return state
  445. query_result = execute_result.get("data_result")
  446. state["query_result"] = query_result
  447. self.logger.info(f"SQL执行成功,返回 {query_result.get('row_count', 0)} 行数据")
  448. # 步骤3:生成摘要(可通过配置控制,仅在有数据时生成)
  449. if ENABLE_RESULT_SUMMARY and query_result.get('row_count', 0) > 0:
  450. self.logger.info("步骤3:生成摘要")
  451. # 重要:提取原始问题用于摘要生成,避免历史记录循环嵌套
  452. original_question = self._extract_original_question(question)
  453. self.logger.debug(f"原始问题: {original_question}")
  454. summary_result = generate_summary.invoke({
  455. "question": original_question, # 使用原始问题而不是enhanced_question
  456. "query_result": query_result,
  457. "sql": sql
  458. })
  459. if not summary_result.get("success"):
  460. self.logger.warning(f"摘要生成失败: {summary_result.get('message')}")
  461. # 摘要生成失败不是致命错误,使用默认摘要
  462. state["summary"] = f"查询执行完成,共返回 {query_result.get('row_count', 0)} 条记录。"
  463. else:
  464. state["summary"] = summary_result.get("summary")
  465. self.logger.info("摘要生成成功")
  466. else:
  467. self.logger.info(f"跳过摘要生成(ENABLE_RESULT_SUMMARY={ENABLE_RESULT_SUMMARY},数据行数={query_result.get('row_count', 0)})")
  468. # 不生成摘要时,不设置summary字段,让格式化响应节点决定如何处理
  469. state["current_step"] = "database_completed"
  470. state["execution_path"].append("agent_database")
  471. self.logger.info("数据库查询完成")
  472. return state
  473. except Exception as e:
  474. self.logger.error(f"数据库Agent异常: {str(e)}")
  475. import traceback
  476. self.logger.error(f"详细错误信息: {traceback.format_exc()}")
  477. state["error"] = f"数据库查询失败: {str(e)}"
  478. state["error_code"] = 500
  479. state["current_step"] = "database_error"
  480. state["execution_path"].append("agent_database_error")
  481. return state
  482. def _agent_chat_node(self, state: AgentState) -> AgentState:
  483. """聊天Agent节点 - 直接工具调用模式"""
  484. try:
  485. self.logger.info(f"开始处理聊天: {state['question']}")
  486. question = state["question"]
  487. # 构建上下文 - 仅使用真实的对话历史上下文
  488. # 注意:不要将分类原因传递给LLM,那是系统内部的路由信息
  489. enable_context_injection = self.config.get("chat_agent", {}).get("enable_context_injection", True)
  490. context = None
  491. if enable_context_injection:
  492. # TODO: 在这里可以添加真实的对话历史上下文
  493. # 例如从Redis或其他存储中获取最近的对话记录
  494. # context = get_conversation_history(state.get("session_id"))
  495. pass
  496. # 直接调用general_chat工具
  497. self.logger.info("调用general_chat工具")
  498. chat_result = general_chat.invoke({
  499. "question": question,
  500. "context": context
  501. })
  502. if chat_result.get("success"):
  503. state["chat_response"] = chat_result.get("response", "")
  504. self.logger.info("聊天处理成功")
  505. else:
  506. # 处理失败,使用备用响应
  507. state["chat_response"] = chat_result.get("response", "抱歉,我暂时无法处理您的问题。请稍后再试。")
  508. self.logger.warning(f"聊天处理失败,使用备用响应: {chat_result.get('error')}")
  509. state["current_step"] = "chat_completed"
  510. state["execution_path"].append("agent_chat")
  511. self.logger.info("聊天处理完成")
  512. return state
  513. except Exception as e:
  514. self.logger.error(f"聊天Agent异常: {str(e)}")
  515. import traceback
  516. self.logger.error(f"详细错误信息: {traceback.format_exc()}")
  517. state["chat_response"] = "抱歉,我暂时无法处理您的问题。请稍后再试,或者尝试询问数据相关的问题。"
  518. state["current_step"] = "chat_error"
  519. state["execution_path"].append("agent_chat_error")
  520. return state
  521. def _format_response_node(self, state: AgentState) -> AgentState:
  522. """格式化最终响应节点"""
  523. try:
  524. self.logger.info(f"开始格式化响应,问题类型: {state['question_type']}")
  525. state["current_step"] = "completed"
  526. state["execution_path"].append("format_response")
  527. # 根据问题类型和执行状态格式化响应
  528. if state.get("error"):
  529. # 有错误的情况
  530. state["final_response"] = {
  531. "success": False,
  532. "error": state["error"],
  533. "error_code": state.get("error_code", 500),
  534. "question_type": state["question_type"],
  535. "execution_path": state["execution_path"],
  536. "classification_info": {
  537. "confidence": state.get("classification_confidence", 0),
  538. "reason": state.get("classification_reason", ""),
  539. "method": state.get("classification_method", "")
  540. }
  541. }
  542. elif state["question_type"] == "DATABASE":
  543. # 数据库查询类型
  544. # 处理SQL生成失败的情况
  545. if not state.get("sql_generation_success", True) and state.get("user_prompt"):
  546. state["final_response"] = {
  547. "success": False,
  548. "response": state["user_prompt"],
  549. "type": "DATABASE",
  550. "sql_generation_failed": True,
  551. "validation_error_type": state.get("validation_error_type"),
  552. "sql": state.get("sql"),
  553. "execution_path": state["execution_path"],
  554. "classification_info": {
  555. "confidence": state["classification_confidence"],
  556. "reason": state["classification_reason"],
  557. "method": state["classification_method"]
  558. },
  559. "sql_validation_info": {
  560. "sql_generation_success": state.get("sql_generation_success", False),
  561. "sql_validation_success": state.get("sql_validation_success", False),
  562. "sql_repair_attempted": state.get("sql_repair_attempted", False),
  563. "sql_repair_success": state.get("sql_repair_success", False)
  564. }
  565. }
  566. elif state.get("chat_response"):
  567. # SQL生成失败的解释性响应(不受ENABLE_RESULT_SUMMARY配置影响)
  568. state["final_response"] = {
  569. "success": True,
  570. "response": state["chat_response"],
  571. "type": "DATABASE",
  572. "sql": state.get("sql"),
  573. "query_result": state.get("query_result"), # 保持内部字段名不变
  574. "execution_path": state["execution_path"],
  575. "classification_info": {
  576. "confidence": state["classification_confidence"],
  577. "reason": state["classification_reason"],
  578. "method": state["classification_method"]
  579. }
  580. }
  581. elif state.get("summary"):
  582. # 正常的数据库查询结果,有摘要的情况
  583. # 将summary的值同时赋给response字段(为将来移除summary字段做准备)
  584. state["final_response"] = {
  585. "success": True,
  586. "type": "DATABASE",
  587. "response": state["summary"], # 新增:将summary的值赋给response
  588. "sql": state.get("sql"),
  589. "query_result": state.get("query_result"), # 保持内部字段名不变
  590. "summary": state["summary"], # 暂时保留summary字段
  591. "execution_path": state["execution_path"],
  592. "classification_info": {
  593. "confidence": state["classification_confidence"],
  594. "reason": state["classification_reason"],
  595. "method": state["classification_method"]
  596. }
  597. }
  598. elif state.get("query_result"):
  599. # 有数据但没有摘要(摘要被配置禁用)
  600. query_result = state.get("query_result")
  601. row_count = query_result.get("row_count", 0)
  602. # 构建基本响应,不包含summary字段和response字段
  603. # 用户应该直接从query_result.columns和query_result.rows获取数据
  604. state["final_response"] = {
  605. "success": True,
  606. "type": "DATABASE",
  607. "sql": state.get("sql"),
  608. "query_result": query_result, # 保持内部字段名不变
  609. "execution_path": state["execution_path"],
  610. "classification_info": {
  611. "confidence": state["classification_confidence"],
  612. "reason": state["classification_reason"],
  613. "method": state["classification_method"]
  614. }
  615. }
  616. else:
  617. # 数据库查询失败,没有任何结果
  618. state["final_response"] = {
  619. "success": False,
  620. "error": state.get("error", "数据库查询未完成"),
  621. "type": "DATABASE",
  622. "sql": state.get("sql"),
  623. "execution_path": state["execution_path"]
  624. }
  625. else:
  626. # 聊天类型
  627. state["final_response"] = {
  628. "success": True,
  629. "response": state.get("chat_response", ""),
  630. "type": "CHAT",
  631. "execution_path": state["execution_path"],
  632. "classification_info": {
  633. "confidence": state["classification_confidence"],
  634. "reason": state["classification_reason"],
  635. "method": state["classification_method"]
  636. }
  637. }
  638. self.logger.info("响应格式化完成")
  639. return state
  640. except Exception as e:
  641. self.logger.error(f"响应格式化异常: {str(e)}")
  642. state["final_response"] = {
  643. "success": False,
  644. "error": f"响应格式化异常: {str(e)}",
  645. "error_code": 500,
  646. "execution_path": state["execution_path"]
  647. }
  648. return state
  649. def _route_after_sql_generation(self, state: AgentState) -> Literal["continue_execution", "return_to_user"]:
  650. """
  651. SQL生成后的路由决策
  652. 根据SQL生成和验证的结果决定后续流向:
  653. - SQL生成验证成功 → 继续执行SQL
  654. - SQL生成验证失败 → 直接返回用户提示
  655. """
  656. sql_generation_success = state.get("sql_generation_success", False)
  657. self.logger.debug(f"SQL生成路由: success={sql_generation_success}")
  658. if sql_generation_success:
  659. return "continue_execution" # 路由到SQL执行节点
  660. else:
  661. return "return_to_user" # 路由到format_response,结束流程
  662. def _route_after_classification(self, state: AgentState) -> Literal["DATABASE", "CHAT"]:
  663. """
  664. 分类后的路由决策
  665. 完全信任QuestionClassifier的决策:
  666. - DATABASE类型 → 数据库Agent
  667. - CHAT和UNCERTAIN类型 → 聊天Agent
  668. 这样避免了双重决策的冲突,所有分类逻辑都集中在QuestionClassifier中
  669. """
  670. question_type = state["question_type"]
  671. confidence = state["classification_confidence"]
  672. self.logger.debug(f"分类路由: {question_type}, 置信度: {confidence} (完全信任分类器决策)")
  673. if question_type == "DATABASE":
  674. return "DATABASE"
  675. else:
  676. # 将 "CHAT" 和 "UNCERTAIN" 类型都路由到聊天流程
  677. # 聊天Agent可以处理不确定的情况,并在必要时引导用户提供更多信息
  678. return "CHAT"
  679. async def process_question(self, question: str, session_id: str = None, context_type: str = None, routing_mode: str = None) -> Dict[str, Any]:
  680. """
  681. 统一的问题处理入口
  682. Args:
  683. question: 用户问题
  684. session_id: 会话ID
  685. context_type: 上下文类型 ("DATABASE" 或 "CHAT"),用于渐进式分类
  686. routing_mode: 路由模式,可选,用于覆盖配置文件设置
  687. Returns:
  688. Dict包含完整的处理结果
  689. """
  690. try:
  691. self.logger.info(f"开始处理问题: {question}")
  692. if context_type:
  693. self.logger.info(f"上下文类型: {context_type}")
  694. if routing_mode:
  695. self.logger.info(f"使用指定路由模式: {routing_mode}")
  696. # 动态创建workflow(基于路由模式)
  697. self.logger.info(f"🔄 [PROCESS] 调用动态创建workflow")
  698. workflow = self._create_workflow(routing_mode)
  699. # 初始化状态
  700. initial_state = self._create_initial_state(question, session_id, context_type, routing_mode)
  701. # 执行工作流
  702. final_state = await workflow.ainvoke(
  703. initial_state,
  704. config={
  705. "configurable": {"session_id": session_id}
  706. } if session_id else None
  707. )
  708. # 提取最终结果
  709. result = final_state["final_response"]
  710. self.logger.info(f"问题处理完成: {result.get('success', False)}")
  711. return result
  712. except Exception as e:
  713. self.logger.error(f"Agent执行异常: {str(e)}")
  714. return {
  715. "success": False,
  716. "error": f"Agent系统异常: {str(e)}",
  717. "error_code": 500,
  718. "execution_path": ["error"]
  719. }
  720. def _create_initial_state(self, question: str, session_id: str = None, context_type: str = None, routing_mode: str = None) -> AgentState:
  721. """创建初始状态 - 支持渐进式分类"""
  722. # 确定使用的路由模式
  723. if routing_mode:
  724. effective_routing_mode = routing_mode
  725. else:
  726. try:
  727. from app_config import QUESTION_ROUTING_MODE
  728. effective_routing_mode = QUESTION_ROUTING_MODE
  729. except ImportError:
  730. effective_routing_mode = "hybrid"
  731. return AgentState(
  732. # 输入信息
  733. question=question,
  734. session_id=session_id,
  735. # 上下文信息
  736. context_type=context_type,
  737. # 分类结果 (初始值,会在分类节点或直接模式初始化节点中更新)
  738. question_type="UNCERTAIN",
  739. classification_confidence=0.0,
  740. classification_reason="",
  741. classification_method="",
  742. # 数据库查询流程状态
  743. sql=None,
  744. sql_generation_attempts=0,
  745. query_result=None,
  746. summary=None,
  747. # SQL验证和修复相关状态
  748. sql_generation_success=False,
  749. sql_validation_success=False,
  750. sql_repair_attempted=False,
  751. sql_repair_success=False,
  752. validation_error_type=None,
  753. user_prompt=None,
  754. # 聊天响应
  755. chat_response=None,
  756. # 最终输出
  757. final_response={},
  758. # 错误处理
  759. error=None,
  760. error_code=None,
  761. # 流程控制
  762. current_step="initialized",
  763. execution_path=["start"],
  764. retry_count=0,
  765. max_retries=3,
  766. # 调试信息
  767. debug_info={},
  768. # 路由模式
  769. routing_mode=effective_routing_mode
  770. )
  771. # ==================== SQL验证和修复相关方法 ====================
  772. def _is_sql_validation_enabled(self) -> bool:
  773. """检查是否启用SQL验证"""
  774. from agent.config import get_nested_config
  775. return (get_nested_config(self.config, "sql_validation.enable_syntax_validation", False) or
  776. get_nested_config(self.config, "sql_validation.enable_forbidden_check", False))
  777. def _is_auto_repair_enabled(self) -> bool:
  778. """检查是否启用自动修复"""
  779. from agent.config import get_nested_config
  780. return (get_nested_config(self.config, "sql_validation.enable_auto_repair", False) and
  781. get_nested_config(self.config, "sql_validation.enable_syntax_validation", False))
  782. async def _validate_sql_with_custom_priority(self, sql: str) -> Dict[str, Any]:
  783. """
  784. 按照自定义优先级验证SQL:先禁止词,再语法
  785. Args:
  786. sql: 要验证的SQL语句
  787. Returns:
  788. 验证结果字典
  789. """
  790. try:
  791. from agent.config import get_nested_config
  792. # 1. 优先检查禁止词(您要求的优先级)
  793. if get_nested_config(self.config, "sql_validation.enable_forbidden_check", True):
  794. forbidden_result = self._check_forbidden_keywords(sql)
  795. if not forbidden_result.get("valid"):
  796. return {
  797. "valid": False,
  798. "error_type": "forbidden_keywords",
  799. "error_message": forbidden_result.get("error"),
  800. "can_repair": False # 禁止词错误不能修复
  801. }
  802. # 2. 再检查语法(EXPLAIN SQL)
  803. if get_nested_config(self.config, "sql_validation.enable_syntax_validation", True):
  804. syntax_result = await self._validate_sql_syntax(sql)
  805. if not syntax_result.get("valid"):
  806. return {
  807. "valid": False,
  808. "error_type": "syntax_error",
  809. "error_message": syntax_result.get("error"),
  810. "can_repair": True # 语法错误可以尝试修复
  811. }
  812. return {"valid": True}
  813. except Exception as e:
  814. return {
  815. "valid": False,
  816. "error_type": "validation_exception",
  817. "error_message": str(e),
  818. "can_repair": False
  819. }
  820. def _check_forbidden_keywords(self, sql: str) -> Dict[str, Any]:
  821. """检查禁止的SQL关键词"""
  822. try:
  823. from agent.config import get_nested_config
  824. forbidden_operations = get_nested_config(
  825. self.config,
  826. "sql_validation.forbidden_operations",
  827. ['UPDATE', 'DELETE', 'DROP', 'ALTER', 'INSERT']
  828. )
  829. sql_upper = sql.upper().strip()
  830. for operation in forbidden_operations:
  831. if sql_upper.startswith(operation.upper()):
  832. return {
  833. "valid": False,
  834. "error": f"不允许的操作: {operation}。本系统只支持查询操作(SELECT)。"
  835. }
  836. return {"valid": True}
  837. except Exception as e:
  838. return {
  839. "valid": False,
  840. "error": f"禁止词检查异常: {str(e)}"
  841. }
  842. async def _validate_sql_syntax(self, sql: str) -> Dict[str, Any]:
  843. """语法验证 - 使用EXPLAIN SQL"""
  844. try:
  845. from common.vanna_instance import get_vanna_instance
  846. import asyncio
  847. vn = get_vanna_instance()
  848. # 构建EXPLAIN查询
  849. explain_sql = f"EXPLAIN {sql}"
  850. # 异步执行验证
  851. result = await asyncio.to_thread(vn.run_sql, explain_sql)
  852. if result is not None:
  853. return {"valid": True}
  854. else:
  855. return {
  856. "valid": False,
  857. "error": "SQL语法验证失败"
  858. }
  859. except Exception as e:
  860. return {
  861. "valid": False,
  862. "error": str(e)
  863. }
  864. async def _attempt_sql_repair_once(self, sql: str, error_message: str) -> Dict[str, Any]:
  865. """
  866. 使用LLM尝试修复SQL - 只修复一次
  867. Args:
  868. sql: 原始SQL
  869. error_message: 错误信息
  870. Returns:
  871. 修复结果字典
  872. """
  873. try:
  874. from common.vanna_instance import get_vanna_instance
  875. from agent.config import get_nested_config
  876. import asyncio
  877. vn = get_vanna_instance()
  878. # 构建修复提示词
  879. repair_prompt = f"""你是一个PostgreSQL SQL专家,请修复以下SQL语句的语法错误。
  880. 当前数据库类型: PostgreSQL
  881. 错误信息: {error_message}
  882. 需要修复的SQL:
  883. {sql}
  884. 修复要求:
  885. 1. 只修复语法错误和表结构错误
  886. 2. 保持SQL的原始业务逻辑不变
  887. 3. 使用PostgreSQL标准语法
  888. 4. 确保修复后的SQL语法正确
  889. 请直接输出修复后的SQL语句,不要添加其他说明文字。"""
  890. # 获取超时配置
  891. timeout = get_nested_config(self.config, "sql_validation.repair_timeout", 60)
  892. # 异步调用LLM修复
  893. response = await asyncio.wait_for(
  894. asyncio.to_thread(
  895. vn.chat_with_llm,
  896. question=repair_prompt,
  897. system_prompt="你是一个专业的PostgreSQL SQL专家,专门负责修复SQL语句中的语法错误。"
  898. ),
  899. timeout=timeout
  900. )
  901. if response and response.strip():
  902. repaired_sql = response.strip()
  903. # 验证修复后的SQL
  904. validation_result = await self._validate_sql_syntax(repaired_sql)
  905. if validation_result.get("valid"):
  906. return {
  907. "success": True,
  908. "repaired_sql": repaired_sql,
  909. "error": None
  910. }
  911. else:
  912. return {
  913. "success": False,
  914. "repaired_sql": None,
  915. "error": f"修复后的SQL仍然无效: {validation_result.get('error')}"
  916. }
  917. else:
  918. return {
  919. "success": False,
  920. "repaired_sql": None,
  921. "error": "LLM返回空响应"
  922. }
  923. except asyncio.TimeoutError:
  924. return {
  925. "success": False,
  926. "repaired_sql": None,
  927. "error": f"修复超时({get_nested_config(self.config, 'sql_validation.repair_timeout', 60)}秒)"
  928. }
  929. except Exception as e:
  930. return {
  931. "success": False,
  932. "repaired_sql": None,
  933. "error": f"修复异常: {str(e)}"
  934. }
  935. # ==================== 原有方法 ====================
  936. def _extract_original_question(self, question: str) -> str:
  937. """
  938. 从enhanced_question中提取原始问题
  939. Args:
  940. question: 可能包含上下文的问题
  941. Returns:
  942. str: 原始问题
  943. """
  944. try:
  945. # 检查是否为enhanced_question格式
  946. if "\n[CONTEXT]\n" in question and "\n[CURRENT]\n" in question:
  947. # 提取[CURRENT]标签后的内容
  948. current_start = question.find("\n[CURRENT]\n")
  949. if current_start != -1:
  950. original_question = question[current_start + len("\n[CURRENT]\n"):].strip()
  951. return original_question
  952. # 如果不是enhanced_question格式,直接返回原问题
  953. return question.strip()
  954. except Exception as e:
  955. self.logger.warning(f"提取原始问题失败: {str(e)}")
  956. return question.strip()
  957. async def health_check(self) -> Dict[str, Any]:
  958. """健康检查"""
  959. try:
  960. # 从配置获取健康检查参数
  961. from agent.config import get_nested_config
  962. test_question = get_nested_config(self.config, "health_check.test_question", "你好")
  963. enable_full_test = get_nested_config(self.config, "health_check.enable_full_test", True)
  964. if enable_full_test:
  965. # 完整流程测试
  966. test_result = await self.process_question(test_question, "health_check")
  967. return {
  968. "status": "healthy" if test_result.get("success") else "degraded",
  969. "test_result": test_result.get("success", False),
  970. "workflow_compiled": True, # 动态创建,始终可用
  971. "tools_count": len(self.tools),
  972. "agent_reuse_enabled": False,
  973. "message": "Agent健康检查完成"
  974. }
  975. else:
  976. # 简单检查
  977. return {
  978. "status": "healthy",
  979. "test_result": True,
  980. "workflow_compiled": True, # 动态创建,始终可用
  981. "tools_count": len(self.tools),
  982. "agent_reuse_enabled": False,
  983. "message": "Agent简单健康检查完成"
  984. }
  985. except Exception as e:
  986. return {
  987. "status": "unhealthy",
  988. "error": str(e),
  989. "workflow_compiled": True, # 动态创建,始终可用
  990. "tools_count": len(self.tools) if hasattr(self, 'tools') else 0,
  991. "agent_reuse_enabled": False,
  992. "message": "Agent健康检查失败"
  993. }