utils.py 9.6 KB


  1. # agent/tools/utils.py
  2. """
  3. Agent相关的工具函数
  4. """
  5. import functools
  6. import json
  7. from typing import Dict, Any, Callable, List, Optional
  8. from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage, ToolMessage
  9. from langchain_core.tools import BaseTool
  10. from core.logging import get_agent_logger
  11. # Initialize logger
  12. logger = get_agent_logger("AgentUtils")
  13. def handle_tool_errors(func: Callable) -> Callable:
  14. """
  15. 工具函数错误处理装饰器
  16. """
  17. @functools.wraps(func)
  18. def wrapper(*args, **kwargs) -> Dict[str, Any]:
  19. try:
  20. return func(*args, **kwargs)
  21. except Exception as e:
  22. logger.error(f"工具 {func.__name__} 执行失败: {str(e)}")
  23. return {
  24. "success": False,
  25. "error": f"工具执行异常: {str(e)}",
  26. "error_type": "tool_exception"
  27. }
  28. return wrapper
  29. class LLMWrapper:
  30. """自定义LLM的LangChain兼容包装器,支持工具调用"""
  31. def __init__(self, llm_instance):
  32. self.llm = llm_instance
  33. self._model_name = getattr(llm_instance, 'model', 'custom_llm')
  34. self._bound_tools = []
  35. def invoke(self, input_data, **kwargs):
  36. """LangChain invoke接口"""
  37. try:
  38. if isinstance(input_data, str):
  39. messages = [HumanMessage(content=input_data)]
  40. elif isinstance(input_data, list):
  41. messages = input_data
  42. else:
  43. messages = [HumanMessage(content=str(input_data))]
  44. # 检查是否需要工具调用
  45. if self._bound_tools and self._should_use_tools(messages):
  46. return self._invoke_with_tools(messages, **kwargs)
  47. else:
  48. return self._invoke_without_tools(messages, **kwargs)
  49. except Exception as e:
  50. logger.error(f"LLM包装器调用失败: {str(e)}")
  51. return AIMessage(content=f"LLM调用失败: {str(e)}")
  52. def _should_use_tools(self, messages: List[BaseMessage]) -> bool:
  53. """判断是否应该使用工具"""
  54. # 检查最后一条消息是否包含工具相关的指令
  55. if messages:
  56. last_message = messages[-1]
  57. if isinstance(last_message, HumanMessage):
  58. content = last_message.content.lower()
  59. # 检查是否包含工具相关的关键词
  60. tool_keywords = ["生成sql", "执行sql", "generate sql", "execute sql", "查询", "数据库"]
  61. return any(keyword in content for keyword in tool_keywords)
  62. return True # 默认使用工具
  63. def _invoke_with_tools(self, messages: List[BaseMessage], **kwargs):
  64. """使用工具调用的方式"""
  65. try:
  66. # 构建工具调用提示
  67. tool_prompt = self._build_tool_prompt(messages)
  68. # 调用底层LLM
  69. response = self.llm.submit_prompt(tool_prompt, **kwargs)
  70. # 解析工具调用
  71. tool_calls = self._parse_tool_calls(response)
  72. if tool_calls:
  73. # 如果有工具调用,返回包含工具调用的AIMessage
  74. return AIMessage(
  75. content=response,
  76. tool_calls=tool_calls
  77. )
  78. else:
  79. # 没有工具调用,返回普通响应
  80. return AIMessage(content=response)
  81. except Exception as e:
  82. logger.error(f"工具调用失败: {str(e)}")
  83. return self._invoke_without_tools(messages, **kwargs)
  84. def _invoke_without_tools(self, messages: List[BaseMessage], **kwargs):
  85. """不使用工具的普通调用"""
  86. # 转换消息格式
  87. prompt = []
  88. for msg in messages:
  89. if isinstance(msg, SystemMessage):
  90. prompt.append(self.llm.system_message(msg.content))
  91. elif isinstance(msg, HumanMessage):
  92. prompt.append(self.llm.user_message(msg.content))
  93. elif isinstance(msg, AIMessage):
  94. prompt.append(self.llm.assistant_message(msg.content))
  95. else:
  96. prompt.append(self.llm.user_message(str(msg.content)))
  97. # 调用底层LLM
  98. response = self.llm.submit_prompt(prompt, **kwargs)
  99. # 返回LangChain格式的结果
  100. return AIMessage(content=response)
  101. def _build_tool_prompt(self, messages: List[BaseMessage]) -> List[Dict[str, str]]:
  102. """构建包含工具信息的提示"""
  103. prompt = []
  104. # 添加系统消息,包含工具定义
  105. system_content = self._get_system_message_with_tools(messages)
  106. prompt.append(self.llm.system_message(system_content))
  107. # 添加用户消息
  108. for msg in messages:
  109. if isinstance(msg, HumanMessage):
  110. prompt.append(self.llm.user_message(msg.content))
  111. elif isinstance(msg, AIMessage) and not isinstance(msg, SystemMessage):
  112. prompt.append(self.llm.assistant_message(msg.content))
  113. return prompt
  114. def _get_system_message_with_tools(self, messages: List[BaseMessage]) -> str:
  115. """获取包含工具定义的系统消息"""
  116. # 查找原始系统消息
  117. original_system = ""
  118. for msg in messages:
  119. if isinstance(msg, SystemMessage):
  120. original_system = msg.content
  121. break
  122. # 构建工具定义
  123. tool_definitions = []
  124. for tool in self._bound_tools:
  125. tool_def = {
  126. "name": tool.name,
  127. "description": tool.description,
  128. "parameters": getattr(tool, 'args_schema', {})
  129. }
  130. tool_definitions.append(f"- {tool.name}: {tool.description}")
  131. # 组合系统消息
  132. if tool_definitions:
  133. tools_text = "\n".join(tool_definitions)
  134. return f"""{original_system}
  135. 你有以下工具可以使用:
  136. {tools_text}
  137. 使用工具时,请明确说明你要调用哪个工具以及需要的参数。对于数据库查询问题,请按照以下步骤:
  138. 1. 使用 generate_sql 工具生成SQL查询
  139. 2. 使用 execute_sql 工具执行SQL查询
  140. 3. 使用 generate_summary 工具生成结果摘要
  141. 请直接开始执行工具调用,不要只是提供指导。"""
  142. else:
  143. return original_system
  144. def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]:
  145. """解析LLM响应中的工具调用"""
  146. tool_calls = []
  147. # 简单的工具调用解析逻辑
  148. # 这里可以根据实际的LLM响应格式进行调整
  149. response_lower = response.lower()
  150. if "generate_sql" in response_lower:
  151. tool_calls.append({
  152. "name": "generate_sql",
  153. "args": {},
  154. "id": "generate_sql_call"
  155. })
  156. return tool_calls
  157. @property
  158. def model_name(self) -> str:
  159. return self._model_name
  160. def bind_tools(self, tools):
  161. """绑定工具(用于支持工具调用)"""
  162. self._bound_tools = tools if isinstance(tools, list) else [tools]
  163. return self
  164. def get_compatible_llm():
  165. """获取兼容的LLM实例"""
  166. try:
  167. from common.utils import get_current_llm_config
  168. llm_config = get_current_llm_config()
  169. # 尝试使用标准的OpenAI兼容API
  170. if llm_config.get("base_url") and llm_config.get("api_key"):
  171. try:
  172. from langchain_openai import ChatOpenAI
  173. llm = ChatOpenAI(
  174. base_url=llm_config.get("base_url"),
  175. api_key=llm_config.get("api_key"),
  176. model=llm_config.get("model"),
  177. temperature=llm_config.get("temperature", 0.7)
  178. )
  179. logger.info("使用标准OpenAI兼容API")
  180. return llm
  181. except ImportError:
  182. logger.warning("langchain_openai 未安装,使用 Vanna 实例包装器")
  183. # 优先使用统一的 Vanna 实例
  184. from common.vanna_instance import get_vanna_instance
  185. vn = get_vanna_instance()
  186. logger.info("使用Vanna实例包装器")
  187. return LLMWrapper(vn)
  188. except Exception as e:
  189. logger.error(f"获取 Vanna 实例失败: {str(e)}")
  190. # 回退到原有逻辑
  191. from common.utils import get_current_llm_config
  192. from customllm.qianwen_chat import QianWenChat
  193. llm_config = get_current_llm_config()
  194. custom_llm = QianWenChat(config=llm_config)
  195. logger.info("使用QianWen包装器")
  196. return LLMWrapper(custom_llm)
  197. def _is_valid_sql_format(sql_text: str) -> bool:
  198. """验证文本是否为有效的SQL查询格式"""
  199. if not sql_text or not sql_text.strip():
  200. return False
  201. sql_clean = sql_text.strip().upper()
  202. # 检查是否以SQL关键字开头
  203. sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP', 'WITH']
  204. starts_with_sql = any(sql_clean.startswith(keyword) for keyword in sql_keywords)
  205. # 检查是否包含解释性语言
  206. explanation_phrases = [
  207. '无法', '不能', '抱歉', 'SORRY', 'UNABLE', 'CANNOT',
  208. '需要更多信息', '请提供', '表不存在', '字段不存在',
  209. '不清楚', '不确定', '没有足够', '无法理解', '无法生成',
  210. '无法确定', '不支持', '不可用', '缺少', '未找到'
  211. ]
  212. contains_explanation = any(phrase in sql_clean for phrase in explanation_phrases)
  213. return starts_with_sql and not contains_explanation