Explorar o código

准备开发langgraph agent的astream功能。

wangxq hai 2 semanas
pai
achega
c777698d27

+ 557 - 0
docs/agent_streaming_api_design.md

@@ -0,0 +1,557 @@
+# Agent 流式API设计方案
+
+## 概述
+
+本文档描述了Citu LangGraph Agent流式状态监控API的设计方案,解决当前同步API无法显示执行进度的问题。通过新增流式API,在保持现有API不变的前提下,为客户端提供实时的业务查询处理状态监控能力。
+
+## 背景问题
+
+### 当前API的局限性
+
+当前的 `ask_agent` API存在以下问题:
+
+1. **同步执行**:客户端必须等待整个Agent执行完成
+2. **无进度显示**:无法了解问题分类、SQL生成、查询执行等关键步骤的进度
+3. **用户体验差**:复杂查询时长时间等待没有任何反馈
+4. **调试困难**:开发者无法实时观察Agent的决策和执行过程
+
+### 业务场景需求
+
+- **复杂SQL生成**:表结构分析、SQL生成可能耗时较长
+- **数据库查询监控**:大数据量查询需要执行进度反馈
+- **错误及时反馈**:SQL验证失败、执行异常需要立即通知
+- **分类决策透明**:让用户了解系统如何判断问题类型
+
+## 解决方案
+
+### 设计原则
+
+1. **向后兼容**:现有 `ask_agent` API保持完全不变
+2. **业务导向**:重点关注SQL生成、执行等高价值业务环节
+3. **统一格式**:与现有API保持相同的响应格式
+4. **实时性**:基于LangGraph原生的流式能力
+5. **调试友好**:提供丰富的执行状态信息
+
+### 技术方案
+
+#### 1. 新增流式API
+
+**端点**:`POST /api/v0/ask_agent_stream`
+
+**技术栈**:
+- 基于 LangGraph 的 `astream` 功能
+- 使用 Server-Sent Events (SSE) 推送状态
+- 复用现有的所有业务逻辑和错误处理
+
+#### 2. 执行流程
+
+```mermaid
+sequenceDiagram
+    participant Client as 客户端
+    participant API as 流式API
+    participant Agent as Citu Agent
+    participant LangGraph as LangGraph执行器
+
+    Client->>API: POST /ask_agent_stream
+    API->>API: 生成conversation_id
+    API->>Client: 推送开始状态
+    API->>Agent: 调用process_question_stream()
+    Agent->>LangGraph: astream执行
+    
+    Note over LangGraph: 问题分类节点
+    LangGraph->>Agent: classify_question状态
+    Agent->>API: yield分类进度
+    API->>Client: 推送"正在分析问题类型"
+    
+    Note over LangGraph: 条件路由决策
+    LangGraph->>Agent: 路由决策结果
+    Agent->>API: yield路由信息
+    API->>Client: 推送问题类型结果
+    
+    alt DATABASE类型
+        Note over LangGraph: SQL生成验证节点
+        LangGraph->>Agent: sql_generation状态
+        Agent->>API: yield SQL生成进度
+        API->>Client: 推送"正在生成SQL"
+        
+        Note over LangGraph: SQL执行节点
+        LangGraph->>Agent: sql_execution状态
+        Agent->>API: yield执行进度
+        API->>Client: 推送"正在执行查询"
+    else CHAT类型
+        Note over LangGraph: 聊天处理节点
+        LangGraph->>Agent: chat状态
+        Agent->>API: yield聊天进度
+        API->>Client: 推送"正在思考回答"
+    end
+    
+    Note over LangGraph: 响应格式化节点
+    LangGraph->>Agent: format_response状态
+    Agent->>API: yield最终结果
+    API->>Client: 推送完整结果
+    API->>Client: 推送结束标记
+```
+
+## 实现方案
+
+### 1. 修改 `agent/citu_agent.py`
+
+**新增方法**:`process_question_stream()`
+
+```python
+async def process_question_stream(self, question: str, conversation_id: str = None, 
+                                context_type: str = None, routing_mode: str = None):
+    """
+    流式处理用户问题 - 复用process_question()的所有逻辑
+    """
+    try:
+        # 1. 复用现有的初始化逻辑
+        if not conversation_id:
+            conversation_id = self._generate_conversation_id()
+        
+        # 2. 动态创建workflow(复用现有逻辑)
+        workflow = self._create_workflow(routing_mode)
+        
+        # 3. 创建初始状态(复用现有逻辑)
+        initial_state = self._create_initial_state(question, conversation_id, context_type, routing_mode)
+        
+        # 4. 使用astream流式执行
+        async for chunk in workflow.astream(
+            initial_state,
+            config={"configurable": {"conversation_id": conversation_id}} if conversation_id else None
+        ):
+            for node_name, node_data in chunk.items():
+                # 映射节点状态为用户友好的进度信息
+                progress_info = self._map_node_to_progress(node_name, node_data)
+                if progress_info:
+                    yield {
+                        "type": "progress",
+                        "node": node_name,
+                        "progress": progress_info,
+                        "state_data": self._extract_relevant_state(node_data),
+                        "conversation_id": conversation_id
+                    }
+        
+        # 5. 最终结果处理(复用现有的结果提取逻辑)
+        final_result = node_data.get("final_response", {})
+        yield {
+            "type": "completed",
+            "result": final_result,
+            "conversation_id": conversation_id
+        }
+        
+    except Exception as e:
+        yield {
+            "type": "error", 
+            "error": str(e),
+            "conversation_id": conversation_id
+        }
+```
+
+**关键特性**:
+- 完全复用现有的工作流创建和状态管理逻辑
+- 使用 `astream` 而非 `ainvoke` 获取每个节点状态
+- 通过生成器模式实时yield状态信息
+- 保持所有现有的错误处理和业务逻辑
+
+### 2. 修改 `unified_api.py`
+
+**新增API端点**:`ask_agent_stream()`
+
+```python
+@app.route("/api/v0/ask_agent_stream", methods=["POST"])
+async def ask_agent_stream():
+    """Citu Agent 流式API"""
+    
+    async def generate():
+        try:
+            # 复用现有的数据验证逻辑
+            data = request.get_json(force=True)
+            validated_data = validate_agent_request_data(data)
+            
+            # 复用现有的Agent初始化检查
+            if not await ensure_agent_ready():
+                yield format_error_event("Agent服务不可用")
+                return
+            
+            # 流式执行
+            async for chunk in _agent_instance.process_question_stream(
+                question=validated_data['question'],
+                conversation_id=validated_data.get('conversation_id'),
+                context_type=validated_data.get('context_type'),
+                routing_mode=validated_data.get('routing_mode')
+            ):
+                if chunk["type"] == "progress":
+                    yield format_progress_event(chunk)
+                elif chunk["type"] == "completed":
+                    yield format_completion_event(chunk, validated_data)
+                elif chunk["type"] == "error":
+                    yield format_error_event(chunk["error"])
+            
+        except Exception as e:
+            yield format_exception_event(e)
+    
+    return Response(generate(), mimetype='text/event-stream')
+```
+
+### 3. 节点状态映射设计
+
+#### 核心节点监控
+
+| 节点名称 | 显示名称 | 图标 | 业务价值 | 监控重点 |
+|----------|----------|------|----------|----------|
+| `classify_question` | 分析问题类型 | 🤔 | ⭐⭐⭐ | 分类结果、置信度、路由模式 |
+| `agent_sql_generation` | 生成SQL查询 | 🔧 | ⭐⭐⭐⭐⭐ | SQL生成成功/失败、验证结果 |
+| `agent_sql_execution` | 执行数据查询 | ⚙️ | ⭐⭐⭐⭐⭐ | 查询进度、结果行数、摘要生成 |
+| `agent_chat` | 思考回答 | 💭 | ⭐⭐⭐ | 聊天处理进度 |
+| `format_response` | 整理结果 | 📝 | ⭐⭐ | 最终格式化进度 |
+
+#### 条件路由监控
+
+```python
+def _map_node_to_progress(self, node_name: str, node_data: dict) -> dict:
+    """将节点执行状态映射为用户友好的进度信息"""
+    
+    if node_name == "classify_question":
+        question_type = node_data.get("question_type", "UNCERTAIN")
+        confidence = node_data.get("classification_confidence", 0)
+        return {
+            "display_name": "分析问题类型",
+            "icon": "🤔",
+            "details": f"问题类型: {question_type} (置信度: {confidence:.2f})",
+            "sub_status": f"使用{node_data.get('classification_method', '未知')}方法分类"
+        }
+    
+    elif node_name == "agent_sql_generation":
+        if node_data.get("sql_generation_success"):
+            return {
+                "display_name": "SQL生成成功",
+                "icon": "✅",
+                "details": f"生成SQL: {node_data.get('sql', '')[:50]}...",
+                "sub_status": "验证通过,准备执行"
+            }
+        else:
+            error_type = node_data.get("validation_error_type", "unknown")
+            return {
+                "display_name": "SQL生成处理中",
+                "icon": "🔧",
+                "details": f"验证状态: {error_type}",
+                "sub_status": node_data.get("user_prompt", "正在处理")
+            }
+    
+    elif node_name == "agent_sql_execution":
+        query_result = node_data.get("query_result", {})
+        row_count = query_result.get("row_count", 0)
+        return {
+            "display_name": "执行数据查询", 
+            "icon": "⚙️",
+            "details": f"查询完成,返回 {row_count} 行数据",
+            "sub_status": "正在生成摘要" if row_count > 0 else "查询执行完成"
+        }
+    
+    elif node_name == "agent_chat":
+        return {
+            "display_name": "思考回答",
+            "icon": "💭", 
+            "details": "正在处理您的问题",
+            "sub_status": "使用智能对话模式"
+        }
+    
+    elif node_name == "format_response":
+        return {
+            "display_name": "整理结果",
+            "icon": "📝",
+            "details": "正在格式化响应结果",
+            "sub_status": "即将完成"
+        }
+    
+    return None
+```
+
+### 4. 响应格式设计
+
+#### 进度状态消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "正在执行: 分析问题类型",
+  "data": {
+    "type": "progress",
+    "node": "classify_question",
+    "display_name": "分析问题类型",
+    "icon": "🤔",
+    "details": "问题类型: DATABASE (置信度: 0.85)",
+    "sub_status": "使用hybrid方法分类",
+    "conversation_id": "citu_20250131103000001",
+    "timestamp": "2025-01-31T10:30:05",
+    "state_summary": {
+      "current_step": "classified",
+      "execution_path": ["start", "classify"],
+      "routing_mode": "hybrid"
+    }
+  }
+}
+```
+
+#### 最终结果消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "处理完成",
+  "data": {
+    "type": "completed",
+    "response": "根据查询结果,服务区营业收入...",
+    "type": "DATABASE",
+    "sql": "SELECT service_area, SUM(revenue) FROM ...",
+    "query_result": {
+      "columns": [...],
+      "rows": [...],
+      "row_count": 25
+    },
+    "summary": "查询到25个服务区的营业数据...",
+    "conversation_id": "citu_20250131103000001",
+    "execution_path": ["start", "classify", "agent_sql_generation", "agent_sql_execution", "format_response"],
+    "classification_info": {
+      "question_type": "DATABASE",
+      "confidence": 0.85,
+      "method": "rule_based_strong_business"
+    },
+    "timestamp": "2025-01-31T10:30:15"
+  }
+}
+```
+
+#### 错误状态消息
+
+```json
+{
+  "code": 500,
+  "success": false,
+  "message": "SQL验证失败",
+  "data": {
+    "type": "error",
+    "error": "不允许的操作: DELETE。本系统只支持查询操作(SELECT)。",
+    "error_type": "forbidden_keywords",
+    "conversation_id": "citu_20250131103000001",
+    "node": "agent_sql_generation",
+    "timestamp": "2025-01-31T10:30:10",
+    "state_summary": {
+      "sql": "DELETE FROM sales WHERE...",
+      "validation_error_type": "forbidden_keywords"
+    }
+  }
+}
+```
+
+## 前端集成
+
+### 现有API(保持不变)
+
+```javascript
+// 同步方式 - 适合简单查询场景
+const response = await fetch('/api/v0/ask_agent', {
+    method: 'POST',
+    headers: {'Content-Type': 'application/json'},
+    body: JSON.stringify({question, user_id})
+});
+
+const result = await response.json();
+displayResult(result.data);
+```
+
+### 流式API
+
+```javascript
+// 流式方式 - 适合复杂查询和需要进度显示的场景
+const eventSource = new EventSource('/api/v0/ask_agent_stream', {
+    method: 'POST',
+    headers: {'Content-Type': 'application/json'},
+    body: JSON.stringify({question, user_id})
+});
+
+let conversationId = null;
+
+eventSource.onmessage = function(event) {
+    const data = JSON.parse(event.data);
+    
+    switch(data.data.type) {
+        case 'progress':
+            // 显示执行进度
+            updateProgress(
+                `${data.data.icon} ${data.data.display_name}`,
+                data.data.details,
+                data.data.sub_status
+            );
+            
+            // 特殊处理关键节点
+            if (data.data.node === 'classify_question') {
+                showClassificationResult(data.data.state_summary);
+            } else if (data.data.node === 'agent_sql_generation') {
+                if (data.data.details.includes('SQL生成成功')) {
+                    showSQL(data.data.state_summary.sql);
+                }
+            }
+            
+            if (!conversationId) conversationId = data.data.conversation_id;
+            break;
+            
+        case 'completed':
+            hideProgress();
+            displayResult(data.data);  // 格式与现有API完全一致
+            eventSource.close();
+            break;
+            
+        case 'error':
+            hideProgress();
+            showError(data.message, data.data.error_type);
+            eventSource.close();
+            break;
+    }
+};
+
+// 错误处理
+eventSource.onerror = function(event) {
+    hideProgress();
+    showError("连接异常,请重试");
+    eventSource.close();
+};
+```
+
+## API测试
+
+### Postman测试流式API
+
+**Postman完全支持测试SSE流式API**:
+
+#### 测试配置
+
+1. **Method**: `POST`
+2. **URL**: `http://localhost:8084/api/v0/ask_agent_stream`
+3. **Headers**:
+   ```
+   Content-Type: application/json
+   Accept: text/event-stream
+   ```
+4. **Body** (JSON):
+   ```json
+   {
+     "question": "查询各服务区的营业收入排行",
+     "user_id": "test_user",
+     "routing_mode": "hybrid"
+   }
+   ```
+
+#### 预期响应
+
+Postman会逐步显示流式响应:
+
+```
+data: {"code":200,"success":true,"message":"正在执行: 分析问题类型","data":{"type":"progress","display_name":"分析问题类型","icon":"🤔"}}
+
+data: {"code":200,"success":true,"message":"正在执行: 生成SQL查询","data":{"type":"progress","display_name":"生成SQL查询","icon":"🔧"}}
+
+data: {"code":200,"success":true,"message":"正在执行: 执行数据查询","data":{"type":"progress","display_name":"执行数据查询","icon":"⚙️"}}
+
+data: {"code":200,"success":true,"message":"处理完成","data":{"type":"completed","response":"...","conversation_id":"citu_20250131..."}}
+```
+
+#### 测试要点
+
+**适合测试**:
+- ✅ SSE格式正确性
+- ✅ 节点状态转换序列
+- ✅ 错误处理(SQL验证失败等)
+- ✅ 不同路由模式的执行路径
+- ✅ 分类决策和置信度
+
+**特殊测试场景**:
+- 🔧 SQL生成失败场景
+- ⚙️ 数据库连接异常场景  
+- 🤔 问题分类边界情况
+- 💭 聊天类型问题处理
+
+## 技术优势
+
+### 1. 业务价值最大化
+
+- **复杂查询可视化**:SQL生成、验证、执行全程可见
+- **智能分类透明化**:用户了解系统如何判断问题类型
+- **错误及时反馈**:SQL验证失败立即通知,不用等待整个流程
+- **调试友好**:开发者可实时观察Agent决策过程
+
+### 2. 技术架构优势
+
+- **零风险改造**:现有API完全不变,新功能独立部署
+- **代码复用最大化**:所有业务逻辑、错误处理完全复用
+- **状态管理完善**:`AgentState` 天然适合流式监控
+- **性能影响最小**:流式执行不增加计算开销
+
+### 3. 用户体验提升
+
+- **进度可视化**:关键业务步骤实时显示
+- **智能等待**:用户知道系统在做什么,减少焦虑
+- **错误友好**:问题分类、SQL验证错误立即显示
+- **调试便利**:执行路径、状态变更全程记录
+
+## 实施计划
+
+### 阶段1:核心功能开发(1-2周)
+- [ ] 在 `CituLangGraphAgent` 中新增 `process_question_stream()` 方法
+- [ ] 在 `unified_api.py` 中新增流式API端点
+- [ ] 实现节点状态映射和响应格式化
+- [ ] 基础错误处理和异常捕获
+
+### 阶段2:业务逻辑完善(1周)
+- [ ] 完善SQL生成/验证/修复的流式监控
+- [ ] 实现分类决策的详细状态推送
+- [ ] 优化聊天类型问题的进度显示
+- [ ] 添加条件路由的状态推送
+
+### 阶段3:测试验证(1周)
+- [ ] 单元测试:验证流式方法的正确性
+- [ ] 集成测试:验证API端到端功能
+- [ ] 业务场景测试:复杂SQL、分类边界、错误处理
+- [ ] 性能测试:验证流式推送性能影响
+
+### 阶段4:文档和部署(0.5周)
+- [ ] 更新API文档和使用示例
+- [ ] 前端集成指南和最佳实践
+- [ ] 生产环境部署和监控配置
+
+## 风险评估
+
+### 低风险
+- **现有功能影响**:新增功能,现有API完全不变
+- **代码质量**:大量复用现有逻辑,经过验证的业务代码
+- **部署复杂度**:仅需要API层面的更新
+
+### 需要注意
+- **异步处理**:确保所有节点的异步调用正确处理
+- **状态一致性**:确保流式状态与最终结果一致
+- **资源管理**:长连接的资源清理和超时处理
+- **错误传播**:确保节点内部错误正确传递到流式API
+
+### 监控要点
+- **连接数监控**:SSE长连接数量和持续时间
+- **节点执行时间**:各个节点的执行耗时分布
+- **错误率统计**:不同类型错误的发生频率
+- **用户体验指标**:完整流程的端到端耗时
+
+## 总结
+
+本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为Citu LangGraph Agent提供了实时状态监控能力。方案具有以下特点:
+
+1. **业务价值最大化**:重点关注SQL生成、执行等高价值环节
+2. **技术实现可靠**:基于LangGraph原生流式能力,复用所有现有逻辑
+3. **用户体验优秀**:提供类似ChatGPT的实时反馈体验
+4. **维护成本低**:统一的响应格式和错误处理机制
+
+该方案为Agent系统的用户体验升级提供了完整的技术解决方案,特别适合处理复杂业务查询的场景。
+
+---
+
+*本文档描述了Citu LangGraph Agent流式API的完整设计方案,为实际开发提供详细的技术规范。*

