像ChatGPT一样,只告诉用户"AI正在思考",不显示百分比进度,专注于当前执行状态。
接口: GET /api/status/{thread_id}
响应格式:
{
"status": "running|completed",
"name": "AI思考中",
"icon": "🤖",
"timestamp": "2025-07-30T15:30:45.123Z"
}
基于您的LangGraph流程图节点:
节点 | 显示名称 | 图标 |
---|---|---|
__start__ |
开始 | 🚀 |
agent |
AI思考中 | 🤖 |
prepare_tool_input |
准备工具 | 🔧 |
tools |
执行查询 | ⚙️ |
update_state_after_tool |
处理结果 | 🔄 |
format_final_response |
生成回答 | 📝 |
__end__ |
完成 | ✅ |
suggested_next_step
字段__end__
→ 返回completed客户端轮询:
# Postman测试
GET http://localhost:5000/api/status/wang1:20250729235038043
# 返回示例1 (执行中)
{
"status": "running",
"name": "AI思考中",
"icon": "🤖",
"timestamp": "2025-07-30T15:30:45.123Z"
}
# 返回示例2 (完成)
{
"status": "completed",
"name": "完成",
"icon": "✅",
"timestamp": "2025-07-30T15:31:20.456Z"
}
客户端实现:
// 每秒轮询,直到status=completed就停止
setInterval(() => {
fetch('/api/status/thread_id')
.then(r => r.json())
.then(data => {
console.log(`${data.icon} ${data.name}`);
if (data.status === 'completed') {
// 停止轮询
}
});
}, 1000);
这就是您要的最简化版本 - 一个API,几个字段,专注核心功能!
简化LangGraph状态API 的代码:
from flask import Flask, jsonify
import redis
import json
from datetime import datetime
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 简单的节点状态映射
NODE_STATUS = {
"__start__": {"name": "开始", "icon": "🚀"},
"agent": {"name": "AI思考中", "icon": "🤖"},
"prepare_tool_input": {"name": "准备工具", "icon": "🔧"},
"tools": {"name": "执行查询", "icon": "⚙️"},
"update_state_after_tool": {"name": "处理结果", "icon": "🔄"},
"format_final_response": {"name": "生成回答", "icon": "📝"},
"__end__": {"name": "完成", "icon": "✅"}
}
@app.route('/api/status/<thread_id>')
def get_status(thread_id):
try:
# 查找最新checkpoint
pattern = f"checkpoint:{thread_id}:*"
keys = redis_client.keys(pattern)
if not keys:
return jsonify({"error": "未找到执行线程"}), 404
# 获取最新checkpoint
latest_key = sorted(keys)[-1]
checkpoint_data = redis_client.get(latest_key)
if not checkpoint_data:
return jsonify({"error": "无法读取数据"}), 500
checkpoint = json.loads(checkpoint_data)
suggested_next_step = checkpoint.get('suggested_next_step', '')
# 判断状态
if not suggested_next_step or suggested_next_step == "__end__":
# 完成了
return jsonify({
"status": "completed",
"name": "完成",
"icon": "✅",
"timestamp": datetime.now().isoformat()
})
else:
# 正在执行
node_info = NODE_STATUS.get(suggested_next_step, {
"name": "执行中",
"icon": "⚙️"
})
return jsonify({
"status": "running",
"name": node_info["name"],
"icon": node_info["icon"],
"timestamp": datetime.now().isoformat()
})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
就是我们之前使用的方法,直接查询Redis键:
# 直接Redis查询
pattern = f"checkpoint:{thread_id}:*"
keys = redis_client.keys(pattern)
latest_key = sorted(keys)[-1]
checkpoint_data = redis_client.get(latest_key)
使用官方的Redis Checkpoint Saver API:## 📊 两种方法对比
✅ 优点:
❌ 缺点:
需要了解Redis键名格式
checkpoint:wang1:20250729235038043:__empty__:1f06c943-fd28-6539-801f-41a94c25bd43
硬编码依赖具体实现
升级时可能出问题
✅ 优点:
❌ 缺点:
langgraph-checkpoint-redis
包基于搜索结果,官方Redis Checkpointer的核心方法:
# 同步版本
from langgraph.checkpoint.redis import RedisSaver
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
checkpointer.setup() # 重要:创建索引
# 获取最新checkpoint
read_config = {"configurable": {"thread_id": "your_thread_id"}}
checkpoint_tuple = checkpointer.get_tuple(read_config)
# 异步版本
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as checkpointer:
await checkpointer.asetup()
checkpoint_tuple = await checkpointer.aget_tuple(read_config)
我建议使用方法二(LangGraph API),因为:
这是官方推荐方式,更可靠
自动处理版本兼容性问题
提供标准化的数据结构
您的代码会更加健壮和可维护
### 方法二参考代码
from flask import Flask, jsonify
from langgraph.checkpoint.redis import RedisSaver
from datetime import datetime
import asyncio
app = Flask(__name__)
# 初始化Redis Checkpointer
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
checkpointer.setup() # 创建必要的索引
# 简单的节点状态映射
NODE_STATUS = {
"__start__": {"name": "开始", "icon": "🚀"},
"agent": {"name": "AI思考中", "icon": "🤖"},
"prepare_tool_input": {"name": "准备工具", "icon": "🔧"},
"tools": {"name": "执行查询", "icon": "⚙️"},
"update_state_after_tool": {"name": "处理结果", "icon": "🔄"},
"format_final_response": {"name": "生成回答", "icon": "📝"},
"__end__": {"name": "完成", "icon": "✅"}
}
@app.route('/api/status/<thread_id>')
def get_status(thread_id):
try:
# 使用LangGraph API获取最新checkpoint
read_config = {"configurable": {"thread_id": thread_id}}
# 获取最新的checkpoint tuple
checkpoint_tuple = checkpointer.get_tuple(read_config)
if not checkpoint_tuple:
return jsonify({"error": "未找到执行线程"}), 404
# 从checkpoint中提取状态信息
checkpoint = checkpoint_tuple.checkpoint
# 获取suggested_next_step (这里需要根据实际的checkpoint结构调整)
# 通常在channel_values中或者通过分析当前状态推断
channel_values = checkpoint.get('channel_values', {})
# 根据您的实际LangGraph实现,suggested_next_step可能在不同位置
# 这里提供几种可能的获取方式:
# 方式1: 直接从channel_values获取
suggested_next_step = channel_values.get('suggested_next_step', '')
# 方式2: 从其他可能的位置获取
if not suggested_next_step:
# 可能在metadata中
metadata = checkpoint_tuple.metadata or {}
suggested_next_step = metadata.get('suggested_next_step', '')
# 方式3: 根据当前节点状态推断下一步
if not suggested_next_step:
current_node = channel_values.get('current_node', '')
# 根据业务逻辑推断下一步...
# 判断是否完成
is_completed = (not suggested_next_step or
suggested_next_step == "__end__" or
channel_values.get('status') == 'completed')
if is_completed:
return jsonify({
"status": "completed",
"name": "完成",
"icon": "✅",
"timestamp": datetime.now().isoformat()
})
else:
# 获取当前步骤信息
node_info = NODE_STATUS.get(suggested_next_step, {
"name": "执行中",
"icon": "⚙️"
})
return jsonify({
"status": "running",
"name": node_info["name"],
"icon": node_info["icon"],
"timestamp": datetime.now().isoformat()
})
except Exception as e:
return jsonify({"error": f"获取状态失败: {str(e)}"}), 500
# 异步版本 (如果需要)
@app.route('/api/status-async/<thread_id>')
def get_status_async(thread_id):
try:
# 使用异步API
async def _get_async_status():
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as async_checkpointer:
await async_checkpointer.asetup()
read_config = {"configurable": {"thread_id": thread_id}}
checkpoint_tuple = await async_checkpointer.aget_tuple(read_config)
if not checkpoint_tuple:
return None
return checkpoint_tuple.checkpoint
# 在Flask中运行异步代码
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
checkpoint = loop.run_until_complete(_get_async_status())
if not checkpoint:
return jsonify({"error": "未找到执行线程"}), 404
# 处理checkpoint数据...
# (相同的逻辑)
finally:
loop.close()
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/api/health')
def health_check():
"""健康检查"""
try:
# 测试checkpointer连接
test_config = {"configurable": {"thread_id": "health_check"}}
checkpointer.get_tuple(test_config) # 这不会报错,即使没有数据
return jsonify({
"status": "healthy",
"checkpointer": "connected",
"timestamp": datetime.now().isoformat()
})
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e)
}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
checkpoint:wang1:20250729235038043:__empty__:1f06c943-fd28-6539-801f-41a94c25bd43
其中“wang1:20250729235038043”就是thread_id,它是由 {user_id}:{时间戳}组成的。
{
"type": "ai",
"content": "系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。"
}
],
"question": "请问这个服务区有几个餐饮档口?"
},
"id": "call_498bcec43e2c4b3499e6e1"
},
{
"name": "",
"args": {
},
"id": "call_f71ffe8603ed42da879a17",
"type": "tool_call"
}
],
"invalid_tool_calls": []
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"ToolMessage"
],
"kwargs": {
"content": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL \nAND sa.delete_ts IS NULL;",
"type": "tool",
"name": "generate_sql",
"id": "3dfacea1-93af-444c-bfb4-2995f336f274",
"tool_call_id": "call_498bcec43e2c4b3499e6e1",
"status": "success"
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"ToolMessage"
],
"kwargs": {
"content": "Error: is not a valid tool, try one of [generate_sql, valid_sql, run_sql].",
"type": "tool",
"name": "",
"id": "1763e2eb-7be4-4e7e-bae3-19f9e0b1b4b5",
"tool_call_id": "call_f71ffe8603ed42da879a17",
"status": "error"
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"AIMessage"
],
"kwargs": {
"content": "",
"additional_kwargs": {
"tool_calls": [
{
"index": 0,
"id": "call_09fe3a57143947e5b85766",
"function": {
"arguments": "{\"history_messages\": [{\"type\": \"human\", \"content\": \"请问系统中哪个服务区档口最多?\"}, {\"type\": \"ai\", \"content\": \"系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。\"}], \"question\": \"请问这个服务区有几个餐饮档口?\"}",
"name": "generate_sql"
},
"type": "function"
}
]
},
"response_metadata": {
"finish_reason": "tool_calls",
"model_name": "qwen3-235b-a22b"
},
"type": "ai",
"id": "run--d61425aa-94cf-4e0c-837a-9ac7c48f4930-0",
"tool_calls": [
{
"name": "generate_sql",
"args": {
"history_messages": [
{
"type": "human",
"content": "请问系统中哪个服务区档口最多?"
},
{
"type": "ai",
"content": "系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。"
}
],
"question": "请问这个服务区有几个餐饮档口?"
},
"id": "call_09fe3a57143947e5b85766"
}
],
"invalid_tool_calls": []
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"ToolMessage"
],
"kwargs": {
"content": "SELECT COUNT(*) AS 餐饮档口数量 FROM bss_branch b JOIN bss_service_area sa ON b.service_area_id = sa.id WHERE sa.service_area_name = '南城服务区' AND b.classify = '餐饮' AND b.delete_ts IS NULL;",
"type": "tool",
"name": "generate_sql",
"id": "8016bd5b-9ad7-43b3-90f2-1994b798b06b",
"tool_call_id": "call_09fe3a57143947e5b85766",
"status": "success"
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"AIMessage"
],
"kwargs": {
"content": "",
"additional_kwargs": {
"tool_calls": [
{
"index": 0,
"id": "call_0753617fb1174fab9074eb",
"function": {
"arguments": "{\"sql\": \"SELECT COUNT(*) AS \\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf \\nFROM bss_branch b \\nJOIN bss_service_area sa ON b.service_area_id = sa.id \\nWHERE sa.service_area_name = '南城服务区' \\nAND b.classify = '餐饮' \\nAND b.delete_ts IS NULL;\", \"question\": \"\\u8bf7\\u95ee\\u8fd9\\u4e2a\\u670d\\u52a1\\u533a\\u6709\\u51e0\\u4e2a\\u9910\\u996e\\u6863\\u53e3\\uff1f\"}",
"name": "valid_sql"
},
"type": "function"
}
]
},
"response_metadata": {
"finish_reason": "tool_calls",
"model_name": "qwen3-235b-a22b"
},
"type": "ai",
"id": "run--4c0c02fa-efbb-4cd6-96c0-d80392c99aa9-0",
"tool_calls": [
{
"name": "valid_sql",
"args": {
"sql": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL;",
"question": "请问这个服务区有几个餐饮档口?"
},
"id": "call_0753617fb1174fab9074eb",
"type": "tool_call"
}
],
"invalid_tool_calls": []
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"ToolMessage"
],
"kwargs": {
"content": "SQL验证通过:语法正确且字段存在",
"type": "tool",
"name": "valid_sql",
"id": "62bde32c-02a9-4e36-aa76-5387cb03ce65",
"tool_call_id": "call_0753617fb1174fab9074eb",
"status": "success"
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"AIMessage"
],
"kwargs": {
"content": "",
"additional_kwargs": {
"tool_calls": [
{
"index": 0,
"id": "call_b0600de998f5467d96a3f2",
"function": {
"arguments": "{\"sql\": \"SELECT COUNT(*) AS \\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf \\nFROM bss_branch b \\nJOIN bss_service_area sa ON b.service_area_id = sa.id \\nWHERE sa.service_area_name = '\\u5357\\u57ce\\u670d\\u52a1\\u533a' \\nAND b.classify = '\\u9910\\u996e' \\nAND b.delete_ts IS NULL;\", \"question\": \"\\u8bf7\\u95ee\\u8fd9\\u4e2a\\u670d\\u52a1\\u533a\\u6709\\u51e0\\u4e2a\\u9910\\u996e\\u6863\\u53e3\\uff1f\"}",
"name": "run_sql"
},
"type": "function"
}
]
},
"response_metadata": {
"finish_reason": "tool_calls",
"model_name": "qwen3-235b-a22b"
},
"type": "ai",
"id": "run--6144e643-450a-4ec2-932a-230f666dd90c-0",
"tool_calls": [
{
"name": "run_sql",
"args": {
"sql": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL;",
"question": "请问这个服务区有几个餐饮档口?"
},
"id": "call_b0600de998f5467d96a3f2",
"type": "tool_call"
}
],
"invalid_tool_calls": []
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"ToolMessage"
],
"kwargs": {
"content": "[{\"\\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf\":6}]",
"type": "tool",
"name": "run_sql",
"id": "488da67c-ea4f-46f4-a201-d1bb40c25931",
"tool_call_id": "call_b0600de998f5467d96a3f2",
"status": "success"
}
}
],
"user_id": "wang1",
"thread_id": "wang1:20250729235038043",
"suggested_next_step": "summarize_final_answer",
"branch:to:agent": null
},
"channel_versions": {
"__start__": "00000000000000000000000000000018.0.2372340570136553",
"messages": "00000000000000000000000000000033.0.3402020632542543",
"user_id": "00000000000000000000000000000018.0.6157289048059329",
"thread_id": "00000000000000000000000000000018.0.3438965440425772",
"suggested_next_step": "00000000000000000000000000000034.0.4358036384739292",
"branch:to:agent": "00000000000000000000000000000034.0.30470240061347376",
"branch:to:prepare_tool_input": "00000000000000000000000000000032.0.683834676177241",
"branch:to:tools": "00000000000000000000000000000033.0.4617350256378949",
"branch:to:update_state_after_tool": "00000000000000000000000000000034.0.7301276046246093",
"branch:to:format_final_response": "00000000000000000000000000000016.0.4685210381789543"
},
"