# React Agent 流式API设计方案 (ask_react_agent_stream) ## 概述 本文档描述了为React Agent添加流式API `ask_react_agent_stream` 的设计方案,该API与现有的 `ask_react_agent` 功能和参数完全相同,但采用流式输出模式。设计参考了现有的 `ask_agent_stream` 实现模式,确保系统的一致性和稳定性。 ## 背景与需求 ### 现有API局限性 当前的 `ask_react_agent` API 存在以下问题: 1. **同步执行**:客户端必须等待完整的React Agent执行完成 2. **无进度反馈**:客户端无法了解LangGraph各节点的执行状态 3. **用户体验差**:长时间执行没有实时反馈 ### 解决目标 - 提供与 `ask_react_agent` 相同的功能和参数 - 支持实时显示LangGraph节点执行进度 - 采用SSE(Server-Sent Events)流式推送 - 完全不影响现有 `ask_react_agent` API的功能 ## 技术方案 ### 1. API设计 **端点**: `GET /api/v0/ask_react_agent_stream` **参数获取方式**: URL查询参数(因为EventSource只支持GET请求) **支持参数**: - `question` (必填): 用户问题 - `user_id` (可选): 用户ID,默认为"guest"或从thread_id推断 - `thread_id` (可选): 会话ID,不传则自动生成新会话 **响应格式**: `text/event-stream` (SSE格式) ### 2.1 同步版本API (推荐用于复杂数据库查询) **端点**: `GET /api/v0/ask_react_agent_stream_sync` **设计目的**: 解决Vector搜索异步冲突问题,专门用于复杂数据库查询 **参数**: 与原API完全相同 - `question` (必填): 用户问题 - `user_id` (必填): 用户ID - `thread_id` (可选): 会话ID,不传则自动生成新会话 - `routing_mode` (可选): 路由模式,默认'agent' - `continue_conversation` (可选): 是否继续对话,默认false **技术架构**: - 使用同步LangGraph (`invoke()` 而不是 `ainvoke()`) - 同步LLM配置 (`streaming=False`, `enable_thinking=False`) - 完全避免异步依赖冲突 **适用场景**: - ✅ 复杂数据库查询 - ✅ 需要Vector搜索的问题 - ✅ 对稳定性要求高的场景 **使用示例**: ```bash curl -X GET "http://localhost:8084/api/v0/ask_react_agent_stream_sync?question=请问当前系统中哪个服务区档口最多?&user_id=test_user" -H "Accept: text/event-stream" ``` ### 2. 实现架构 ```mermaid sequenceDiagram participant Client as 客户端 participant API as ask_react_agent_stream participant ReactAgent as React Agent实例 participant LangGraph as LangGraph执行器 Client->>API: GET /ask_react_agent_stream?question=...&user_id=... API->>API: 参数验证和数据清洗 API->>ReactAgent: 调用chat_stream()方法 ReactAgent->>LangGraph: astream执行 loop 每个节点执行 LangGraph->>ReactAgent: 节点状态更新 ReactAgent->>API: yield进度信息 API->>Client: SSE推送进度状态 end LangGraph->>ReactAgent: 执行完成 ReactAgent->>API: yield最终结果 API->>Client: SSE推送完整结果 Client->>Client: 关闭EventSource连接 ``` ### 3. 核心实现逻辑 #### 3.1 API端点实现 ```python @app.route('/api/v0/ask_react_agent_stream', methods=['GET']) async def ask_react_agent_stream(): """React Agent 流式API - 支持实时进度显示 功能与ask_react_agent完全相同,除了采用流式输出 """ def generate(): try: # 1. 参数获取和验证(从URL参数) question = request.args.get('question') user_id_input = request.args.get('user_id') thread_id_input = request.args.get('thread_id') # 参数验证(复用现有validate_request_data逻辑) if not question: yield format_sse_error("缺少必需参数:question") return # 2. 数据预处理(与ask_react_agent相同) validated_data = validate_request_data({ 'question': question, 'user_id': user_id_input, 'thread_id': thread_id_input }) # 3. Agent实例检查 if not await ensure_agent_ready(): yield format_sse_error("React Agent 初始化失败") return # 4. 流式执行 async for chunk in _react_agent_instance.chat_stream( message=validated_data['question'], user_id=validated_data['user_id'], thread_id=validated_data['thread_id'] ): if chunk["type"] == "progress": yield format_sse_react_progress(chunk) elif chunk["type"] == "completed": yield format_sse_react_completed(chunk) elif chunk["type"] == "error": yield format_sse_error(chunk.get("error", "处理失败")) except Exception as e: yield format_sse_error(f"服务异常: {str(e)}") return Response(stream_with_context(generate()), mimetype='text/event-stream') ``` #### 3.2 React Agent流式方法 需要在React Agent类中新增 `chat_stream()` 方法: ```python async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None): """ 流式处理用户聊天请求 - 复用chat()方法的所有逻辑 """ # 1. 复用现有的初始化逻辑(thread_id生成、配置检查等) if not thread_id: now = pd.Timestamp.now() milliseconds = int(now.microsecond / 1000) thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}" # 2. 复用现有的配置和错误处理 self._recursion_count = 0 run_config = { "configurable": {"thread_id": thread_id}, "recursion_limit": config.RECURSION_LIMIT } # 3. 复用checkpointer检查逻辑 if self.checkpointer: # ... checkpointer连接检查和重新初始化 ... # 4. 使用astream流式执行 final_state = None async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"): for node_name, node_data in chunk.items(): yield { "type": "progress", "node": node_name, "data": node_data, "thread_id": thread_id } final_state = node_data # 5. 复用现有的结果处理逻辑 if final_state and "messages" in final_state: api_data = await self._async_generate_api_data(final_state) yield { "type": "completed", "result": {"api_data": api_data, "thread_id": thread_id} } ``` ### 4. SSE响应格式设计 #### 4.1 进度状态消息 ```json { "code": 200, "success": true, "message": "正在执行: AI思考中", "data": { "type": "progress", "node": "agent", "display_name": "AI思考中", "thread_id": "wang1:20250131103000001", "timestamp": "2025-01-31T10:30:00.123Z" } } ``` #### 4.2 最终结果消息 ```json { "code": 200, "success": true, "message": "处理完成", "data": { "type": "completed", "response": "根据销售数据分析...", "conversation_id": "wang1:20250131103000001", "user_id": "wang1", "react_agent_meta": { "thread_id": "wang1:20250131103000001", "agent_version": "custom_react_v1_async" }, "sql": "SELECT * FROM sales...", "records": [...], "timestamp": "2025-01-31T10:32:15.456Z" } } ``` #### 4.3 错误消息 ```json { "code": 500, "success": false, "message": "处理失败", "data": { "type": "error", "error": "具体错误信息", "timestamp": "2025-01-31T10:30:05.789Z" } } ``` ### 5. 节点状态映射 基于LangGraph的节点执行,提供用户友好的状态显示: | 节点名称 | 显示名称 | 说明 | |----------|----------|------| | `__start__` | 开始处理 | 流程启动 | | `trim_messages` | 准备消息 | 消息预处理 | | `agent` | AI思考中 | LLM推理决策 | | `prepare_tool_input` | 准备工具 | 工具输入准备 | | `tools` | 执行查询 | SQL工具执行 | | `update_state_after_tool` | 处理结果 | 结果后处理 | | `format_final_response` | 生成回答 | 最终响应格式化 | | `__end__` | 完成 | 流程结束 | ### 6. SSE格式化函数 #### 6.1 进度格式化 ```python def format_sse_react_progress(chunk: dict) -> str: """格式化React Agent进度事件为SSE格式""" node = chunk.get("node") thread_id = chunk.get("thread_id") # 节点显示名称映射 node_display_map = { "__start__": "开始处理", "trim_messages": "准备消息", "agent": "AI思考中", "prepare_tool_input": "准备工具", "tools": "执行查询", "update_state_after_tool": "处理结果", "format_final_response": "生成回答", "__end__": "完成" } display_name = node_display_map.get(node, "处理中") data = { "code": 200, "success": True, "message": f"正在执行: {display_name}", "data": { "type": "progress", "node": node, "display_name": display_name, "thread_id": thread_id, "timestamp": datetime.now().isoformat() } } import json return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" ``` #### 6.2 完成结果格式化 ```python def format_sse_react_completed(chunk: dict) -> str: """格式化React Agent完成事件为SSE格式""" result = chunk.get("result", {}) api_data = result.get("api_data", {}) thread_id = result.get("thread_id") # 构建与ask_react_agent相同的响应格式 response_data = { "response": api_data.get("response", ""), "conversation_id": thread_id, "react_agent_meta": api_data.get("react_agent_meta", {}), "timestamp": datetime.now().isoformat() } # 可选字段 if "sql" in api_data: response_data["sql"] = api_data["sql"] if "records" in api_data: response_data["records"] = api_data["records"] data = { "code": 200, "success": True, "message": "处理完成", "data": { "type": "completed", **response_data } } import json return f"data: {json.dumps(data, ensure_ascii=False)}\n\n" ``` ## 前端集成 ### 现有API(保持不变) ```javascript // 同步方式 - 现有代码无需修改 const response = await fetch('/api/v0/ask_react_agent', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({ question: "帮我查询销售数据", user_id: "wang1" }) }); const result = await response.json(); displayResult(result.data); ``` ### 流式API ```javascript // 流式方式 - 新增功能 const params = new URLSearchParams({ question: "帮我查询销售数据", user_id: "wang1" }); const eventSource = new EventSource(`/api/v0/ask_react_agent_stream?${params}`); let conversationId = null; eventSource.onmessage = function(event) { const data = JSON.parse(event.data); switch(data.data.type) { case 'progress': updateProgress(data.data.display_name); if (!conversationId) conversationId = data.data.thread_id; break; case 'completed': hideProgress(); displayResult(data.data); // 格式与现有API完全一致 eventSource.close(); break; case 'error': hideProgress(); showError(data.data.error); eventSource.close(); break; } }; eventSource.onerror = function(error) { console.error('EventSource failed:', error); eventSource.close(); }; ``` ### 完整前端集成示例 #### HTML页面结构 ```html
{result.response}
{result.sql && ({result.sql}
共 {result.records.row_count} 条记录
{/* 表格渲染逻辑 */}