+ 378 - 0
docs/react_agent_streaming_api_design.md

@@ -0,0 +1,378 @@
+# React Agent 流式API设计方案
+
+## 概述
+
+本文档描述了React Agent流式状态监控API的设计方案,解决当前同步API无法显示执行进度的问题。通过新增流式API,在保持现有API不变的前提下,为客户端提供实时的执行状态监控能力。
+
+## 背景问题
+
+### 当前API的局限性
+
+当前的 `ask_react_agent` API存在以下问题:
+
+1. **同步执行**:客户端必须等待整个Agent执行完成
+2. **无进度显示**:客户端无法了解当前执行状态
+3. **用户体验差**:长时间等待没有任何反馈
+
+### 用户需求
+
+- 立即获得 `thread_id` 用于标识对话
+- 实时显示Agent执行进度(如"AI思考中"、"执行查询"等)
+- 保持现有API的兼容性
+
+## 解决方案
+
+### 设计原则
+
+1. **向后兼容**:现有 `ask_react_agent` API保持完全不变
+2. **最小侵入**:尽可能复用现有代码逻辑
+3. **统一格式**:新API与现有API保持相同的响应格式
+4. **实时性**:基于LangGraph原生的流式能力
+
+### 技术方案
+
+#### 1. 新增流式API
+
+**端点**:`POST /api/v0/ask_react_agent_stream`
+
+**技术栈**:
+- 基于 LangGraph 的 `astream` 功能
+- 使用 Server-Sent Events (SSE) 推送状态
+- 保持统一的标准API响应格式
+
+#### 2. 执行流程
+
+```mermaid
+sequenceDiagram
+    participant Client as 客户端
+    participant API as 流式API
+    participant Agent as React Agent
+    participant LangGraph as LangGraph执行器
+
+    Client->>API: POST /ask_react_agent_stream
+    API->>API: 生成thread_id
+    API->>Client: 推送thread_id
+    API->>Agent: 调用chat_stream()
+    Agent->>LangGraph: astream执行
+    
+    loop 每个节点执行
+        LangGraph->>Agent: 节点状态更新
+        Agent->>API: yield进度信息
+        API->>Client: 推送进度状态
+    end
+    
+    LangGraph->>Agent: 执行完成
+    Agent->>API: yield最终结果
+    API->>Client: 推送完整结果
+    API->>Client: 推送结束标记
+```
+
+## 实现方案
+
+### 1. 修改 `react_agent/agent.py`
+
+**新增方法**:`chat_stream()`
+
+```python
+async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None):
+    """
+    流式处理用户聊天请求 - 复用chat()方法的所有逻辑
+    """
+    # 1. 复用现有的初始化逻辑
+    if not thread_id:
+        now = pd.Timestamp.now()
+        milliseconds = int(now.microsecond / 1000)
+        thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
+    
+    # 2. 复用现有的配置和错误处理
+    self._recursion_count = 0
+    run_config = {
+        "configurable": {"thread_id": thread_id},
+        "recursion_limit": config.RECURSION_LIMIT
+    }
+    
+    # 3. 复用checkpointer检查逻辑
+    if self.checkpointer:
+        # ... checkpointer连接检查和重新初始化 ...
+    
+    # 4. 使用astream流式执行
+    final_state = None
+    async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"):
+        for node_name, node_data in chunk.items():
+            yield {
+                "type": "node_progress",
+                "node": node_name,
+                "data": node_data,
+                "thread_id": thread_id
+            }
+            final_state = node_data
+    
+    # 5. 复用现有的结果处理逻辑
+    if final_state and "messages" in final_state:
+        api_data = await self._async_generate_api_data(final_state)
+        yield {
+            "type": "final_result",
+            "result": {"api_data": api_data, "thread_id": thread_id}
+        }
+```
+
+**关键特性**:
+- 完全复用 `chat()` 方法的初始化和错误处理逻辑
+- 使用 `astream` 而非 `ainvoke` 获取每个节点状态
+- 通过生成器模式实时yield状态信息
+
+### 2. 修改 `unified_api.py`
+
+**新增API端点**:`ask_react_agent_stream()`
+
+```python
+@app.route("/api/v0/ask_react_agent_stream", methods=["POST"])
+async def ask_react_agent_stream():
+    """React Agent 流式API"""
+    
+    async def generate():
+        try:
+            # 复用现有的数据验证逻辑
+            data = request.get_json(force=True)
+            validated_data = validate_request_data(data)
+            
+            # 复用现有的Agent初始化检查
+            if not await ensure_agent_ready():
+                yield error_response()
+                return
+            
+            # 流式执行
+            async for chunk in _react_agent_instance.chat_stream(
+                message=validated_data['question'],
+                user_id=validated_data['user_id'],
+                thread_id=validated_data.get('thread_id')
+            ):
+                if chunk["type"] == "node_progress":
+                    yield format_progress_message(chunk)
+                elif chunk["type"] == "final_result":
+                    yield format_final_result(chunk, validated_data)
+                elif chunk["type"] == "error":
+                    yield format_error_message(chunk)
+            
+        except Exception as e:
+            yield format_exception(e)
+    
+    return Response(generate(), mimetype='text/event-stream')
+```
+
+### 3. 响应格式设计
+
+#### 进度状态消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "正在执行: AI思考中",
+  "data": {
+    "type": "progress",
+    "node": "agent",
+    "display_name": "AI思考中",
+    "icon": "🤖",
+    "thread_id": "wang1:20250131103000001",
+    "timestamp": "2025-01-31T10:30:00"
+  }
+}
+```
+
+#### 最终结果消息
+
+```json
+{
+  "code": 200,
+  "success": true,
+  "message": "处理成功",
+  "data": {
+    "type": "completed",
+    "response": "根据销售数据分析...",
+    "conversation_id": "wang1:20250131103000001",
+    "user_id": "wang1",
+    "react_agent_meta": {...},
+    "sql": "SELECT * FROM sales...",
+    "records": [...],
+    "timestamp": "2025-01-31T10:32:15"
+  }
+}
+```
+
+#### 节点状态映射
+
+| 节点名称 | 显示名称 | 图标 | 说明 |
+|----------|----------|------|------|
+| `__start__` | 开始 | 🚀 | 流程启动 |
+| `trim_messages` | 准备中 | 📝 | 消息裁剪 |
+| `agent` | AI思考中 | 🤖 | LLM推理决策 |
+| `prepare_tool_input` | 准备工具 | 🔧 | 工具输入准备 |
+| `tools` | 执行查询 | ⚙️ | SQL工具执行 |
+| `update_state_after_tool` | 处理结果 | 🔄 | 结果处理 |
+| `format_final_response` | 生成回答 | 📝 | 最终响应格式化 |
+| `__end__` | 完成 | ✅ | 流程结束 |
+
+## 前端集成
+
+### 现有API(保持不变)
+
+```javascript
+// 同步方式 - 适合不需要显示进度的场景
+const response = await fetch('/api/v0/ask_react_agent', {
+    method: 'POST',
+    headers: {'Content-Type': 'application/json'},
+    body: JSON.stringify({question, user_id})
+});
+
+const result = await response.json();
+displayResult(result.data);
+```
+
+### 流式API
+
+```javascript
+// 流式方式 - 适合需要显示进度的场景
+const eventSource = new EventSource('/api/v0/ask_react_agent_stream', {
+    method: 'POST',
+    headers: {'Content-Type': 'application/json'},
+    body: JSON.stringify({question, user_id})
+});
+
+let threadId = null;
+
+eventSource.onmessage = function(event) {
+    const data = JSON.parse(event.data);
+    
+    switch(data.data.type) {
+        case 'progress':
+            updateProgress(`${data.data.icon} ${data.data.display_name}`);
+            if (!threadId) threadId = data.data.thread_id;
+            break;
+            
+        case 'completed':
+            hideProgress();
+            displayResult(data.data);  // 格式与现有API完全一致
+            eventSource.close();
+            break;
+            
+        case 'error':
+            hideProgress();
+            showError(data.message);
+            eventSource.close();
+            break;
+    }
+};
+```
+
+## API测试
+
+### Postman测试流式API
+
+**Postman完全支持测试SSE流式API**:
+
+#### 测试配置
+
+1. **Method**: `POST`
+2. **URL**: `http://localhost:8084/api/v0/ask_react_agent_stream`
+3. **Headers**:
+   ```
+   Content-Type: application/json
+   Accept: text/event-stream
+   ```
+4. **Body** (JSON):
+   ```json
+   {
+     "question": "帮我查询销售数据",
+     "user_id": "test_user"
+   }
+   ```
+
+#### 预期响应
+
+Postman会逐步显示流式响应:
+
+```
+data: {"code":200,"success":true,"message":"任务已启动","data":{"type":"started","thread_id":"test_user:20250131..."}}
+
+data: {"code":200,"success":true,"message":"正在执行: AI思考中","data":{"type":"progress","display_name":"AI思考中","icon":"🤖"}}
+
+data: {"code":200,"success":true,"message":"正在执行: 执行查询","data":{"type":"progress","display_name":"执行查询","icon":"⚙️"}}
+
+data: {"code":200,"success":true,"message":"处理成功","data":{"type":"completed","response":"...","conversation_id":"test_user:20250131..."}}
+```
+
+#### 测试要点
+
+**适合测试**:
+- ✅ SSE格式是否正确
+- ✅ JSON结构验证
+- ✅ 错误情况处理
+- ✅ 流式事件序列
+
+**局限性**:
+- ❌ 无法测试前端EventSource逻辑
+- ❌ 显示原始SSE格式,不够直观
+
+## 技术优势
+
+### 1. 向后兼容性
+
+- **现有API完全不变**:`ask_react_agent` 保持所有现有功能
+- **响应格式一致**:最终结果与现有API格式完全相同
+- **代码复用**:复用所有现有的验证、格式化、错误处理逻辑
+
+### 2. 实时性能
+
+- **基于LangGraph原生能力**:利用 `astream` 获取真实的节点执行状态
+- **零延迟推送**:每个节点执行时立即推送状态
+- **无需轮询**:Server-Sent Events 实现实时推送
+
+### 3. 用户体验
+
+- **立即响应**:客户端立即获得 `thread_id`
+- **进度可视化**:实时显示执行进度和当前步骤
+- **错误及时反馈**:执行异常时立即通知客户端
+
+## 实施计划
+
+### 阶段1:核心功能开发
+- [ ] 在 `react_agent/agent.py` 中新增 `chat_stream()` 方法
+- [ ] 在 `unified_api.py` 中新增流式API端点
+- [ ] 实现节点状态映射和响应格式化
+
+### 阶段2:测试验证
+- [ ] 单元测试:验证流式方法的正确性
+- [ ] 集成测试:验证API端到端功能
+- [ ] 性能测试:验证流式推送性能
+
+### 阶段3:文档和部署
+- [ ] 更新API文档
+- [ ] 前端集成示例
+- [ ] 生产环境部署
+
+## 风险评估
+
+### 低风险
+- **现有功能影响**:新增功能,现有API完全不变
+- **代码质量**:大量复用现有逻辑,风险较低
+
+### 需要注意
+- **异步处理**:确保所有异步调用正确处理
+- **错误传播**:确保Agent内部错误正确传递到API层
+- **资源管理**:长连接的资源清理和超时处理
+
+## 总结
+
+本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为React Agent提供了实时状态监控能力。方案具有以下特点:
+
+1. **最小侵入**:只需新增代码,不修改现有逻辑
+2. **用户友好**:提供类似ChatGPT的实时反馈体验
+3. **技术先进**:基于LangGraph原生流式能力
+4. **易于维护**:统一的响应格式和错误处理
+
+该方案为React Agent的用户体验升级提供了完整的技术解决方案。
+
+---
+
+*本文档描述了React Agent流式API的完整设计方案,为实际开发提供详细的技术规范。*

+ 3 - 3
react_agent/agent.py

@@ -1429,9 +1429,9 @@ class CustomReactAgent:
         """Get database scope prompt for intelligent query decision making"""
         try:
             import os
-            # Read agent/tools/db_query_decision_prompt.txt
-            project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-            db_scope_file = os.path.join(project_root, "agent", "tools", "db_query_decision_prompt.txt")
+            # Read local db_query_decision_prompt.txt
+            current_dir = os.path.dirname(os.path.abspath(__file__))
+            db_scope_file = os.path.join(current_dir, "db_query_decision_prompt.txt")
             
             with open(db_scope_file, 'r', encoding='utf-8') as f:
                 db_scope_content = f.read().strip()