简化版LangGraph状态监控方案.md 22 KB

简化版LangGraph状态监控方案

🎯 核心理念

像ChatGPT一样,只告诉用户"AI正在思考",不显示百分比进度,专注于当前执行状态。

📡 单一API设计

接口: GET /api/status/{thread_id}

响应格式:

{
  "status": "running|completed",
  "name": "AI思考中",
  "icon": "🤖",
  "timestamp": "2025-07-30T15:30:45.123Z"
}

🔄 状态映射

基于您的LangGraph流程图节点:

节点 显示名称 图标
__start__ 开始 🚀
agent AI思考中 🤖
prepare_tool_input 准备工具 🔧
tools 执行查询 ⚙️
update_state_after_tool 处理结果 🔄
format_final_response 生成回答 📝
__end__ 完成

🏗️ 实现逻辑

  1. 从Redis获取最新checkpoint
  2. 读取suggested_next_step字段
  3. 如果为空或__end__ → 返回completed
  4. 否则 → 返回running + 对应状态名称

💡 使用方式

客户端轮询:

# Postman测试
GET http://localhost:5000/api/status/wang1:20250729235038043

# 返回示例1 (执行中)
{
  "status": "running",
  "name": "AI思考中", 
  "icon": "🤖",
  "timestamp": "2025-07-30T15:30:45.123Z"
}

# 返回示例2 (完成)
{
  "status": "completed",
  "name": "完成",
  "icon": "✅", 
  "timestamp": "2025-07-30T15:31:20.456Z"
}

客户端实现:

// 每秒轮询,直到status=completed就停止
setInterval(() => {
  fetch('/api/status/thread_id')
    .then(r => r.json())
    .then(data => {
      console.log(`${data.icon} ${data.name}`);
      if (data.status === 'completed') {
        // 停止轮询
      }
    });
}, 1000);

方案优势

  • 极简: 只有4个字段,一个API
  • 准确: 基于真实的checkpoint状态
  • 直观: 像ChatGPT一样的用户体验
  • 轻量: 最小化网络传输和服务器负载
  • 可靠: 直接读取LangGraph的执行状态

这就是您要的最简化版本 - 一个API,几个字段,专注核心功能!

简化LangGraph状态API 的代码:


from flask import Flask, jsonify
import redis
import json
from datetime import datetime

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)


# 简单的节点状态映射

NODE_STATUS = {
    "__start__": {"name": "开始", "icon": "🚀"},
    "agent": {"name": "AI思考中", "icon": "🤖"},
    "prepare_tool_input": {"name": "准备工具", "icon": "🔧"},
    "tools": {"name": "执行查询", "icon": "⚙️"},
    "update_state_after_tool": {"name": "处理结果", "icon": "🔄"},
    "format_final_response": {"name": "生成回答", "icon": "📝"},
    "__end__": {"name": "完成", "icon": "✅"}
}

@app.route('/api/status/<thread_id>')
def get_status(thread_id):
    try:
        # 查找最新checkpoint
        pattern = f"checkpoint:{thread_id}:*"
        keys = redis_client.keys(pattern)
    
    if not keys:
        return jsonify({"error": "未找到执行线程"}), 404
    
    # 获取最新checkpoint
    latest_key = sorted(keys)[-1]
    checkpoint_data = redis_client.get(latest_key)
    
    if not checkpoint_data:
        return jsonify({"error": "无法读取数据"}), 500
    
    checkpoint = json.loads(checkpoint_data)
    suggested_next_step = checkpoint.get('suggested_next_step', '')
    
    # 判断状态
    if not suggested_next_step or suggested_next_step == "__end__":
        # 完成了
        return jsonify({
            "status": "completed",
            "name": "完成",
            "icon": "✅",
            "timestamp": datetime.now().isoformat()
        })
    else:
        # 正在执行
        node_info = NODE_STATUS.get(suggested_next_step, {
            "name": "执行中",
            "icon": "⚙️"
        })
        
        return jsonify({
            "status": "running",
            "name": node_info["name"],
            "icon": node_info["icon"],
            "timestamp": datetime.now().isoformat()
        })
        
