agent.py 78 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675
  1. """
  2. 基于 StateGraph 的、具备上下文感知能力的 React Agent 核心实现
  3. """
  4. import json
  5. import pandas as pd
  6. import httpx
  7. import sys
  8. import os
  9. from pathlib import Path
  10. from typing import List, Optional, Dict, Any, Tuple
  11. from contextlib import AsyncExitStack
  12. # 添加项目根目录到sys.path以解决模块导入问题
  13. try:
  14. project_root = Path(__file__).parent.parent
  15. if str(project_root) not in sys.path:
  16. sys.path.insert(0, str(project_root))
  17. except Exception as e:
  18. pass # 忽略路径添加错误
  19. # 使用统一日志系统
  20. try:
  21. # 尝试相对导入(当作为模块导入时)
  22. from core.logging import get_react_agent_logger
  23. except ImportError:
  24. # 如果相对导入失败,尝试绝对导入(直接运行时)
  25. from core.logging import get_react_agent_logger
  26. from langchain_openai import ChatOpenAI
  27. from langchain_core.messages import HumanMessage, ToolMessage, BaseMessage, SystemMessage, AIMessage
  28. from langgraph.graph import StateGraph, END
  29. from langgraph.prebuilt import ToolNode
  30. import redis.asyncio as redis
  31. try:
  32. from langgraph.checkpoint.redis import AsyncRedisSaver
  33. except ImportError:
  34. AsyncRedisSaver = None
  35. # 从新模块导入配置、状态和工具
  36. try:
  37. # 尝试相对导入(当作为模块导入时)
  38. from . import config
  39. from .state import AgentState
  40. from .sql_tools import sql_tools
  41. except ImportError:
  42. # 如果相对导入失败,尝试绝对导入(直接运行时)
  43. import config
  44. from state import AgentState
  45. from sql_tools import sql_tools
  46. from langchain_core.runnables import RunnablePassthrough
  47. logger = get_react_agent_logger("CustomReactAgent")
  48. class CustomReactAgent:
  49. """
  50. 一个使用 StateGraph 构建的、具备上下文感知和持久化能力的 Agent。
  51. """
  52. def __init__(self):
  53. """私有构造函数,请使用 create() 类方法来创建实例。"""
  54. self.llm = None
  55. self.tools = None
  56. self.agent_executor = None
  57. self.checkpointer = None
  58. self._exit_stack = None
  59. self.redis_client = None
  60. @classmethod
  61. async def create(cls):
  62. """异步工厂方法,创建并初始化 CustomReactAgent 实例。"""
  63. instance = cls()
  64. await instance._async_init()
  65. return instance
  66. async def _async_init(self):
  67. """异步初始化所有组件。"""
  68. logger.info("🚀 开始初始化 CustomReactAgent...")
  69. # 1. 初始化异步Redis客户端
  70. self.redis_client = redis.from_url(config.REDIS_URL, decode_responses=True)
  71. try:
  72. await self.redis_client.ping()
  73. logger.info(f" ✅ Redis连接成功: {config.REDIS_URL}")
  74. except Exception as e:
  75. logger.error(f" ❌ Redis连接失败: {e}")
  76. raise
  77. # 2. 初始化 LLM
  78. self.llm = ChatOpenAI(
  79. api_key=config.QWEN_API_KEY,
  80. base_url=config.QWEN_BASE_URL,
  81. model=config.QWEN_MODEL,
  82. temperature=0.1,
  83. timeout=config.NETWORK_TIMEOUT, # 添加超时配置
  84. max_retries=0, # 禁用OpenAI客户端重试,改用Agent层统一重试
  85. streaming=True,
  86. extra_body={
  87. "enable_thinking": True, # False by wxq
  88. "misc": {
  89. "ensure_ascii": False
  90. }
  91. },
  92. # 新增:优化HTTP连接配置
  93. http_client=httpx.Client(
  94. limits=httpx.Limits(
  95. max_connections=config.HTTP_MAX_CONNECTIONS,
  96. max_keepalive_connections=config.HTTP_MAX_KEEPALIVE_CONNECTIONS,
  97. keepalive_expiry=config.HTTP_KEEPALIVE_EXPIRY, # 30秒keep-alive过期
  98. ),
  99. timeout=httpx.Timeout(
  100. connect=config.HTTP_CONNECT_TIMEOUT, # 连接超时
  101. read=config.NETWORK_TIMEOUT, # 读取超时
  102. write=config.HTTP_CONNECT_TIMEOUT, # 写入超时
  103. pool=config.HTTP_POOL_TIMEOUT # 连接池超时
  104. )
  105. )
  106. )
  107. logger.info(f" ReactAgent LLM 已初始化,模型: {config.QWEN_MODEL}")
  108. # 3. 绑定工具
  109. self.tools = sql_tools
  110. self.llm_with_tools = self.llm.bind_tools(self.tools)
  111. logger.info(f" 已绑定 {len(self.tools)} 个工具。")
  112. # 4. 初始化 Redis Checkpointer
  113. if config.REDIS_ENABLED and AsyncRedisSaver is not None:
  114. try:
  115. self._exit_stack = AsyncExitStack()
  116. checkpointer_manager = AsyncRedisSaver.from_conn_string(config.REDIS_URL)
  117. self.checkpointer = await self._exit_stack.enter_async_context(checkpointer_manager)
  118. await self.checkpointer.asetup()
  119. logger.info(f" AsyncRedisSaver 持久化已启用: {config.REDIS_URL}")
  120. except Exception as e:
  121. logger.error(f" ❌ RedisSaver 初始化失败: {e}", exc_info=True)
  122. if self._exit_stack:
  123. await self._exit_stack.aclose()
  124. self.checkpointer = None
  125. else:
  126. logger.warning(" Redis 持久化功能已禁用。")
  127. # 5. 构建 StateGraph
  128. self.agent_executor = self._create_graph()
  129. logger.info(" StateGraph 已构建并编译。")
  130. logger.info("✅ CustomReactAgent 初始化完成。")
  131. async def _reinitialize_checkpointer(self):
  132. """重新初始化checkpointer连接"""
  133. try:
  134. # 清理旧的连接
  135. if self._exit_stack:
  136. try:
  137. await self._exit_stack.aclose()
  138. except:
  139. pass
  140. # 重新创建
  141. if config.REDIS_ENABLED and AsyncRedisSaver is not None:
  142. self._exit_stack = AsyncExitStack()
  143. checkpointer_manager = AsyncRedisSaver.from_conn_string(config.REDIS_URL)
  144. self.checkpointer = await self._exit_stack.enter_async_context(checkpointer_manager)
  145. await self.checkpointer.asetup()
  146. logger.info("✅ Checkpointer重新初始化成功")
  147. else:
  148. self.checkpointer = None
  149. logger.warning("⚠️ Redis禁用,checkpointer设为None")
  150. except Exception as e:
  151. logger.error(f"❌ Checkpointer重新初始化失败: {e}")
  152. self.checkpointer = None
  153. async def close(self):
  154. """清理资源,关闭 Redis 连接。"""
  155. if self._exit_stack:
  156. await self._exit_stack.aclose()
  157. self._exit_stack = None
  158. self.checkpointer = None
  159. logger.info("✅ RedisSaver 资源已通过 AsyncExitStack 释放。")
  160. if self.redis_client:
  161. await self.redis_client.aclose()
  162. logger.info("✅ Redis客户端已关闭。")
  163. def _create_graph(self):
  164. """定义并编译最终的、正确的 StateGraph 结构。"""
  165. builder = StateGraph(AgentState)
  166. # 定义所有需要的节点 - 全部改为异步
  167. builder.add_node("agent", self._async_agent_node)
  168. builder.add_node("prepare_tool_input", self._async_prepare_tool_input_node)
  169. builder.add_node("tools", ToolNode(self.tools))
  170. builder.add_node("update_state_after_tool", self._async_update_state_after_tool_node)
  171. builder.add_node("format_final_response", self._async_format_final_response_node)
  172. # 建立正确的边连接
  173. builder.set_entry_point("agent")
  174. builder.add_conditional_edges(
  175. "agent",
  176. self._async_should_continue,
  177. {
  178. "continue": "prepare_tool_input",
  179. "end": "format_final_response"
  180. }
  181. )
  182. builder.add_edge("prepare_tool_input", "tools")
  183. builder.add_edge("tools", "update_state_after_tool")
  184. builder.add_edge("update_state_after_tool", "agent")
  185. builder.add_edge("format_final_response", END)
  186. return builder.compile(checkpointer=self.checkpointer)
  187. async def _async_should_continue(self, state: AgentState) -> str:
  188. """异步判断是继续调用工具还是结束。"""
  189. thread_id = state.get("thread_id", "unknown")
  190. messages = state["messages"]
  191. total_messages = len(messages)
  192. # 显示当前递归计数
  193. current_count = getattr(self, '_recursion_count', 0)
  194. logger.info(f"🔄 [Decision] _async_should_continue - Thread: {thread_id} | 递归计数: {current_count}/{config.RECURSION_LIMIT}")
  195. logger.info(f" 消息总数: {total_messages}")
  196. if not messages:
  197. logger.warning(" ⚠️ 消息列表为空,返回 'end'")
  198. return "end"
  199. last_message = messages[-1]
  200. message_type = type(last_message).__name__
  201. logger.info(f" 最后消息类型: {message_type}")
  202. # 检查是否有tool_calls
  203. has_tool_calls = hasattr(last_message, "tool_calls") and last_message.tool_calls
  204. if has_tool_calls:
  205. tool_calls_count = len(last_message.tool_calls)
  206. logger.info(f" 发现工具调用: {tool_calls_count} 个")
  207. # 详细记录每个工具调用
  208. for i, tool_call in enumerate(last_message.tool_calls):
  209. tool_name = tool_call.get('name', 'unknown')
  210. tool_id = tool_call.get('id', 'unknown')
  211. logger.info(f" 工具调用[{i}]: {tool_name} (ID: {tool_id})")
  212. logger.info(" 🔄 决策: continue (继续工具调用)")
  213. return "continue"
  214. else:
  215. logger.info(" ✅ 无工具调用")
  216. # 检查消息内容以了解为什么结束
  217. if hasattr(last_message, 'content'):
  218. content_preview = str(last_message.content)[:100] + "..." if len(str(last_message.content)) > 100 else str(last_message.content)
  219. logger.info(f" 消息内容预览: {content_preview}")
  220. logger.info(" 🏁 决策: end (结束对话)")
  221. return "end"
  222. async def _async_agent_node(self, state: AgentState) -> Dict[str, Any]:
  223. """异步Agent 节点:使用异步LLM调用。"""
  224. # 增加递归计数
  225. if hasattr(self, '_recursion_count'):
  226. self._recursion_count += 1
  227. else:
  228. self._recursion_count = 1
  229. logger.info(f"🧠 [Async Node] agent - Thread: {state['thread_id']} | 递归计数: {self._recursion_count}/{config.RECURSION_LIMIT}")
  230. # 获取建议的下一步操作
  231. next_step = state.get("suggested_next_step")
  232. # 构建发送给LLM的消息列表
  233. messages_for_llm = state["messages"].copy()
  234. # 🎯 添加数据库范围系统提示词(每次用户提问时添加)
  235. if isinstance(state["messages"][-1], HumanMessage):
  236. db_scope_prompt = self._get_database_scope_prompt()
  237. if db_scope_prompt:
  238. messages_for_llm.insert(0, SystemMessage(content=db_scope_prompt))
  239. logger.info(" ✅ 已添加数据库范围判断提示词")
  240. # 检查是否需要分析验证错误
  241. # 行为指令与工具建议分离
  242. real_tools = {'valid_sql', 'run_sql'}
  243. if next_step:
  244. if next_step in real_tools:
  245. # 场景1: 建议调用一个真实的工具
  246. instruction = f"Suggestion: Based on the previous step, please use the '{next_step}' tool to continue."
  247. messages_for_llm.append(SystemMessage(content=instruction))
  248. logger.info(f" ✅ 已添加工具建议: {next_step}")
  249. elif next_step == "analyze_validation_error":
  250. # 场景2: 分析SQL验证错误(特殊指令)
  251. for msg in reversed(state["messages"]):
  252. if isinstance(msg, ToolMessage) and msg.name == "valid_sql":
  253. error_guidance = self._generate_validation_error_guidance(msg.content)
  254. messages_for_llm.append(SystemMessage(content=error_guidance))
  255. logger.info(" ✅ 已添加SQL验证错误指导")
  256. break
  257. elif next_step == 'summarize_final_answer':
  258. # 场景3: 总结最终答案(行为指令)
  259. instruction = "System Instruction: The SQL query was executed successfully. Please analyze the JSON data in the last message and summarize it in natural, user-friendly language as the final answer. Do not expose the raw JSON data or SQL statements in your response."
  260. messages_for_llm.append(SystemMessage(content=instruction))
  261. logger.info(" ✅ 已添加 '总结答案' 行为指令")
  262. elif next_step == 'answer_with_common_sense':
  263. # 场景4: 基于常识回答(特殊指令)
  264. instruction = (
  265. "无法为当前问题生成有效的SQL查询。失败原因已在上下文中提供。"
  266. "请你直接利用自身的知识库来回答用户的问题,不要再重复解释失败的原因。"
  267. )
  268. messages_for_llm.append(SystemMessage(content=instruction))
  269. logger.info("✅ 已添加 '常识回答' 行为指令")
  270. # 🛡️ 添加防幻觉系统提示词(重点防止参数篡改)
  271. anti_hallucination_prompt = self._get_anti_hallucination_prompt(state)
  272. if anti_hallucination_prompt:
  273. messages_for_llm.append(SystemMessage(content=anti_hallucination_prompt))
  274. logger.info(" 🛡️ 已添加防幻觉系统提示词")
  275. # 🔍 【新增】详细日志:发送给LLM的完整消息列表(按实际提交顺序)
  276. logger.info("📤 发送给LLM的完整消息列表和参数:")
  277. logger.info(f" 总消息数: {len(messages_for_llm)}")
  278. logger.info(" 消息详情:")
  279. for i, msg in enumerate(messages_for_llm):
  280. msg_type = type(msg).__name__
  281. content = str(msg.content)
  282. # 对于长内容,显示前500字符并标记
  283. if len(content) > 500:
  284. content_display = content[:500] + f"... (内容被截断,完整长度: {len(content)}字符)"
  285. else:
  286. content_display = content
  287. logger.info(f" [{i}] {msg_type}:")
  288. # 多行显示内容,便于阅读
  289. for line in content_display.split('\n'):
  290. logger.info(f" {line}")
  291. # 添加重试机制处理网络连接问题
  292. import asyncio
  293. max_retries = config.MAX_RETRIES
  294. for attempt in range(max_retries):
  295. try:
  296. # 🔍 【调试】打印LLM调用的详细信息
  297. logger.info(f"🚀 准备调用LLM (尝试 {attempt + 1}/{max_retries})")
  298. logger.info(f" LLM实例: {type(self.llm_with_tools)}")
  299. logger.info(f" 消息数量: {len(messages_for_llm)}")
  300. # 🔍 【调试】检查消息格式是否正确
  301. for i, msg in enumerate(messages_for_llm):
  302. logger.debug(f" 消息[{i}] 类型: {type(msg)}")
  303. logger.debug(f" 消息[{i}] 有content: {hasattr(msg, 'content')}")
  304. if hasattr(msg, 'content'):
  305. logger.debug(f" 消息[{i}] content类型: {type(msg.content)}")
  306. logger.debug(f" 消息[{i}] content长度: {len(str(msg.content))}")
  307. # 使用异步调用
  308. logger.info("🔄 开始调用LLM...")
  309. response = await self.llm_with_tools.ainvoke(messages_for_llm)
  310. logger.info("✅ LLM调用完成")
  311. # 🔍 【调试】详细的响应检查和日志
  312. logger.info(f" 响应类型: {type(response)}")
  313. logger.info(f" 响应有content: {hasattr(response, 'content')}")
  314. logger.info(f" 响应有tool_calls: {hasattr(response, 'tool_calls')}")
  315. logger.info(f" LLM原始响应内容: '{response.content}'")
  316. logger.info(f" 响应内容长度: {len(response.content) if response.content else 0}")
  317. logger.info(f" 响应内容类型: {type(response.content)}")
  318. if hasattr(response, 'tool_calls'):
  319. logger.info(f" LLM是否有工具调用: {response.tool_calls}")
  320. else:
  321. logger.info(f" LLM是否有工具调用: 无tool_calls属性")
  322. if hasattr(response, 'tool_calls') and response.tool_calls:
  323. logger.info(f" 工具调用数量: {len(response.tool_calls)}")
  324. for i, tool_call in enumerate(response.tool_calls):
  325. logger.info(f" 工具调用[{i}]: {tool_call.get('name', 'Unknown')}")
  326. # 🎯 改进的响应检查和重试逻辑
  327. # 检查空响应情况 - 将空响应也视为需要重试的情况
  328. if not response.content and not (hasattr(response, 'tool_calls') and response.tool_calls):
  329. logger.warning(" ⚠️ LLM返回空响应且无工具调用")
  330. if attempt < max_retries - 1:
  331. # 空响应也进行重试
  332. wait_time = config.RETRY_BASE_DELAY * (2 ** attempt)
  333. logger.info(f" 🔄 空响应重试,{wait_time}秒后重试...")
  334. await asyncio.sleep(wait_time)
  335. continue
  336. else:
  337. # 所有重试都失败,返回降级回答
  338. logger.error(f" ❌ 多次尝试仍返回空响应,返回降级回答")
  339. fallback_content = "抱歉,我现在无法正确处理您的问题。请稍后重试或重新表述您的问题。"
  340. fallback_response = AIMessage(content=fallback_content)
  341. return {"messages": [fallback_response]}
  342. elif response.content and response.content.strip() == "":
  343. logger.warning(" ⚠️ LLM返回只包含空白字符的内容")
  344. if attempt < max_retries - 1:
  345. # 空白字符也进行重试
  346. wait_time = config.RETRY_BASE_DELAY * (2 ** attempt)
  347. logger.info(f" 🔄 空白字符重试,{wait_time}秒后重试...")
  348. await asyncio.sleep(wait_time)
  349. continue
  350. else:
  351. # 所有重试都失败,返回降级回答
  352. logger.error(f" ❌ 多次尝试仍返回空白字符,返回降级回答")
  353. fallback_content = "抱歉,我现在无法正确处理您的问题。请稍后重试或重新表述您的问题。"
  354. fallback_response = AIMessage(content=fallback_content)
  355. return {"messages": [fallback_response]}
  356. elif not response.content and hasattr(response, 'tool_calls') and response.tool_calls:
  357. logger.info(" ✅ LLM只返回工具调用,无文本内容(正常情况)")
  358. # 🎯 最终检查:确保响应是有效的
  359. if ((response.content and response.content.strip()) or
  360. (hasattr(response, 'tool_calls') and response.tool_calls)):
  361. logger.info(f" ✅ 异步LLM调用成功,返回有效响应")
  362. return {"messages": [response]}
  363. else:
  364. # 这种情况理论上不应该发生,但作为最后的保障
  365. logger.error(f" ❌ 意外的响应格式,进行重试")
  366. if attempt < max_retries - 1:
  367. wait_time = config.RETRY_BASE_DELAY * (2 ** attempt)
  368. logger.info(f" 🔄 意外响应格式重试,{wait_time}秒后重试...")
  369. await asyncio.sleep(wait_time)
  370. continue
  371. else:
  372. fallback_content = "抱歉,我现在无法正确处理您的问题。请稍后重试或重新表述您的问题。"
  373. fallback_response = AIMessage(content=fallback_content)
  374. return {"messages": [fallback_response]}
  375. except Exception as e:
  376. error_msg = str(e)
  377. error_type = type(e).__name__
  378. logger.warning(f" ⚠️ LLM调用失败 (尝试 {attempt + 1}/{max_retries}): {error_type}: {error_msg}")
  379. # 🎯 改进的错误分类逻辑:检查异常类型和错误消息
  380. is_network_error = False
  381. is_parameter_error = False
  382. # 1. 检查异常类型
  383. network_exception_types = [
  384. 'APIConnectionError', 'ConnectTimeout', 'ReadTimeout',
  385. 'TimeoutError', 'APITimeoutError', 'ConnectError',
  386. 'HTTPError', 'RequestException', 'ConnectionError'
  387. ]
  388. if error_type in network_exception_types:
  389. is_network_error = True
  390. logger.info(f" 📊 根据异常类型判断为网络错误: {error_type}")
  391. # 2. 检查BadRequestError中的参数错误
  392. if error_type == 'BadRequestError':
  393. # 检查是否是消息格式错误
  394. if any(keyword in error_msg.lower() for keyword in [
  395. 'must be followed by tool messages',
  396. 'invalid_parameter_error',
  397. 'assistant message with "tool_calls"',
  398. 'tool_call_id',
  399. 'message format'
  400. ]):
  401. is_parameter_error = True
  402. logger.info(f" 📊 根据错误消息判断为参数格式错误: {error_msg[:100]}...")
  403. # 3. 检查错误消息内容(不区分大小写)
  404. error_msg_lower = error_msg.lower()
  405. network_keywords = [
  406. 'connection error', 'connect error', 'timeout', 'timed out',
  407. 'network', 'connection refused', 'connection reset',
  408. 'remote host', '远程主机', '网络连接', '连接超时',
  409. 'request timed out', 'read timeout', 'connect timeout'
  410. ]
  411. for keyword in network_keywords:
  412. if keyword in error_msg_lower:
  413. is_network_error = True
  414. logger.info(f" 📊 根据错误消息判断为网络错误: '{keyword}' in '{error_msg}'")
  415. break
  416. # 处理可重试的错误
  417. if is_network_error or is_parameter_error:
  418. if attempt < max_retries - 1:
  419. # 渐进式重试间隔:3, 6, 12秒
  420. wait_time = config.RETRY_BASE_DELAY * (2 ** attempt)
  421. error_type_desc = "网络错误" if is_network_error else "参数格式错误"
  422. logger.info(f" 🔄 {error_type_desc},{wait_time}秒后重试...")
  423. # 🎯 对于参数错误,修复消息历史后重试
  424. if is_parameter_error:
  425. try:
  426. messages_for_llm = await self._handle_parameter_error_with_retry(
  427. messages_for_llm, error_msg, attempt
  428. )
  429. logger.info(f" 🔧 消息历史修复完成,继续重试...")
  430. except Exception as fix_error:
  431. logger.error(f" ❌ 消息历史修复失败: {fix_error}")
  432. # 修复失败,使用原始消息继续重试
  433. await asyncio.sleep(wait_time)
  434. continue
  435. else:
  436. # 所有重试都失败了,返回一个降级的回答
  437. error_type_desc = "网络连接" if is_network_error else "请求格式"
  438. logger.error(f" ❌ {error_type_desc}持续失败,返回降级回答")
  439. # 检查是否有SQL执行结果可以利用
  440. sql_data = await self._async_extract_latest_sql_data(state["messages"])
  441. if sql_data:
  442. fallback_content = f"抱歉,由于{error_type_desc}问题,无法生成完整的文字总结。不过查询已成功执行,结果如下:\n\n" + sql_data
  443. else:
  444. fallback_content = f"抱歉,由于{error_type_desc}问题,无法完成此次请求。请稍后重试或检查网络连接。"
  445. fallback_response = AIMessage(content=fallback_content)
  446. return {"messages": [fallback_response]}
  447. else:
  448. # 非网络错误,直接抛出
  449. logger.error(f" ❌ LLM调用出现非可重试错误: {error_type}: {error_msg}")
  450. raise e
  451. def _print_state_info(self, state: AgentState, node_name: str) -> None:
  452. """
  453. 打印 state 的全部信息,用于调试
  454. """
  455. logger.info(" ~" * 10 + " State Print Start" + " ~" * 10)
  456. logger.info(f"📋 [State Debug] {node_name} - 当前状态信息:")
  457. # 🎯 打印 state 中的所有字段
  458. logger.info(" State中的所有字段:")
  459. for key, value in state.items():
  460. if key == "messages":
  461. logger.info(f" {key}: {len(value)} 条消息")
  462. else:
  463. logger.info(f" {key}: {value}")
  464. # 原有的详细消息信息
  465. logger.info(f" 用户ID: {state.get('user_id', 'N/A')}")
  466. logger.info(f" 线程ID: {state.get('thread_id', 'N/A')}")
  467. logger.info(f" 建议下一步: {state.get('suggested_next_step', 'N/A')}")
  468. messages = state.get("messages", [])
  469. logger.info(f" 消息历史数量: {len(messages)}")
  470. if messages:
  471. logger.info(" 最近的消息:")
  472. for i, msg in enumerate(messages[-10:], start=max(0, len(messages)-10)): # 显示最后10条消息
  473. msg_type = type(msg).__name__
  474. content = str(msg.content)
  475. # 对于长内容,使用多行显示
  476. if len(content) > 200:
  477. logger.info(f" [{i}] {msg_type}:")
  478. logger.info(f" {content}")
  479. else:
  480. logger.info(f" [{i}] {msg_type}: {content}")
  481. # 如果是 AIMessage 且有工具调用,显示工具调用信息
  482. if hasattr(msg, 'tool_calls') and msg.tool_calls:
  483. for tool_call in msg.tool_calls:
  484. tool_name = tool_call.get('name', 'Unknown')
  485. tool_args = tool_call.get('args', {})
  486. logger.info(f" 工具调用: {tool_name}")
  487. # 对于复杂参数,使用JSON格式化
  488. import json
  489. try:
  490. formatted_args = json.dumps(tool_args, ensure_ascii=False, indent=2)
  491. logger.info(f" 参数:")
  492. for line in formatted_args.split('\n'):
  493. logger.info(f" {line}")
  494. except Exception:
  495. logger.info(f" 参数: {str(tool_args)}")
  496. logger.info(" ~" * 10 + " State Print End" + " ~" * 10)
  497. async def _async_prepare_tool_input_node(self, state: AgentState) -> Dict[str, Any]:
  498. """异步准备工具输入节点:为generate_sql工具注入history_messages。"""
  499. # 增加递归计数
  500. if hasattr(self, '_recursion_count'):
  501. self._recursion_count += 1
  502. else:
  503. self._recursion_count = 1
  504. logger.info(f"🔧 [Async Node] prepare_tool_input - Thread: {state['thread_id']} | 递归计数: {self._recursion_count}/{config.RECURSION_LIMIT}")
  505. # 获取最后一条消息(应该是来自agent的AIMessage)
  506. last_message = state["messages"][-1]
  507. if not isinstance(last_message, AIMessage) or not last_message.tool_calls:
  508. return {"messages": [last_message]}
  509. # 强制修正LLM幻觉出的问题
  510. for tool_call in last_message.tool_calls:
  511. if tool_call['name'] == 'generate_sql':
  512. original_user_question = next((msg.content for msg in reversed(state['messages']) if isinstance(msg, HumanMessage)), None)
  513. if original_user_question and tool_call['args'].get('question') != original_user_question:
  514. logger.warning(
  515. f"修正 'generate_sql' 的问题参数。\n"
  516. f" - LLM提供: '{tool_call['args'].get('question')}'\n"
  517. f" + 修正为: '{original_user_question}'"
  518. )
  519. tool_call['args']['question'] = original_user_question
  520. # 恢复原始的、更健壮的历史消息过滤和注入逻辑
  521. new_tool_calls = []
  522. for tool_call in last_message.tool_calls:
  523. if tool_call["name"] == "generate_sql":
  524. logger.info("检测到 generate_sql 调用,开始注入历史消息。")
  525. modified_args = tool_call["args"].copy()
  526. clean_history = []
  527. # 找到当前用户问题,确保不包含在历史上下文中
  528. current_user_question = None
  529. messages_for_history = []
  530. # 从最后开始找到当前用户问题
  531. for i in range(len(state["messages"]) - 1, -1, -1):
  532. msg = state["messages"][i]
  533. if isinstance(msg, HumanMessage):
  534. current_user_question = msg.content
  535. messages_for_history = state["messages"][:i] # 排除当前用户问题及之后的消息
  536. break
  537. # 处理历史消息,确保不包含当前用户问题
  538. for msg in messages_for_history:
  539. if isinstance(msg, HumanMessage):
  540. clean_history.append({"type": "human", "content": msg.content})
  541. elif isinstance(msg, AIMessage):
  542. if not msg.tool_calls and msg.content:
  543. # 注释掉 [Formatted Output] 清理逻辑 - 源头已不生成前缀
  544. # clean_content = msg.content.replace("[Formatted Output]\n", "").strip()
  545. clean_content = msg.content.strip()
  546. if clean_content:
  547. clean_history.append({"type": "ai", "content": clean_content})
  548. modified_args["history_messages"] = clean_history
  549. logger.info(f"注入了 {len(clean_history)} 条过滤后的历史消息")
  550. new_tool_calls.append({
  551. "name": tool_call["name"],
  552. "args": modified_args,
  553. "id": tool_call["id"],
  554. })
  555. else:
  556. new_tool_calls.append(tool_call)
  557. last_message.tool_calls = new_tool_calls
  558. return {"messages": [last_message]}
  559. def _filter_and_format_history(self, messages: list) -> list:
  560. """
  561. 过滤和格式化历史消息,为generate_sql工具提供干净的上下文。
  562. 只保留历史中的用户提问和AI的最终回答。
  563. """
  564. clean_history = []
  565. # 处理除最后一个(即当前的工具调用)之外的所有消息
  566. messages_to_process = messages[:-1]
  567. for msg in messages_to_process:
  568. if isinstance(msg, HumanMessage):
  569. clean_history.append({"type": "human", "content": msg.content})
  570. elif isinstance(msg, AIMessage):
  571. # 只保留最终的、面向用户的回答(不包含工具调用的纯文本回答)
  572. if not msg.tool_calls and msg.content:
  573. # 注释掉 [Formatted Output] 清理逻辑 - 源头已不生成前缀
  574. # clean_content = msg.content.replace("[Formatted Output]\n", "").strip()
  575. clean_content = msg.content.strip()
  576. if clean_content:
  577. clean_history.append({"type": "ai", "content": clean_content})
  578. return clean_history
  579. async def _async_update_state_after_tool_node(self, state: AgentState) -> Dict[str, Any]:
  580. """异步更新工具执行后的状态。"""
  581. # 增加递归计数
  582. if hasattr(self, '_recursion_count'):
  583. self._recursion_count += 1
  584. else:
  585. self._recursion_count = 1
  586. logger.info(f"📝 [Async Node] update_state_after_tool - Thread: {state['thread_id']} | 递归计数: {self._recursion_count}/{config.RECURSION_LIMIT}")
  587. # 获取最后一条工具消息
  588. last_message = state["messages"][-1]
  589. tool_name = last_message.name
  590. tool_output = last_message.content
  591. next_step = None
  592. if tool_name == 'generate_sql':
  593. # 使用 .lower() 将输出转为小写,可以同时捕获 "failed" 和 "Failed" 等情况
  594. tool_output_lower = tool_output.lower()
  595. if "failed" in tool_output_lower or "无法生成" in tool_output_lower or "失败" in tool_output_lower:
  596. next_step = 'answer_with_common_sense'
  597. else:
  598. next_step = 'valid_sql'
  599. elif tool_name == 'valid_sql':
  600. if "失败" in tool_output:
  601. next_step = 'analyze_validation_error'
  602. else:
  603. next_step = 'run_sql'
  604. elif tool_name == 'run_sql':
  605. next_step = 'summarize_final_answer'
  606. logger.info(f" Tool '{tool_name}' executed. Suggested next step: {next_step}")
  607. return {"suggested_next_step": next_step}
  608. def _clear_history_messages_parameter(self, messages: List[BaseMessage]) -> None:
  609. """
  610. 将 generate_sql 工具的 history_messages 参数设置为空字符串
  611. """
  612. for message in messages:
  613. if hasattr(message, "tool_calls") and message.tool_calls:
  614. for tool_call in message.tool_calls:
  615. if tool_call["name"] == "generate_sql" and "history_messages" in tool_call["args"]:
  616. tool_call["args"]["history_messages"] = ""
  617. logger.info(f" 已将 generate_sql 的 history_messages 设置为空字符串")
  618. async def _async_format_final_response_node(self, state: AgentState) -> Dict[str, Any]:
  619. """异步格式化最终响应节点。"""
  620. # 增加递归计数
  621. if hasattr(self, '_recursion_count'):
  622. self._recursion_count += 1
  623. else:
  624. self._recursion_count = 1
  625. logger.info(f"✨ [Async Node] format_final_response - Thread: {state['thread_id']} | 递归计数: {self._recursion_count}/{config.RECURSION_LIMIT}")
  626. # 这个节点主要用于最终处理,通常不需要修改状态
  627. return {"messages": state["messages"]}
  628. async def _async_generate_api_data(self, state: AgentState) -> Dict[str, Any]:
  629. """异步生成API格式的数据结构"""
  630. logger.info("📊 异步生成API格式数据...")
  631. last_message = state['messages'][-1]
  632. response_content = last_message.content
  633. # 注释掉 [Formatted Output] 清理逻辑 - 源头已不生成前缀
  634. # if response_content.startswith("[Formatted Output]\n"):
  635. # response_content = response_content.replace("[Formatted Output]\n", "")
  636. api_data = {
  637. "response": response_content
  638. }
  639. # --- 新增逻辑:为 answer_with_common_sense 场景拼接响应 ---
  640. if state.get("suggested_next_step") == 'answer_with_common_sense':
  641. failure_reason = self._find_generate_sql_failure_reason(state['messages'])
  642. if failure_reason:
  643. # 将 "Database query failed. Reason: " 前缀移除,使其更自然
  644. cleaned_reason = failure_reason.replace("Database query failed. Reason:", "").strip()
  645. # 拼接失败原因和LLM的常识回答
  646. api_data["response"] = f"{cleaned_reason}\n\n{response_content}"
  647. logger.info(" ✅ 已成功拼接 '失败原因' 和 '常识回答'")
  648. sql_info = await self._async_extract_sql_and_data(state['messages'])
  649. if sql_info['sql']:
  650. api_data["sql"] = sql_info['sql']
  651. if sql_info['records']:
  652. api_data["records"] = sql_info['records']
  653. # 生成Agent元数据
  654. api_data["react_agent_meta"] = await self._async_collect_agent_metadata(state)
  655. logger.info(f" API数据生成完成,包含字段: {list(api_data.keys())}")
  656. return api_data
  657. def _find_generate_sql_failure_reason(self, messages: List[BaseMessage]) -> Optional[str]:
  658. """从后向前查找最近一次generate_sql失败的原因"""
  659. for msg in reversed(messages):
  660. if isinstance(msg, ToolMessage) and msg.name == 'generate_sql':
  661. # 找到最近的generate_sql工具消息
  662. if "failed" in msg.content.lower() or "失败" in msg.content.lower():
  663. return msg.content
  664. else:
  665. # 如果是成功的消息,说明当前轮次没有失败,停止查找
  666. return None
  667. return None
  668. async def _async_extract_sql_and_data(self, messages: List[BaseMessage]) -> Dict[str, Any]:
  669. """异步从消息历史中提取SQL和数据记录"""
  670. result = {"sql": None, "records": None}
  671. # 查找最后一个HumanMessage之后的工具执行结果
  672. last_human_index = -1
  673. for i in range(len(messages) - 1, -1, -1):
  674. if isinstance(messages[i], HumanMessage):
  675. last_human_index = i
  676. break
  677. if last_human_index == -1:
  678. return result
  679. # 在当前对话轮次中查找工具执行结果
  680. current_conversation = messages[last_human_index:]
  681. sql_query = None
  682. sql_data = None
  683. for msg in current_conversation:
  684. if isinstance(msg, ToolMessage):
  685. if msg.name == 'generate_sql':
  686. # 提取生成的SQL
  687. content = msg.content
  688. if content and not any(keyword in content for keyword in ["失败", "无法生成", "Database query failed"]):
  689. sql_query = content.strip()
  690. elif msg.name == 'run_sql':
  691. # 提取SQL执行结果
  692. try:
  693. import json
  694. parsed_data = json.loads(msg.content)
  695. if isinstance(parsed_data, list) and len(parsed_data) > 0:
  696. # DataFrame.to_json(orient='records') 格式
  697. columns = list(parsed_data[0].keys()) if parsed_data else []
  698. sql_data = {
  699. "columns": columns,
  700. "rows": parsed_data,
  701. "total_row_count": len(parsed_data),
  702. "is_limited": False # 当前版本没有实现限制
  703. }
  704. except (json.JSONDecodeError, Exception) as e:
  705. logger.warning(f" 解析SQL结果失败: {e}")
  706. if sql_query:
  707. result["sql"] = sql_query
  708. if sql_data:
  709. result["records"] = sql_data
  710. return result
  711. async def _async_collect_agent_metadata(self, state: AgentState) -> Dict[str, Any]:
  712. """收集Agent元数据"""
  713. messages = state['messages']
  714. # 统计工具使用情况
  715. tools_used = []
  716. sql_execution_count = 0
  717. context_injected = False
  718. # 计算对话轮次(HumanMessage的数量)
  719. conversation_rounds = sum(1 for msg in messages if isinstance(msg, HumanMessage))
  720. # 分析工具调用和执行
  721. for msg in messages:
  722. if isinstance(msg, ToolMessage):
  723. if msg.name not in tools_used:
  724. tools_used.append(msg.name)
  725. if msg.name == 'run_sql':
  726. sql_execution_count += 1
  727. elif isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
  728. for tool_call in msg.tool_calls:
  729. tool_name = tool_call.get('name')
  730. if tool_name and tool_name not in tools_used:
  731. tools_used.append(tool_name)
  732. # 检查是否注入了历史上下文
  733. if (tool_name == 'generate_sql' and
  734. tool_call.get('args', {}).get('history_messages')):
  735. context_injected = True
  736. # 构建执行路径(简化版本)
  737. execution_path = ["agent"]
  738. if tools_used:
  739. execution_path.extend(["prepare_tool_input", "tools"])
  740. execution_path.append("format_final_response")
  741. return {
  742. "thread_id": state['thread_id'],
  743. "conversation_rounds": conversation_rounds,
  744. "tools_used": tools_used,
  745. "execution_path": execution_path,
  746. "total_messages": len(messages),
  747. "sql_execution_count": sql_execution_count,
  748. "context_injected": context_injected,
  749. "agent_version": "custom_react_v1"
  750. }
  751. async def _async_extract_latest_sql_data(self, messages: List[BaseMessage]) -> Optional[str]:
  752. """从消息历史中提取最近的run_sql执行结果,但仅限于当前对话轮次。"""
  753. logger.info("🔍 提取最新的SQL执行结果...")
  754. # 🎯 只查找最后一个HumanMessage之后的SQL执行结果
  755. last_human_index = -1
  756. for i in range(len(messages) - 1, -1, -1):
  757. if isinstance(messages[i], HumanMessage):
  758. last_human_index = i
  759. break
  760. if last_human_index == -1:
  761. logger.info(" 未找到用户消息,跳过SQL数据提取")
  762. return None
  763. # 只在当前对话轮次中查找SQL结果
  764. current_conversation = messages[last_human_index:]
  765. logger.info(f" 当前对话轮次包含 {len(current_conversation)} 条消息")
  766. for msg in reversed(current_conversation):
  767. if isinstance(msg, ToolMessage) and msg.name == 'run_sql':
  768. logger.info(f" 找到当前对话轮次的run_sql结果: {msg.content[:100]}...")
  769. # 🎯 处理Unicode转义序列,将其转换为正常的中文字符
  770. try:
  771. # 先尝试解析JSON以验证格式
  772. parsed_data = json.loads(msg.content)
  773. # 重新序列化,确保中文字符正常显示
  774. formatted_content = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':'))
  775. logger.info(f" 已转换Unicode转义序列为中文字符")
  776. return formatted_content
  777. except json.JSONDecodeError:
  778. # 如果不是有效JSON,直接返回原内容
  779. logger.warning(f" SQL结果不是有效JSON格式,返回原始内容")
  780. return msg.content
  781. logger.info(" 当前对话轮次中未找到run_sql执行结果")
  782. return None
  783. async def chat(self, message: str, user_id: str, thread_id: Optional[str] = None) -> Dict[str, Any]:
  784. """
  785. 处理用户聊天请求。
  786. """
  787. if not thread_id:
  788. now = pd.Timestamp.now()
  789. milliseconds = int(now.microsecond / 1000)
  790. thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
  791. logger.info(f"🆕 新建会话,Thread ID: {thread_id}")
  792. # 初始化递归计数器(用于日志显示)
  793. self._recursion_count = 0
  794. run_config = {
  795. "configurable": {
  796. "thread_id": thread_id,
  797. },
  798. "recursion_limit": config.RECURSION_LIMIT
  799. }
  800. logger.info(f"🔢 递归限制设置: {config.RECURSION_LIMIT}")
  801. inputs = {
  802. "messages": [HumanMessage(content=message)],
  803. "user_id": user_id,
  804. "thread_id": thread_id,
  805. "suggested_next_step": None,
  806. }
  807. try:
  808. logger.info(f"🚀 开始处理用户消息: {message[:50]}...")
  809. # 检查checkpointer状态,如果Redis连接有问题则重新初始化
  810. if self.checkpointer:
  811. try:
  812. # 简单的连接测试 - 不用aget_tuple因为可能没有数据
  813. # 直接测试Redis连接
  814. if hasattr(self.checkpointer, 'conn') and self.checkpointer.conn:
  815. await self.checkpointer.conn.ping()
  816. except Exception as checkpoint_error:
  817. if "Event loop is closed" in str(checkpoint_error) or "closed" in str(checkpoint_error).lower():
  818. logger.warning(f"⚠️ Checkpointer连接异常,尝试重新初始化: {checkpoint_error}")
  819. await self._reinitialize_checkpointer()
  820. # 重新构建graph使用新的checkpointer
  821. self.agent_executor = self._create_graph()
  822. else:
  823. logger.warning(f"⚠️ Checkpointer测试失败,但继续执行: {checkpoint_error}")
  824. final_state = await self.agent_executor.ainvoke(inputs, run_config)
  825. # 🔍 调试:打印 final_state 的所有 keys
  826. logger.info(f"🔍 Final state keys: {list(final_state.keys())}")
  827. answer = final_state["messages"][-1].content
  828. # 🎯 提取最近的 run_sql 执行结果(不修改messages)
  829. sql_data = await self._async_extract_latest_sql_data(final_state["messages"])
  830. logger.info(f"✅ 处理完成 - Final Answer: '{answer}'")
  831. # 构建返回结果(保持简化格式用于shell.py)
  832. result = {
  833. "success": True,
  834. "answer": answer,
  835. "thread_id": thread_id
  836. }
  837. # 只有当存在SQL数据时才添加到返回结果中
  838. if sql_data:
  839. result["sql_data"] = sql_data
  840. logger.info(" 📊 已包含SQL原始数据")
  841. # 生成API格式数据
  842. api_data = await self._async_generate_api_data(final_state)
  843. result["api_data"] = api_data
  844. logger.info(" 🔌 已生成API格式数据")
  845. return result
  846. except Exception as e:
  847. # 特殊处理Redis相关的Event loop错误
  848. if "Event loop is closed" in str(e):
  849. logger.error(f"❌ Redis Event loop已关闭 - Thread: {thread_id}: {e}")
  850. # 尝试重新初始化checkpointer
  851. try:
  852. await self._reinitialize_checkpointer()
  853. self.agent_executor = self._create_graph()
  854. logger.info("🔄 已重新初始化checkpointer,请重试请求")
  855. except Exception as reinit_error:
  856. logger.error(f"❌ 重新初始化失败: {reinit_error}")
  857. return {
  858. "success": False,
  859. "error": "Redis连接问题,请重试",
  860. "thread_id": thread_id,
  861. "retry_suggested": True
  862. }
  863. else:
  864. logger.error(f"❌ 处理过程中发生严重错误 - Thread: {thread_id}: {e}", exc_info=True)
  865. return {"success": False, "error": str(e), "thread_id": thread_id}
  866. async def get_conversation_history(self, thread_id: str, include_tools: bool = False) -> Dict[str, Any]:
  867. """
  868. 从 checkpointer 获取指定线程的对话历史,支持消息过滤和时间戳。
  869. Args:
  870. thread_id: 线程ID
  871. include_tools: 是否包含工具消息,默认False(只返回human和ai消息)
  872. Returns:
  873. Dict包含: {
  874. "messages": List[Dict], # 消息列表
  875. "thread_created_at": str, # 线程创建时间
  876. "total_checkpoints": int # 总checkpoint数
  877. }
  878. """
  879. if not self.checkpointer:
  880. return {"messages": [], "thread_created_at": None, "total_checkpoints": 0}
  881. thread_config = {"configurable": {"thread_id": thread_id}}
  882. try:
  883. # 获取所有历史checkpoint,按时间倒序
  884. checkpoints = []
  885. async for checkpoint_tuple in self.checkpointer.alist(thread_config):
  886. checkpoints.append(checkpoint_tuple)
  887. if not checkpoints:
  888. return {"messages": [], "thread_created_at": None, "total_checkpoints": 0}
  889. # 解析thread_id中的创建时间
  890. thread_created_at = self._parse_thread_creation_time(thread_id)
  891. # 构建消息到时间戳的映射
  892. message_timestamps = self._build_message_timestamp_map(checkpoints)
  893. # 获取最新状态的消息
  894. latest_checkpoint = checkpoints[0]
  895. messages = latest_checkpoint.checkpoint.get('channel_values', {}).get('messages', [])
  896. # 过滤和格式化消息
  897. filtered_messages = []
  898. for msg in messages:
  899. # 确定消息类型
  900. if isinstance(msg, HumanMessage):
  901. msg_type = "human"
  902. elif isinstance(msg, ToolMessage):
  903. if not include_tools:
  904. continue # 跳过工具消息
  905. msg_type = "tool"
  906. else: # AIMessage
  907. msg_type = "ai"
  908. # 如果不包含工具消息,跳过只有工具调用没有内容的AI消息
  909. if not include_tools and (not msg.content and hasattr(msg, 'tool_calls') and msg.tool_calls):
  910. continue
  911. # 获取消息ID
  912. msg_id = getattr(msg, 'id', None)
  913. if not msg_id:
  914. continue # 跳过没有ID的消息
  915. # 获取时间戳
  916. timestamp = message_timestamps.get(msg_id)
  917. if not timestamp:
  918. # 如果没有找到精确时间戳,使用最新checkpoint的时间
  919. timestamp = latest_checkpoint.checkpoint.get('ts')
  920. filtered_messages.append({
  921. "id": msg_id,
  922. "type": msg_type,
  923. "content": msg.content,
  924. "timestamp": timestamp
  925. })
  926. return {
  927. "messages": filtered_messages,
  928. "thread_created_at": thread_created_at,
  929. "total_checkpoints": len(checkpoints)
  930. }
  931. except RuntimeError as e:
  932. if "Event loop is closed" in str(e):
  933. logger.warning(f"⚠️ Event loop已关闭,尝试重新获取对话历史: {thread_id}")
  934. return {"messages": [], "thread_created_at": None, "total_checkpoints": 0}
  935. else:
  936. raise
  937. except Exception as e:
  938. logger.error(f"❌ 获取对话历史失败: {e}")
  939. return {"messages": [], "thread_created_at": None, "total_checkpoints": 0}
  940. def _parse_thread_creation_time(self, thread_id: str) -> str:
  941. """解析thread_id中的创建时间,返回带毫秒的格式"""
  942. try:
  943. if ':' in thread_id:
  944. parts = thread_id.split(':')
  945. if len(parts) >= 2:
  946. timestamp_part = parts[1]
  947. if len(timestamp_part) >= 14:
  948. year = timestamp_part[:4]
  949. month = timestamp_part[4:6]
  950. day = timestamp_part[6:8]
  951. hour = timestamp_part[8:10]
  952. minute = timestamp_part[10:12]
  953. second = timestamp_part[12:14]
  954. ms = timestamp_part[14:17] if len(timestamp_part) > 14 else "000"
  955. return f"{year}-{month}-{day} {hour}:{minute}:{second}.{ms}"
  956. except Exception as e:
  957. logger.warning(f"⚠️ 解析thread创建时间失败: {e}")
  958. return None
  959. def _build_message_timestamp_map(self, checkpoints: List) -> Dict[str, str]:
  960. """构建消息ID到时间戳的映射"""
  961. message_timestamps = {}
  962. # 按时间正序排列checkpoint(最早的在前)
  963. checkpoints_sorted = sorted(checkpoints, key=lambda x: x.checkpoint.get('ts', ''))
  964. for checkpoint_tuple in checkpoints_sorted:
  965. checkpoint_ts = checkpoint_tuple.checkpoint.get('ts')
  966. messages = checkpoint_tuple.checkpoint.get('channel_values', {}).get('messages', [])
  967. # 为这个checkpoint中的新消息分配时间戳
  968. for msg in messages:
  969. msg_id = getattr(msg, 'id', None)
  970. if msg_id and msg_id not in message_timestamps:
  971. message_timestamps[msg_id] = checkpoint_ts
  972. return message_timestamps
  973. async def get_user_recent_conversations(self, user_id: str, limit: int = 10) -> List[Dict[str, Any]]:
  974. """
  975. 获取指定用户的最近聊天记录列表
  976. 利用thread_id格式 'user_id:timestamp' 来查询
  977. """
  978. if not self.checkpointer:
  979. return []
  980. try:
  981. # 使用统一的异步Redis客户端
  982. redis_client = self.redis_client
  983. # 1. 扫描匹配该用户的所有checkpoint keys
  984. # checkpointer的key格式通常是: checkpoint:thread_id:checkpoint_id
  985. pattern = f"checkpoint:{user_id}:*"
  986. logger.info(f"🔍 扫描模式: {pattern}")
  987. user_threads = {}
  988. cursor = 0
  989. while True:
  990. cursor, keys = await redis_client.scan(
  991. cursor=cursor,
  992. match=pattern,
  993. count=1000
  994. )
  995. for key in keys:
  996. try:
  997. # 解析key获取thread_id和checkpoint信息
  998. # key格式: checkpoint:user_id:timestamp:status:checkpoint_id
  999. key_str = key.decode() if isinstance(key, bytes) else key
  1000. parts = key_str.split(':')
  1001. if len(parts) >= 4:
  1002. # thread_id = user_id:timestamp
  1003. thread_id = f"{parts[1]}:{parts[2]}"
  1004. timestamp = parts[2]
  1005. # 跟踪每个thread的最新checkpoint
  1006. if thread_id not in user_threads:
  1007. user_threads[thread_id] = {
  1008. "thread_id": thread_id,
  1009. "timestamp": timestamp,
  1010. "latest_key": key_str
  1011. }
  1012. else:
  1013. # 保留最新的checkpoint key(通常checkpoint_id越大越新)
  1014. if len(parts) > 4 and parts[4] > user_threads[thread_id]["latest_key"].split(':')[4]:
  1015. user_threads[thread_id]["latest_key"] = key_str
  1016. except Exception as e:
  1017. logger.warning(f"解析key {key} 失败: {e}")
  1018. continue
  1019. if cursor == 0:
  1020. break
  1021. # 注意:不要关闭共享的Redis客户端连接,因为其他操作可能还在使用
  1022. # await redis_client.close() # 已注释掉,避免关闭共享连接
  1023. # 2. 按时间戳排序(新的在前)
  1024. sorted_threads = sorted(
  1025. user_threads.values(),
  1026. key=lambda x: x["timestamp"],
  1027. reverse=True
  1028. )[:limit]
  1029. # 3. 获取每个thread的详细信息
  1030. conversations = []
  1031. for thread_info in sorted_threads:
  1032. try:
  1033. thread_id = thread_info["thread_id"]
  1034. thread_config = {"configurable": {"thread_id": thread_id}}
  1035. try:
  1036. state = await self.checkpointer.aget(thread_config)
  1037. except RuntimeError as e:
  1038. if "Event loop is closed" in str(e):
  1039. logger.warning(f"⚠️ Event loop已关闭,跳过thread: {thread_id}")
  1040. continue
  1041. else:
  1042. raise
  1043. if state and state.get('channel_values', {}).get('messages'):
  1044. messages = state['channel_values']['messages']
  1045. # 生成对话预览
  1046. preview = self._generate_conversation_preview(messages)
  1047. # 获取最后一条用户消息
  1048. last_human_message = None
  1049. if messages:
  1050. for msg in reversed(messages):
  1051. if isinstance(msg, HumanMessage):
  1052. last_human_message = msg.content
  1053. break
  1054. conversations.append({
  1055. "conversation_id": thread_id,
  1056. "thread_id": thread_id,
  1057. "user_id": user_id,
  1058. "message_count": len(messages),
  1059. "last_message": last_human_message,
  1060. "updated_at": self._format_utc_to_china_time(state.get('ts')) if state.get('ts') else None,
  1061. "conversation_title": preview,
  1062. "created_at": self._format_timestamp(thread_info["timestamp"])
  1063. })
  1064. except Exception as e:
  1065. logger.error(f"获取thread {thread_info['thread_id']} 详情失败: {e}")
  1066. continue
  1067. logger.info(f"✅ 找到用户 {user_id} 的 {len(conversations)} 个对话")
  1068. return conversations
  1069. except Exception as e:
  1070. logger.error(f"❌ 获取用户 {user_id} 对话列表失败: {e}")
  1071. return []
  1072. def _generate_conversation_preview(self, messages: List[BaseMessage]) -> str:
  1073. """生成对话预览"""
  1074. if not messages:
  1075. return "空对话"
  1076. # 获取第一个用户消息作为预览
  1077. for msg in messages:
  1078. if isinstance(msg, HumanMessage):
  1079. content = str(msg.content)
  1080. return content[:50] + "..." if len(content) > 50 else content
  1081. return "系统消息"
  1082. def _format_timestamp(self, timestamp: str) -> str:
  1083. """格式化时间戳为可读格式,包含毫秒"""
  1084. try:
  1085. # timestamp格式: 20250710123137984
  1086. if len(timestamp) >= 14:
  1087. year = timestamp[:4]
  1088. month = timestamp[4:6]
  1089. day = timestamp[6:8]
  1090. hour = timestamp[8:10]
  1091. minute = timestamp[10:12]
  1092. second = timestamp[12:14]
  1093. # 提取毫秒部分(如果存在)
  1094. millisecond = timestamp[14:17] if len(timestamp) > 14 else "000"
  1095. return f"{year}-{month}-{day} {hour}:{minute}:{second}.{millisecond}"
  1096. except Exception:
  1097. pass
  1098. return timestamp
  1099. def _format_utc_to_china_time(self, utc_time_str: str) -> str:
  1100. """将UTC时间转换为中国时区时间格式"""
  1101. try:
  1102. from datetime import datetime, timezone, timedelta
  1103. # 解析UTC时间字符串
  1104. # 格式: "2025-07-17T13:21:52.868292+00:00"
  1105. dt = datetime.fromisoformat(utc_time_str.replace('Z', '+00:00'))
  1106. # 转换为中国时区 (UTC+8)
  1107. china_tz = timezone(timedelta(hours=8))
  1108. china_time = dt.astimezone(china_tz)
  1109. # 格式化为目标格式: "2025-07-17 21:12:02.456"
  1110. return china_time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # 只保留3位毫秒
  1111. except Exception as e:
  1112. logger.warning(f"时间格式转换失败: {e}")
  1113. return utc_time_str
  1114. def _get_database_scope_prompt(self) -> str:
  1115. """Get database scope prompt for intelligent query decision making"""
  1116. try:
  1117. import os
  1118. # Read agent/tools/db_query_decision_prompt.txt
  1119. project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  1120. db_scope_file = os.path.join(project_root, "agent", "tools", "db_query_decision_prompt.txt")
  1121. with open(db_scope_file, 'r', encoding='utf-8') as f:
  1122. db_scope_content = f.read().strip()
  1123. prompt = f"""You are an intelligent database query assistant. When deciding whether to use database query tools, please follow these rules:
  1124. === DATABASE BUSINESS SCOPE ===
  1125. {db_scope_content}
  1126. === DECISION RULES ===
  1127. 1. If the question involves data within the above business scope (service areas, branches, revenue, traffic flow, etc.), use the generate_sql tool
  1128. 2. If the question is about general knowledge (like "when do lychees ripen?", weather, historical events, etc.), answer directly based on your knowledge WITHOUT using database tools
  1129. 3. When answering general knowledge questions, provide clear and helpful answers without any special prefixes
  1130. === FALLBACK STRATEGY ===
  1131. When generate_sql returns an error message or when queries return no results:
  1132. 1. First, check if the question is within the database scope described above
  1133. 2. For questions clearly OUTSIDE the database scope (world events, general knowledge, etc.):
  1134. - Provide the answer based on your knowledge immediately
  1135. - Give a direct, natural answer without any prefixes or disclaimers
  1136. 3. For questions within database scope but queries return no results:
  1137. - If it's a reasonable question that might have a general answer, provide it naturally
  1138. 4. For questions that definitely require specific database data:
  1139. - Acknowledge the limitation and suggest the data may not be available
  1140. - Do not attempt to guess or fabricate specific data
  1141. Please intelligently choose whether to query the database based on the nature of the user's question,
  1142. not on explaining your decision-making process.
  1143. """
  1144. return prompt
  1145. except Exception as e:
  1146. logger.warning(f"⚠️ Unable to read database scope description file: {e}")
  1147. return ""
  1148. def _generate_validation_error_guidance(self, validation_error: str) -> str:
  1149. """根据验证错误类型生成具体的修复指导"""
  1150. # 优先处理最常见的语法错误
  1151. if "语法错误" in validation_error or "syntax error" in validation_error.lower():
  1152. return """SQL验证失败:语法错误。
  1153. 处理建议:
  1154. 1. 仔细检查SQL语法(括号、引号、关键词等)
  1155. 2. 修复语法错误后,调用 valid_sql 工具重新验证
  1156. 3. 常见问题:缺少逗号、括号不匹配、关键词拼写错误"""
  1157. # 新增的合并条件,处理所有"不存在"类型的错误
  1158. elif ("不存在" in validation_error or
  1159. "no such table" in validation_error.lower() or
  1160. "does not exist" in validation_error.lower()):
  1161. return """SQL验证失败:表或字段不存在。
  1162. 处理建议:
  1163. 1. 请明确告知用户,因数据库缺少相应的表或字段,无法通过SQL查询获取准确答案。
  1164. 2. 请基于你的通用知识和常识,直接回答用户的问题或提供相关解释。
  1165. 3. 请不要再尝试生成或修复SQL。"""
  1166. # 其他原有分支可以被新逻辑覆盖,故移除
  1167. # Fallback 到通用的错误处理
  1168. else:
  1169. return f"""SQL验证失败:{validation_error}
  1170. 处理建议:
  1171. 1. 如果这是一个可以修复的错误,请尝试修正并再次验证。
  1172. 2. 如果错误表明数据缺失,请直接向用户说明情况。
  1173. 3. 避免猜测或编造数据库中不存在的信息。"""
  1174. # === 参数错误诊断和修复函数 ===
  1175. def _diagnose_parameter_error(self, messages: List[BaseMessage], error_msg: str) -> Dict[str, Any]:
  1176. """
  1177. 诊断参数错误的详细原因
  1178. """
  1179. logger.error("🔍 开始诊断参数错误...")
  1180. logger.error(f" 错误消息: {error_msg}")
  1181. diagnosis = {
  1182. "error_type": "parameter_error",
  1183. "incomplete_tool_calls": [],
  1184. "orphaned_tool_messages": [],
  1185. "total_messages": len(messages),
  1186. "recommended_action": None
  1187. }
  1188. # 分析消息历史
  1189. logger.error("📋 消息历史分析:")
  1190. for i, msg in enumerate(messages):
  1191. msg_type = type(msg).__name__
  1192. if isinstance(msg, AIMessage):
  1193. has_tool_calls = hasattr(msg, 'tool_calls') and msg.tool_calls
  1194. content_summary = f"'{msg.content[:50]}...'" if msg.content else "空内容"
  1195. logger.error(f" [{i}] {msg_type}: {content_summary}")
  1196. if has_tool_calls:
  1197. logger.error(f" 工具调用: {len(msg.tool_calls)} 个")
  1198. for j, tc in enumerate(msg.tool_calls):
  1199. tool_name = tc.get('name', 'Unknown')
  1200. tool_id = tc.get('id', 'Unknown')
  1201. logger.error(f" [{j}] {tool_name} (ID: {tool_id})")
  1202. # 查找对应的ToolMessage
  1203. found_response = False
  1204. for k in range(i + 1, len(messages)):
  1205. if (isinstance(messages[k], ToolMessage) and
  1206. messages[k].tool_call_id == tool_id):
  1207. found_response = True
  1208. break
  1209. elif isinstance(messages[k], (HumanMessage, AIMessage)):
  1210. # 遇到新的对话轮次,停止查找
  1211. break
  1212. if not found_response:
  1213. diagnosis["incomplete_tool_calls"].append({
  1214. "message_index": i,
  1215. "tool_name": tool_name,
  1216. "tool_id": tool_id,
  1217. "ai_message_content": msg.content
  1218. })
  1219. logger.error(f" ❌ 未找到对应的ToolMessage!")
  1220. else:
  1221. logger.error(f" ✅ 找到对应的ToolMessage")
  1222. elif isinstance(msg, ToolMessage):
  1223. logger.error(f" [{i}] {msg_type}: {msg.name} (ID: {msg.tool_call_id})")
  1224. # 检查是否有对应的AIMessage
  1225. found_ai_message = False
  1226. for k in range(i - 1, -1, -1):
  1227. if (isinstance(messages[k], AIMessage) and
  1228. hasattr(messages[k], 'tool_calls') and
  1229. messages[k].tool_calls):
  1230. if any(tc.get('id') == msg.tool_call_id for tc in messages[k].tool_calls):
  1231. found_ai_message = True
  1232. break
  1233. elif isinstance(messages[k], HumanMessage):
  1234. break
  1235. if not found_ai_message:
  1236. diagnosis["orphaned_tool_messages"].append({
  1237. "message_index": i,
  1238. "tool_name": msg.name,
  1239. "tool_call_id": msg.tool_call_id
  1240. })
  1241. logger.error(f" ❌ 未找到对应的AIMessage!")
  1242. elif isinstance(msg, HumanMessage):
  1243. logger.error(f" [{i}] {msg_type}: '{msg.content[:50]}...'")
  1244. # 生成修复建议
  1245. if diagnosis["incomplete_tool_calls"]:
  1246. logger.error(f"🔧 发现 {len(diagnosis['incomplete_tool_calls'])} 个不完整的工具调用")
  1247. diagnosis["recommended_action"] = "fix_incomplete_tool_calls"
  1248. elif diagnosis["orphaned_tool_messages"]:
  1249. logger.error(f"🔧 发现 {len(diagnosis['orphaned_tool_messages'])} 个孤立的工具消息")
  1250. diagnosis["recommended_action"] = "remove_orphaned_tool_messages"
  1251. else:
  1252. logger.error("🔧 未发现明显的消息格式问题")
  1253. diagnosis["recommended_action"] = "unknown"
  1254. return diagnosis
  1255. def _fix_by_adding_missing_tool_messages(self, messages: List[BaseMessage], diagnosis: Dict) -> List[BaseMessage]:
  1256. """
  1257. 通过添加缺失的ToolMessage来修复消息历史
  1258. """
  1259. logger.info("🔧 策略1: 补充缺失的ToolMessage")
  1260. fixed_messages = list(messages)
  1261. for incomplete in diagnosis["incomplete_tool_calls"]:
  1262. # 为缺失的工具调用添加错误响应
  1263. error_tool_message = ToolMessage(
  1264. content="工具调用已超时或失败,请重新尝试。",
  1265. tool_call_id=incomplete["tool_id"],
  1266. name=incomplete["tool_name"]
  1267. )
  1268. # 插入到合适的位置
  1269. insert_index = incomplete["message_index"] + 1
  1270. fixed_messages.insert(insert_index, error_tool_message)
  1271. logger.info(f" ✅ 为工具调用 {incomplete['tool_name']}({incomplete['tool_id']}) 添加错误响应")
  1272. return fixed_messages
  1273. def _fix_by_removing_incomplete_tool_calls(self, messages: List[BaseMessage], diagnosis: Dict) -> List[BaseMessage]:
  1274. """
  1275. 通过删除不完整的工具调用来修复消息历史
  1276. """
  1277. logger.info("🔧 策略2: 删除不完整的工具调用")
  1278. fixed_messages = []
  1279. for i, msg in enumerate(messages):
  1280. if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
  1281. # 检查这个消息是否有不完整的工具调用
  1282. has_incomplete = any(
  1283. inc["message_index"] == i
  1284. for inc in diagnosis["incomplete_tool_calls"]
  1285. )
  1286. if has_incomplete:
  1287. # 如果有文本内容,保留文本内容但删除工具调用
  1288. if msg.content and msg.content.strip():
  1289. logger.info(f" 🔧 保留文本内容,删除工具调用: '{msg.content[:50]}...'")
  1290. fixed_msg = AIMessage(content=msg.content)
  1291. fixed_messages.append(fixed_msg)
  1292. else:
  1293. # 如果没有文本内容,创建一个说明性的消息
  1294. logger.info(f" 🔧 创建说明性消息替换空的工具调用")
  1295. fixed_msg = AIMessage(content="我需要重新分析这个问题。")
  1296. fixed_messages.append(fixed_msg)
  1297. else:
  1298. fixed_messages.append(msg)
  1299. else:
  1300. fixed_messages.append(msg)
  1301. return fixed_messages
  1302. def _fix_by_rebuilding_history(self, messages: List[BaseMessage]) -> List[BaseMessage]:
  1303. """
  1304. 重建消息历史,只保留完整的对话轮次
  1305. """
  1306. logger.info("🔧 策略3: 重建消息历史")
  1307. clean_messages = []
  1308. current_conversation = []
  1309. for msg in messages:
  1310. if isinstance(msg, HumanMessage):
  1311. # 新的对话轮次开始
  1312. if current_conversation:
  1313. # 检查上一轮对话是否完整
  1314. if self._is_conversation_complete(current_conversation):
  1315. clean_messages.extend(current_conversation)
  1316. logger.info(f" ✅ 保留完整的对话轮次 ({len(current_conversation)} 条消息)")
  1317. else:
  1318. logger.info(f" ❌ 跳过不完整的对话轮次 ({len(current_conversation)} 条消息)")
  1319. current_conversation = [msg]
  1320. else:
  1321. current_conversation.append(msg)
  1322. # 处理最后一轮对话
  1323. if current_conversation:
  1324. if self._is_conversation_complete(current_conversation):
  1325. clean_messages.extend(current_conversation)
  1326. else:
  1327. # 最后一轮对话不完整,只保留用户消息
  1328. clean_messages.extend([msg for msg in current_conversation if isinstance(msg, HumanMessage)])
  1329. logger.info(f" 📊 重建完成: {len(messages)} -> {len(clean_messages)} 条消息")
  1330. return clean_messages
  1331. def _is_conversation_complete(self, conversation: List[BaseMessage]) -> bool:
  1332. """
  1333. 检查对话轮次是否完整
  1334. """
  1335. for msg in conversation:
  1336. if (isinstance(msg, AIMessage) and
  1337. hasattr(msg, 'tool_calls') and
  1338. msg.tool_calls):
  1339. # 检查是否有对应的ToolMessage
  1340. tool_call_ids = [tc.get('id') for tc in msg.tool_calls]
  1341. found_responses = sum(
  1342. 1 for m in conversation
  1343. if isinstance(m, ToolMessage) and m.tool_call_id in tool_call_ids
  1344. )
  1345. if found_responses < len(tool_call_ids):
  1346. return False
  1347. return True
  1348. async def _handle_parameter_error_with_retry(self, messages: List[BaseMessage], error_msg: str, attempt: int) -> List[BaseMessage]:
  1349. """
  1350. 处理参数错误的完整流程
  1351. """
  1352. logger.error(f"🔧 处理参数错误 (重试 {attempt + 1}/3)")
  1353. # 1. 诊断问题
  1354. diagnosis = self._diagnose_parameter_error(messages, error_msg)
  1355. # 2. 根据重试次数选择修复策略
  1356. if attempt == 0:
  1357. # 第一次重试:补充缺失的ToolMessage
  1358. fixed_messages = self._fix_by_adding_missing_tool_messages(messages, diagnosis)
  1359. elif attempt == 1:
  1360. # 第二次重试:删除不完整的工具调用
  1361. fixed_messages = self._fix_by_removing_incomplete_tool_calls(messages, diagnosis)
  1362. else:
  1363. # 第三次重试:重建消息历史
  1364. fixed_messages = self._fix_by_rebuilding_history(messages)
  1365. logger.info(f"🔧 修复完成: {len(messages)} -> {len(fixed_messages)} 条消息")
  1366. return fixed_messages
  1367. def _generate_contextual_fallback(self, messages: List[BaseMessage], diagnosis: Dict) -> str:
  1368. """
  1369. 基于上下文生成合理的回答
  1370. """
  1371. # 分析用户的最新问题
  1372. last_human_message = None
  1373. for msg in reversed(messages):
  1374. if isinstance(msg, HumanMessage):
  1375. last_human_message = msg
  1376. break
  1377. if not last_human_message:
  1378. return "抱歉,我无法理解您的问题。"
  1379. # 分析是否是数据库相关问题
  1380. question = last_human_message.content.lower()
  1381. if any(keyword in question for keyword in ['查询', '数据', '服务区', '收入', '车流量']):
  1382. return f"抱歉,在处理您关于「{last_human_message.content}」的查询时遇到了技术问题。请稍后重试,或者重新描述您的问题。"
  1383. else:
  1384. return "抱歉,我现在无法正确处理您的问题。请稍后重试或重新表述您的问题。"
  1385. def _get_anti_hallucination_prompt(self, state: AgentState) -> str:
  1386. """
  1387. 生成防幻觉提示词,专注于保持参数原样传递
  1388. """
  1389. # 获取当前用户的最新问题
  1390. last_user_message = None
  1391. for msg in reversed(state["messages"]):
  1392. if isinstance(msg, HumanMessage):
  1393. last_user_message = msg.content
  1394. break
  1395. if not last_user_message:
  1396. return ""
  1397. prompt = f"""🛡️ 关键指令:工具调用参数必须完全准确
  1398. 用户当前问题:「{last_user_message}」
  1399. 调用工具时的严格要求:
  1400. 1. **原样传递原则**:question 参数必须与用户问题完全一致,一字不差
  1401. 2. **禁止任何改写**:不得进行同义词替换、语言优化或任何形式的修改
  1402. ❌ 错误示例:
  1403. - 用户问"充电桩",不得改为"充电栋"
  1404. ✅ 正确做法:
  1405. - 完全复制用户的原始问题作为question参数
  1406. 请严格遵守此要求,确保工具调用的准确性。"""
  1407. return prompt