|
@@ -0,0 +1,418 @@
|
|
|
+# React Agent状态监控API设计文档
|
|
|
+
|
|
|
+## 1. 需求背景
|
|
|
+
|
|
|
+在React Agent执行过程中,用户需要实时了解当前的执行状态,类似ChatGPT的"AI正在思考"效果。
|
|
|
+
|
|
|
+**核心需求**:
|
|
|
+- 提供状态查询API:`GET /api/v0/react/status/{thread_id}`
|
|
|
+- 对应现有聊天接口:`POST /api/v0/ask_react_agent`
|
|
|
+- 支持并发访问,不阻塞主要业务流程
|
|
|
+- 显示具体的执行步骤和工具调用状态
|
|
|
+
|
|
|
+## 2. 问题分析
|
|
|
+
|
|
|
+### 2.1 并发阻塞问题
|
|
|
+
|
|
|
+**问题现象**:
|
|
|
+- 执行`/api/v0/ask_react_agent`时,无法同时调用`/api/v0/react/status/{thread_id}`
|
|
|
+- 状态API必须等待聊天API完成后才能执行
|
|
|
+
|
|
|
+**根因分析**:
|
|
|
+```python
|
|
|
+# 全局单例Agent实例
|
|
|
+_react_agent_instance: Optional[Any] = None
|
|
|
+
|
|
|
+# ask_react_agent API调用
|
|
|
+agent_result = await _react_agent_instance.chat(...)
|
|
|
+→ agent_executor.ainvoke() # 长时间执行,锁定Agent实例
|
|
|
+
|
|
|
+# status API同时调用
|
|
|
+checkpoint_tuple = await _react_agent_instance.checkpointer.aget_tuple(read_config)
|
|
|
+→ 访问同一个checkpointer实例 → 被阻塞
|
|
|
+```
|
|
|
+
|
|
|
+**核心原因**:
|
|
|
+- 全局Agent实例的资源竞争
|
|
|
+- StateGraph执行期间持续使用checkpointer
|
|
|
+- AsyncRedisSaver可能存在内部锁定机制
|
|
|
+
|
|
|
+### 2.2 AsyncRedisSaver并发性调研
|
|
|
+
|
|
|
+通过LangGraph官方文档调研发现:
|
|
|
+
|
|
|
+**支持并发的证据**:
|
|
|
+```python
|
|
|
+# 官方文档显示不同thread_id可以并发
|
|
|
+config1 = {"configurable": {"thread_id": "1"}}
|
|
|
+config2 = {"configurable": {"thread_id": "2"}}
|
|
|
+# 理论上可以并发运行
|
|
|
+
|
|
|
+# 异步API设计
|
|
|
+async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as checkpointer:
|
|
|
+ await checkpointer.aput(write_config, checkpoint, {}, {})
|
|
|
+ loaded_checkpoint = await checkpointer.aget(read_config)
|
|
|
+```
|
|
|
+
|
|
|
+**结论**:AsyncRedisSaver本身支持并发,问题在于Agent实例的资源竞争。
|
|
|
+
|
|
|
+### 2.3 状态判断问题
|
|
|
+
|
|
|
+**`suggested_next_step`字段不可靠**:
|
|
|
+- 测试发现:即使LangGraph执行完成,该字段仍显示执行中
|
|
|
+- 原因:该字段表示"下一步要执行的节点",不是"当前执行状态"
|
|
|
+
|
|
|
+## 3. 解决方案设计
|
|
|
+
|
|
|
+### 3.1 方案对比
|
|
|
+
|
|
|
+| 方案 | 优点 | 缺点 | 可行性 |
|
|
|
+|------|------|------|---------|
|
|
|
+| 通过Agent API | 复用现有逻辑 | 资源竞争,并发阻塞 | ❌ |
|
|
|
+| 直接Redis访问 | 完全独立,无阻塞 | 需要解析原始数据 | ✅ |
|
|
|
+| 独立checkpointer | 避免竞争 | 复杂度高 | 🔶 |
|
|
|
+
|
|
|
+**最终选择**:直接Redis访问方案
|
|
|
+
|
|
|
+### 3.2 技术方案
|
|
|
+
|
|
|
+**核心思路**:
|
|
|
+1. 使用独立的Redis连接,绕过Agent实例
|
|
|
+2. 直接读取Redis中的checkpoint原始数据
|
|
|
+3. 通过分析messages列表判断真实执行状态
|
|
|
+4. 根据工具调用情况提供详细状态信息
|
|
|
+
|
|
|
+## 4. 实现设计
|
|
|
+
|
|
|
+### 4.1 Redis数据结构分析
|
|
|
+
|
|
|
+**Redis Key格式**:
|
|
|
+```
|
|
|
+checkpoint:wang1:20250729235038043:__empty__:1f06c944-6250-64c7-8021-00e2694c5546
|
|
|
+```
|
|
|
+
|
|
|
+**Redis Value结构**:
|
|
|
+```json
|
|
|
+{
|
|
|
+ "checkpoint": {
|
|
|
+ "channel_values": {
|
|
|
+ "messages": [
|
|
|
+ {
|
|
|
+ "kwargs": {
|
|
|
+ "type": "human|ai|tool",
|
|
|
+ "content": "...",
|
|
|
+ "tool_calls": [...],
|
|
|
+ "name": "generate_sql|valid_sql|run_sql"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "suggested_next_step": "..." // 不可靠,不使用
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+### 4.2 状态判断逻辑
|
|
|
+
|
|
|
+**通过messages分析执行状态**:
|
|
|
+
|
|
|
+1. **已完成**:最后一条AIMessage有完整内容且无tool_calls
|
|
|
+2. **执行中**:
|
|
|
+ - 最后一条AIMessage有tool_calls → 显示具体工具调用
|
|
|
+ - 最后一条ToolMessage → 显示工具执行状态
|
|
|
+ - 最后一条HumanMessage → AI思考中
|
|
|
+
|
|
|
+### 4.3 工具状态映射
|
|
|
+
|
|
|
+```python
|
|
|
+TOOL_STATUS_MAPPING = {
|
|
|
+ "generate_sql": {"name": "生成SQL中", "icon": "🔍"},
|
|
|
+ "valid_sql": {"name": "验证SQL中", "icon": "✅"},
|
|
|
+ "run_sql": {"name": "执行查询中", "icon": "⚡"},
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+## 5. 完整实现代码
|
|
|
+
|
|
|
+### 5.1 API接口实现
|
|
|
+
|
|
|
+```python
|
|
|
+@app.route('/api/v0/react/status/<thread_id>', methods=['GET'])
|
|
|
+async def get_react_agent_status_direct(thread_id: str):
|
|
|
+ """直接访问Redis获取React Agent执行状态,绕过Agent实例资源竞争"""
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 工具状态映射
|
|
|
+ TOOL_STATUS_MAPPING = {
|
|
|
+ "generate_sql": {"name": "生成SQL中", "icon": "🔍"},
|
|
|
+ "valid_sql": {"name": "验证SQL中", "icon": "✅"},
|
|
|
+ "run_sql": {"name": "执行查询中", "icon": "⚡"},
|
|
|
+ }
|
|
|
+
|
|
|
+ # 创建独立的Redis连接,不使用Agent的连接
|
|
|
+ redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 1. 查找该thread_id的所有checkpoint键
|
|
|
+ pattern = f"checkpoint:{thread_id}:*"
|
|
|
+ keys = await redis_client.keys(pattern)
|
|
|
+
|
|
|
+ if not keys:
|
|
|
+ from common.result import failed
|
|
|
+ return jsonify(failed(message="未找到执行线程", code=404)), 404
|
|
|
+
|
|
|
+ # 2. 获取最新的checkpoint键
|
|
|
+ latest_key = sorted(keys)[-1]
|
|
|
+
|
|
|
+ # 3. 直接从Redis获取原始JSON数据
|
|
|
+ raw_checkpoint_data = await redis_client.get(latest_key)
|
|
|
+
|
|
|
+ if not raw_checkpoint_data:
|
|
|
+ from common.result import failed
|
|
|
+ return jsonify(failed(message="无法读取checkpoint数据", code=500)), 500
|
|
|
+
|
|
|
+ # 4. 解析JSON
|
|
|
+ checkpoint = json.loads(raw_checkpoint_data)
|
|
|
+
|
|
|
+ # 5. 提取messages
|
|
|
+ messages = checkpoint.get("checkpoint", {}).get("channel_values", {}).get("messages", [])
|
|
|
+
|
|
|
+ if not messages:
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": "初始化中",
|
|
|
+ "icon": "🚀",
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+ from common.result import success
|
|
|
+ return jsonify(success(data=status_data, message="获取状态成功")), 200
|
|
|
+
|
|
|
+ # 6. 分析最后一条消息
|
|
|
+ last_message = messages[-1]
|
|
|
+ last_msg_type = last_message.get("kwargs", {}).get("type", "")
|
|
|
+
|
|
|
+ # 7. 判断执行状态
|
|
|
+ if (last_msg_type == "ai" and
|
|
|
+ not last_message.get("kwargs", {}).get("tool_calls", []) and
|
|
|
+ last_message.get("kwargs", {}).get("content", "").strip()):
|
|
|
+
|
|
|
+ # 完成状态:AIMessage有完整回答且无tool_calls
|
|
|
+ status_data = {
|
|
|
+ "status": "completed",
|
|
|
+ "name": "完成",
|
|
|
+ "icon": "✅",
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ elif (last_msg_type == "ai" and
|
|
|
+ last_message.get("kwargs", {}).get("tool_calls", [])):
|
|
|
+
|
|
|
+ # AI正在调用工具
|
|
|
+ tool_calls = last_message.get("kwargs", {}).get("tool_calls", [])
|
|
|
+ tool_name = tool_calls[0].get("name", "") if tool_calls else ""
|
|
|
+
|
|
|
+ tool_info = TOOL_STATUS_MAPPING.get(tool_name, {
|
|
|
+ "name": f"调用{tool_name}中" if tool_name else "调用工具中",
|
|
|
+ "icon": "🔧"
|
|
|
+ })
|
|
|
+
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": tool_info["name"],
|
|
|
+ "icon": tool_info["icon"],
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ elif last_msg_type == "tool":
|
|
|
+ # 工具执行完成,等待AI处理
|
|
|
+ tool_name = last_message.get("kwargs", {}).get("name", "")
|
|
|
+ tool_status = last_message.get("kwargs", {}).get("status", "")
|
|
|
+
|
|
|
+ if tool_status == "success":
|
|
|
+ tool_info = TOOL_STATUS_MAPPING.get(tool_name, {"name": "处理中", "icon": "🔄"})
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": f"{tool_info['name'].replace('中', '')}完成,AI处理中",
|
|
|
+ "icon": "🤖",
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ tool_info = TOOL_STATUS_MAPPING.get(tool_name, {
|
|
|
+ "name": f"执行{tool_name}中",
|
|
|
+ "icon": "⚙️"
|
|
|
+ })
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": tool_info["name"],
|
|
|
+ "icon": tool_info["icon"],
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ elif last_msg_type == "human":
|
|
|
+ # 用户刚提问,AI开始思考
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": "AI思考中",
|
|
|
+ "icon": "🤖",
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ else:
|
|
|
+ # 默认执行中状态
|
|
|
+ status_data = {
|
|
|
+ "status": "running",
|
|
|
+ "name": "执行中",
|
|
|
+ "icon": "⚙️",
|
|
|
+ "timestamp": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ from common.result import success
|
|
|
+ return jsonify(success(data=status_data, message="获取状态成功")), 200
|
|
|
+
|
|
|
+ finally:
|
|
|
+ await redis_client.close()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取React Agent状态失败: {e}")
|
|
|
+ from common.result import failed
|
|
|
+ return jsonify(failed(message=f"获取状态失败: {str(e)}", code=500)), 500
|
|
|
+```
|
|
|
+
|
|
|
+### 5.2 响应格式示例
|
|
|
+
|
|
|
+**执行中示例**:
|
|
|
+```json
|
|
|
+{
|
|
|
+ "code": 200,
|
|
|
+ "success": true,
|
|
|
+ "message": "获取状态成功",
|
|
|
+ "data": {
|
|
|
+ "status": "running",
|
|
|
+ "name": "生成SQL中",
|
|
|
+ "icon": "🔍",
|
|
|
+ "timestamp": "2025-01-31T12:34:56.789Z"
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+**完成示例**:
|
|
|
+```json
|
|
|
+{
|
|
|
+ "code": 200,
|
|
|
+ "success": true,
|
|
|
+ "message": "获取状态成功",
|
|
|
+ "data": {
|
|
|
+ "status": "completed",
|
|
|
+ "name": "完成",
|
|
|
+ "icon": "✅",
|
|
|
+ "timestamp": "2025-01-31T12:35:20.123Z"
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+## 6. 状态流转示例
|
|
|
+
|
|
|
+基于实际的LangGraph执行流程:
|
|
|
+
|
|
|
+1. **用户提问** → `{"status": "running", "name": "AI思考中", "icon": "🤖"}`
|
|
|
+
|
|
|
+2. **AI调用generate_sql** → `{"status": "running", "name": "生成SQL中", "icon": "🔍"}`
|
|
|
+
|
|
|
+3. **generate_sql完成** → `{"status": "running", "name": "生成SQL完成,AI处理中", "icon": "🤖"}`
|
|
|
+
|
|
|
+4. **AI调用valid_sql** → `{"status": "running", "name": "验证SQL中", "icon": "✅"}`
|
|
|
+
|
|
|
+5. **AI调用run_sql** → `{"status": "running", "name": "执行查询中", "icon": "⚡"}`
|
|
|
+
|
|
|
+6. **AI生成最终回答** → `{"status": "completed", "name": "完成", "icon": "✅"}`
|
|
|
+
|
|
|
+## 7. 使用方式
|
|
|
+
|
|
|
+### 7.1 客户端轮询
|
|
|
+
|
|
|
+```bash
|
|
|
+# 开始对话
|
|
|
+curl -X POST http://localhost:8084/api/v0/ask_react_agent \
|
|
|
+ -H "Content-Type: application/json" \
|
|
|
+ -d '{"question": "查询服务区信息", "user_id": "test"}'
|
|
|
+
|
|
|
+# 轮询状态 (使用返回的thread_id)
|
|
|
+curl http://localhost:8084/api/v0/react/status/test:20250131123456789
|
|
|
+```
|
|
|
+
|
|
|
+### 7.2 JavaScript轮询示例
|
|
|
+
|
|
|
+```javascript
|
|
|
+async function pollStatus(threadId) {
|
|
|
+ const pollInterval = 1000; // 1秒轮询
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ const response = await fetch(`/api/v0/react/status/${threadId}`);
|
|
|
+ const result = await response.json();
|
|
|
+
|
|
|
+ if (result.success) {
|
|
|
+ console.log(`${result.data.icon} ${result.data.name}`);
|
|
|
+
|
|
|
+ if (result.data.status === 'completed') {
|
|
|
+ console.log('✅ 执行完成');
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ await new Promise(resolve => setTimeout(resolve, pollInterval));
|
|
|
+ } catch (error) {
|
|
|
+ console.error('状态查询失败:', error);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+```
|
|
|
+
|
|
|
+## 8. 技术优势
|
|
|
+
|
|
|
+1. **完全并发**:独立Redis连接,无资源竞争
|
|
|
+2. **状态准确**:基于messages分析,比suggested_next_step可靠
|
|
|
+3. **信息丰富**:显示具体工具调用状态,用户体验更好
|
|
|
+4. **性能优秀**:直接Redis访问,跳过LangGraph封装层
|
|
|
+5. **架构清晰**:不影响现有Agent实现,纯新增功能
|
|
|
+
|
|
|
+## 9. 注意事项
|
|
|
+
|
|
|
+1. **Redis连接管理**:每次请求创建独立连接,避免连接池竞争
|
|
|
+2. **错误处理**:完善的异常处理,避免Redis连接泄露
|
|
|
+3. **工具映射扩展**:新增工具时需更新TOOL_STATUS_MAPPING
|
|
|
+4. **轮询频率**:建议1秒轮询,避免过度查询Redis
|
|
|
+
|
|
|
+## 10. 后续优化
|
|
|
+
|
|
|
+1. **缓存机制**:适当缓存状态,减少Redis查询频率
|
|
|
+2. **WebSocket推送**:实现服务端主动推送状态变更
|
|
|
+3. **状态历史**:记录状态变更历史,便于调试分析
|
|
|
+4. **监控告警**:添加状态API的性能监控和异常告警
|
|
|
+
|
|
|
+## 并发问题解决方案
|
|
|
+
|
|
|
+### 问题根源
|
|
|
+经测试确认,**WsgiToAsgi转换器是并发阻塞的根本原因**:
|
|
|
+- WsgiToAsgi虽然能让WSGI应用在ASGI服务器运行,但不是真正的原生异步
|
|
|
+- 内部可能使用线程池或其他机制导致请求串行化
|
|
|
+- 即使使用独立Redis连接,仍然无法解决框架层面的并发限制
|
|
|
+
|
|
|
+### 解决方案
|
|
|
+使用**原生Flask多线程模式**:
|
|
|
+```python
|
|
|
+USE_WSGI_TO_ASGI = False # 禁用WsgiToAsgi
|
|
|
+app.run(host="0.0.0.0", port=8084, debug=False, threaded=True)
|
|
|
+```
|
|
|
+
|
|
|
+### 验证结果
|
|
|
+- ✅ 状态API可以在ask_react_agent执行过程中立即响应
|
|
|
+- ✅ 实现真正的并发访问,不再阻塞
|
|
|
+- ✅ 完美支持实时状态监控功能
|
|
|
+
|
|
|
+---
|
|
|
+
|
|
|
+**文档版本**: v1.1 (新增并发解决方案)
|
|
|
+**创建时间**: 2025-01-31
|
|
|
+**适用范围**: unified_api.py React Agent状态监控功能
|