except Exception as e:
    return jsonify({"error": str(e)}), 500
    
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

两种获取Checkpoint的方法

🔥 方法一:直接Redis访问

就是我们之前使用的方法,直接查询Redis键:

# 直接Redis查询
pattern = f"checkpoint:{thread_id}:*"
keys = redis_client.keys(pattern)
latest_key = sorted(keys)[-1]
checkpoint_data = redis_client.get(latest_key)

🚀 方法二:使用LangGraph API

使用官方的Redis Checkpoint Saver API:## 📊 两种方法对比

方法一:直接Redis访问

优点

  • 更直接,性能稍快
  • 完全控制数据格式
  • 不依赖LangGraph版本

缺点

  • 需要了解Redis键名格式

    • checkpoint:wang1:20250729235038043:__empty__:1f06c943-fd28-6539-801f-41a94c25bd43
      
      • 硬编码依赖具体实现

      • 升级时可能出问题

      方法二:LangGraph API推荐

      优点

      • 官方API,更稳定可靠
      • 自动处理索引和版本兼容
      • 标准化的CheckpointTuple对象
      • 支持异步操作

      缺点

      • 需要安装langgraph-checkpoint-redis
      • 稍微多一点的API调用开销

      🔧 关键API使用

      基于搜索结果,官方Redis Checkpointer的核心方法:

      # 同步版本
      from langgraph.checkpoint.redis import RedisSaver
      
      checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
      checkpointer.setup()  # 重要:创建索引
      
      # 获取最新checkpoint
      read_config = {"configurable": {"thread_id": "your_thread_id"}}
      checkpoint_tuple = checkpointer.get_tuple(read_config)
      
      # 异步版本  
      from langgraph.checkpoint.redis.aio import AsyncRedisSaver
      
      async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as checkpointer:
      await checkpointer.asetup()
      checkpoint_tuple = await checkpointer.aget_tuple(read_config)
      

💡 建议

我建议使用方法二(LangGraph API),因为:

  1. 这是官方推荐方式,更可靠

  2. 自动处理版本兼容性问题

  3. 提供标准化的数据结构

  4. 您的代码会更加健壮和可维护

### 方法二参考代码

from flask import Flask, jsonify
from langgraph.checkpoint.redis import RedisSaver
from datetime import datetime
import asyncio

app = Flask(__name__)

# 初始化Redis Checkpointer
checkpointer = RedisSaver.from_conn_string("redis://localhost:6379")
checkpointer.setup()  # 创建必要的索引

# 简单的节点状态映射
NODE_STATUS = {
    "__start__": {"name": "开始", "icon": "🚀"},
    "agent": {"name": "AI思考中", "icon": "🤖"},
    "prepare_tool_input": {"name": "准备工具", "icon": "🔧"},
    "tools": {"name": "执行查询", "icon": "⚙️"},
    "update_state_after_tool": {"name": "处理结果", "icon": "🔄"},  
    "format_final_response": {"name": "生成回答", "icon": "📝"},
    "__end__": {"name": "完成", "icon": "✅"}
}

