ソースを参照

已经完成ask_react_agent_stream api的开发.

wangxq 2 週間 前
コミット
05e3b2f182

+ 1029 - 0
docs/ask_react_agent_stream_design.md

@@ -0,0 +1,1029 @@
+# React Agent 流式API设计方案 (ask_react_agent_stream)
+
+## 概述
+
+本文档描述了为React Agent添加流式API `ask_react_agent_stream` 的设计方案,该API与现有的 `ask_react_agent` 功能和参数完全相同,但采用流式输出模式。设计参考了现有的 `ask_agent_stream` 实现模式,确保系统的一致性和稳定性。
+
+## 背景与需求
+
+### 现有API局限性
+
+当前的 `ask_react_agent` API 存在以下问题:
+1. **同步执行**:客户端必须等待完整的React Agent执行完成
+2. **无进度反馈**:客户端无法了解LangGraph各节点的执行状态
+3. **用户体验差**:长时间执行没有实时反馈
+
+### 解决目标
+
+- 提供与 `ask_react_agent` 相同的功能和参数
+- 支持实时显示LangGraph节点执行进度
+- 采用SSE(Server-Sent Events)流式推送
+- 完全不影响现有 `ask_react_agent` API的功能
+
+## 技术方案
+
+### 1. API设计
+
+**端点**: `GET /api/v0/ask_react_agent_stream`
+
+**参数获取方式**: URL查询参数(因为EventSource只支持GET请求)
+
+**支持参数**:
+- `question` (必填): 用户问题
+- `user_id` (可选): 用户ID,默认为"guest"或从thread_id推断
+- `thread_id` (可选): 会话ID,不传则自动生成新会话
+
+**响应格式**: `text/event-stream` (SSE格式)
+
+### 2.1 同步版本API (推荐用于复杂数据库查询)
+
+**端点**: `GET /api/v0/ask_react_agent_stream_sync`
+
+**设计目的**: 解决Vector搜索异步冲突问题,专门用于复杂数据库查询
+
+**参数**: 与原API完全相同
+- `question` (必填): 用户问题
+- `user_id` (必填): 用户ID
+- `thread_id` (可选): 会话ID,不传则自动生成新会话
+- `routing_mode` (可选): 路由模式,默认'agent'
+- `continue_conversation` (可选): 是否继续对话,默认false
+
+**技术架构**: 
+- 使用同步LangGraph (`invoke()` 而不是 `ainvoke()`)
+- 同步LLM配置 (`streaming=False`, `enable_thinking=False`)
+- 完全避免异步依赖冲突
+
+**适用场景**:
+- ✅ 复杂数据库查询
+- ✅ 需要Vector搜索的问题
+- ✅ 对稳定性要求高的场景
+
+**使用示例**:
+```bash
+curl -X GET "http://localhost:8084/api/v0/ask_react_agent_stream_sync?question=请问当前系统中哪个服务区档口最多?&user_id=test_user" -H "Accept: text/event-stream"
+```
+
+### 2. 实现架构
+
+```mermaid
+sequenceDiagram
+    participant Client as 客户端
+    participant API as ask_react_agent_stream
+    participant ReactAgent as React Agent实例
+    participant LangGraph as LangGraph执行器
+    
+    Client->>API: GET /ask_react_agent_stream?question=...&user_id=...
+    API->>API: 参数验证和数据清洗
+    API->>ReactAgent: 调用chat_stream()方法
+    ReactAgent->>LangGraph: astream执行
+    
+    loop 每个节点执行
+        LangGraph->>ReactAgent: 节点状态更新
+        ReactAgent->>API: yield进度信息
+        API->>Client: SSE推送进度状态
+    end
+    
+    LangGraph->>ReactAgent: 执行完成
+    ReactAgent->>API: yield最终结果
+    API->>Client: SSE推送完整结果
+    Client->>Client: 关闭EventSource连接
+```
+
+### 3. 核心实现逻辑
+
+#### 3.1 API端点实现
+
+```python
+@app.route('/api/v0/ask_react_agent_stream', methods=['GET'])
+async def ask_react_agent_stream():
+    """React Agent 流式API - 支持实时进度显示
+    功能与ask_react_agent完全相同,除了采用流式输出
+    """
+    def generate():
+        try:
+            # 1. 参数获取和验证(从URL参数)
+            question = request.args.get('question')
+            user_id_input = request.args.get('user_id')
+            thread_id_input = request.args.get('thread_id')
+            
+            # 参数验证(复用现有validate_request_data逻辑)
+            if not question:
+                yield format_sse_error("缺少必需参数:question")
+                return
+                
+            # 2. 数据预处理(与ask_react_agent相同)
+            validated_data = validate_request_data({
+                'question': question,
+                'user_id': user_id_input,
+                'thread_id': thread_id_input
+            })
+            
+            # 3. Agent实例检查
+            if not await ensure_agent_ready():
+                yield format_sse_error("React Agent 初始化失败")
+                return
+            
+            # 4. 流式执行
+            async for chunk in _react_agent_instance.chat_stream(
+                message=validated_data['question'],
+                user_id=validated_data['user_id'],
+                thread_id=validated_data['thread_id']
+            ):
+                if chunk["type"] == "progress":
+                    yield format_sse_react_progress(chunk)
+                elif chunk["type"] == "completed":
+                    yield format_sse_react_completed(chunk)
+                elif chunk["type"] == "error":
+                    yield format_sse_error(chunk.get("error", "处理失败"))
+                    
+        except Exception as e:
+            yield format_sse_error(f"服务异常: {str(e)}")
+    
+    return Response(stream_with_context(generate()), mimetype='text/event-stream')
+```
+
+#### 3.2 React Agent流式方法
+
+需要在React Agent类中新增 `chat_stream()` 方法:
+
+```python
+async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None):
+    """
+    流式处理用户聊天请求 - 复用chat()方法的所有逻辑
+    """
+    # 1. 复用现有的初始化逻辑(thread_id生成、配置检查等)
+    if not thread_id:
+        now = pd.Timestamp.now()
+        milliseconds = int(now.microsecond / 1000)
+        thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
+    
+    # 2. 复用现有的配置和错误处理
+    self._recursion_count = 0
+    run_config = {
+        "configurable": {"thread_id": thread_id},
+        "recursion_limit": config.RECURSION_LIMIT
+    }
+    
+    # 3. 复用checkpointer检查逻辑
+    if self.checkpointer:
+        # ... checkpointer连接检查和重新初始化 ...
+    
+    # 4. 使用astream流式执行
+    final_state = None
+    async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"):
+        for node_name, node_data in chunk.items():
+            yield {
+                "type": "progress",
+                "node": node_name,
+                "data": node_data,
+                "thread_id": thread_id
+            }
+            final_state = node_data
+    
+    # 5. 复用现有的结果处理逻辑
+    if final_state and "messages" in final_state:
+        api_data = await self._async_generate_api_data(final_state)
+        yield {
+            "type": "completed",
+            "result": {"api_data": api_data, "thread_id": thread_id}
+        }
+```
+
+### 4. SSE响应格式设计
+
+#### 4.1 进度状态消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "正在执行: AI思考中",
+  "data": {
+    "type": "progress",
+    "node": "agent",
+    "display_name": "AI思考中",
+    "thread_id": "wang1:20250131103000001",
+    "timestamp": "2025-01-31T10:30:00.123Z"
+  }
+}
+```
+
+#### 4.2 最终结果消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "处理完成",
+  "data": {
+    "type": "completed",
+    "response": "根据销售数据分析...",
+    "conversation_id": "wang1:20250131103000001",
+    "user_id": "wang1",
+    "react_agent_meta": {
+      "thread_id": "wang1:20250131103000001",
+      "agent_version": "custom_react_v1_async"
+    },
+    "sql": "SELECT * FROM sales...",
+    "records": [...],
+    "timestamp": "2025-01-31T10:32:15.456Z"
+  }
+}
+```
+
+#### 4.3 错误消息
+
+```json
+{
+  "code": 500,
+  "success": false,
+  "message": "处理失败",
+  "data": {
+    "type": "error",
+    "error": "具体错误信息",
+    "timestamp": "2025-01-31T10:30:05.789Z"
+  }
+}
+```
+
+### 5. 节点状态映射
+
+基于LangGraph的节点执行,提供用户友好的状态显示:
+
+| 节点名称 | 显示名称 | 说明 |
+|----------|----------|------|
+| `__start__` | 开始处理 | 流程启动 |
+| `trim_messages` | 准备消息 | 消息预处理 |
+| `agent` | AI思考中 | LLM推理决策 |
+| `prepare_tool_input` | 准备工具 | 工具输入准备 |
+| `tools` | 执行查询 | SQL工具执行 |
+| `update_state_after_tool` | 处理结果 | 结果后处理 |
+| `format_final_response` | 生成回答 | 最终响应格式化 |
+| `__end__` | 完成 | 流程结束 |
+
+### 6. SSE格式化函数
+
+#### 6.1 进度格式化
+
+```python
+def format_sse_react_progress(chunk: dict) -> str:
+    """格式化React Agent进度事件为SSE格式"""
+    node = chunk.get("node")
+    thread_id = chunk.get("thread_id")
+    
+    # 节点显示名称映射
+    node_display_map = {
+        "__start__": "开始处理",
+        "trim_messages": "准备消息", 
+        "agent": "AI思考中",
+        "prepare_tool_input": "准备工具",
+        "tools": "执行查询",
+        "update_state_after_tool": "处理结果",
+        "format_final_response": "生成回答",
+        "__end__": "完成"
+    }
+    
+    display_name = node_display_map.get(node, "处理中")
+    
+    data = {
+        "code": 200,
+        "success": True,
+        "message": f"正在执行: {display_name}",
+        "data": {
+            "type": "progress",
+            "node": node,
+            "display_name": display_name,
+            "thread_id": thread_id,
+            "timestamp": datetime.now().isoformat()
+        }
+    }
+    
+    import json
+    return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
+```
+
+#### 6.2 完成结果格式化
+
+```python
+def format_sse_react_completed(chunk: dict) -> str:
+    """格式化React Agent完成事件为SSE格式"""
+    result = chunk.get("result", {})
+    api_data = result.get("api_data", {})
+    thread_id = result.get("thread_id")
+    
+    # 构建与ask_react_agent相同的响应格式
+    response_data = {
+        "response": api_data.get("response", ""),
+        "conversation_id": thread_id,
+        "react_agent_meta": api_data.get("react_agent_meta", {}),
+        "timestamp": datetime.now().isoformat()
+    }
+    
+    # 可选字段
+    if "sql" in api_data:
+        response_data["sql"] = api_data["sql"]
+    if "records" in api_data:
+        response_data["records"] = api_data["records"]
+    
+    data = {
+        "code": 200,
+        "success": True,
+        "message": "处理完成",
+        "data": {
+            "type": "completed",
+            **response_data
+        }
+    }
+    
+    import json
+    return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
+```
+
+## 前端集成
+
+### 现有API(保持不变)
+
+```javascript
+// 同步方式 - 现有代码无需修改
+const response = await fetch('/api/v0/ask_react_agent', {
+    method: 'POST',
+    headers: {'Content-Type': 'application/json'},
+    body: JSON.stringify({
+        question: "帮我查询销售数据",
+        user_id: "wang1"
+    })
+});
+
+const result = await response.json();
+displayResult(result.data);
+```
+
+### 流式API
+
+```javascript
+// 流式方式 - 新增功能
+const params = new URLSearchParams({
+    question: "帮我查询销售数据",
+    user_id: "wang1"
+});
+
+const eventSource = new EventSource(`/api/v0/ask_react_agent_stream?${params}`);
+
+let conversationId = null;
+
+eventSource.onmessage = function(event) {
+    const data = JSON.parse(event.data);
+    
+    switch(data.data.type) {
+        case 'progress':
+            updateProgress(data.data.display_name);
+            if (!conversationId) conversationId = data.data.thread_id;
+            break;
+            
+        case 'completed':
+            hideProgress();
+            displayResult(data.data);  // 格式与现有API完全一致
+            eventSource.close();
+            break;
+            
+        case 'error':
+            hideProgress();
+            showError(data.data.error);
+            eventSource.close();
+            break;
+    }
+};
+
+eventSource.onerror = function(error) {
+    console.error('EventSource failed:', error);
+    eventSource.close();
+};
+```
+
+### 完整前端集成示例
+
+#### HTML页面结构
+
+```html
+<!DOCTYPE html>
+<html lang="zh-CN">
+<head>
+    <meta charset="UTF-8">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>React Agent 流式API测试</title>
+    <style>
+        .container { max-width: 800px; margin: 0 auto; padding: 20px; }
+        .input-group { margin-bottom: 20px; }
+        .input-group label { display: block; margin-bottom: 5px; font-weight: bold; }
+        .input-group input, .input-group textarea { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; }
+        .input-group textarea { min-height: 100px; resize: vertical; }
+        .btn { padding: 10px 20px; background: #007bff; color: white; border: none; border-radius: 4px; cursor: pointer; }
+        .btn:hover { background: #0056b3; }
+        .btn:disabled { background: #ccc; cursor: not-allowed; }
+        .progress { display: none; margin: 20px 0; padding: 15px; background: #f8f9fa; border-radius: 4px; border-left: 4px solid #007bff; }
+        .result { margin: 20px 0; padding: 15px; background: #d4edda; border-radius: 4px; border-left: 4px solid #28a745; }
+        .error { margin: 20px 0; padding: 15px; background: #f8d7da; border-radius: 4px; border-left: 4px solid #dc3545; }
+        .sql-code { background: #f8f9fa; padding: 10px; border-radius: 4px; font-family: monospace; margin: 10px 0; }
+        .loading-spinner { display: inline-block; width: 20px; height: 20px; border: 3px solid #f3f3f3; border-top: 3px solid #007bff; border-radius: 50%; animation: spin 1s linear infinite; margin-right: 10px; }
+        @keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } }
+    </style>
+</head>
+<body>
+    <div class="container">
+        <h1>React Agent 流式API测试</h1>
+        
+        <div class="input-group">
+            <label for="question">问题:</label>
+            <textarea id="question" placeholder="请输入您的问题,例如:帮我查询销售数据">帮我查询销售数据</textarea>
+        </div>
+        
+        <div class="input-group">
+            <label for="userId">用户ID:</label>
+            <input type="text" id="userId" placeholder="用户ID(可选)" value="test_user">
+        </div>
+        
+        <div class="input-group">
+            <label for="threadId">会话ID:</label>
+            <input type="text" id="threadId" placeholder="会话ID(可选,留空自动生成)">
+        </div>
+        
+        <button id="askBtn" class="btn" onclick="askReactAgent()">发送请求</button>
+        <button id="stopBtn" class="btn" onclick="stopRequest()" style="display:none; background:#dc3545;">停止请求</button>
+        
+        <div id="progress" class="progress">
+            <div class="loading-spinner"></div>
+            <span id="progressText">准备中...</span>
+        </div>
+        
+        <div id="result" class="result" style="display:none;">
+            <h3>响应结果</h3>
+            <div id="responseText"></div>
+            <div id="conversationInfo"></div>
+            <div id="sqlSection" style="display:none;">
+                <h4>执行的SQL:</h4>
+                <div id="sqlCode" class="sql-code"></div>
+            </div>
+            <div id="recordsSection" style="display:none;">
+                <h4>查询结果:</h4>
+                <div id="recordsData"></div>
+            </div>
+        </div>
+        
+        <div id="error" class="error" style="display:none;">
+            <h3>错误信息</h3>
+            <div id="errorText"></div>
+        </div>
+    </div>
+
+    <script>
+        let currentEventSource = null;
+        let currentConversationId = null;
+
+        function askReactAgent() {
+            const question = document.getElementById('question').value.trim();
+            const userId = document.getElementById('userId').value.trim() || 'guest';
+            const threadId = document.getElementById('threadId').value.trim();
+            
+            if (!question) {
+                showError('请输入问题');
+                return;
+            }
+            
+            // 重置界面
+            hideError();
+            hideResult();
+            showProgress('准备发送请求...');
+            
+            // 禁用发送按钮,显示停止按钮
+            document.getElementById('askBtn').disabled = true;
+            document.getElementById('stopBtn').style.display = 'inline-block';
+            
+            // 构建请求参数
+            const params = new URLSearchParams({
+                question: question,
+                user_id: userId
+            });
+            
+            if (threadId) {
+                params.append('thread_id', threadId);
+            }
+            
+            // 创建EventSource连接
+            const url = `/api/v0/ask_react_agent_stream?${params}`;
+            currentEventSource = new EventSource(url);
+            
+            currentEventSource.onmessage = function(event) {
+                try {
+                    const data = JSON.parse(event.data);
+                    handleStreamMessage(data);
+                } catch (e) {
+                    console.error('解析SSE消息失败:', e, event.data);
+                    showError('响应数据格式错误');
+                    stopRequest();
+                }
+            };
+            
+            currentEventSource.onerror = function(error) {
+                console.error('EventSource连接错误:', error);
+                showError('连接失败,请检查网络或服务状态');
+                stopRequest();
+            };
+        }
+
+        function handleStreamMessage(data) {
+            if (!data.success) {
+                showError(data.message || '处理失败');
+                stopRequest();
+                return;
+            }
+            
+            const messageData = data.data;
+            
+            switch(messageData.type) {
+                case 'progress':
+                    showProgress(messageData.display_name || '处理中...');
+                    // 记录会话ID
+                    if (messageData.thread_id && !currentConversationId) {
+                        currentConversationId = messageData.thread_id;
+                        document.getElementById('threadId').value = currentConversationId;
+                    }
+                    break;
+                    
+                case 'completed':
+                    hideProgress();
+                    showResult(messageData);
+                    stopRequest();
+                    break;
+                    
+                case 'error':
+                    hideProgress();
+                    showError(messageData.error || '处理失败');
+                    stopRequest();
+                    break;
+                    
+                default:
+                    console.warn('未知的消息类型:', messageData.type);
+            }
+        }
+
+        function showProgress(text) {
+            document.getElementById('progressText').textContent = text;
+            document.getElementById('progress').style.display = 'block';
+        }
+
+        function hideProgress() {
+            document.getElementById('progress').style.display = 'none';
+        }
+
+        function showResult(data) {
+            // 显示响应文本
+            document.getElementById('responseText').textContent = data.response || '无响应内容';
+            
+            // 显示会话信息
+            const conversationInfo = `会话ID: ${data.conversation_id || '未知'}`;
+            document.getElementById('conversationInfo').textContent = conversationInfo;
+            
+            // 显示SQL(如果有)
+            if (data.sql) {
+                document.getElementById('sqlCode').textContent = data.sql;
+                document.getElementById('sqlSection').style.display = 'block';
+            } else {
+                document.getElementById('sqlSection').style.display = 'none';
+            }
+            
+            // 显示查询结果(如果有)
+            if (data.records && data.records.rows && data.records.rows.length > 0) {
+                const recordsHtml = `
+                    <p>共 ${data.records.row_count || data.records.rows.length} 条记录</p>
+                    <table border="1" style="width:100%; border-collapse:collapse;">
+                        <thead>
+                            <tr>
+                                ${data.records.columns.map(col => `<th style="padding:8px; background:#f8f9fa;">${col}</th>`).join('')}
+                            </tr>
+                        </thead>
+                        <tbody>
+                            ${data.records.rows.slice(0, 10).map(row => 
+                                `<tr>${row.map(cell => `<td style="padding:8px;">${cell || ''}</td>`).join('')}</tr>`
+                            ).join('')}
+                        </tbody>
+                    </table>
+                    ${data.records.rows.length > 10 ? '<p>(仅显示前10条记录)</p>' : ''}
+                `;
+                document.getElementById('recordsData').innerHTML = recordsHtml;
+                document.getElementById('recordsSection').style.display = 'block';
+            } else {
+                document.getElementById('recordsSection').style.display = 'none';
+            }
+            
+            document.getElementById('result').style.display = 'block';
+        }
+
+        function showError(message) {
+            document.getElementById('errorText').textContent = message;
+            document.getElementById('error').style.display = 'block';
+        }
+
+        function hideError() {
+            document.getElementById('error').style.display = 'none';
+        }
+
+        function hideResult() {
+            document.getElementById('result').style.display = 'none';
+        }
+
+        function stopRequest() {
+            if (currentEventSource) {
+                currentEventSource.close();
+                currentEventSource = null;
+            }
+            
+            // 恢复按钮状态
+            document.getElementById('askBtn').disabled = false;
+            document.getElementById('stopBtn').style.display = 'none';
+            
+            hideProgress();
+        }
+
+        // 页面卸载时关闭连接
+        window.addEventListener('beforeunload', function() {
+            stopRequest();
+        });
+        
+        // 回车键发送请求
+        document.getElementById('question').addEventListener('keydown', function(e) {
+            if (e.ctrlKey && e.key === 'Enter') {
+                askReactAgent();
+            }
+        });
+    </script>
+</body>
+</html>
+```
+
+#### 生产环境集成建议
+
+**1. 错误处理和重连机制**
+
+```javascript
+class ReactAgentStreamClient {
+    constructor(baseUrl = '') {
+        this.baseUrl = baseUrl;
+        this.eventSource = null;
+        this.retryCount = 0;
+        this.maxRetries = 3;
+        this.retryDelay = 1000; // 1秒
+    }
+    
+    async ask(question, userId = 'guest', threadId = null, callbacks = {}) {
+        const {
+            onProgress = () => {},
+            onCompleted = () => {},
+            onError = () => {},
+            onRetry = () => {}
+        } = callbacks;
+        
+        const params = new URLSearchParams({
+            question: question,
+            user_id: userId
+        });
+        
+        if (threadId) {
+            params.append('thread_id', threadId);
+        }
+        
+        const url = `${this.baseUrl}/api/v0/ask_react_agent_stream?${params}`;
+        
+        return new Promise((resolve, reject) => {
+            this.eventSource = new EventSource(url);
+            
+            this.eventSource.onmessage = (event) => {
+                try {
+                    const data = JSON.parse(event.data);
+                    
+                    if (!data.success) {
+                        onError(data.message || '处理失败');
+                        reject(new Error(data.message || '处理失败'));
+                        return;
+                    }
+                    
+                    const messageData = data.data;
+                    
+                    switch(messageData.type) {
+                        case 'progress':
+                            onProgress(messageData);
+                            break;
+                        case 'completed':
+                            onCompleted(messageData);
+                            resolve(messageData);
+                            this.close();
+                            break;
+                        case 'error':
+                            onError(messageData.error || '处理失败');
+                            reject(new Error(messageData.error || '处理失败'));
+                            this.close();
+                            break;
+                    }
+                } catch (e) {
+                    console.error('解析SSE消息失败:', e);
+                    onError('响应数据格式错误');
+                    reject(e);
+                    this.close();
+                }
+            };
+            
+            this.eventSource.onerror = (error) => {
+                console.error('EventSource错误:', error);
+                
+                if (this.retryCount < this.maxRetries) {
+                    this.retryCount++;
+                    onRetry(this.retryCount, this.maxRetries);
+                    
+                    setTimeout(() => {
+                        this.close();
+                        this.ask(question, userId, threadId, callbacks)
+                            .then(resolve)
+                            .catch(reject);
+                    }, this.retryDelay * this.retryCount);
+                } else {
+                    onError('连接失败,已达到最大重试次数');
+                    reject(new Error('连接失败'));
+                    this.close();
+                }
+            };
+        });
+    }
+    
+    close() {
+        if (this.eventSource) {
+            this.eventSource.close();
+            this.eventSource = null;
+        }
+        this.retryCount = 0;
+    }
+}
+
+// 使用示例
+const client = new ReactAgentStreamClient();
+
+client.ask('帮我查询销售数据', 'user123', null, {
+    onProgress: (data) => {
+        console.log('进度:', data.display_name);
+        updateProgressUI(data.display_name);
+    },
+    onCompleted: (data) => {
+        console.log('完成:', data);
+        showResults(data);
+    },
+    onError: (error) => {
+        console.error('错误:', error);
+        showError(error);
+    },
+    onRetry: (attempt, maxRetries) => {
+        console.log(`重试 ${attempt}/${maxRetries}...`);
+        showRetryMessage(attempt, maxRetries);
+    }
+}).catch(error => {
+    console.error('最终失败:', error);
+});
+```
+
+**2. React组件集成示例**
+
+```jsx
+import React, { useState, useCallback, useEffect } from 'react';
+
+const ReactAgentChat = () => {
+    const [question, setQuestion] = useState('');
+    const [isLoading, setIsLoading] = useState(false);
+    const [progress, setProgress] = useState('');
+    const [result, setResult] = useState(null);
+    const [error, setError] = useState(null);
+    const [eventSource, setEventSource] = useState(null);
+    
+    const askAgent = useCallback(async () => {
+        if (!question.trim()) return;
+        
+        setIsLoading(true);
+        setError(null);
+        setResult(null);
+        setProgress('准备中...');
+        
+        const params = new URLSearchParams({
+            question: question,
+            user_id: 'react_user'
+        });
+        
+        const source = new EventSource(`/api/v0/ask_react_agent_stream?${params}`);
+        setEventSource(source);
+        
+        source.onmessage = (event) => {
+            try {
+                const data = JSON.parse(event.data);
+                
+                if (!data.success) {
+                    setError(data.message || '处理失败');
+                    setIsLoading(false);
+                    return;
+                }
+                
+                const messageData = data.data;
+                
+                switch(messageData.type) {
+                    case 'progress':
+                        setProgress(messageData.display_name || '处理中...');
+                        break;
+                    case 'completed':
+                        setResult(messageData);
+                        setProgress('');
+                        setIsLoading(false);
+                        source.close();
+                        break;
+                    case 'error':
+                        setError(messageData.error || '处理失败');
+                        setIsLoading(false);
+                        source.close();
+                        break;
+                }
+            } catch (e) {
+                setError('响应数据格式错误');
+                setIsLoading(false);
+                source.close();
+            }
+        };
+        
+        source.onerror = () => {
+            setError('连接失败');
+            setIsLoading(false);
+            source.close();
+        };
+    }, [question]);
+    
+    const stopRequest = useCallback(() => {
+        if (eventSource) {
+            eventSource.close();
+            setEventSource(null);
+        }
+        setIsLoading(false);
+        setProgress('');
+    }, [eventSource]);
+    
+    useEffect(() => {
+        return () => {
+            if (eventSource) {
+                eventSource.close();
+            }
+        };
+    }, [eventSource]);
+    
+    return (
+        <div className="react-agent-chat">
+            <div className="input-section">
+                <textarea
+                    value={question}
+                    onChange={(e) => setQuestion(e.target.value)}
+                    placeholder="请输入您的问题..."
+                    disabled={isLoading}
+                />
+                <button onClick={askAgent} disabled={isLoading || !question.trim()}>
+                    {isLoading ? '处理中...' : '发送'}
+                </button>
+                {isLoading && (
+                    <button onClick={stopRequest} className="stop-btn">
+                        停止
+                    </button>
+                )}
+            </div>
+            
+            {progress && (
+                <div className="progress">
+                    <div className="spinner" />
+                    <span>{progress}</span>
+                </div>
+            )}
+            
+            {error && (
+                <div className="error">
+                    <strong>错误:</strong>{error}
+                </div>
+            )}
+            
+            {result && (
+                <div className="result">
+                    <h3>回答:</h3>
+                    <p>{result.response}</p>
+                    
+                    {result.sql && (
+                        <div className="sql-section">
+                            <h4>执行的SQL:</h4>
+                            <pre><code>{result.sql}</code></pre>
+                        </div>
+                    )}
+                    
+                    {result.records && result.records.rows && (
+                        <div className="records-section">
+                            <h4>查询结果:</h4>
+                            <p>共 {result.records.row_count} 条记录</p>
+                            {/* 表格渲染逻辑 */}
+                        </div>
+                    )}
+                </div>
+            )}
+        </div>
+    );
+};
+
+export default ReactAgentChat;
+```
+
+## API测试
+
+### 测试用例
+
+#### 1. Postman测试
+
+**配置**:
+- Method: `GET`
+- URL: `http://localhost:8084/api/v0/ask_react_agent_stream?question=帮我查询销售数据&user_id=test_user`
+- Headers: `Accept: text/event-stream`
+
+**预期响应**:
+```
+data: {"code":200,"success":true,"message":"正在执行: 开始处理","data":{"type":"progress","display_name":"开始处理"}}
+
+data: {"code":200,"success":true,"message":"正在执行: AI思考中","data":{"type":"progress","display_name":"AI思考中"}}
+
+data: {"code":200,"success":true,"message":"正在执行: 执行查询","data":{"type":"progress","display_name":"执行查询"}}
+
+data: {"code":200,"success":true,"message":"处理完成","data":{"type":"completed","response":"...","conversation_id":"test_user:20250131..."}}
+```
+
+#### 2. curl测试
+
+```bash
+curl -N "http://localhost:8084/api/v0/ask_react_agent_stream?question=查询用户数量&user_id=test_user" \
+     -H "Accept: text/event-stream"
+```
+
+## 与现有系统的兼容性
+
+### 1. 完全向后兼容
+
+- **现有API不变**: `ask_react_agent` 保持所有现有功能
+- **响应格式一致**: 最终结果与现有API格式完全相同
+- **参数兼容**: 支持相同的参数验证和处理逻辑
+
+### 2. 代码复用
+
+- **验证逻辑**: 复用 `validate_request_data()` 函数
+- **Agent检查**: 复用 `ensure_agent_ready()` 函数
+- **错误处理**: 复用现有的异常处理机制
+
+### 3. 配置共享
+
+- **Agent实例**: 使用相同的 `_react_agent_instance`
+- **日志系统**: 使用相同的日志配置
+- **数据库连接**: 共享Redis等资源连接
+
+## 实施计划
+
+### 阶段1: 核心功能开发
+- [ ] 在React Agent类中新增 `chat_stream()` 方法
+- [ ] 在unified_api.py中新增 `ask_react_agent_stream` 端点  
+- [ ] 实现SSE格式化函数
+
+### 阶段2: 测试验证
+- [ ] 单元测试:验证流式方法的正确性
+- [ ] 集成测试:验证API端到端功能
+- [ ] 性能测试:验证流式推送性能
+
+### 阶段3: 文档和部署
+- [ ] 更新API文档
+- [ ] 前端集成示例
+- [ ] 生产环境部署
+
+## 风险评估
+
+### 低风险
+- **现有功能影响**: 新增功能,现有API完全不变
+- **代码质量**: 大量复用现有逻辑,风险较低
+
+### 需要注意
+- **异步处理**: 确保所有异步调用正确处理
+- **错误传播**: 确保Agent内部错误正确传递到API层
+- **资源管理**: 长连接的资源清理和超时处理
+- **LangGraph流式支持**: 确认React Agent支持astream模式
+
+## 总结
+
+本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为React Agent提供了实时状态监控能力。方案具有以下特点:
+
+1. **最小侵入**: 只需新增代码,不修改现有逻辑
+2. **用户友好**: 提供类似ChatGPT的实时反馈体验  
+3. **技术一致**: 与现有ask_agent_stream保持相同的实现模式
+4. **易于维护**: 统一的响应格式和错误处理
+
+该方案为React Agent的用户体验升级提供了完整的技术解决方案。
+
+---
+
+*本文档基于现有ask_agent_stream实现模式,为React Agent流式API提供详细的技术规范。*

