react_agent_streaming_api_design.md 11 KB

React Agent 流式API设计方案

概述

本文档描述了React Agent流式状态监控API的设计方案,解决当前同步API无法显示执行进度的问题。通过新增流式API,在保持现有API不变的前提下,为客户端提供实时的执行状态监控能力。

背景问题

当前API的局限性

当前的 ask_react_agent API存在以下问题:

  1. 同步执行:客户端必须等待整个Agent执行完成
  2. 无进度显示:客户端无法了解当前执行状态
  3. 用户体验差:长时间等待没有任何反馈

用户需求

  • 立即获得 thread_id 用于标识对话
  • 实时显示Agent执行进度(如"AI思考中"、"执行查询"等)
  • 保持现有API的兼容性

解决方案

设计原则

  1. 向后兼容:现有 ask_react_agent API保持完全不变
  2. 最小侵入:尽可能复用现有代码逻辑
  3. 统一格式:新API与现有API保持相同的响应格式
  4. 实时性:基于LangGraph原生的流式能力

技术方案

1. 新增流式API

端点POST /api/v0/ask_react_agent_stream

技术栈

  • 基于 LangGraph 的 astream 功能
  • 使用 Server-Sent Events (SSE) 推送状态
  • 保持统一的标准API响应格式

2. 执行流程

sequenceDiagram
    participant Client as 客户端
    participant API as 流式API
    participant Agent as React Agent
    participant LangGraph as LangGraph执行器

    Client->>API: POST /ask_react_agent_stream
    API->>API: 生成thread_id
    API->>Client: 推送thread_id
    API->>Agent: 调用chat_stream()
    Agent->>LangGraph: astream执行
    
    loop 每个节点执行
        LangGraph->>Agent: 节点状态更新
        Agent->>API: yield进度信息
        API->>Client: 推送进度状态
    end
    
    LangGraph->>Agent: 执行完成
    Agent->>API: yield最终结果
    API->>Client: 推送完整结果
    API->>Client: 推送结束标记

实现方案

1. 修改 react_agent/agent.py

新增方法chat_stream()

async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None):
    """
    流式处理用户聊天请求 - 复用chat()方法的所有逻辑
    """
    # 1. 复用现有的初始化逻辑
    if not thread_id:
        now = pd.Timestamp.now()
        milliseconds = int(now.microsecond / 1000)
        thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
    
    # 2. 复用现有的配置和错误处理
    self._recursion_count = 0
    run_config = {
        "configurable": {"thread_id": thread_id},
        "recursion_limit": config.RECURSION_LIMIT
    }
    
    # 3. 复用checkpointer检查逻辑
    if self.checkpointer:
        # ... checkpointer连接检查和重新初始化 ...
    
    # 4. 使用astream流式执行
    final_state = None
    async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"):
        for node_name, node_data in chunk.items():
            yield {
                "type": "node_progress",
                "node": node_name,
                "data": node_data,
                "thread_id": thread_id
            }
            final_state = node_data
    
    # 5. 复用现有的结果处理逻辑
    if final_state and "messages" in final_state:
        api_data = await self._async_generate_api_data(final_state)
        yield {
            "type": "final_result",
            "result": {"api_data": api_data, "thread_id": thread_id}
        }

关键特性

  • 完全复用 chat() 方法的初始化和错误处理逻辑
  • 使用 astream 而非 ainvoke 获取每个节点状态
  • 通过生成器模式实时yield状态信息

2. 修改 unified_api.py

新增API端点ask_react_agent_stream()

@app.route("/api/v0/ask_react_agent_stream", methods=["POST"])
async def ask_react_agent_stream():
    """React Agent 流式API"""
    
    async def generate():
        try:
            # 复用现有的数据验证逻辑
            data = request.get_json(force=True)
            validated_data = validate_request_data(data)
            
            # 复用现有的Agent初始化检查
            if not await ensure_agent_ready():
                yield error_response()
                return
            
            # 流式执行
            async for chunk in _react_agent_instance.chat_stream(
                message=validated_data['question'],
                user_id=validated_data['user_id'],
                thread_id=validated_data.get('thread_id')
            ):
                if chunk["type"] == "node_progress":
                    yield format_progress_message(chunk)
                elif chunk["type"] == "final_result":
                    yield format_final_result(chunk, validated_data)
                elif chunk["type"] == "error":
                    yield format_error_message(chunk)
            
        except Exception as e:
            yield format_exception(e)
    
    return Response(generate(), mimetype='text/event-stream')

3. 响应格式设计

进度状态消息

{
  "code": 200,
  "success": true,
  "message": "正在执行: AI思考中",
  "data": {
    "type": "progress",
    "node": "agent",
    "display_name": "AI思考中",
    "icon": "🤖",
    "thread_id": "wang1:20250131103000001",
    "timestamp": "2025-01-31T10:30:00"
  }
}

最终结果消息