@app.route('/api/status/<thread_id>')
def get_status(thread_id):
    try:
        # 使用LangGraph API获取最新checkpoint
        read_config = {"configurable": {"thread_id": thread_id}}
        
        # 获取最新的checkpoint tuple
        checkpoint_tuple = checkpointer.get_tuple(read_config)
        
        if not checkpoint_tuple:
            return jsonify({"error": "未找到执行线程"}), 404
        
        # 从checkpoint中提取状态信息
        checkpoint = checkpoint_tuple.checkpoint
        
        # 获取suggested_next_step (这里需要根据实际的checkpoint结构调整)
        # 通常在channel_values中或者通过分析当前状态推断
        channel_values = checkpoint.get('channel_values', {})
        
        # 根据您的实际LangGraph实现,suggested_next_step可能在不同位置
        # 这里提供几种可能的获取方式:
        
        # 方式1: 直接从channel_values获取
        suggested_next_step = channel_values.get('suggested_next_step', '')
        
        # 方式2: 从其他可能的位置获取
        if not suggested_next_step:
            # 可能在metadata中
            metadata = checkpoint_tuple.metadata or {}
            suggested_next_step = metadata.get('suggested_next_step', '')
        
        # 方式3: 根据当前节点状态推断下一步
        if not suggested_next_step:
            current_node = channel_values.get('current_node', '')
            # 根据业务逻辑推断下一步...
        
        # 判断是否完成
        is_completed = (not suggested_next_step or 
                       suggested_next_step == "__end__" or
                       channel_values.get('status') == 'completed')
        
        if is_completed:
            return jsonify({
                "status": "completed", 
                "name": "完成",
                "icon": "✅",
                "timestamp": datetime.now().isoformat()
            })
        else:
            # 获取当前步骤信息
            node_info = NODE_STATUS.get(suggested_next_step, {
                "name": "执行中",
                "icon": "⚙️"
            })
            
            return jsonify({
                "status": "running",
                "name": node_info["name"],
                "icon": node_info["icon"], 
                "timestamp": datetime.now().isoformat()
            })
            
    except Exception as e:
        return jsonify({"error": f"获取状态失败: {str(e)}"}), 500

# 异步版本 (如果需要)
@app.route('/api/status-async/<thread_id>')
def get_status_async(thread_id):
    try:
        # 使用异步API
        async def _get_async_status():
            from langgraph.checkpoint.redis.aio import AsyncRedisSaver
            
            async with AsyncRedisSaver.from_conn_string("redis://localhost:6379") as async_checkpointer:
                await async_checkpointer.asetup()
                
                read_config = {"configurable": {"thread_id": thread_id}}
                checkpoint_tuple = await async_checkpointer.aget_tuple(read_config)
                
                if not checkpoint_tuple:
                    return None
                    
                return checkpoint_tuple.checkpoint
        
        # 在Flask中运行异步代码
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        
        try:
            checkpoint = loop.run_until_complete(_get_async_status())
            if not checkpoint:
                return jsonify({"error": "未找到执行线程"}), 404
            
            # 处理checkpoint数据...
            # (相同的逻辑)
            
        finally:
            loop.close()
            
    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/api/health')
def health_check():
    """健康检查"""
    try:
        # 测试checkpointer连接
        test_config = {"configurable": {"thread_id": "health_check"}}
        checkpointer.get_tuple(test_config)  # 这不会报错,即使没有数据
        
        return jsonify({
            "status": "healthy",
            "checkpointer": "connected",
            "timestamp": datetime.now().isoformat()
        })
    except Exception as e:
        return jsonify({
            "status": "unhealthy",
            "error": str(e)
        }), 500

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

Checkpoint在Redis中的存储的值

key:

checkpoint:wang1:20250729235038043:__empty__:1f06c943-fd28-6539-801f-41a94c25bd43

其中“wang1:20250729235038043”就是thread_id,它是由 {user_id}:{时间戳}组成的。