+ 245 - 0
docs/ask_react_agent_stream_fix_solution.md

@@ -0,0 +1,245 @@
+# React Agent 流式 API 修复方案
+
+## 问题背景
+
+`ask_react_agent_stream` API 在使用 LangGraph 的原生 `astream` 方法时,出现了 "Event loop is closed" 错误。这是因为:
+
+1. LangGraph 的 Redis checkpointer 使用异步连接
+2. Vanna 的向量搜索是同步操作
+3. Flask 的 WSGI 模型与 asyncio 事件循环管理存在冲突
+
+用户创建了临时的 `ask_react_agent_stream_sync` API 作为变通方案,但这个方案使用的是同步 `invoke` 方法,并不是真正的流式输出。
+
+## 核心需求
+
+1. 修复 `ask_react_agent_stream`,使其能够使用 LangGraph 的原生 `astream` 方法
+2. 保留 checkpoint 功能(对话历史记录等)
+3. 不影响其他 API(特别是 `ask_react_agent`)
+4. 删除临时的 `ask_react_agent_stream_sync` API
+
+## 解决方案
+
+### 1. 创建异步 SQL 工具
+
+创建 `react_agent/async_sql_tools.py`,将同步的 Vanna 操作包装成异步函数:
+
+```python
+import asyncio
+from concurrent.futures import ThreadPoolExecutor
+from typing import Any, Dict, List, Optional
+from langchain_core.tools import tool
+from common.vanna_instance import get_vanna_instance
+from common.utils import log_to_db
+from core.logging import get_logger
+
+logger = get_logger(__name__)
+
+# 创建线程池执行器
+executor = ThreadPoolExecutor(max_workers=3)
+
+@tool
+async def generate_sql(question: str, history: Optional[List[Dict[str, str]]] = None) -> str:
+    """异步生成SQL查询语句"""
+    def _sync_generate():
+        vn = get_vanna_instance()
+        
+        # 构造完整的提问内容
+        if history and len(history) > 0:
+            context_parts = ["根据以下对话历史:"]
+            for h in history:
+                if h.get("role") == "assistant" and h.get("content"):
+                    context_parts.append(f"- {h['content']}")
+            context_parts.append(f"\n当前问题:{question}")
+            full_question = "\n".join(context_parts)
+        else:
+            full_question = question
+            
+        logger.info(f"📝 [Vanna Input] Complete question being sent to Vanna:")
+        logger.info(f"--- BEGIN VANNA INPUT ---")
+        logger.info(full_question)
+        logger.info(f"--- END VANNA INPUT ---")
+        
+        sql = vn.generate_sql(full_question, allow_llm_to_see_data=False)
+        
+        if sql:
+            logger.info(f"   ✅ SQL Generated Successfully:")
+            logger.info(f"   {sql}")
+            return sql
+        else:
+            logger.warning(f"   ⚠️ No SQL generated")
+            return "SQL生成失败,请检查问题描述是否准确。"
+    
+    # 在线程池中执行同步操作
+    loop = asyncio.get_event_loop()
+    return await loop.run_in_executor(executor, _sync_generate)
+
+@tool
+async def valid_sql(sql: str, question: str) -> str:
+    """异步验证SQL语句的语法正确性"""
+    def _sync_validate():
+        # ... 同步验证逻辑 ...
+        return validation_result
+    
+    loop = asyncio.get_event_loop()
+    return await loop.run_in_executor(executor, _sync_validate)
+
+@tool
+async def run_sql(sql: str, question: str) -> str:
+    """异步执行SQL查询并返回结果"""
+    def _sync_run():
+        # ... 同步执行逻辑 ...
+        return json.dumps(results_dict, ensure_ascii=False)
+    
+    loop = asyncio.get_event_loop()
+    return await loop.run_in_executor(executor, _sync_run)
+
+# 导出异步工具列表
+async_sql_tools = [generate_sql, valid_sql, run_sql]
+```
+
+### 2. 修改 unified_api.py
+
+#### 2.1 为流式 API 创建独立的 Agent 实例
+
+每个流式请求创建新的 Agent 实例,避免事件循环冲突:
+
+```python
+async def create_stream_agent_instance():
+    """为每个流式请求创建新的Agent实例(使用异步工具)"""
+    if CustomReactAgent is None:
+        logger.error("❌ CustomReactAgent 未能导入,无法初始化流式Agent")
+        raise ImportError("CustomReactAgent 未能导入")
+        
+    logger.info("🚀 正在为流式请求创建新的 React Agent 实例...")
+    try:
+        # 创建流式专用 Agent 实例
+        stream_agent = await CustomReactAgent.create()
+        
+        # 配置使用异步 SQL 工具
+        from react_agent.async_sql_tools import async_sql_tools
+        stream_agent.tools = async_sql_tools
+        stream_agent.llm_with_tools = stream_agent.llm.bind_tools(async_sql_tools)
+        
+        logger.info("✅ 流式 React Agent 实例创建完成(配置异步工具)")
+        return stream_agent
+        
+    except Exception as e:
+        logger.error(f"❌ 流式 React Agent 实例创建失败: {e}")
+        raise
+```
+
+#### 2.2 修改 ask_react_agent_stream 函数
+
+在 Flask 路由的 generate 函数中管理事件循环:
+
+```python
+@app.route('/api/v0/ask_react_agent_stream', methods=['GET'])
+def ask_react_agent_stream():
+    """React Agent 流式API - 使用异步工具的专用 Agent 实例"""
+    def generate():
+        try:
+            # ... 参数验证 ...
+            
+            # 3. 为当前请求创建新的事件循环和Agent实例
+            import asyncio
+            
+            # 创建新的事件循环
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            
+            stream_agent = None
+            try:
+                # 为当前请求创建新的Agent实例
+                stream_agent = loop.run_until_complete(create_stream_agent_instance())
+                
+                if not stream_agent:
+                    yield format_sse_error("流式 React Agent 初始化失败")
+                    return
+            except Exception as e:
+                logger.error(f"流式 Agent 初始化异常: {str(e)}")
+                yield format_sse_error(f"流式 Agent 初始化失败: {str(e)}")
+                return
+            
+            # 4. 在同一个事件循环中执行流式处理
+            try:
+                # 创建异步生成器
+                async def stream_worker():
+                    try:
+                        # 使用当前请求的 Agent 实例(已配置异步工具)
+                        async for chunk in stream_agent.chat_stream(
+                            message=validated_data['question'],
+                            user_id=validated_data['user_id'],
+                            thread_id=validated_data['thread_id']
+                        ):
+                            yield chunk
+                            if chunk.get("type") == "completed":
+                                break
+                    except Exception as e:
+                        logger.error(f"流式处理异常: {str(e)}", exc_info=True)
+                        yield {
+                            "type": "error", 
+                            "error": f"流式处理异常: {str(e)}"
+                        }
+                
+                # 在当前事件循环中运行异步生成器
+                async_gen = stream_worker()
+                
+                # 同步迭代异步生成器
+                while True:
+                    try:
+                        chunk = loop.run_until_complete(async_gen.__anext__())
+                        
+                        if chunk["type"] == "progress":
+                            yield format_sse_react_progress(chunk)
+                        elif chunk["type"] == "completed":
+                            yield format_sse_react_completed(chunk)
+                            break
+                        elif chunk["type"] == "error":
+                            yield format_sse_error(chunk.get("error", "未知错误"))
+                            break
+                            
+                    except StopAsyncIteration:
+                        break
+                    except Exception as e:
+                        logger.error(f"处理流式数据异常: {str(e)}")
+                        yield format_sse_error(f"处理异常: {str(e)}")
+                        break
+                        
+            except Exception as e:
+                logger.error(f"React Agent流式处理异常: {str(e)}")
+                yield format_sse_error(f"流式处理异常: {str(e)}")
+            finally:
+                # 清理:流式处理完成后关闭事件循环
+                try:
+                    loop.close()
+                except Exception as e:
+                    logger.warning(f"关闭事件循环时出错: {e}")
+                    
+        except Exception as e:
+            logger.error(f"React Agent流式API异常: {str(e)}")
+            yield format_sse_error(f"服务异常: {str(e)}")
+    
+    return Response(stream_with_context(generate()), mimetype='text/event-stream')
+```
+
+## 关键改进点
+
+1. **每个请求独立的 Agent 实例**:避免跨请求的事件循环冲突
+2. **异步工具包装**:使用 ThreadPoolExecutor 将同步的 Vanna 操作转换为异步
+3. **事件循环管理**:在同一个事件循环中创建和使用 Agent 实例
+4. **保留 checkpoint 功能**:每个 Agent 实例都有完整的 Redis checkpoint 支持
+
+## 测试结果
+
+修复后的测试结果显示:
+
+- 所有流式 API 测试用例都成功执行
+- 正确显示进度信息(准备消息 → AI思考中 → 准备工具 → 执行查询 → 处理结果 → 生成回答)
+- 没有再出现 "Event loop is closed" 错误
+- 保留了完整的 checkpoint 功能(thread_id、对话历史等)
+
+## 后续步骤
+
+1. ✅ 删除临时的 `ask_react_agent_stream_sync` API
+2. ✅ 在生产环境中使用修复后的 `ask_react_agent_stream` API
+3. 考虑性能优化:如果需要,可以实现 Agent 实例池来减少创建开销(当前先确保功能正常)

