本文档描述了为React Agent添加流式API ask_react_agent_stream
的设计方案,该API与现有的 ask_react_agent
功能和参数完全相同,但采用流式输出模式。设计参考了现有的 ask_agent_stream
实现模式,确保系统的一致性和稳定性。
当前的 ask_react_agent
API 存在以下问题:
ask_react_agent
相同的功能和参数ask_react_agent
API的功能端点: GET /api/v0/ask_react_agent_stream
参数获取方式: URL查询参数(因为EventSource只支持GET请求)
支持参数:
question
(必填): 用户问题user_id
(可选): 用户ID,默认为"guest"或从thread_id推断thread_id
(可选): 会话ID,不传则自动生成新会话响应格式: text/event-stream
(SSE格式)
端点: GET /api/v0/ask_react_agent_stream_sync
设计目的: 解决Vector搜索异步冲突问题,专门用于复杂数据库查询
参数: 与原API完全相同
question
(必填): 用户问题user_id
(必填): 用户IDthread_id
(可选): 会话ID,不传则自动生成新会话routing_mode
(可选): 路由模式,默认'agent'continue_conversation
(可选): 是否继续对话,默认false技术架构:
invoke()
而不是 ainvoke()
)streaming=False
, enable_thinking=False
)适用场景:
使用示例:
curl -X GET "http://localhost:8084/api/v0/ask_react_agent_stream_sync?question=请问当前系统中哪个服务区档口最多?&user_id=test_user" -H "Accept: text/event-stream"
sequenceDiagram
participant Client as 客户端
participant API as ask_react_agent_stream
participant ReactAgent as React Agent实例
participant LangGraph as LangGraph执行器
Client->>API: GET /ask_react_agent_stream?question=...&user_id=...
API->>API: 参数验证和数据清洗
API->>ReactAgent: 调用chat_stream()方法
ReactAgent->>LangGraph: astream执行
loop 每个节点执行
LangGraph->>ReactAgent: 节点状态更新
ReactAgent->>API: yield进度信息
API->>Client: SSE推送进度状态
end
LangGraph->>ReactAgent: 执行完成
ReactAgent->>API: yield最终结果
API->>Client: SSE推送完整结果
Client->>Client: 关闭EventSource连接
@app.route('/api/v0/ask_react_agent_stream', methods=['GET'])
async def ask_react_agent_stream():
"""React Agent 流式API - 支持实时进度显示
功能与ask_react_agent完全相同,除了采用流式输出
"""
def generate():
try:
# 1. 参数获取和验证(从URL参数)
question = request.args.get('question')
user_id_input = request.args.get('user_id')
thread_id_input = request.args.get('thread_id')
# 参数验证(复用现有validate_request_data逻辑)
if not question:
yield format_sse_error("缺少必需参数:question")
return
# 2. 数据预处理(与ask_react_agent相同)
validated_data = validate_request_data({
'question': question,
'user_id': user_id_input,
'thread_id': thread_id_input
})
# 3. Agent实例检查
if not await ensure_agent_ready():
yield format_sse_error("React Agent 初始化失败")
return
# 4. 流式执行
async for chunk in _react_agent_instance.chat_stream(
message=validated_data['question'],
user_id=validated_data['user_id'],
thread_id=validated_data['thread_id']
):
if chunk["type"] == "progress":
yield format_sse_react_progress(chunk)
elif chunk["type"] == "completed":
yield format_sse_react_completed(chunk)
elif chunk["type"] == "error":
yield format_sse_error(chunk.get("error", "处理失败"))
except Exception as e:
yield format_sse_error(f"服务异常: {str(e)}")
return Response(stream_with_context(generate()), mimetype='text/event-stream')
需要在React Agent类中新增 chat_stream()
方法:
async def chat_stream(self, message: str, user_id: str, thread_id: Optional[str] = None):
"""
流式处理用户聊天请求 - 复用chat()方法的所有逻辑
"""
# 1. 复用现有的初始化逻辑(thread_id生成、配置检查等)
if not thread_id:
now = pd.Timestamp.now()
milliseconds = int(now.microsecond / 1000)
thread_id = f"{user_id}:{now.strftime('%Y%m%d%H%M%S')}{milliseconds:03d}"
# 2. 复用现有的配置和错误处理
self._recursion_count = 0
run_config = {
"configurable": {"thread_id": thread_id},
"recursion_limit": config.RECURSION_LIMIT
}
# 3. 复用checkpointer检查逻辑
if self.checkpointer:
# ... checkpointer连接检查和重新初始化 ...
# 4. 使用astream流式执行
final_state = None
async for chunk in self.agent_executor.astream(inputs, run_config, stream_mode="updates"):
for node_name, node_data in chunk.items():
yield {
"type": "progress",
"node": node_name,
"data": node_data,
"thread_id": thread_id
}
final_state = node_data
# 5. 复用现有的结果处理逻辑
if final_state and "messages" in final_state:
api_data = await self._async_generate_api_data(final_state)
yield {
"type": "completed",
"result": {"api_data": api_data, "thread_id": thread_id}
}
{
"code": 200,
"success": true,
"message": "正在执行: AI思考中",
"data": {
"type": "progress",
"node": "agent",
"display_name": "AI思考中",
"thread_id": "wang1:20250131103000001",
"timestamp": "2025-01-31T10:30:00.123Z"
}
}
{
"code": 200,
"success": true,
"message": "处理完成",
"data": {
"type": "completed",
"response": "根据销售数据分析...",
"conversation_id": "wang1:20250131103000001",
"user_id": "wang1",
"react_agent_meta": {
"thread_id": "wang1:20250131103000001",
"agent_version": "custom_react_v1_async"
},
"sql": "SELECT * FROM sales...",
"records": [...],
"timestamp": "2025-01-31T10:32:15.456Z"
}
}
{
"code": 500,
"success": false,
"message": "处理失败",
"data": {
"type": "error",
"error": "具体错误信息",
"timestamp": "2025-01-31T10:30:05.789Z"
}
}
基于LangGraph的节点执行,提供用户友好的状态显示:
节点名称 | 显示名称 | 说明 |
---|---|---|
__start__ |
开始处理 | 流程启动 |
trim_messages |
准备消息 | 消息预处理 |
agent |
AI思考中 | LLM推理决策 |
prepare_tool_input |
准备工具 | 工具输入准备 |
tools |
执行查询 | SQL工具执行 |
update_state_after_tool |
处理结果 | 结果后处理 |
format_final_response |
生成回答 | 最终响应格式化 |
__end__ |
完成 | 流程结束 |
def format_sse_react_progress(chunk: dict) -> str:
"""格式化React Agent进度事件为SSE格式"""
node = chunk.get("node")
thread_id = chunk.get("thread_id")
# 节点显示名称映射
node_display_map = {
"__start__": "开始处理",
"trim_messages": "准备消息",
"agent": "AI思考中",
"prepare_tool_input": "准备工具",
"tools": "执行查询",
"update_state_after_tool": "处理结果",
"format_final_response": "生成回答",
"__end__": "完成"
}
display_name = node_display_map.get(node, "处理中")
data = {
"code": 200,
"success": True,
"message": f"正在执行: {display_name}",
"data": {
"type": "progress",
"node": node,
"display_name": display_name,
"thread_id": thread_id,
"timestamp": datetime.now().isoformat()
}
}
import json
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
def format_sse_react_completed(chunk: dict) -> str:
"""格式化React Agent完成事件为SSE格式"""
result = chunk.get("result", {})
api_data = result.get("api_data", {})
thread_id = result.get("thread_id")
# 构建与ask_react_agent相同的响应格式
response_data = {
"response": api_data.get("response", ""),
"conversation_id": thread_id,
"react_agent_meta": api_data.get("react_agent_meta", {}),
"timestamp": datetime.now().isoformat()
}
# 可选字段
if "sql" in api_data:
response_data["sql"] = api_data["sql"]
if "records" in api_data:
response_data["records"] = api_data["records"]
data = {
"code": 200,
"success": True,
"message": "处理完成",
"data": {
"type": "completed",
**response_data
}
}
import json
return f"data: {json.dumps(data, ensure_ascii=False)}\n\n"
// 同步方式 - 现有代码无需修改
const response = await fetch('/api/v0/ask_react_agent', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
question: "帮我查询销售数据",
user_id: "wang1"
})
});
const result = await response.json();
displayResult(result.data);
// 流式方式 - 新增功能
const params = new URLSearchParams({
question: "帮我查询销售数据",
user_id: "wang1"
});
const eventSource = new EventSource(`/api/v0/ask_react_agent_stream?${params}`);
let conversationId = null;
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
switch(data.data.type) {
case 'progress':
updateProgress(data.data.display_name);
if (!conversationId) conversationId = data.data.thread_id;
break;
case 'completed':
hideProgress();
displayResult(data.data); // 格式与现有API完全一致
eventSource.close();
break;
case 'error':
hideProgress();
showError(data.data.error);
eventSource.close();
break;
}
};
eventSource.onerror = function(error) {
console.error('EventSource failed:', error);
eventSource.close();
};
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>React Agent 流式API测试</title>
<style>
.container { max-width: 800px; margin: 0 auto; padding: 20px; }
.input-group { margin-bottom: 20px; }
.input-group label { display: block; margin-bottom: 5px; font-weight: bold; }
.input-group input, .input-group textarea { width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; }
.input-group textarea { min-height: 100px; resize: vertical; }
.btn { padding: 10px 20px; background: #007bff; color: white; border: none; border-radius: 4px; cursor: pointer; }
.btn:hover { background: #0056b3; }
.btn:disabled { background: #ccc; cursor: not-allowed; }
.progress { display: none; margin: 20px 0; padding: 15px; background: #f8f9fa; border-radius: 4px; border-left: 4px solid #007bff; }
.result { margin: 20px 0; padding: 15px; background: #d4edda; border-radius: 4px; border-left: 4px solid #28a745; }
.error { margin: 20px 0; padding: 15px; background: #f8d7da; border-radius: 4px; border-left: 4px solid #dc3545; }
.sql-code { background: #f8f9fa; padding: 10px; border-radius: 4px; font-family: monospace; margin: 10px 0; }
.loading-spinner { display: inline-block; width: 20px; height: 20px; border: 3px solid #f3f3f3; border-top: 3px solid #007bff; border-radius: 50%; animation: spin 1s linear infinite; margin-right: 10px; }
@keyframes spin { 0% { transform: rotate(0deg); } 100% { transform: rotate(360deg); } }
</style>
</head>
<body>
<div class="container">
<h1>React Agent 流式API测试</h1>
<div class="input-group">
<label for="question">问题:</label>
<textarea id="question" placeholder="请输入您的问题,例如:帮我查询销售数据">帮我查询销售数据</textarea>
</div>
<div class="input-group">
<label for="userId">用户ID:</label>
<input type="text" id="userId" placeholder="用户ID(可选)" value="test_user">
</div>
<div class="input-group">
<label for="threadId">会话ID:</label>
<input type="text" id="threadId" placeholder="会话ID(可选,留空自动生成)">
</div>
<button id="askBtn" class="btn" onclick="askReactAgent()">发送请求</button>
<button id="stopBtn" class="btn" onclick="stopRequest()" style="display:none; background:#dc3545;">停止请求</button>
<div id="progress" class="progress">
<div class="loading-spinner"></div>
<span id="progressText">准备中...</span>
</div>
<div id="result" class="result" style="display:none;">
<h3>响应结果</h3>
<div id="responseText"></div>
<div id="conversationInfo"></div>
<div id="sqlSection" style="display:none;">
<h4>执行的SQL:</h4>
<div id="sqlCode" class="sql-code"></div>
</div>
<div id="recordsSection" style="display:none;">
<h4>查询结果:</h4>
<div id="recordsData"></div>
</div>
</div>
<div id="error" class="error" style="display:none;">
<h3>错误信息</h3>
<div id="errorText"></div>
</div>
</div>
<script>
let currentEventSource = null;
let currentConversationId = null;
function askReactAgent() {
const question = document.getElementById('question').value.trim();
const userId = document.getElementById('userId').value.trim() || 'guest';
const threadId = document.getElementById('threadId').value.trim();
if (!question) {
showError('请输入问题');
return;
}
// 重置界面
hideError();
hideResult();
showProgress('准备发送请求...');
// 禁用发送按钮,显示停止按钮
document.getElementById('askBtn').disabled = true;
document.getElementById('stopBtn').style.display = 'inline-block';
// 构建请求参数
const params = new URLSearchParams({
question: question,
user_id: userId
});
if (threadId) {
params.append('thread_id', threadId);
}
// 创建EventSource连接
const url = `/api/v0/ask_react_agent_stream?${params}`;
currentEventSource = new EventSource(url);
currentEventSource.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
handleStreamMessage(data);
} catch (e) {
console.error('解析SSE消息失败:', e, event.data);
showError('响应数据格式错误');
stopRequest();
}
};
currentEventSource.onerror = function(error) {
console.error('EventSource连接错误:', error);
showError('连接失败,请检查网络或服务状态');
stopRequest();
};
}
function handleStreamMessage(data) {
if (!data.success) {
showError(data.message || '处理失败');
stopRequest();
return;
}
const messageData = data.data;
switch(messageData.type) {
case 'progress':
showProgress(messageData.display_name || '处理中...');
// 记录会话ID
if (messageData.thread_id && !currentConversationId) {
currentConversationId = messageData.thread_id;
document.getElementById('threadId').value = currentConversationId;
}
break;
case 'completed':
hideProgress();
showResult(messageData);
stopRequest();
break;
case 'error':
hideProgress();
showError(messageData.error || '处理失败');
stopRequest();
break;
default:
console.warn('未知的消息类型:', messageData.type);
}
}
function showProgress(text) {
document.getElementById('progressText').textContent = text;
document.getElementById('progress').style.display = 'block';
}
function hideProgress() {
document.getElementById('progress').style.display = 'none';
}
function showResult(data) {
// 显示响应文本
document.getElementById('responseText').textContent = data.response || '无响应内容';
// 显示会话信息
const conversationInfo = `会话ID: ${data.conversation_id || '未知'}`;
document.getElementById('conversationInfo').textContent = conversationInfo;
// 显示SQL(如果有)
if (data.sql) {
document.getElementById('sqlCode').textContent = data.sql;
document.getElementById('sqlSection').style.display = 'block';
} else {
document.getElementById('sqlSection').style.display = 'none';
}
// 显示查询结果(如果有)
if (data.records && data.records.rows && data.records.rows.length > 0) {
const recordsHtml = `
<p>共 ${data.records.row_count || data.records.rows.length} 条记录</p>
<table border="1" style="width:100%; border-collapse:collapse;">
<thead>
<tr>
${data.records.columns.map(col => `<th style="padding:8px; background:#f8f9fa;">${col}</th>`).join('')}
</tr>
</thead>
<tbody>
${data.records.rows.slice(0, 10).map(row =>
`<tr>${row.map(cell => `<td style="padding:8px;">${cell || ''}</td>`).join('')}</tr>`
).join('')}
</tbody>
</table>
${data.records.rows.length > 10 ? '<p>(仅显示前10条记录)</p>' : ''}
`;
document.getElementById('recordsData').innerHTML = recordsHtml;
document.getElementById('recordsSection').style.display = 'block';
} else {
document.getElementById('recordsSection').style.display = 'none';
}
document.getElementById('result').style.display = 'block';
}
function showError(message) {
document.getElementById('errorText').textContent = message;
document.getElementById('error').style.display = 'block';
}
function hideError() {
document.getElementById('error').style.display = 'none';
}
function hideResult() {
document.getElementById('result').style.display = 'none';
}
function stopRequest() {
if (currentEventSource) {
currentEventSource.close();
currentEventSource = null;
}
// 恢复按钮状态
document.getElementById('askBtn').disabled = false;
document.getElementById('stopBtn').style.display = 'none';
hideProgress();
}
// 页面卸载时关闭连接
window.addEventListener('beforeunload', function() {
stopRequest();
});
// 回车键发送请求
document.getElementById('question').addEventListener('keydown', function(e) {
if (e.ctrlKey && e.key === 'Enter') {
askReactAgent();
}
});
</script>
</body>
</html>
1. 错误处理和重连机制
class ReactAgentStreamClient {
constructor(baseUrl = '') {
this.baseUrl = baseUrl;
this.eventSource = null;
this.retryCount = 0;
this.maxRetries = 3;
this.retryDelay = 1000; // 1秒
}
async ask(question, userId = 'guest', threadId = null, callbacks = {}) {
const {
onProgress = () => {},
onCompleted = () => {},
onError = () => {},
onRetry = () => {}
} = callbacks;
const params = new URLSearchParams({
question: question,
user_id: userId
});
if (threadId) {
params.append('thread_id', threadId);
}
const url = `${this.baseUrl}/api/v0/ask_react_agent_stream?${params}`;
return new Promise((resolve, reject) => {
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (!data.success) {
onError(data.message || '处理失败');
reject(new Error(data.message || '处理失败'));
return;
}
const messageData = data.data;
switch(messageData.type) {
case 'progress':
onProgress(messageData);
break;
case 'completed':
onCompleted(messageData);
resolve(messageData);
this.close();
break;
case 'error':
onError(messageData.error || '处理失败');
reject(new Error(messageData.error || '处理失败'));
this.close();
break;
}
} catch (e) {
console.error('解析SSE消息失败:', e);
onError('响应数据格式错误');
reject(e);
this.close();
}
};
this.eventSource.onerror = (error) => {
console.error('EventSource错误:', error);
if (this.retryCount < this.maxRetries) {
this.retryCount++;
onRetry(this.retryCount, this.maxRetries);
setTimeout(() => {
this.close();
this.ask(question, userId, threadId, callbacks)
.then(resolve)
.catch(reject);
}, this.retryDelay * this.retryCount);
} else {
onError('连接失败,已达到最大重试次数');
reject(new Error('连接失败'));
this.close();
}
};
});
}
close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
this.retryCount = 0;
}
}
// 使用示例
const client = new ReactAgentStreamClient();
client.ask('帮我查询销售数据', 'user123', null, {
onProgress: (data) => {
console.log('进度:', data.display_name);
updateProgressUI(data.display_name);
},
onCompleted: (data) => {
console.log('完成:', data);
showResults(data);
},
onError: (error) => {
console.error('错误:', error);
showError(error);
},
onRetry: (attempt, maxRetries) => {
console.log(`重试 ${attempt}/${maxRetries}...`);
showRetryMessage(attempt, maxRetries);
}
}).catch(error => {
console.error('最终失败:', error);
});
2. React组件集成示例
import React, { useState, useCallback, useEffect } from 'react';
const ReactAgentChat = () => {
const [question, setQuestion] = useState('');
const [isLoading, setIsLoading] = useState(false);
const [progress, setProgress] = useState('');
const [result, setResult] = useState(null);
const [error, setError] = useState(null);
const [eventSource, setEventSource] = useState(null);
const askAgent = useCallback(async () => {
if (!question.trim()) return;
setIsLoading(true);
setError(null);
setResult(null);
setProgress('准备中...');
const params = new URLSearchParams({
question: question,
user_id: 'react_user'
});
const source = new EventSource(`/api/v0/ask_react_agent_stream?${params}`);
setEventSource(source);
source.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (!data.success) {
setError(data.message || '处理失败');
setIsLoading(false);
return;
}
const messageData = data.data;
switch(messageData.type) {
case 'progress':
setProgress(messageData.display_name || '处理中...');
break;
case 'completed':
setResult(messageData);
setProgress('');
setIsLoading(false);
source.close();
break;
case 'error':
setError(messageData.error || '处理失败');
setIsLoading(false);
source.close();
break;
}
} catch (e) {
setError('响应数据格式错误');
setIsLoading(false);
source.close();
}
};
source.onerror = () => {
setError('连接失败');
setIsLoading(false);
source.close();
};
}, [question]);
const stopRequest = useCallback(() => {
if (eventSource) {
eventSource.close();
setEventSource(null);
}
setIsLoading(false);
setProgress('');
}, [eventSource]);
useEffect(() => {
return () => {
if (eventSource) {
eventSource.close();
}
};
}, [eventSource]);
return (
<div className="react-agent-chat">
<div className="input-section">
<textarea
value={question}
onChange={(e) => setQuestion(e.target.value)}
placeholder="请输入您的问题..."
disabled={isLoading}
/>
<button onClick={askAgent} disabled={isLoading || !question.trim()}>
{isLoading ? '处理中...' : '发送'}
</button>
{isLoading && (
<button onClick={stopRequest} className="stop-btn">
停止
</button>
)}
</div>
{progress && (
<div className="progress">
<div className="spinner" />
<span>{progress}</span>
</div>
)}
{error && (
<div className="error">
<strong>错误:</strong>{error}
</div>
)}
{result && (
<div className="result">
<h3>回答:</h3>
<p>{result.response}</p>
{result.sql && (
<div className="sql-section">
<h4>执行的SQL:</h4>
<pre><code>{result.sql}</code></pre>
</div>
)}
{result.records && result.records.rows && (
<div className="records-section">
<h4>查询结果:</h4>
<p>共 {result.records.row_count} 条记录</p>
{/* 表格渲染逻辑 */}
</div>
)}
</div>
)}
</div>
);
};
export default ReactAgentChat;
配置:
GET
http://localhost:8084/api/v0/ask_react_agent_stream?question=帮我查询销售数据&user_id=test_user
Accept: text/event-stream
预期响应:
data: {"code":200,"success":true,"message":"正在执行: 开始处理","data":{"type":"progress","display_name":"开始处理"}}
data: {"code":200,"success":true,"message":"正在执行: AI思考中","data":{"type":"progress","display_name":"AI思考中"}}
data: {"code":200,"success":true,"message":"正在执行: 执行查询","data":{"type":"progress","display_name":"执行查询"}}
data: {"code":200,"success":true,"message":"处理完成","data":{"type":"completed","response":"...","conversation_id":"test_user:20250131..."}}
curl -N "http://localhost:8084/api/v0/ask_react_agent_stream?question=查询用户数量&user_id=test_user" \
-H "Accept: text/event-stream"
ask_react_agent
保持所有现有功能validate_request_data()
函数ensure_agent_ready()
函数_react_agent_instance
chat_stream()
方法ask_react_agent_stream
端点本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为React Agent提供了实时状态监控能力。方案具有以下特点:
该方案为React Agent的用户体验升级提供了完整的技术解决方案。
本文档基于现有ask_agent_stream实现模式,为React Agent流式API提供详细的技术规范。