{
  "code": 200,
  "success": true,
  "message": "处理成功",
  "data": {
    "type": "completed",
    "response": "根据销售数据分析...",
    "conversation_id": "wang1:20250131103000001",
    "user_id": "wang1",
    "react_agent_meta": {...},
    "sql": "SELECT * FROM sales...",
    "records": [...],
    "timestamp": "2025-01-31T10:32:15"
  }
}

节点状态映射

节点名称 显示名称 图标 说明
__start__ 开始 🚀 流程启动
trim_messages 准备中 📝 消息裁剪
agent AI思考中 🤖 LLM推理决策
prepare_tool_input 准备工具 🔧 工具输入准备
tools 执行查询 ⚙️ SQL工具执行
update_state_after_tool 处理结果 🔄 结果处理
format_final_response 生成回答 📝 最终响应格式化
__end__ 完成 流程结束

前端集成

现有API(保持不变)

// 同步方式 - 适合不需要显示进度的场景
const response = await fetch('/api/v0/ask_react_agent', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({question, user_id})
});

const result = await response.json();
displayResult(result.data);

流式API

// 流式方式 - 适合需要显示进度的场景
const eventSource = new EventSource('/api/v0/ask_react_agent_stream', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({question, user_id})
});

let threadId = null;

eventSource.onmessage = function(event) {
    const data = JSON.parse(event.data);
    
    switch(data.data.type) {
        case 'progress':
            updateProgress(`${data.data.icon} ${data.data.display_name}`);
            if (!threadId) threadId = data.data.thread_id;
            break;
            
        case 'completed':
            hideProgress();
            displayResult(data.data);  // 格式与现有API完全一致
            eventSource.close();
            break;
            
        case 'error':
            hideProgress();
            showError(data.message);
            eventSource.close();
            break;
    }
};

API测试

Postman测试流式API

Postman完全支持测试SSE流式API

测试配置

  1. Method: POST
  2. URL: http://localhost:8084/api/v0/ask_react_agent_stream
  3. Headers:

    Content-Type: application/json
    Accept: text/event-stream
    
  4. Body (JSON):

    {
     "question": "帮我查询销售数据",
     "user_id": "test_user"
    }
    

    预期响应

    Postman会逐步显示流式响应:

    ```

data: {"code":200,"success":true,"message":"任务已启动","data":{"type":"started","thread_id":"test_user:20250131..."}}

data: {"code":200,"success":true,"message":"正在执行: AI思考中","data":{"type":"progress","display_name":"AI思考中","icon":"🤖"}}

data: {"code":200,"success":true,"message":"正在执行: 执行查询","data":{"type":"progress","display_name":"执行查询","icon":"⚙️"}}

data: {"code":200,"success":true,"message":"处理成功","data":{"type":"completed","response":"...","conversation_id":"test_user:20250131..."}} ```

测试要点

适合测试

  • ✅ SSE格式是否正确
  • ✅ JSON结构验证
  • ✅ 错误情况处理
  • ✅ 流式事件序列

局限性

  • ❌ 无法测试前端EventSource逻辑
  • ❌ 显示原始SSE格式,不够直观

技术优势

1. 向后兼容性

  • 现有API完全不变ask_react_agent 保持所有现有功能
  • 响应格式一致:最终结果与现有API格式完全相同
  • 代码复用:复用所有现有的验证、格式化、错误处理逻辑

2. 实时性能

  • 基于LangGraph原生能力:利用 astream 获取真实的节点执行状态
  • 零延迟推送:每个节点执行时立即推送状态
  • 无需轮询:Server-Sent Events 实现实时推送

3. 用户体验

  • 立即响应:客户端立即获得 thread_id
  • 进度可视化:实时显示执行进度和当前步骤
  • 错误及时反馈:执行异常时立即通知客户端

实施计划

阶段1:核心功能开发

  • react_agent/agent.py 中新增 chat_stream() 方法
  • unified_api.py 中新增流式API端点
  • 实现节点状态映射和响应格式化

阶段2:测试验证

  • 单元测试:验证流式方法的正确性
  • 集成测试:验证API端到端功能
  • 性能测试:验证流式推送性能

阶段3:文档和部署

  • 更新API文档
  • 前端集成示例
  • 生产环境部署

风险评估

低风险

  • 现有功能影响:新增功能,现有API完全不变
  • 代码质量:大量复用现有逻辑,风险较低

需要注意

  • 异步处理:确保所有异步调用正确处理
  • 错误传播:确保Agent内部错误正确传递到API层
  • 资源管理:长连接的资源清理和超时处理

总结

本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为React Agent提供了实时状态监控能力。方案具有以下特点:

  1. 最小侵入:只需新增代码,不修改现有逻辑
  2. 用户友好:提供类似ChatGPT的实时反馈体验
  3. 技术先进:基于LangGraph原生流式能力
  4. 易于维护:统一的响应格式和错误处理

该方案为React Agent的用户体验升级提供了完整的技术解决方案。


本文档描述了React Agent流式API的完整设计方案,为实际开发提供详细的技术规范。