+ 162 - 0
docs/ask_react_agent_stream_sync_solution.md

@@ -0,0 +1,162 @@
+# React Agent 同步流式API解决方案
+
+## 问题背景
+
+原有的 `ask_react_agent_stream` API 在处理复杂数据库查询时,会出现 Vector 搜索异步冲突错误:
+```
+RuntimeError: Task <Task pending...> got Future <Future pending> attached to a different loop
+```
+
+**根本原因**:LangGraph异步流式处理与Vanna Vector搜索的同步操作冲突。
+
+## 解决方案
+
+创建了完全独立的同步流式API,彻底避免异步冲突问题。
+
+### 新API端点
+
+```
+GET /api/v0/ask_react_agent_stream_sync
+```
+
+### 核心技术架构
+
+1. **同步Agent类**:`SyncCustomReactAgent`
+2. **同步LangGraph**:使用 `graph.invoke()` 而不是 `ainvoke()`
+3. **同步LLM配置**:
+   - `streaming=False`
+   - `enable_thinking=False` (千问模型非流式调用要求)
+4. **完全避免异步依赖**:不使用checkpointer
+
+## 使用方法
+
+### API调用示例
+
+```bash
+# PowerShell
+Invoke-WebRequest -Uri 'http://localhost:8084/api/v0/ask_react_agent_stream_sync?question=请问当前系统中哪个服务区档口最多?&user_id=test_user' -Headers @{'Accept'='text/event-stream'} -Method GET
+
+# curl (在支持的终端中)
+curl -X GET "http://localhost:8084/api/v0/ask_react_agent_stream_sync?question=请查询所有服务区的名称和档口数量&user_id=test_user" -H "Accept: text/event-stream"
+```
+
+### 参数说明
+
+与原API完全兼容:
+
+| 参数 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| `question` | string | ✅ | 用户问题 |
+| `user_id` | string | ✅ | 用户ID |
+| `thread_id` | string | ❌ | 对话线程ID |
+| `routing_mode` | string | ❌ | 路由模式,默认'agent' |
+| `continue_conversation` | boolean | ❌ | 是否继续对话,默认false |
+
+### 响应格式
+
+返回SSE流式响应:
+
+```
+data: {"type": "start", "message": "开始处理请求...", "timestamp": 1754412317.7335036}
+
+data: {"type": "thinking", "content": "正在分析问题并准备查询数据库...", "timestamp": 1754412317.7335036}
+
+data: {"type": "response", "content": "查询结果...", "metadata": {...}, "timestamp": 1754412317.7335036}
+
+data: {"type": "completed", "message": "请求处理完成", "user_id": "test_user", "thread_id": "...", "timestamp": 1754412317.7335036}
+```
+
+## 验证结果
+
+### ✅ 成功解决的问题
+
+1. **Vector搜索异步冲突**:完全消除
+2. **复杂数据库查询**:正常执行SQL生成、查询、结果返回
+3. **LLM配置兼容性**:解决千问模型非流式调用限制
+4. **系统稳定性**:不影响现有API
+
+### ✅ 实际测试验证
+
+**测试查询**:
+```
+请查询所有服务区的名称和档口数量,按档口数量降序排列
+```
+
+**执行结果**:
+- SQL生成成功
+- Vector搜索无异步冲突
+- 数据库查询正常执行
+- 返回正确结果:南城服务区39个档口
+
+## API选择建议
+
+### 使用同步API的场景
+
+- ✅ **复杂数据库查询**
+- ✅ **需要Vector搜索的问题**
+- ✅ **对稳定性要求高的场景**
+
+### 继续使用原API的场景
+
+- ✅ **简单对话查询**
+- ✅ **不涉及数据库的问题**
+- ✅ **现有集成代码暂时不变更的场景**
+
+## 技术细节
+
+### 文件结构
+
+```
+react_agent/
+├── sync_agent.py          # 新增:同步Agent类
+├── agent.py              # 原有:异步Agent类
+└── ...
+
+unified_api.py             # 新增:同步流式API端点
+```
+
+### 关键配置
+
+**LLM配置** (`sync_agent.py`):
+```python
+ChatOpenAI(
+    model=config.QWEN_MODEL,
+    temperature=0.1,
+    streaming=False,  # 关键:禁用流式
+    extra_body={
+        "enable_thinking": False,  # 关键:非流式调用必须为false
+        "misc": {"ensure_ascii": False}
+    }
+)
+```
+
+**LangGraph配置**:
+```python
+# 使用同步invoke而不是异步ainvoke
+final_state = self.agent_executor.invoke(inputs, run_config)
+```
+
+## 影响范围
+
+### ✅ 零风险部署
+
+- **不修改任何现有代码**
+- **原有API继续正常工作**
+- **完全独立的实现**
+- **可以渐进式切换使用**
+
+### 文件变更
+
+- **新增文件**:`react_agent/sync_agent.py`
+- **修改文件**:`unified_api.py` (新增API端点)
+- **未修改**:所有现有核心功能代码
+
+## 结论
+
+通过创建同步版本的React Agent,彻底解决了Vector搜索异步冲突问题,为复杂数据库查询提供了稳定可靠的解决方案。
+
+---
+
+**创建时间**:2025-08-06  
+**解决的核心问题**:React Agent流式API的Vector搜索异步冲突  
+**技术方案**:同步LangGraph + 同步LLM配置

