utils.py 9.5 KB


  1. # agent/utils.py
  2. """
  3. Agent相关的工具函数
  4. """
  5. import functools
  6. import json
  7. from typing import Dict, Any, Callable, List, Optional
  8. from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, SystemMessage, ToolMessage
  9. from langchain_core.tools import BaseTool
  10. def handle_tool_errors(func: Callable) -> Callable:
  11. """
  12. 工具函数错误处理装饰器
  13. """
  14. @functools.wraps(func)
  15. def wrapper(*args, **kwargs) -> Dict[str, Any]:
  16. try:
  17. return func(*args, **kwargs)
  18. except Exception as e:
  19. print(f"[ERROR] 工具 {func.__name__} 执行失败: {str(e)}")
  20. return {
  21. "success": False,
  22. "error": f"工具执行异常: {str(e)}",
  23. "error_type": "tool_exception"
  24. }
  25. return wrapper
  26. class LLMWrapper:
  27. """自定义LLM的LangChain兼容包装器,支持工具调用"""
  28. def __init__(self, llm_instance):
  29. self.llm = llm_instance
  30. self._model_name = getattr(llm_instance, 'model', 'custom_llm')
  31. self._bound_tools = []
  32. def invoke(self, input_data, **kwargs):
  33. """LangChain invoke接口"""
  34. try:
  35. if isinstance(input_data, str):
  36. messages = [HumanMessage(content=input_data)]
  37. elif isinstance(input_data, list):
  38. messages = input_data
  39. else:
  40. messages = [HumanMessage(content=str(input_data))]
  41. # 检查是否需要工具调用
  42. if self._bound_tools and self._should_use_tools(messages):
  43. return self._invoke_with_tools(messages, **kwargs)
  44. else:
  45. return self._invoke_without_tools(messages, **kwargs)
  46. except Exception as e:
  47. print(f"[ERROR] LLM包装器调用失败: {str(e)}")
  48. return AIMessage(content=f"LLM调用失败: {str(e)}")
  49. def _should_use_tools(self, messages: List[BaseMessage]) -> bool:
  50. """判断是否应该使用工具"""
  51. # 检查最后一条消息是否包含工具相关的指令
  52. if messages:
  53. last_message = messages[-1]
  54. if isinstance(last_message, HumanMessage):
  55. content = last_message.content.lower()
  56. # 检查是否包含工具相关的关键词
  57. tool_keywords = ["生成sql", "执行sql", "generate sql", "execute sql", "查询", "数据库"]
  58. return any(keyword in content for keyword in tool_keywords)
  59. return True # 默认使用工具
  60. def _invoke_with_tools(self, messages: List[BaseMessage], **kwargs):
  61. """使用工具调用的方式"""
  62. try:
  63. # 构建工具调用提示
  64. tool_prompt = self._build_tool_prompt(messages)
  65. # 调用底层LLM
  66. response = self.llm.submit_prompt(tool_prompt, **kwargs)
  67. # 解析工具调用
  68. tool_calls = self._parse_tool_calls(response)
  69. if tool_calls:
  70. # 如果有工具调用,返回包含工具调用的AIMessage
  71. return AIMessage(
  72. content=response,
  73. tool_calls=tool_calls
  74. )
  75. else:
  76. # 没有工具调用,返回普通响应
  77. return AIMessage(content=response)
  78. except Exception as e:
  79. print(f"[ERROR] 工具调用失败: {str(e)}")
  80. return self._invoke_without_tools(messages, **kwargs)
  81. def _invoke_without_tools(self, messages: List[BaseMessage], **kwargs):
  82. """不使用工具的普通调用"""
  83. # 转换消息格式
  84. prompt = []
  85. for msg in messages:
  86. if isinstance(msg, SystemMessage):
  87. prompt.append(self.llm.system_message(msg.content))
  88. elif isinstance(msg, HumanMessage):
  89. prompt.append(self.llm.user_message(msg.content))
  90. elif isinstance(msg, AIMessage):
  91. prompt.append(self.llm.assistant_message(msg.content))
  92. else:
  93. prompt.append(self.llm.user_message(str(msg.content)))
  94. # 调用底层LLM
  95. response = self.llm.submit_prompt(prompt, **kwargs)
  96. # 返回LangChain格式的结果
  97. return AIMessage(content=response)
  98. def _build_tool_prompt(self, messages: List[BaseMessage]) -> List[Dict[str, str]]:
  99. """构建包含工具信息的提示"""
  100. prompt = []
  101. # 添加系统消息,包含工具定义
  102. system_content = self._get_system_message_with_tools(messages)
  103. prompt.append(self.llm.system_message(system_content))
  104. # 添加用户消息
  105. for msg in messages:
  106. if isinstance(msg, HumanMessage):
  107. prompt.append(self.llm.user_message(msg.content))
  108. elif isinstance(msg, AIMessage) and not isinstance(msg, SystemMessage):
  109. prompt.append(self.llm.assistant_message(msg.content))
  110. return prompt
  111. def _get_system_message_with_tools(self, messages: List[BaseMessage]) -> str:
  112. """获取包含工具定义的系统消息"""
  113. # 查找原始系统消息
  114. original_system = ""
  115. for msg in messages:
  116. if isinstance(msg, SystemMessage):
  117. original_system = msg.content
  118. break
  119. # 构建工具定义
  120. tool_definitions = []
  121. for tool in self._bound_tools:
  122. tool_def = {
  123. "name": tool.name,
  124. "description": tool.description,
  125. "parameters": getattr(tool, 'args_schema', {})
  126. }
  127. tool_definitions.append(f"- {tool.name}: {tool.description}")
  128. # 组合系统消息
  129. if tool_definitions:
  130. tools_text = "\n".join(tool_definitions)
  131. return f"""{original_system}
  132. 你有以下工具可以使用:
  133. {tools_text}
  134. 使用工具时,请明确说明你要调用哪个工具以及需要的参数。对于数据库查询问题,请按照以下步骤:
  135. 1. 使用 generate_sql 工具生成SQL查询
  136. 2. 使用 execute_sql 工具执行SQL查询
  137. 3. 使用 generate_summary 工具生成结果摘要
  138. 请直接开始执行工具调用,不要只是提供指导。"""
  139. else:
  140. return original_system
  141. def _parse_tool_calls(self, response: str) -> List[Dict[str, Any]]:
  142. """解析LLM响应中的工具调用"""
  143. tool_calls = []
  144. # 简单的工具调用解析逻辑
  145. # 这里可以根据实际的LLM响应格式进行调整
  146. response_lower = response.lower()
  147. if "generate_sql" in response_lower:
  148. tool_calls.append({
  149. "name": "generate_sql",
  150. "args": {},
  151. "id": "generate_sql_call"
  152. })
  153. return tool_calls
  154. @property
  155. def model_name(self) -> str:
  156. return self._model_name
  157. def bind_tools(self, tools):
  158. """绑定工具(用于支持工具调用)"""
  159. self._bound_tools = tools if isinstance(tools, list) else [tools]
  160. return self
  161. def get_compatible_llm():
  162. """获取兼容的LLM实例"""
  163. try:
  164. from common.utils import get_current_llm_config
  165. llm_config = get_current_llm_config()
  166. # 尝试使用标准的OpenAI兼容API
  167. if llm_config.get("base_url") and llm_config.get("api_key"):
  168. try:
  169. from langchain_openai import ChatOpenAI
  170. llm = ChatOpenAI(
  171. base_url=llm_config.get("base_url"),
  172. api_key=llm_config.get("api_key"),
  173. model=llm_config.get("model"),
  174. temperature=llm_config.get("temperature", 0.7)
  175. )
  176. print("[INFO] 使用标准OpenAI兼容API")
  177. return llm
  178. except ImportError:
  179. print("[WARNING] langchain_openai 未安装,使用 Vanna 实例包装器")
  180. # 优先使用统一的 Vanna 实例
  181. from common.vanna_instance import get_vanna_instance
  182. vn = get_vanna_instance()
  183. print("[INFO] 使用Vanna实例包装器")
  184. return LLMWrapper(vn)
  185. except Exception as e:
  186. print(f"[ERROR] 获取 Vanna 实例失败: {str(e)}")
  187. # 回退到原有逻辑
  188. from common.utils import get_current_llm_config
  189. from customllm.qianwen_chat import QianWenChat
  190. llm_config = get_current_llm_config()
  191. custom_llm = QianWenChat(config=llm_config)
  192. print("[INFO] 使用QianWen包装器")
  193. return LLMWrapper(custom_llm)
  194. def _is_valid_sql_format(sql_text: str) -> bool:
  195. """验证文本是否为有效的SQL查询格式"""
  196. if not sql_text or not sql_text.strip():
  197. return False
  198. sql_clean = sql_text.strip().upper()
  199. # 检查是否以SQL关键字开头
  200. sql_keywords = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP', 'WITH']
  201. starts_with_sql = any(sql_clean.startswith(keyword) for keyword in sql_keywords)
  202. # 检查是否包含解释性语言
  203. explanation_phrases = [
  204. '无法', '不能', '抱歉', 'SORRY', 'UNABLE', 'CANNOT',
  205. '需要更多信息', '请提供', '表不存在', '字段不存在',
  206. '不清楚', '不确定', '没有足够', '无法理解', '无法生成',
  207. '无法确定', '不支持', '不可用', '缺少', '未找到'
  208. ]
  209. contains_explanation = any(phrase in sql_clean for phrase in explanation_phrases)
  210. return starts_with_sql and not contains_explanation