本文档描述了Citu LangGraph Agent流式状态监控API的设计方案,解决当前同步API无法显示执行进度的问题。通过新增流式API,在保持现有API不变的前提下,为客户端提供实时的业务查询处理状态监控能力。
当前的 ask_agent
API存在以下问题:
ask_agent
API保持完全不变端点:POST /api/v0/ask_agent_stream
技术栈:
astream
功能sequenceDiagram
participant Client as 客户端
participant API as 流式API
participant Agent as Citu Agent
participant LangGraph as LangGraph执行器
Client->>API: POST /ask_agent_stream
API->>API: 生成conversation_id
API->>Client: 推送开始状态
API->>Agent: 调用process_question_stream()
Agent->>LangGraph: astream执行
Note over LangGraph: 问题分类节点
LangGraph->>Agent: classify_question状态
Agent->>API: yield分类进度
API->>Client: 推送"正在分析问题类型"
Note over LangGraph: 条件路由决策
LangGraph->>Agent: 路由决策结果
Agent->>API: yield路由信息
API->>Client: 推送问题类型结果
alt DATABASE类型
Note over LangGraph: SQL生成验证节点
LangGraph->>Agent: sql_generation状态
Agent->>API: yield SQL生成进度
API->>Client: 推送"正在生成SQL"
Note over LangGraph: SQL执行节点
LangGraph->>Agent: sql_execution状态
Agent->>API: yield执行进度
API->>Client: 推送"正在执行查询"
else CHAT类型
Note over LangGraph: 聊天处理节点
LangGraph->>Agent: chat状态
Agent->>API: yield聊天进度
API->>Client: 推送"正在思考回答"
end
Note over LangGraph: 响应格式化节点
LangGraph->>Agent: format_response状态
Agent->>API: yield最终结果
API->>Client: 推送完整结果
API->>Client: 推送结束标记
agent/citu_agent.py
新增方法:process_question_stream()
async def process_question_stream(self, question: str, conversation_id: str = None,
context_type: str = None, routing_mode: str = None):
"""
流式处理用户问题 - 复用process_question()的所有逻辑
"""
try:
# 1. 复用现有的初始化逻辑
if not conversation_id:
conversation_id = self._generate_conversation_id()
# 2. 动态创建workflow(复用现有逻辑)
workflow = self._create_workflow(routing_mode)
# 3. 创建初始状态(复用现有逻辑)
initial_state = self._create_initial_state(question, conversation_id, context_type, routing_mode)
# 4. 使用astream流式执行
async for chunk in workflow.astream(
initial_state,
config={"configurable": {"conversation_id": conversation_id}} if conversation_id else None
):
for node_name, node_data in chunk.items():
# 映射节点状态为用户友好的进度信息
progress_info = self._map_node_to_progress(node_name, node_data)
if progress_info:
yield {
"type": "progress",
"node": node_name,
"progress": progress_info,
"state_data": self._extract_relevant_state(node_data),
"conversation_id": conversation_id
}
# 5. 最终结果处理(复用现有的结果提取逻辑)
final_result = node_data.get("final_response", {})
yield {
"type": "completed",
"result": final_result,
"conversation_id": conversation_id
}
except Exception as e:
yield {
"type": "error",
"error": str(e),
"conversation_id": conversation_id
}
关键特性:
astream
而非 ainvoke
获取每个节点状态unified_api.py
新增API端点:ask_agent_stream()
@app.route("/api/v0/ask_agent_stream", methods=["POST"])
async def ask_agent_stream():
"""Citu Agent 流式API"""
async def generate():
try:
# 复用现有的数据验证逻辑
data = request.get_json(force=True)
validated_data = validate_agent_request_data(data)
# 复用现有的Agent初始化检查
if not await ensure_agent_ready():
yield format_error_event("Agent服务不可用")
return
# 流式执行
async for chunk in _agent_instance.process_question_stream(
question=validated_data['question'],
conversation_id=validated_data.get('conversation_id'),
context_type=validated_data.get('context_type'),
routing_mode=validated_data.get('routing_mode')
):
if chunk["type"] == "progress":
yield format_progress_event(chunk)
elif chunk["type"] == "completed":
yield format_completion_event(chunk, validated_data)
elif chunk["type"] == "error":
yield format_error_event(chunk["error"])
except Exception as e:
yield format_exception_event(e)
return Response(generate(), mimetype='text/event-stream')
节点名称 | 显示名称 | 图标 | 业务价值 | 监控重点 |
---|---|---|---|---|
classify_question |
分析问题类型 | 🤔 | ⭐⭐⭐ | 分类结果、置信度、路由模式 |
agent_sql_generation |
生成SQL查询 | 🔧 | ⭐⭐⭐⭐⭐ | SQL生成成功/失败、验证结果 |
agent_sql_execution |
执行数据查询 | ⚙️ | ⭐⭐⭐⭐⭐ | 查询进度、结果行数、摘要生成 |
agent_chat |
思考回答 | 💭 | ⭐⭐⭐ | 聊天处理进度 |
format_response |
整理结果 | 📝 | ⭐⭐ | 最终格式化进度 |
def _map_node_to_progress(self, node_name: str, node_data: dict) -> dict:
"""将节点执行状态映射为用户友好的进度信息"""
if node_name == "classify_question":
question_type = node_data.get("question_type", "UNCERTAIN")
confidence = node_data.get("classification_confidence", 0)
return {
"display_name": "分析问题类型",
"icon": "🤔",
"details": f"问题类型: {question_type} (置信度: {confidence:.2f})",
"sub_status": f"使用{node_data.get('classification_method', '未知')}方法分类"
}
elif node_name == "agent_sql_generation":
if node_data.get("sql_generation_success"):
return {
"display_name": "SQL生成成功",
"icon": "✅",
"details": f"生成SQL: {node_data.get('sql', '')[:50]}...",
"sub_status": "验证通过,准备执行"
}
else:
error_type = node_data.get("validation_error_type", "unknown")
return {
"display_name": "SQL生成处理中",
"icon": "🔧",
"details": f"验证状态: {error_type}",
"sub_status": node_data.get("user_prompt", "正在处理")
}
elif node_name == "agent_sql_execution":
query_result = node_data.get("query_result", {})
row_count = query_result.get("row_count", 0)
return {
"display_name": "执行数据查询",
"icon": "⚙️",
"details": f"查询完成,返回 {row_count} 行数据",
"sub_status": "正在生成摘要" if row_count > 0 else "查询执行完成"
}
elif node_name == "agent_chat":
return {
"display_name": "思考回答",
"icon": "💭",
"details": "正在处理您的问题",
"sub_status": "使用智能对话模式"
}
elif node_name == "format_response":
return {
"display_name": "整理结果",
"icon": "📝",
"details": "正在格式化响应结果",
"sub_status": "即将完成"
}
return None
{
"code": 200,
"success": true,
"message": "正在执行: 分析问题类型",
"data": {
"type": "progress",
"node": "classify_question",
"display_name": "分析问题类型",
"icon": "🤔",
"details": "问题类型: DATABASE (置信度: 0.85)",
"sub_status": "使用hybrid方法分类",
"conversation_id": "citu_20250131103000001",
"timestamp": "2025-01-31T10:30:05",
"state_summary": {
"current_step": "classified",
"execution_path": ["start", "classify"],
"routing_mode": "hybrid"
}
}
}
{
"code": 200,
"success": true,
"message": "处理完成",
"data": {
"type": "completed",
"response": "根据查询结果,服务区营业收入...",
"type": "DATABASE",
"sql": "SELECT service_area, SUM(revenue) FROM ...",
"query_result": {
"columns": [...],
"rows": [...],
"row_count": 25
},
"summary": "查询到25个服务区的营业数据...",
"conversation_id": "citu_20250131103000001",
"execution_path": ["start", "classify", "agent_sql_generation", "agent_sql_execution", "format_response"],
"classification_info": {
"question_type": "DATABASE",
"confidence": 0.85,
"method": "rule_based_strong_business"
},
"timestamp": "2025-01-31T10:30:15"
}
}
{
"code": 500,
"success": false,
"message": "SQL验证失败",
"data": {
"type": "error",
"error": "不允许的操作: DELETE。本系统只支持查询操作(SELECT)。",
"error_type": "forbidden_keywords",
"conversation_id": "citu_20250131103000001",
"node": "agent_sql_generation",
"timestamp": "2025-01-31T10:30:10",
"state_summary": {
"sql": "DELETE FROM sales WHERE...",
"validation_error_type": "forbidden_keywords"
}
}
}
// 同步方式 - 适合简单查询场景
const response = await fetch('/api/v0/ask_agent', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question, user_id})
});
const result = await response.json();
displayResult(result.data);
// 流式方式 - 适合复杂查询和需要进度显示的场景
const eventSource = new EventSource('/api/v0/ask_agent_stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({question, user_id})
});
let conversationId = null;
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
switch(data.data.type) {
case 'progress':
// 显示执行进度
updateProgress(
`${data.data.icon} ${data.data.display_name}`,
data.data.details,
data.data.sub_status
);
// 特殊处理关键节点
if (data.data.node === 'classify_question') {
showClassificationResult(data.data.state_summary);
} else if (data.data.node === 'agent_sql_generation') {
if (data.data.details.includes('SQL生成成功')) {
showSQL(data.data.state_summary.sql);
}
}
if (!conversationId) conversationId = data.data.conversation_id;
break;
case 'completed':
hideProgress();
displayResult(data.data); // 格式与现有API完全一致
eventSource.close();
break;
case 'error':
hideProgress();
showError(data.message, data.data.error_type);
eventSource.close();
break;
}
};
// 错误处理
eventSource.onerror = function(event) {
hideProgress();
showError("连接异常,请重试");
eventSource.close();
};
Postman完全支持测试SSE流式API:
POST
http://localhost:8084/api/v0/ask_agent_stream
Headers:
Content-Type: application/json
Accept: text/event-stream
Body (JSON):
{
"question": "查询各服务区的营业收入排行",
"user_id": "test_user",
"routing_mode": "hybrid"
}
Postman会逐步显示流式响应:
```
data: {"code":200,"success":true,"message":"正在执行: 分析问题类型","data":{"type":"progress","display_name":"分析问题类型","icon":"🤔"}}
data: {"code":200,"success":true,"message":"正在执行: 生成SQL查询","data":{"type":"progress","display_name":"生成SQL查询","icon":"🔧"}}
data: {"code":200,"success":true,"message":"正在执行: 执行数据查询","data":{"type":"progress","display_name":"执行数据查询","icon":"⚙️"}}
data: {"code":200,"success":true,"message":"处理完成","data":{"type":"completed","response":"...","conversation_id":"citu_20250131..."}} ```
适合测试:
特殊测试场景:
AgentState
天然适合流式监控CituLangGraphAgent
中新增 process_question_stream()
方法unified_api.py
中新增流式API端点本方案通过新增流式API的方式,在保持现有系统稳定性的前提下,为Citu LangGraph Agent提供了实时状态监控能力。方案具有以下特点:
该方案为Agent系统的用户体验升级提供了完整的技术解决方案,特别适合处理复杂业务查询的场景。
本文档描述了Citu LangGraph Agent流式API的完整设计方案,为实际开发提供详细的技术规范。