+ 121 - 0
docs/react_agent_api_quick_reference.md

@@ -0,0 +1,121 @@
+# React Agent API 快速参考指南
+
+## API端点总览
+
+| API端点 | 方法 | 适用场景 | 状态 |
+|---------|------|----------|------|
+| `/api/v0/ask_react_agent` | POST | 同步调用,JSON响应 | ✅ 正常使用 |
+| `/api/v0/ask_react_agent_stream` | GET | 异步流式,简单查询 | ✅ 正常使用 |
+| `/api/v0/ask_react_agent_stream_sync` | GET | 同步流式,复杂数据库查询 | ✅ **推荐** |
+
+## 使用建议
+
+### 🚀 复杂数据库查询 (推荐)
+```bash
+GET /api/v0/ask_react_agent_stream_sync?question=请问当前系统中哪个服务区档口最多?&user_id=your_user_id
+```
+- ✅ 解决Vector搜索异步冲突
+- ✅ 适用于需要数据库查询的问题
+- ✅ 稳定可靠,无异步错误
+
+### 💬 简单对话查询
+```bash
+GET /api/v0/ask_react_agent_stream?question=你好&user_id=your_user_id
+```
+- ✅ 适用于不涉及数据库的简单问题
+- ✅ 异步流式响应
+
+### 📊 标准同步调用
+```bash
+POST /api/v0/ask_react_agent
+Content-Type: application/json
+
+{
+  "question": "你的问题",
+  "user_id": "your_user_id"
+}
+```
+- ✅ 标准JSON响应
+- ✅ 适用于集成已有的同步调用代码
+
+## 参数说明
+
+所有API支持相同的核心参数:
+
+| 参数 | 类型 | 必填 | 说明 |
+|------|------|------|------|
+| `question` | string | ✅ | 用户问题 |
+| `user_id` | string | ✅ | 用户ID |
+| `thread_id` | string | ❌ | 对话线程ID,不传则自动生成 |
+| `routing_mode` | string | ❌ | 路由模式,默认'agent' |
+| `continue_conversation` | boolean | ❌ | 是否继续对话,默认false |
+
+## 技术架构对比
+
+| 特性 | 原异步API | 新同步API |
+|------|-----------|-----------|
+| LangGraph执行 | `ainvoke()` 异步 | `invoke()` 同步 |
+| LLM配置 | `streaming=True` | `streaming=False` |
+| Vector搜索 | 异步冲突 ❌ | 同步执行 ✅ |
+| 复杂查询 | 可能出错 ⚠️ | 稳定可靠 ✅ |
+| Checkpointer | AsyncRedisSaver | 无 (避免异步依赖) |
+
+## 问题排查
+
+### Vector搜索异步冲突错误
+```
+RuntimeError: Task <Task pending...> got Future <Future pending> attached to a different loop
+```
+**解决方案**: 使用 `/api/v0/ask_react_agent_stream_sync`
+
+### LLM配置错误
+```
+parameter.enable_thinking must be set to false for non-streaming calls
+```
+**解决方案**: 已在同步API中修复,设置 `enable_thinking=False`
+
+## 示例代码
+
+### JavaScript (前端)
+```javascript
+// 同步流式API (推荐)
+const eventSource = new EventSource(
+  `/api/v0/ask_react_agent_stream_sync?question=${encodeURIComponent(question)}&user_id=${userId}`
+);
+
+eventSource.onmessage = function(event) {
+  const data = JSON.parse(event.data);
+  console.log('收到数据:', data);
+  
+  if (data.type === 'completed') {
+    eventSource.close();
+  }
+};
+```
+
+### Python (后端调用)
+```python
+import requests
+
+# 同步API调用
+response = requests.get(
+    'http://localhost:8084/api/v0/ask_react_agent_stream_sync',
+    params={
+        'question': '请问当前系统中哪个服务区档口最多?',
+        'user_id': 'test_user'
+    },
+    headers={'Accept': 'text/event-stream'},
+    stream=True
+)
+
+for line in response.iter_lines():
+    if line.startswith(b'data: '):
+        data = json.loads(line[6:].decode())
+        print(f"收到: {data}")
+```
+
+---
+
+**更新时间**: 2025-08-06  
+**版本**: v1.0  
+**问题解决**: React Agent Vector搜索异步冲突问题

