Bläddra i källkod

已经解决./custom_react_agent的chat返回结果取SQL错误的问题,现在准备彻底格式化输出.

wangxq 1 månad sedan
förälder
incheckning
419b47f544

+ 64 - 5
test/custom_react_agent/agent.py

@@ -153,7 +153,7 @@ class CustomReactAgent:
         """
         打印 state 的全部信息,用于调试
         """
-        logger.info(" =" * 20)
+        logger.info(" ~" * 10 + " State Print Start" + "~" * 10)
         logger.info(f"📋 [State Debug] {node_name} - 当前状态信息:")
         
         # 🎯 打印 state 中的所有字段
@@ -187,7 +187,7 @@ class CustomReactAgent:
                         logger.info(f"         工具调用: {tool_name}")
                         logger.info(f"         参数: {str(tool_args)[:200]}...")
         
-        logger.info(" =" * 20)
+        logger.info(" ~" * 10 + " State Print End" + "~" * 10)
 
     def _prepare_tool_input_node(self, state: AgentState) -> Dict[str, Any]:
         """
@@ -251,7 +251,7 @@ class CustomReactAgent:
         logger.info(f"📝 [Node] update_state_after_tool - Thread: {state['thread_id']}")
         
         # 🎯 打印 state 全部信息
-        # self._print_state_info(state, "update_state_after_tool")
+        self._print_state_info(state, "update_state_after_tool")
         
         last_tool_message = state['messages'][-1]
         tool_name = last_tool_message.name
@@ -297,12 +297,53 @@ class CustomReactAgent:
         last_message.content = f"[Formatted Output]\n{last_message.content}"
         return {"messages": [last_message]}
 
+    def _extract_latest_sql_data(self, messages: List[BaseMessage]) -> Optional[str]:
+        """从消息历史中提取最近的run_sql执行结果,但仅限于当前对话轮次。"""
+        logger.info("🔍 提取最新的SQL执行结果...")
+        
+        # 🎯 只查找最后一个HumanMessage之后的SQL执行结果
+        last_human_index = -1
+        for i in range(len(messages) - 1, -1, -1):
+            if isinstance(messages[i], HumanMessage):
+                last_human_index = i
+                break
+        
+        if last_human_index == -1:
+            logger.info("   未找到用户消息,跳过SQL数据提取")
+            return None
+        
+        # 只在当前对话轮次中查找SQL结果
+        current_conversation = messages[last_human_index:]
+        logger.info(f"   当前对话轮次包含 {len(current_conversation)} 条消息")
+        
+        for msg in reversed(current_conversation):
+            if isinstance(msg, ToolMessage) and msg.name == 'run_sql':
+                logger.info(f"   找到当前对话轮次的run_sql结果: {msg.content[:100]}...")
+                
+                # 🎯 处理Unicode转义序列,将其转换为正常的中文字符
+                try:
+                    # 先尝试解析JSON以验证格式
+                    parsed_data = json.loads(msg.content)
+                    # 重新序列化,确保中文字符正常显示
+                    formatted_content = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':'))
+                    logger.info(f"   已转换Unicode转义序列为中文字符")
+                    return formatted_content
+                except json.JSONDecodeError:
+                    # 如果不是有效JSON,直接返回原内容
+                    logger.warning(f"   SQL结果不是有效JSON格式,返回原始内容")
+                    return msg.content
+        
+        logger.info("   当前对话轮次中未找到run_sql执行结果")
+        return None
+
     async def chat(self, message: str, user_id: str, thread_id: Optional[str] = None) -> Dict[str, Any]:
         """
         处理用户聊天请求。
         """
         if not thread_id:
-            thread_id = f"{user_id}:{pd.Timestamp.now().strftime('%Y%m%d%H%M%S%f')}"
+            now = pd.Timestamp.now()
+            milliseconds = int(now.microsecond / 1000)
+            thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
             logger.info(f"🆕 新建会话,Thread ID: {thread_id}")
         
         config = {
@@ -321,8 +362,26 @@ class CustomReactAgent:
         try:
             final_state = await self.agent_executor.ainvoke(inputs, config)
             answer = final_state["messages"][-1].content
+            
+            # 🎯 提取最近的 run_sql 执行结果(不修改messages)
+            sql_data = self._extract_latest_sql_data(final_state["messages"])
+            
             logger.info(f"✅ 处理完成 - Final Answer: '{answer}'")
-            return {"success": True, "answer": answer, "thread_id": thread_id}
+            
+            # 构建返回结果
+            result = {
+                "success": True, 
+                "answer": answer, 
+                "thread_id": thread_id
+            }
+            
+            # 只有当存在SQL数据时才添加到返回结果中
+            if sql_data:
+                result["sql_data"] = sql_data
+                logger.info("   📊 已包含SQL原始数据")
+            
+            return result
+            
         except Exception as e:
             logger.error(f"❌ 处理过程中发生严重错误 - Thread: {thread_id}: {e}", exc_info=True)
             return {"success": False, "error": str(e), "thread_id": thread_id}

+ 116 - 0
test/custom_react_agent/result 输出结果.md

@@ -0,0 +1,116 @@
+1.成功生成SQL并执行查询
+POST  http://localhost:8084/api/v0/ask_agent 
+
+{
+    "question": "请按照收入给每个高速服务区进行排名?返回收入最多的前三名服务区?"
+}
+
+
+#正常生成SQL,并完成查询的返回结果
+
+{
+    "code": 200,
+    "data": {
+        "agent_version": "langgraph_v1",
+        "classification_info": {
+            "confidence": 0.9,
+            "method": "rule_based_strong_business",
+            "reason": "强业务特征 - 业务实体: ['核心业务实体:服务区', '支付业务:收入'], 查询意图: ['排名'], SQL: []"
+        },
+        "context_used": false,
+        "conversation_id": "conv_1751199617_5d37a647",
+        "conversation_message": "创建新对话",
+        "conversation_status": "new",
+        "execution_path": [
+            "start",
+            "classify",
+            "agent_sql_generation",
+            "agent_sql_execution",
+            "format_response"
+        ],
+        "records": {
+            "columns": [
+                "服务区名称",
+                "总收入"
+            ],
+            "is_limited": false,
+            "row_count": 3,
+            "rows": [
+                {
+                    "总收入": "7024226.1500",
+                    "服务区名称": "庐山服务区"
+                },
+                {
+                    "总收入": "6929288.3300",
+                    "服务区名称": "三清山服务区"
+                },
+                {
+                    "总收入": "6848435.6700",
+                    "服务区名称": "南城服务区"
+                }
+            ],
+            "total_row_count": 3
+        },
+        "response": "根据收入排名,前三名高速服务区依次为:庐山服务区(702.42万元)、三清山服务区(692.93万元)、南城服务区(684.84万元)。",
+        "routing_mode_source": "config",
+        "routing_mode_used": "hybrid",
+        "session_id": null,
+        "sql": "SELECT service_name AS 服务区名称, SUM(pay_sum) AS 总收入 \nFROM bss_business_day_data \nWHERE delete_ts IS NULL \nGROUP BY service_name \nORDER BY 总收入 DESC NULLS LAST \nLIMIT 3;",
+        "summary": "根据收入排名,前三名高速服务区依次为:庐山服务区(702.42万元)、三清山服务区(692.93万元)、南城服务区(684.84万元)。",
+        "timestamp": "2025-06-29T20:20:56.806141",
+        "type": "DATABASE",
+
+}
+
+前端UI应关注的参数:
+1."response": 它将代替原来的summary,会查询的结果进行总结。
+2."sql":执行查询SQL.
+3."data.records":查询返回的数据,包括表头(data.records.columns)和数据行(data.records.rows)
+
+
+2.未成功生成SQL
+POST  http://localhost:8084/api/v0/ask_agent 
+{
+    "question": "请问每个高速公路服务区的管理经理是谁?"
+}
+
+
+# 返回结果
+{
+    "code": 200,
+    "data": {
+        "agent_version": "langgraph_v1",
+        "classification_info": {
+            "confidence": 0.82,
+            "method": "rule_based_medium_business",
+            "reason": "中等业务特征 - 业务实体: ['核心业务实体:服务区', '核心业务实体:高速公路']"
+        },
+        "context_used": false,
+        "conversation_id": "conv_1751201276_e59f0a07",
+        "conversation_message": "创建新对话",
+        "conversation_status": "new",
+        "execution_path": [
+            "start",
+            "classify",
+            "agent_sql_generation",
+            "format_response"
+        ],
+        "from_cache": false,
+        "is_guest_user": true,
+        "response": "当前提供的上下文信息不足以生成查询服务区对应管理经理的SQL语句。原因如下:\n\n1. 在服务区管理公司表(bss_company)中虽然存在created_by/updated_by字段,但这些字段仅记录数据操作者(系统用户),而非实际的管理经理人员信息。\n\n2. 现有表结构中缺失以下关键实体:\n   - 员工/人员信息表(存储经理姓名等个人信息)\n   - 公司与人员的组织架构表(关联公司ID与员工ID)\n\n3. 当前表间关系仅能查询到服务区所属的管理公司名称(通过bss_service_area.company_id关联bss_company.id),但无法获取具体管理人员的姓名。\n\n需要补充以下信息才能继续:\n- 存储人员信息的表结构(特别是管理岗位人员)\n- 公司与人员的关联关系表结构 请尝试提问其它问题。",
+        "routing_mode_source": "config",
+        "routing_mode_used": "hybrid",
+        "session_id": null,
+        "timestamp": "2025-06-29T20:48:21.351324",
+        "type": "DATABASE",
+        "user_id": "guest"
+    },
+    "message": "操作成功",
+    "success": true
+}
+
+
+前端UI应关注的参数:
+1.没有返回"sql"和"data.records"。
+2."response":当没有返回"sql"和"data.records"的时候,response会返回未能生成SQL的原因,可以返回给客户端
+

+ 21 - 4
test/custom_react_agent/shell.py

@@ -5,15 +5,22 @@ import asyncio
 import logging
 import sys
 import os
+import json
 
 # 动态地将项目根目录添加到 sys.path,以支持跨模块导入
 # 这使得脚本更加健壮,无论从哪里执行
 PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
 sys.path.insert(0, PROJECT_ROOT)
 
-# 从新模块导入 Agent 和配置 (使用相对导入)
-from .agent import CustomReactAgent
-from . import config
+# 从新模块导入 Agent 和配置
+try:
+    # 相对导入(当作为模块导入时)
+    from .agent import CustomReactAgent
+    from . import config
+except ImportError:
+    # 绝对导入(当直接运行时)
+    from test.custom_react_agent.agent import CustomReactAgent
+    from test.custom_react_agent import config
 
 # 配置日志
 logging.basicConfig(level=config.LOG_LEVEL, format=config.LOG_FORMAT)
@@ -82,7 +89,17 @@ class CustomAgentShell:
             result = await self.agent.chat(user_input, self.user_id, self.thread_id)
             
             if result.get("success"):
-                print(f"🤖 Agent: {result.get('answer')}")
+                answer = result.get('answer', '')
+                # 去除 [Formatted Output] 标记,只显示真正的回答
+                if answer.startswith("[Formatted Output]\n"):
+                    answer = answer.replace("[Formatted Output]\n", "")
+                
+                print(f"🤖 Agent: {answer}")
+                
+                # 如果包含 SQL 数据,也显示出来
+                if 'sql_data' in result:
+                    print(f"📊 SQL 查询结果: {result['sql_data']}")
+                    
                 # 更新 thread_id 以便在同一会话中继续
                 self.thread_id = result.get("thread_id")
             else:

+ 7 - 1
test/custom_react_agent/sql_tools.py

@@ -37,7 +37,7 @@ def generate_sql(question: str, history_messages: List[Dict[str, Any]] = None) -
 
     # Combine history and the current question to form a rich prompt
     history_str = "\n".join([f"{msg['type']}: {msg.get('content', '') or ''}" for msg in history_messages])
-    enriched_question = f"""Based on the following conversation history:
+    enriched_question = f"""\nBased on the following conversation history:
 ---
 {history_str}
 ---
@@ -121,6 +121,10 @@ def run_sql(sql: str) -> str:
         vn = get_vanna_instance()
         df = vn.run_sql(sql)
 
+        print("-------------run_sql() df -------------------")
+        print(df)
+        print("--------------------------------")
+
         if df is None:
             logger.warning("   SQL执行成功,但查询结果为空。")
             result = {"status": "success", "data": [], "message": "查询无结果"}
@@ -134,6 +138,8 @@ def run_sql(sql: str) -> str:
         logger.error(f"   SQL执行过程中发生异常: {e}", exc_info=True)
         error_result = {"status": "error", "error_message": str(e)}
         return json.dumps(error_result, ensure_ascii=False)
+    
+
 
 # 将所有工具函数收集到一个列表中,方便Agent导入和使用
 sql_tools = [generate_sql, valid_sql, run_sql]