本文档描述了React Agent流式状态监控API的设计方案,解决当前同步API无法显示执行进度的问题。通过新增流式API,在保持现有API不变的前提下,为客户端提供实时的执行状态监控能力。
当前的 ask_react_agent
API存在以下问题:
thread_id
用于标识对话ask_react_agent
API保持完全不变端点:POST /api/v0/ask_react_agent_stream
技术栈:
astream
功能sequenceDiagram
participant Client as 客户端
participant API as 流式API
participant Agent as React Agent
participant LangGraph as LangGraph执行器
Client->>API: POST /ask_react_agent_stream
API->>API: 生成thread_id
API->>Client: 推送thread_id
API->>Agent: 调用chat_stream()
Agent->>LangGraph: astream执行
loop 每个节点执行
LangGraph->>Agent: 节点状态更新
Agent->>API: yield进度信息
API->>Client: 推送进度状态
end
LangGraph->>Agent: 执行完成
Agent->>API: yield最终结果
API->>Client: 推送完整结果
API->>Client: 推送结束标记
react_agent/agent.py
新增方法:chat_stream()
async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None):
"""
流式处理用户聊天请求 - 复用chat()方法的所有逻辑
"""
# 1. 复用现有的初始化逻辑
if not thread_id:
now = pd.Timestamp.now()
milliseconds = int(now.microsecond / 1000)
thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
# 2. 复用现有的配置和错误处理
self._recursion_count = 0
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": config.RECURSION_LIMIT
}
# 3. 复用checkpointer检查逻辑
if self.checkpointer:
# ... checkpointer连接检查和重新初始化 ...
# 4. 使用astream流式执行
final_state = None
async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"):
for node_name, node_data in chunk.items():
yield {
"type": "node_progress",
"node": node_name,
"data": node_data,
"thread_id": thread_id
}
final_state = node_data
# 5. 复用现有的结果处理逻辑
if final_state and "messages" in final_state:
api_data = await self._async_generate_api_data(final_state)
yield {
"type": "final_result",
"result": {"api_data": api_data, "thread_id": thread_id}
}
关键特性:
chat()
方法的初始化和错误处理逻辑astream
而非 ainvoke
获取每个节点状态unified_api.py
新增API端点:ask_react_agent_stream()
@app.route("/api/v0/ask_react_agent_stream", methods=["POST"])
async def ask_react_agent_stream():
"""React Agent 流式API"""
async def generate():
try:
# 复用现有的数据验证逻辑
data = request.get_json(force=True)
validated_data = validate_request_data(data)
# 复用现有的Agent初始化检查
if not await ensure_agent_ready():
yield error_response()
return
# 流式执行
async for chunk in _react_agent_instance.chat_stream(
message=validated_data['question'],
user_id=validated_data['user_id'],
thread_id=validated_data.get('thread_id')
):
if chunk["type"] == "node_progress":
yield format_progress_message(chunk)
elif chunk["type"] == "final_result":
yield format_final_result(chunk, validated_data)
elif chunk["type"] == "error":
yield format_error_message(chunk)
except Exception as e:
yield format_exception(e)
return Response(generate(), mimetype='text/event-stream')
{
"code": 200,
"success": true,
"message": "正在执行: AI思考中",
"data": {
"type": "progress",
"node": "agent",
"display_name": "AI思考中",
"icon": "🤖",
"thread_id": "wang1:20250131103000001",
"timestamp": "2025-01-31T10:30:00"
}
}
{
"code": 200,
"success": true,
"message": "处理成功",
"data": {
"type": "completed",
"response": "根据销售数据分析...",
"conversation_id": "wang1:20250131103000001",
"user_id": "wang1",
"react_agent_meta": {...},
"sql": "SELECT * FROM sales...",
"records": [...],
"timestamp": "2025-01-31T10:32:15"
}
}
节点名称 | 显示名称 | 图标 | 说明 |
---|---|---|---|
__start__ |
开始 | 🚀 | 流程启动 |
trim_messages |
准备中 | 📝 | 消息裁剪 |
agent |
AI思考中 | 🤖 | LLM推理决策 |
prepare_tool_input |
准备工具 | 🔧 | 工具输入准备 |
tools |
执行查询 | ⚙️ | SQL工具执行 |
update_state_after_tool |
处理结果 | 🔄 | 结果处理 |
format_final_response |
生成回答 | 📝 | 最终响应格式化 |
__end__ |
完成 | ✅ | 流程结束 |
// 同步方式 - 适合不需要显示进度的场景
const response = await fetch('/api/v0/ask_react_agent', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question, user_id})
});
const result = await response.json();
displayResult(result.data);
// 流式方式 - 适合需要显示进度的场景
const eventSource = new EventSource('/api/v0/ask_react_agent_stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question, user_id})
});
let threadId = null;
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
switch(data.data.type) {
case 'progress':
updateProgress(`${data.data.icon} ${data.data.display_name}`);
if (!threadId) threadId = data.data.thread_id;
break;
case 'completed':
hideProgress();
displayResult(data.data); // 格式与现有API完全一致
eventSource.close();
break;
case 'error':
hideProgress();
showError(data.message);
eventSource.close();
break;
}
};
Postman完全支持测试SSE流式API:
POST
http://localhost:8084/api/v0/ask_react_agent_stream
Headers:
Content-Type: application/json
Accept: text/event-stream
Body (JSON):
{
"question": "帮我查询销售数据",
"user_id": "test_user"
}
Postman会逐步显示流式响应:
```
data: {"code":200,"success":true,"message":"任务已启动","data":{"type":"started","thread_id":"test_user:20250131..."}}
data: {"code":200,"success":true,"message":"正在执行: AI思考中","data":{"type":"progress","display_name":"AI思考中","icon":"🤖"}}
data: {"code":200,"success":true,"message":"正在执行: 执行查询","data":{"type":"progress","display_name":"执行查询","icon":"⚙️"}}
data: {"code":200,"success":true,"message":"处理成功","data":{"type":"completed","response":"...","conversation_id":"test_user:20250131..."}} ```
适合测试:
局限性:
ask_react_agent
保持所有现有功能astream
获取真实的节点执行状态thread_id
react_agent/agent.py
中新增 chat_stream()
方法unified_api.py
中新增流式API端点本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为React Agent提供了实时状态监控能力。方案具有以下特点:
该方案为React Agent的用户体验升级提供了完整的技术解决方案。
本文档描述了React Agent流式API的完整设计方案,为实际开发提供详细的技术规范。