+ 185 - 0
react_agent/async_sql_tools.py

@@ -0,0 +1,185 @@
+"""
+异步版本的 SQL 工具 - 解决 Vector 搜索异步冲突
+通过线程池执行同步操作,避免 LangGraph 事件循环冲突
+"""
+import json
+import asyncio
+from typing import List, Dict, Any
+from concurrent.futures import ThreadPoolExecutor
+from langchain_core.tools import tool
+from pydantic import BaseModel, Field
+from core.logging import get_react_agent_logger
+
+logger = get_react_agent_logger("AsyncSQLTools")
+
+# 创建线程池执行器
+_executor = ThreadPoolExecutor(max_workers=3)
+
+class GenerateSqlArgs(BaseModel):
+    question: str = Field(description="The user's question in natural language")
+    history_messages: List[Dict[str, Any]] = Field(
+        default_factory=list,
+        description="The conversation history messages for context."
+    )
+
+async def _run_in_executor(func, *args, **kwargs):
+    """在线程池中运行同步函数,避免事件循环冲突"""
+    loop = asyncio.get_event_loop()
+    return await loop.run_in_executor(_executor, func, *args, **kwargs)
+
+@tool(args_schema=GenerateSqlArgs)
+async def generate_sql(question: str, history_messages: List[Dict[str, Any]] = None) -> str:
+    """
+    异步生成 SQL 查询 - 通过线程池调用同步的 Vanna
+    Generates an SQL query based on the user's question and the conversation history.
+    """
+    logger.info(f"🔧 [Async Tool] generate_sql - Question: '{question}'")
+    
+    # 在线程池中执行,避免事件循环冲突
+    def _sync_generate():
+        from common.vanna_instance import get_vanna_instance
+        
+        if history_messages is None:
+            history_messages_local = []
+        else:
+            history_messages_local = history_messages
+        
+        logger.info(f"   History contains {len(history_messages_local)} messages.")
+        
+        # 构建增强问题(与同步版本相同的逻辑)
+        if history_messages_local:
+            history_str = "\n".join([f"{msg['type']}: {msg.get('content', '') or ''}" for msg in history_messages_local])
+            enriched_question = f"""Previous conversation context:
+{history_str}
+
+Current user question:
+human: {question}
+
+Please analyze the conversation history to understand any references (like "this service area", "that branch", etc.) in the current question, and generate the appropriate SQL query."""
+        else:
+            enriched_question = question
+        
+        # 记录 Vanna 输入
+        logger.info("📝 [Async Vanna Input] Complete question being sent to Vanna:")
+        logger.info("--- BEGIN VANNA INPUT ---")
+        logger.info(enriched_question)
+        logger.info("--- END VANNA INPUT ---")
+        
+        try:
+            vn = get_vanna_instance()
+            sql = vn.generate_sql(enriched_question)
+            
+            if not sql or sql.strip() == "":
+                if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
+                    error_info = vn.last_llm_explanation
+                    logger.warning(f"   Vanna returned an explanation instead of SQL: {error_info}")
+                    return f"Database query failed. Reason: {error_info}"
+                else:
+                    logger.warning("   Vanna failed to generate SQL and provided no explanation.")
+                    return "Could not generate SQL: The question may not be suitable for a database query."
+            
+            sql_upper = sql.upper().strip()
+            if not any(keyword in sql_upper for keyword in ['SELECT', 'WITH']):
+                logger.warning(f"   Vanna returned a message that does not appear to be a valid SQL query: {sql}")
+                return f"Database query failed. Reason: {sql}"
+            
+            logger.info(f"   ✅ SQL Generated Successfully:")
+            logger.info(f"   {sql}")
+            return sql
+            
+        except Exception as e:
+            logger.error(f"   An exception occurred during SQL generation: {e}", exc_info=True)
+            return f"SQL generation failed: {str(e)}"
+    
+    # 在线程池中执行
+    return await _run_in_executor(_sync_generate)
+
+# 导入同步版本的验证函数
+def _import_validation_functions():
+    """动态导入验证函数,避免循环导入"""
+    from react_agent.sql_tools import _check_basic_syntax, _check_table_existence, _validate_with_limit_zero
+    return _check_basic_syntax, _check_table_existence, _validate_with_limit_zero
+
+@tool
+async def valid_sql(sql: str) -> str:
+    """
+    异步验证 SQL 语句的有效性
+    Validates the SQL statement by checking syntax and executing with LIMIT 0.
+    """
+    logger.info(f"🔧 [Async Tool] valid_sql - Validating SQL")
+    
+    def _sync_validate():
+        # 导入验证函数
+        _check_basic_syntax, _check_table_existence, _validate_with_limit_zero = _import_validation_functions()
+        
+        # 规则1:基本语法检查
+        if not _check_basic_syntax(sql):
+            logger.warning(f"   SQL基本语法检查失败: {sql[:100]}...")
+            return json.dumps({
+                "result": "invalid",
+                "error": "SQL语句格式错误:必须是SELECT或WITH开头的查询语句"
+            })
+        
+        # 规则2:表存在性检查
+        if not _check_table_existence(sql):
+            logger.warning(f"   SQL表存在性检查失败")
+            return json.dumps({
+                "result": "invalid",
+                "error": "SQL中引用的表不存在于数据库中"
+            })
+        
+        # 规则3:LIMIT 0执行测试
+        return _validate_with_limit_zero(sql)
+    
+    return await _run_in_executor(_sync_validate)
+
+@tool
+async def run_sql(sql: str) -> str:
+    """
+    异步执行 SQL 查询并返回结果
+    执行SQL查询并以JSON字符串格式返回结果。
+    
+    Args:
+        sql: 待执行的SQL语句。
+        
+    Returns:
+        JSON字符串格式的查询结果,或包含错误的JSON字符串。
+    """
+    logger.info(f"🔧 [Async Tool] run_sql - 待执行SQL:")
+    logger.info(f"   {sql}")
+    
+    def _sync_run():
+        from common.vanna_instance import get_vanna_instance
+        
+        try:
+            vn = get_vanna_instance()
+            df = vn.run_sql(sql)
+            
+            logger.debug(f"SQL执行结果:\n{df}")
+            
+            if df is None:
+                logger.warning("   SQL执行成功,但查询结果为空。")
+                result = {"status": "success", "data": [], "message": "查询无结果"}
+                return json.dumps(result, ensure_ascii=False)
+            
+            logger.info(f"   ✅ SQL执行成功,返回 {len(df)} 条记录。")
+            # 将DataFrame转换为JSON,并妥善处理datetime等特殊类型
+            return df.to_json(orient='records', date_format='iso')
+            
+        except Exception as e:
+            logger.error(f"   SQL执行过程中发生异常: {e}", exc_info=True)
+            error_result = {"status": "error", "error_message": str(e)}
+            return json.dumps(error_result, ensure_ascii=False)
+    
+    return await _run_in_executor(_sync_run)
+
+# 将所有异步工具函数收集到一个列表中
+async_sql_tools = [generate_sql, valid_sql, run_sql]
+
+# 清理函数(可选)
+def cleanup():
+    """清理线程池资源"""
+    global _executor
+    if _executor:
+        _executor.shutdown(wait=False)
+        logger.info("异步SQL工具线程池已关闭")

