|
@@ -27,20 +27,11 @@ class CituLangGraphAgent:
|
|
|
self.tools = TOOLS
|
|
|
self.llm = get_compatible_llm()
|
|
|
|
|
|
- # 预创建Agent实例以提升性能
|
|
|
- enable_reuse = self.config.get("performance", {}).get("enable_agent_reuse", True)
|
|
|
- if enable_reuse:
|
|
|
- print("[CITU_AGENT] 预创建Agent实例中...")
|
|
|
- self._database_executor = self._create_database_agent()
|
|
|
- self._chat_executor = self._create_chat_agent()
|
|
|
- print("[CITU_AGENT] Agent实例预创建完成")
|
|
|
- else:
|
|
|
- self._database_executor = None
|
|
|
- self._chat_executor = None
|
|
|
- print("[CITU_AGENT] Agent实例重用已禁用,将在运行时创建")
|
|
|
+ # 注意:现在使用直接工具调用模式,不再需要预创建Agent执行器
|
|
|
+ print("[CITU_AGENT] 使用直接工具调用模式")
|
|
|
|
|
|
self.workflow = self._create_workflow()
|
|
|
- print("[CITU_AGENT] LangGraph Agent with Tools初始化完成")
|
|
|
+ print("[CITU_AGENT] LangGraph Agent with Direct Tools初始化完成")
|
|
|
|
|
|
def _create_workflow(self) -> StateGraph:
|
|
|
"""创建LangGraph工作流"""
|
|
@@ -99,63 +90,81 @@ class CituLangGraphAgent:
|
|
|
state["execution_path"].append("classify_error")
|
|
|
return state
|
|
|
|
|
|
- def _create_database_agent(self):
|
|
|
- """创建数据库专用Agent(预创建)"""
|
|
|
- from agent.config import get_nested_config
|
|
|
-
|
|
|
- # 获取配置
|
|
|
- max_iterations = get_nested_config(self.config, "database_agent.max_iterations", 5)
|
|
|
- enable_verbose = get_nested_config(self.config, "database_agent.enable_verbose", True)
|
|
|
- early_stopping_method = get_nested_config(self.config, "database_agent.early_stopping_method", "generate")
|
|
|
-
|
|
|
- database_prompt = ChatPromptTemplate.from_messages([
|
|
|
- SystemMessage(content="""
|
|
|
-你是一个专业的数据库查询助手。你的任务是帮助用户查询数据库并生成报告。
|
|
|
-
|
|
|
-工具使用流程:
|
|
|
-1. 首先使用 generate_sql 工具将用户问题转换为SQL
|
|
|
-2. 然后使用 execute_sql 工具执行SQL查询
|
|
|
-3. 最后使用 generate_summary 工具为结果生成自然语言摘要
|
|
|
-
|
|
|
-如果任何步骤失败,请提供清晰的错误信息并建议解决方案。
|
|
|
-"""),
|
|
|
- MessagesPlaceholder(variable_name="chat_history", optional=True),
|
|
|
- HumanMessage(content="{input}"),
|
|
|
- MessagesPlaceholder(variable_name="agent_scratchpad")
|
|
|
- ])
|
|
|
-
|
|
|
- database_tools = [generate_sql, execute_sql, generate_summary]
|
|
|
- agent = create_openai_tools_agent(self.llm, database_tools, database_prompt)
|
|
|
-
|
|
|
- return AgentExecutor(
|
|
|
- agent=agent,
|
|
|
- tools=database_tools,
|
|
|
- verbose=enable_verbose,
|
|
|
- handle_parsing_errors=True,
|
|
|
- max_iterations=max_iterations,
|
|
|
- early_stopping_method=early_stopping_method
|
|
|
- )
|
|
|
-
|
|
|
def _agent_database_node(self, state: AgentState) -> AgentState:
|
|
|
- """数据库Agent节点 - 使用预创建或动态创建的Agent"""
|
|
|
+ """数据库Agent节点 - 直接工具调用模式"""
|
|
|
try:
|
|
|
print(f"[DATABASE_AGENT] 开始处理数据库查询: {state['question']}")
|
|
|
|
|
|
- # 使用预创建的Agent或动态创建
|
|
|
- if self._database_executor is not None:
|
|
|
- executor = self._database_executor
|
|
|
- print(f"[DATABASE_AGENT] 使用预创建的Agent实例")
|
|
|
- else:
|
|
|
- executor = self._create_database_agent()
|
|
|
- print(f"[DATABASE_AGENT] 动态创建Agent实例")
|
|
|
-
|
|
|
- # 执行Agent
|
|
|
- result = executor.invoke({
|
|
|
- "input": state["question"]
|
|
|
+ question = state["question"]
|
|
|
+
|
|
|
+ # 步骤1:生成SQL
|
|
|
+ print(f"[DATABASE_AGENT] 步骤1:生成SQL")
|
|
|
+ sql_result = generate_sql.invoke({"question": question, "allow_llm_to_see_data": True})
|
|
|
+
|
|
|
+ if not sql_result.get("success"):
|
|
|
+ print(f"[DATABASE_AGENT] SQL生成失败: {sql_result.get('error')}")
|
|
|
+ state["error"] = sql_result.get("error", "SQL生成失败")
|
|
|
+ state["error_code"] = 500
|
|
|
+ state["current_step"] = "database_error"
|
|
|
+ state["execution_path"].append("agent_database_error")
|
|
|
+ return state
|
|
|
+
|
|
|
+ sql = sql_result.get("sql")
|
|
|
+ state["sql"] = sql
|
|
|
+ print(f"[DATABASE_AGENT] SQL生成成功: {sql}")
|
|
|
+
|
|
|
+ # 步骤1.5:检查是否为解释性响应而非SQL
|
|
|
+ error_type = sql_result.get("error_type")
|
|
|
+ if error_type == "llm_explanation":
|
|
|
+ # LLM返回了解释性文本,直接作为最终答案
|
|
|
+ explanation = sql_result.get("error", "")
|
|
|
+ state["summary"] = explanation + " 请尝试提问其它问题。"
|
|
|
+ state["current_step"] = "database_completed"
|
|
|
+ state["execution_path"].append("agent_database")
|
|
|
+ print(f"[DATABASE_AGENT] 返回LLM解释性答案: {explanation}")
|
|
|
+ return state
|
|
|
+
|
|
|
+ # 额外验证:检查SQL格式(防止工具误判)
|
|
|
+ from agent.utils import _is_valid_sql_format
|
|
|
+ if not _is_valid_sql_format(sql):
|
|
|
+ # 内容看起来不是SQL,当作解释性响应处理
|
|
|
+ state["summary"] = sql + " 请尝试提问其它问题。"
|
|
|
+ state["current_step"] = "database_completed"
|
|
|
+ state["execution_path"].append("agent_database")
|
|
|
+ print(f"[DATABASE_AGENT] 内容不是有效SQL,当作解释返回: {sql}")
|
|
|
+ return state
|
|
|
+
|
|
|
+ # 步骤2:执行SQL
|
|
|
+ print(f"[DATABASE_AGENT] 步骤2:执行SQL")
|
|
|
+ execute_result = execute_sql.invoke({"sql": sql, "max_rows": 200})
|
|
|
+
|
|
|
+ if not execute_result.get("success"):
|
|
|
+ print(f"[DATABASE_AGENT] SQL执行失败: {execute_result.get('error')}")
|
|
|
+ state["error"] = execute_result.get("error", "SQL执行失败")
|
|
|
+ state["error_code"] = 500
|
|
|
+ state["current_step"] = "database_error"
|
|
|
+ state["execution_path"].append("agent_database_error")
|
|
|
+ return state
|
|
|
+
|
|
|
+ data_result = execute_result.get("data_result")
|
|
|
+ state["data_result"] = data_result
|
|
|
+ print(f"[DATABASE_AGENT] SQL执行成功,返回 {data_result.get('row_count', 0)} 行数据")
|
|
|
+
|
|
|
+ # 步骤3:生成摘要
|
|
|
+ print(f"[DATABASE_AGENT] 步骤3:生成摘要")
|
|
|
+ summary_result = generate_summary.invoke({
|
|
|
+ "question": question,
|
|
|
+ "data_result": data_result,
|
|
|
+ "sql": sql
|
|
|
})
|
|
|
|
|
|
- # 解析Agent执行结果
|
|
|
- self._parse_database_agent_result(state, result)
|
|
|
+ if not summary_result.get("success"):
|
|
|
+ print(f"[DATABASE_AGENT] 摘要生成失败: {summary_result.get('message')}")
|
|
|
+ # 摘要生成失败不是致命错误,使用默认摘要
|
|
|
+ state["summary"] = f"查询执行完成,共返回 {data_result.get('row_count', 0)} 条记录。"
|
|
|
+ else:
|
|
|
+ state["summary"] = summary_result.get("summary")
|
|
|
+ print(f"[DATABASE_AGENT] 摘要生成成功")
|
|
|
|
|
|
state["current_step"] = "database_completed"
|
|
|
state["execution_path"].append("agent_database")
|
|
@@ -165,60 +174,20 @@ class CituLangGraphAgent:
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[ERROR] 数据库Agent异常: {str(e)}")
|
|
|
+ import traceback
|
|
|
+ print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
|
|
|
state["error"] = f"数据库查询失败: {str(e)}"
|
|
|
state["error_code"] = 500
|
|
|
state["current_step"] = "database_error"
|
|
|
state["execution_path"].append("agent_database_error")
|
|
|
return state
|
|
|
|
|
|
- def _create_chat_agent(self):
|
|
|
- """创建聊天专用Agent(预创建)"""
|
|
|
- from agent.config import get_nested_config
|
|
|
-
|
|
|
- # 获取配置
|
|
|
- max_iterations = get_nested_config(self.config, "chat_agent.max_iterations", 3)
|
|
|
- enable_verbose = get_nested_config(self.config, "chat_agent.enable_verbose", True)
|
|
|
-
|
|
|
- chat_prompt = ChatPromptTemplate.from_messages([
|
|
|
- SystemMessage(content="""
|
|
|
-你是Citu智能数据问答平台的友好助手。
|
|
|
-
|
|
|
-使用 general_chat 工具来处理用户的一般性问题、概念解释、操作指导等。
|
|
|
-
|
|
|
-特别注意:
|
|
|
-- 如果用户的问题可能涉及数据查询,建议他们尝试数据库查询功能
|
|
|
-- 如果问题不够明确,主动询问更多细节以便更好地帮助用户
|
|
|
-- 对于模糊的问题,可以提供多种可能的解决方案
|
|
|
-- 当遇到不确定的问题时,通过友好的对话来澄清用户意图
|
|
|
-"""),
|
|
|
- MessagesPlaceholder(variable_name="chat_history", optional=True),
|
|
|
- HumanMessage(content="{input}"),
|
|
|
- MessagesPlaceholder(variable_name="agent_scratchpad")
|
|
|
- ])
|
|
|
-
|
|
|
- chat_tools = [general_chat]
|
|
|
- agent = create_openai_tools_agent(self.llm, chat_tools, chat_prompt)
|
|
|
-
|
|
|
- return AgentExecutor(
|
|
|
- agent=agent,
|
|
|
- tools=chat_tools,
|
|
|
- verbose=enable_verbose,
|
|
|
- handle_parsing_errors=True,
|
|
|
- max_iterations=max_iterations
|
|
|
- )
|
|
|
-
|
|
|
def _agent_chat_node(self, state: AgentState) -> AgentState:
|
|
|
- """聊天Agent节点 - 使用预创建或动态创建的Agent"""
|
|
|
+ """聊天Agent节点 - 直接工具调用模式"""
|
|
|
try:
|
|
|
print(f"[CHAT_AGENT] 开始处理聊天: {state['question']}")
|
|
|
|
|
|
- # 使用预创建的Agent或动态创建
|
|
|
- if self._chat_executor is not None:
|
|
|
- executor = self._chat_executor
|
|
|
- print(f"[CHAT_AGENT] 使用预创建的Agent实例")
|
|
|
- else:
|
|
|
- executor = self._create_chat_agent()
|
|
|
- print(f"[CHAT_AGENT] 动态创建Agent实例")
|
|
|
+ question = state["question"]
|
|
|
|
|
|
# 构建上下文
|
|
|
enable_context_injection = self.config.get("chat_agent", {}).get("enable_context_injection", True)
|
|
@@ -226,17 +195,21 @@ class CituLangGraphAgent:
|
|
|
if enable_context_injection and state.get("classification_reason"):
|
|
|
context = f"分类原因: {state['classification_reason']}"
|
|
|
|
|
|
- # 执行Agent
|
|
|
- input_text = state["question"]
|
|
|
- if context:
|
|
|
- input_text = f"{state['question']}\n\n上下文: {context}"
|
|
|
-
|
|
|
- result = executor.invoke({
|
|
|
- "input": input_text
|
|
|
+ # 直接调用general_chat工具
|
|
|
+ print(f"[CHAT_AGENT] 调用general_chat工具")
|
|
|
+ chat_result = general_chat.invoke({
|
|
|
+ "question": question,
|
|
|
+ "context": context
|
|
|
})
|
|
|
|
|
|
- # 提取聊天响应
|
|
|
- state["chat_response"] = result.get("output", "")
|
|
|
+ if chat_result.get("success"):
|
|
|
+ state["chat_response"] = chat_result.get("response", "")
|
|
|
+ print(f"[CHAT_AGENT] 聊天处理成功")
|
|
|
+ else:
|
|
|
+ # 处理失败,使用备用响应
|
|
|
+ state["chat_response"] = chat_result.get("response", "抱歉,我暂时无法处理您的问题。请稍后再试。")
|
|
|
+ print(f"[CHAT_AGENT] 聊天处理失败,使用备用响应: {chat_result.get('error')}")
|
|
|
+
|
|
|
state["current_step"] = "chat_completed"
|
|
|
state["execution_path"].append("agent_chat")
|
|
|
|
|
@@ -245,6 +218,8 @@ class CituLangGraphAgent:
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"[ERROR] 聊天Agent异常: {str(e)}")
|
|
|
+ import traceback
|
|
|
+ print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
|
|
|
state["chat_response"] = "抱歉,我暂时无法处理您的问题。请稍后再试,或者尝试询问数据相关的问题。"
|
|
|
state["current_step"] = "chat_error"
|
|
|
state["execution_path"].append("agent_chat_error")
|
|
@@ -276,14 +251,14 @@ class CituLangGraphAgent:
|
|
|
|
|
|
elif state["question_type"] == "DATABASE":
|
|
|
# 数据库查询类型
|
|
|
- if state.get("data_result") and state.get("summary"):
|
|
|
- # 完整的数据库查询流程
|
|
|
+ if state.get("summary"):
|
|
|
+ # 有摘要的情况(包括解释性响应和完整查询结果)
|
|
|
state["final_response"] = {
|
|
|
"success": True,
|
|
|
"response": state["summary"],
|
|
|
"type": "DATABASE",
|
|
|
"sql": state.get("sql"),
|
|
|
- "data_result": state["data_result"],
|
|
|
+ "data_result": state.get("data_result"), # 可能为None(解释性响应)
|
|
|
"summary": state["summary"],
|
|
|
"execution_path": state["execution_path"],
|
|
|
"classification_info": {
|
|
@@ -293,7 +268,7 @@ class CituLangGraphAgent:
|
|
|
}
|
|
|
}
|
|
|
else:
|
|
|
- # 数据库查询失败,但有部分结果
|
|
|
+ # 数据库查询失败,没有任何结果
|
|
|
state["final_response"] = {
|
|
|
"success": False,
|
|
|
"error": state.get("error", "数据库查询未完成"),
|
|
@@ -351,47 +326,6 @@ class CituLangGraphAgent:
|
|
|
# 聊天Agent可以处理不确定的情况,并在必要时引导用户提供更多信息
|
|
|
return "CHAT"
|
|
|
|
|
|
- def _parse_database_agent_result(self, state: AgentState, agent_result: Dict[str, Any]):
|
|
|
- """解析数据库Agent的执行结果"""
|
|
|
- try:
|
|
|
- output = agent_result.get("output", "")
|
|
|
- intermediate_steps = agent_result.get("intermediate_steps", [])
|
|
|
-
|
|
|
- # 从intermediate_steps中提取工具调用结果
|
|
|
- for step in intermediate_steps:
|
|
|
- if len(step) >= 2:
|
|
|
- action, observation = step[0], step[1]
|
|
|
-
|
|
|
- if hasattr(action, 'tool') and hasattr(action, 'tool_input'):
|
|
|
- tool_name = action.tool
|
|
|
- tool_result = observation
|
|
|
-
|
|
|
- # 解析工具结果
|
|
|
- if tool_name == "generate_sql" and isinstance(tool_result, dict):
|
|
|
- if tool_result.get("success"):
|
|
|
- state["sql"] = tool_result.get("sql")
|
|
|
- else:
|
|
|
- state["error"] = tool_result.get("error")
|
|
|
-
|
|
|
- elif tool_name == "execute_sql" and isinstance(tool_result, dict):
|
|
|
- if tool_result.get("success"):
|
|
|
- state["data_result"] = tool_result.get("data_result")
|
|
|
- else:
|
|
|
- state["error"] = tool_result.get("error")
|
|
|
-
|
|
|
- elif tool_name == "generate_summary" and isinstance(tool_result, dict):
|
|
|
- if tool_result.get("success"):
|
|
|
- state["summary"] = tool_result.get("summary")
|
|
|
-
|
|
|
- # 如果没有从工具结果中获取到摘要,使用Agent的最终输出
|
|
|
- if not state.get("summary") and output:
|
|
|
- state["summary"] = output
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"[WARNING] 解析数据库Agent结果失败: {str(e)}")
|
|
|
- # 使用Agent的输出作为摘要
|
|
|
- state["summary"] = agent_result.get("output", "查询处理完成")
|
|
|
-
|
|
|
def process_question(self, question: str, session_id: str = None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
统一的问题处理入口
|
|
@@ -489,7 +423,7 @@ class CituLangGraphAgent:
|
|
|
"test_result": test_result.get("success", False),
|
|
|
"workflow_compiled": self.workflow is not None,
|
|
|
"tools_count": len(self.tools),
|
|
|
- "agent_reuse_enabled": self._database_executor is not None and self._chat_executor is not None,
|
|
|
+ "agent_reuse_enabled": False,
|
|
|
"message": "Agent健康检查完成"
|
|
|
}
|
|
|
else:
|
|
@@ -499,7 +433,7 @@ class CituLangGraphAgent:
|
|
|
"test_result": True,
|
|
|
"workflow_compiled": self.workflow is not None,
|
|
|
"tools_count": len(self.tools),
|
|
|
- "agent_reuse_enabled": self._database_executor is not None and self._chat_executor is not None,
|
|
|
+ "agent_reuse_enabled": False,
|
|
|
"message": "Agent简单健康检查完成"
|
|
|
}
|
|
|
|