value:

              {
                      "type": "ai",
                      "content": "系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。"
                    }
                  ],
                  "question": "请问这个服务区有几个餐饮档口?"
                },
                "id": "call_498bcec43e2c4b3499e6e1"
              },
              {
                "name": "",
                "args": {
                  
                },
                "id": "call_f71ffe8603ed42da879a17",
                "type": "tool_call"
              }
            ],
            "invalid_tool_calls": []
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "ToolMessage"
          ],
          "kwargs": {
            "content": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL \nAND sa.delete_ts IS NULL;",
            "type": "tool",
            "name": "generate_sql",
            "id": "3dfacea1-93af-444c-bfb4-2995f336f274",
            "tool_call_id": "call_498bcec43e2c4b3499e6e1",
            "status": "success"
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "ToolMessage"
          ],
          "kwargs": {
            "content": "Error:  is not a valid tool, try one of [generate_sql, valid_sql, run_sql].",
            "type": "tool",
            "name": "",
            "id": "1763e2eb-7be4-4e7e-bae3-19f9e0b1b4b5",
            "tool_call_id": "call_f71ffe8603ed42da879a17",
            "status": "error"
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "AIMessage"
          ],
          "kwargs": {
            "content": "",
            "additional_kwargs": {
              "tool_calls": [
                {
                  "index": 0,
                  "id": "call_09fe3a57143947e5b85766",
                  "function": {
                    "arguments": "{\"history_messages\": [{\"type\": \"human\", \"content\": \"请问系统中哪个服务区档口最多?\"}, {\"type\": \"ai\", \"content\": \"系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。\"}], \"question\": \"请问这个服务区有几个餐饮档口?\"}",
                    "name": "generate_sql"
                  },
                  "type": "function"
                }
              ]
            },
            "response_metadata": {
              "finish_reason": "tool_calls",
              "model_name": "qwen3-235b-a22b"
            },
            "type": "ai",
            "id": "run--d61425aa-94cf-4e0c-837a-9ac7c48f4930-0",
            "tool_calls": [
              {
                "name": "generate_sql",
                "args": {
                  "history_messages": [
                    {
                      "type": "human",
                      "content": "请问系统中哪个服务区档口最多?"
                    },
                    {
                      "type": "ai",
                      "content": "系统中档口最多的服务区是**南城服务区**,共拥有**39个档口**。该结果通过统计各服务区内未删除档口的数量并按降序排列得出,已通过系统验证确保数据准确性。"
                    }
                  ],
                  "question": "请问这个服务区有几个餐饮档口?"
                },
                "id": "call_09fe3a57143947e5b85766"
              }
            ],
            "invalid_tool_calls": []
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "ToolMessage"
          ],
          "kwargs": {
            "content": "SELECT COUNT(*) AS 餐饮档口数量 FROM bss_branch b JOIN bss_service_area sa ON b.service_area_id = sa.id WHERE sa.service_area_name = '南城服务区' AND b.classify = '餐饮' AND b.delete_ts IS NULL;",
            "type": "tool",
            "name": "generate_sql",
            "id": "8016bd5b-9ad7-43b3-90f2-1994b798b06b",
            "tool_call_id": "call_09fe3a57143947e5b85766",
            "status": "success"
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "AIMessage"
          ],
          "kwargs": {
            "content": "",
            "additional_kwargs": {
              "tool_calls": [
                {
                  "index": 0,
                  "id": "call_0753617fb1174fab9074eb",
                  "function": {
                    "arguments": "{\"sql\": \"SELECT COUNT(*) AS \\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf \\nFROM bss_branch b \\nJOIN bss_service_area sa ON b.service_area_id = sa.id \\nWHERE sa.service_area_name = '南城服务区' \\nAND b.classify = '餐饮' \\nAND b.delete_ts IS NULL;\", \"question\": \"\\u8bf7\\u95ee\\u8fd9\\u4e2a\\u670d\\u52a1\\u533a\\u6709\\u51e0\\u4e2a\\u9910\\u996e\\u6863\\u53e3\\uff1f\"}",
                    "name": "valid_sql"
                  },
                  "type": "function"
                }
              ]
            },
            "response_metadata": {
              "finish_reason": "tool_calls",
              "model_name": "qwen3-235b-a22b"
            },
            "type": "ai",
            "id": "run--4c0c02fa-efbb-4cd6-96c0-d80392c99aa9-0",
            "tool_calls": [
              {
                "name": "valid_sql",
                "args": {
                  "sql": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL;",
                  "question": "请问这个服务区有几个餐饮档口?"
                },
                "id": "call_0753617fb1174fab9074eb",
                "type": "tool_call"
              }
            ],
            "invalid_tool_calls": []
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "ToolMessage"
          ],
          "kwargs": {
            "content": "SQL验证通过:语法正确且字段存在",
            "type": "tool",
            "name": "valid_sql",
            "id": "62bde32c-02a9-4e36-aa76-5387cb03ce65",
            "tool_call_id": "call_0753617fb1174fab9074eb",
            "status": "success"
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "AIMessage"
          ],
          "kwargs": {
            "content": "",
            "additional_kwargs": {
              "tool_calls": [
                {
                  "index": 0,
                  "id": "call_b0600de998f5467d96a3f2",
                  "function": {
                    "arguments": "{\"sql\": \"SELECT COUNT(*) AS \\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf \\nFROM bss_branch b \\nJOIN bss_service_area sa ON b.service_area_id = sa.id \\nWHERE sa.service_area_name = '\\u5357\\u57ce\\u670d\\u52a1\\u533a' \\nAND b.classify = '\\u9910\\u996e' \\nAND b.delete_ts IS NULL;\", \"question\": \"\\u8bf7\\u95ee\\u8fd9\\u4e2a\\u670d\\u52a1\\u533a\\u6709\\u51e0\\u4e2a\\u9910\\u996e\\u6863\\u53e3\\uff1f\"}",
                    "name": "run_sql"
                  },
                  "type": "function"
                }
              ]
            },
            "response_metadata": {
              "finish_reason": "tool_calls",
              "model_name": "qwen3-235b-a22b"
            },
            "type": "ai",
            "id": "run--6144e643-450a-4ec2-932a-230f666dd90c-0",
            "tool_calls": [
              {
                "name": "run_sql",
                "args": {
                  "sql": "SELECT COUNT(*) AS 餐饮档口数量 \nFROM bss_branch b \nJOIN bss_service_area sa ON b.service_area_id = sa.id \nWHERE sa.service_area_name = '南城服务区' \nAND b.classify = '餐饮' \nAND b.delete_ts IS NULL;",
                  "question": "请问这个服务区有几个餐饮档口?"
                },
                "id": "call_b0600de998f5467d96a3f2",
                "type": "tool_call"
              }
            ],
            "invalid_tool_calls": []
          }
        },
        {
          "lc": 1,
          "type": "constructor",
          "id": [
            "langchain",
            "schema",
            "messages",
            "ToolMessage"
          ],
          "kwargs": {
            "content": "[{\"\\u9910\\u996e\\u6863\\u53e3\\u6570\\u91cf\":6}]",
            "type": "tool",
            "name": "run_sql",
            "id": "488da67c-ea4f-46f4-a201-d1bb40c25931",
            "tool_call_id": "call_b0600de998f5467d96a3f2",
            "status": "success"
          }
        }
      ],
      "user_id": "wang1",
      "thread_id": "wang1:20250729235038043",
      "suggested_next_step": "summarize_final_answer",
      "branch:to:agent": null
    },
    "channel_versions": {
      "__start__": "00000000000000000000000000000018.0.2372340570136553",
      "messages": "00000000000000000000000000000033.0.3402020632542543",
      "user_id": "00000000000000000000000000000018.0.6157289048059329",
      "thread_id": "00000000000000000000000000000018.0.3438965440425772",
      "suggested_next_step": "00000000000000000000000000000034.0.4358036384739292",
      "branch:to:agent": "00000000000000000000000000000034.0.30470240061347376",
      "branch:to:prepare_tool_input": "00000000000000000000000000000032.0.683834676177241",
      "branch:to:tools": "00000000000000000000000000000033.0.4617350256378949",
      "branch:to:update_state_after_tool": "00000000000000000000000000000034.0.7301276046246093",
      "branch:to:format_final_response": "00000000000000000000000000000016.0.4685210381789543"
    },
    "