+ 382 - 0
react_agent/sync_agent.py

@@ -0,0 +1,382 @@
+"""
+同步版本的React Agent - 解决Vector搜索异步冲突问题
+基于原有CustomReactAgent,但使用完全同步的实现
+"""
+import json
+import sys
+import os
+from pathlib import Path
+from typing import List, Optional, Dict, Any
+import redis
+
+# 添加项目根目录到sys.path
+try:
+    project_root = Path(__file__).parent.parent
+    if str(project_root) not in sys.path:
+        sys.path.insert(0, str(project_root))
+except Exception as e:
+    pass
+
+from core.logging import get_react_agent_logger
+from langchain_openai import ChatOpenAI
+from langchain_core.messages import HumanMessage, ToolMessage, BaseMessage, SystemMessage, AIMessage
+from langgraph.graph import StateGraph, END
+from langgraph.prebuilt import ToolNode
+
+# 导入同步版本的依赖
+try:
+    from . import config
+    from .state import AgentState
+    from .sql_tools import sql_tools
+except ImportError:
+    import config
+    from state import AgentState
+    from sql_tools import sql_tools
+
+logger = get_react_agent_logger("SyncCustomReactAgent")
+
+class SyncCustomReactAgent:
+    """
+    同步版本的React Agent
+    专门解决Vector搜索的异步事件循环冲突问题
+    """
+    
+    def __init__(self):
+        """私有构造函数,请使用 create() 类方法来创建实例。"""
+        self.llm = None
+        self.tools = None
+        self.agent_executor = None
+        self.checkpointer = None
+        self.redis_client = None
+
+    @classmethod
+    def create(cls):
+        """同步工厂方法,创建并初始化 SyncCustomReactAgent 实例。"""
+        instance = cls()
+        instance._sync_init()
+        return instance
+
+    def _sync_init(self):
+        """同步初始化所有组件。"""
+        logger.info("🚀 开始初始化 SyncCustomReactAgent...")
+
+        # 1. 初始化同步Redis客户端(如果需要)
+        try:
+            self.redis_client = redis.from_url(config.REDIS_URL, decode_responses=True)
+            self.redis_client.ping()
+            logger.info(f"   ✅ Redis连接成功: {config.REDIS_URL}")
+        except Exception as e:
+            logger.warning(f"   ⚠️ Redis连接失败,将不使用checkpointer: {e}")
+            self.redis_client = None
+
+        # 2. 初始化 LLM(同步版本)
+        self.llm = ChatOpenAI(
+            api_key=config.QWEN_API_KEY,
+            base_url=config.QWEN_BASE_URL,
+            model=config.QWEN_MODEL,
+            temperature=0.1,
+            timeout=config.NETWORK_TIMEOUT,
+            max_retries=0,
+            streaming=False,  # 关键:禁用流式处理
+            extra_body={
+                "enable_thinking": False,  # 明确设置为False:非流式调用必须设为false
+                "misc": {
+                    "ensure_ascii": False
+                }
+            }
+        )
+        logger.info(f"   ✅ 同步LLM已初始化,模型: {config.QWEN_MODEL}")
+
+        # 3. 绑定工具
+        self.tools = sql_tools
+        self.llm_with_tools = self.llm.bind_tools(self.tools)
+        logger.info(f"   ✅ 已绑定 {len(self.tools)} 个工具")
+
+        # 4. 创建StateGraph(不使用checkpointer避免异步依赖)
+        self.agent_executor = self._create_sync_graph()
+        logger.info("   ✅ 同步StateGraph已创建")
+
+        logger.info("✅ SyncCustomReactAgent 初始化完成")
+
+    def _create_sync_graph(self):
+        """创建同步的StateGraph"""
+        graph = StateGraph(AgentState)
+        
+        # 添加同步节点
+        graph.add_node("agent", self._sync_agent_node)
+        graph.add_node("tools", ToolNode(self.tools))
+        graph.add_node("prepare_tool_input", self._sync_prepare_tool_input_node)
+        graph.add_node("update_state_after_tool", self._sync_update_state_after_tool_node)
+        graph.add_node("format_final_response", self._sync_format_final_response_node)
+
+        # 设置入口点
+        graph.set_entry_point("agent")
+
+        # 添加条件边
+        graph.add_conditional_edges(
+            "agent",
+            self._sync_should_continue,
+            {
+                "tools": "prepare_tool_input",
+                "end": "format_final_response"
+            }
+        )
+
+        # 添加普通边
+        graph.add_edge("prepare_tool_input", "tools")
+        graph.add_edge("tools", "update_state_after_tool")
+        graph.add_edge("update_state_after_tool", "agent")
+        graph.add_edge("format_final_response", END)
+
+        # 关键:使用同步编译,不传入checkpointer
+        return graph.compile()
+
+    def _sync_agent_node(self, state: AgentState) -> Dict[str, Any]:
+        """同步Agent节点"""
+        logger.info(f"🧠 [Sync Node] agent - Thread: {state.get('thread_id', 'unknown')}")
+        
+        messages_for_llm = state["messages"].copy()
+        
+        # 添加数据库范围提示词
+        if isinstance(state["messages"][-1], HumanMessage):
+            db_scope_prompt = self._get_database_scope_prompt()
+            if db_scope_prompt:
+                messages_for_llm.insert(0, SystemMessage(content=db_scope_prompt))
+                logger.info("   ✅ 已添加数据库范围判断提示词")
+
+        # 同步LLM调用
+        response = self.llm_with_tools.invoke(messages_for_llm)
+        
+        return {"messages": [response]}
+
+    def _sync_should_continue(self, state: AgentState):
+        """同步条件判断"""
+        messages = state["messages"]
+        last_message = messages[-1]
+        
+        if not last_message.tool_calls:
+            return "end"
+        else:
+            return "tools"
+
+    def _sync_prepare_tool_input_node(self, state: AgentState) -> Dict[str, Any]:
+        """同步准备工具输入节点"""
+        logger.info(f"🔧 [Sync Node] prepare_tool_input - Thread: {state.get('thread_id', 'unknown')}")
+        
+        last_message = state["messages"][-1]
+        
+        if hasattr(last_message, 'tool_calls') and last_message.tool_calls:
+            for tool_call in last_message.tool_calls:
+                if tool_call.get('name') == 'generate_sql':
+                    # 注入历史消息
+                    history_messages = self._filter_and_format_history(state["messages"])
+                    if 'args' not in tool_call:
+                        tool_call['args'] = {}
+                    tool_call['args']['history_messages'] = history_messages
+                    logger.info(f"   ✅ 为generate_sql注入了 {len(history_messages)} 条历史消息")
+
+        return {"messages": [last_message]}
+
+    def _sync_update_state_after_tool_node(self, state: AgentState) -> Dict[str, Any]:
+        """同步更新工具执行后的状态"""
+        logger.info(f"📝 [Sync Node] update_state_after_tool - Thread: {state.get('thread_id', 'unknown')}")
+        
+        last_message = state["messages"][-1]
+        tool_name = last_message.name
+        tool_output = last_message.content
+        next_step = None
+
+        if tool_name == 'generate_sql':
+            tool_output_lower = tool_output.lower()
+            if "failed" in tool_output_lower or "无法生成" in tool_output_lower or "失败" in tool_output_lower:
+                next_step = 'answer_with_common_sense'
+            else:
+                next_step = 'valid_sql'
+        elif tool_name == 'valid_sql':
+            if "失败" in tool_output:
+                next_step = 'analyze_validation_error'
+            else:
+                next_step = 'run_sql'
+        elif tool_name == 'run_sql':
+            next_step = 'summarize_final_answer'
+            
+        logger.info(f"   Tool '{tool_name}' executed. Suggested next step: {next_step}")
+        return {"suggested_next_step": next_step}
+
+    def _sync_format_final_response_node(self, state: AgentState) -> Dict[str, Any]:
+        """同步格式化最终响应节点"""
+        logger.info(f"📄 [Sync Node] format_final_response - Thread: {state.get('thread_id', 'unknown')}")
+        
+        messages = state["messages"]
+        last_message = messages[-1]
+        
+        # 构建最终响应
+        final_response = last_message.content
+        
+        logger.info(f"   ✅ 最终响应已准备完成")
+        return {"final_answer": final_response}
+
+    def _filter_and_format_history(self, messages: list) -> list:
+        """过滤和格式化历史消息"""
+        clean_history = []
+        for msg in messages[:-1]:  # 排除最后一条消息
+            if isinstance(msg, HumanMessage):
+                clean_history.append({"type": "human", "content": msg.content})
+            elif isinstance(msg, AIMessage):
+                clean_content = msg.content if not hasattr(msg, 'tool_calls') or not msg.tool_calls else ""
+                if clean_content.strip():
+                    clean_history.append({"type": "ai", "content": clean_content})
+        
+        return clean_history
+
+    def _get_database_scope_prompt(self) -> str:
+        """获取数据库范围判断提示词"""
+        return """你是一个专门处理高速公路收费数据查询的AI助手。在回答用户问题时,请首先判断这个问题是否可以通过查询数据库来回答。
+
+数据库包含以下类型的数据:
+- 服务区信息(名称、位置、档口数量等)
+- 收费站数据
+- 车流量统计
+- 业务数据分析
+
+如果用户的问题与这些数据相关,请使用工具生成SQL查询。
+如果问题与数据库内容无关(如常识性问题、天气、新闻等),请直接用你的知识回答,不要尝试生成SQL。"""
+
+    def chat(self, message: str, user_id: str, thread_id: Optional[str] = None) -> Dict[str, Any]:
+        """
+        同步聊天方法 - 关键:使用 graph.invoke() 而不是 ainvoke()
+        """
+        if thread_id is None:
+            import uuid
+            thread_id = str(uuid.uuid4())
+
+        # 构建输入
+        inputs = {
+            "messages": [HumanMessage(content=message)],
+            "user_id": user_id,
+            "thread_id": thread_id,
+            "suggested_next_step": None
+        }
+
+        # 构建运行配置(不使用checkpointer)
+        run_config = {
+            "recursion_limit": config.RECURSION_LIMIT,
+        }
+
+        try:
+            logger.info(f"🚀 开始同步处理用户消息: {message[:50]}...")
+            
+            # 关键:使用同步的 invoke() 方法
+            final_state = self.agent_executor.invoke(inputs, run_config)
+            
+            logger.info(f"🔍 Final state keys: {list(final_state.keys())}")
+            
+            # 提取答案
+            if final_state["messages"]:
+                answer = final_state["messages"][-1].content
+            else:
+                answer = "抱歉,无法处理您的请求。"
+            
+            # 提取SQL数据(如果有)
+            sql_data = self._extract_latest_sql_data(final_state["messages"])
+            
+            logger.info(f"✅ 同步处理完成 - Final Answer: '{answer[:100]}...'")
+            
+            # 构建返回结果
+            result = {
+                "success": True, 
+                "answer": answer, 
+                "thread_id": thread_id
+            }
+            
+            # 只有当存在SQL数据时才添加到返回结果中
+            if sql_data:
+                try:
+                    # 尝试解析SQL数据
+                    sql_parsed = json.loads(sql_data)
+                    
+                    # 检查数据格式:run_sql工具返回的是数组格式 [{"col1":"val1"}]
+                    if isinstance(sql_parsed, list):
+                        # 数组格式:直接作为records使用
+                        result["api_data"] = {
+                            "response": answer,
+                            "records": sql_parsed,
+                            "react_agent_meta": {
+                                "thread_id": thread_id,
+                                "agent_version": "sync_react_v1"
+                            }
+                        }
+                    elif isinstance(sql_parsed, dict):
+                        # 字典格式:按原逻辑处理
+                        result["api_data"] = {
+                            "response": answer,
+                            "sql": sql_parsed.get("sql", ""),
+                            "records": sql_parsed.get("records", []),
+                            "react_agent_meta": {
+                                "thread_id": thread_id,
+                                "agent_version": "sync_react_v1"
+                            }
+                        }
+                    else:
+                        logger.warning(f"SQL数据格式未知: {type(sql_parsed)}")
+                        raise ValueError("Unknown SQL data format")
+                        
+                except (json.JSONDecodeError, AttributeError, ValueError) as e:
+                    logger.warning(f"SQL数据格式处理失败: {str(e)}, 跳过API数据构建")
+            else:
+                result["api_data"] = {
+                    "response": answer,
+                    "react_agent_meta": {
+                        "thread_id": thread_id,
+                        "agent_version": "sync_react_v1"
+                    }
+                }
+            
+            return result
+            
+        except Exception as e:
+            logger.error(f"❌ 同步处理失败: {str(e)}", exc_info=True)
+            return {
+                "success": False,
+                "error": f"同步处理失败: {str(e)}",
+                "thread_id": thread_id,
+                "retry_suggested": True
+            }
+
+    def _extract_latest_sql_data(self, messages: List[BaseMessage]) -> Optional[str]:
+        """从消息历史中提取最近的run_sql执行结果(同步版本)"""
+        logger.info("🔍 提取最新的SQL执行结果...")
+        
+        # 查找最后一个HumanMessage之后的SQL执行结果
+        last_human_index = -1
+        for i in range(len(messages) - 1, -1, -1):
+            if isinstance(messages[i], HumanMessage):
+                last_human_index = i
+                break
+        
+        if last_human_index == -1:
+            logger.info("   未找到用户消息,跳过SQL数据提取")
+            return None
+        
+        # 只在当前对话轮次中查找SQL结果
+        current_conversation = messages[last_human_index:]
+        logger.info(f"   当前对话轮次包含 {len(current_conversation)} 条消息")
+        
+        for msg in reversed(current_conversation):
+            if isinstance(msg, ToolMessage) and msg.name == 'run_sql':
+                logger.info(f"   找到当前对话轮次的run_sql结果: {msg.content[:100]}...")
+                
+                try:
+                    # 尝试解析JSON以验证格式
+                    parsed_data = json.loads(msg.content)
+                    # 重新序列化,确保中文字符正常显示
+                    formatted_content = json.dumps(parsed_data, ensure_ascii=False, separators=(',', ':'))
+                    logger.info(f"   已转换Unicode转义序列为中文字符")
+                    return formatted_content
+                except json.JSONDecodeError:
+                    # 如果不是有效JSON,直接返回原内容
+                    logger.warning(f"   SQL结果不是有效JSON格式,返回原始内容")
+                    return msg.content
+        
+        logger.info("   当前对话轮次中未找到run_sql执行结果")
+        return None

+ 50 - 8
unified_api.py

@@ -1830,21 +1830,59 @@ def format_sse_react_progress(chunk: dict) -> str:
     """格式化React Agent进度事件为SSE格式"""
     node = chunk.get("node")
     thread_id = chunk.get("thread_id")
+    node_data = chunk.get("data", {})
     
-    # 节点显示名称映射
-    node_display_map = {
+    # 基础节点显示名称映射
+    base_node_display_map = {
         "__start__": "开始处理",
-        "trim_messages": "准备消息", 
-        "agent": "AI思考中",
-        "prepare_tool_input": "准备工具",
-        "tools": "执行查询",
-        "update_state_after_tool": "处理结果",
+        "trim_messages": "检查问题上下文", 
+        "agent": "AI分析中",
+        "prepare_tool_input": "准备工具参数",
+        "tools": "执行操作",
+        "update_state_after_tool": "检查结果",
         "format_final_response": "生成回答",
         "__end__": "完成"
     }
     
-    display_name = node_display_map.get(node, "处理中")
+    # 工具名称映射
+    tool_display_map = {
+        "generate_sql": "生成SQL语句",
+        "valid_sql": "验证SQL语法", 
+        "run_sql": "执行SQL查询"
+    }
+    
+    display_name = base_node_display_map.get(node, "处理中")
+    tool_name = None
+    
+    # 特殊处理:提取具体工具信息
+    if node_data and "messages" in node_data and node_data["messages"]:
+        messages = node_data["messages"]
+        last_message = messages[-1]
+        
+        # 方法1:从 AIMessage 的 tool_calls 中提取(agent节点输出)
+        if (hasattr(last_message, 'tool_calls') and 
+            last_message.tool_calls and 
+            len(last_message.tool_calls) > 0):
+            tool_call = last_message.tool_calls[0]
+            tool_name = tool_call.get('name')
+            
+        # 方法2:从 ToolMessage 的 name 属性中提取(tools节点输出)
+        elif (hasattr(last_message, 'name') and 
+              last_message.name):
+            tool_name = last_message.name
     
+    # 根据节点和工具信息生成更精确的显示名称
+    if tool_name and tool_name in tool_display_map:
+        if node == "agent":
+            display_name = f"准备{tool_display_map[tool_name]}"
+        elif node == "tools":
+            display_name = tool_display_map[tool_name]  # 去掉"正在"前缀
+        elif node == "update_state_after_tool":
+            display_name = "检查结果"  # 统一为"检查结果"
+        elif node == "prepare_tool_input":
+            display_name = "准备工具参数"  # 统一为通用描述
+    
+    # 构建响应数据
     data = {
         "code": 200,
         "success": True,
@@ -1858,6 +1896,10 @@ def format_sse_react_progress(chunk: dict) -> str:
         }
     }
     
+    # 可选:在调试模式下添加工具信息
+    if tool_name:
+        data["data"]["tool_name"] = tool_name
+    
     import json
     return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"