参考方案.md 6.3 KB

明白了,我将为你准备一个基于 StateGraph 重构的方案,保留 ReAct 模型结构(即 LangChain Agent 使用 tools 推理的能力),同时确保:

  • 使用 RedisSaver 进行异步 checkpoint 和历史追踪
  • 每个节点都能访问上下文(如 user_id、thread_id)
  • 明确 thread_id 作为会话隔离标识
  • 保留工具调用逻辑和错误处理能力
  • 结构上为向生产过渡做准备,如拆出 config.py 和增强日志输出

我将整理一份详细的改造建议和新的代码框架提案,请稍等,我整理好后马上给你。

重构整体架构

  • 模块划分:引入 config.py 管理全局配置(模型名称、redis_url、日志等级等),主逻辑放在如 qwen3_agent.py 的模块中,将 Graph 定义、Agent 类等拆分。client_api.pyshell.py 保持调用接口不变(get_conversation_historyget_user_conversations 等),只是内部调用新 Agent 类即可。
  • 状态定义:为 StateGraph 定义自定义状态(如 ChatState),包括至少 messages(保存对话列表)、thread_iduser_id 等字段;可使用 typing.Annotatedadd_messages 注解以维护消息列表。例如:class ChatState(TypedDict): messages: Annotated[List[BaseMessage], add_messages]; thread_id: str; user_id: str。。这样所有节点(包括工具函数)都可访问上下文信息;必要时可在工具函数参数中使用 Annotated[..., InjectedState] 注解注入状态字段。
  • 配置文件config.py 示例:
  MODEL_NAME = "qwen3-235b-a22b"
  REDIS_URL = "redis://localhost:6379"
  REDIS_ENABLED = True
  LOG_LEVEL = logging.INFO

主文件中读取这些常量来初始化模型、Redis、日志等配置。

Graph 定义流程示意

重构后使用 LangGraph 的 StateGraph 明确描述 ReAct 流程。基本流程为:用户输入→LLM(助手)思考→根据需要调用工具→工具返回结果→LLM 继续思考→…→最终输出答案。可参考如下伪代码流程:

from langgraph.graph import StateGraph, START
from langgraph.prebuilt import ToolNode, tools_condition

# 创建 StateGraph,指定状态类型 ChatState
builder = StateGraph(ChatState)

# 节点:assistant 调用 LLM(绑定工具)
def assistant_node(state: ChatState) -> dict:
    # 调用绑定工具的模型,输入当前消息列表
    response = llm.bind_tools(tools).invoke(state["messages"])
    return {"messages": response}

builder.add_node("assistant", assistant_node)
builder.add_node("tools", ToolNode(tools))  # 工具节点

# 边:开始进入 assistant 节点
builder.add_edge(START, "assistant")
# 如果 assistant 输出包含工具调用,则流转到 tools 节点,否则结束
builder.add_conditional_edges(
    "assistant",
    tools_condition  # 有工具调用则进 tools,否则结束
)
# tools 处理后回到 assistant 节点(形成循环)
builder.add_edge("tools", "assistant")

# 编译 StateGraph(稍后传入 checkpointer)
graph = builder.compile()

如示例所示,ReAct 图 有两个核心节点:“assistant” 节点用于调用模型并产生 ToolCall;“tools” 节点用于并行执行这些工具调用。通过 add_conditional_edgesassistant→toolsassistant→END 的流转条件化(tools_condition 判断最新 AIMessage 中是否有工具调用)。整体流程为:用户消息进 assistant,若有工具调用则进入 tools 执行后再回 assistant 继续,直至无工具调用后结束并返回最终答案。

RedisSaver 持久化生命周期

采用 langgraph-checkpoint-redis 提供的 AsyncRedisSaver 进行短期(线程级)持久化,以便跨会话保持对话历史。初始化时,用 Redis URL 创建 AsyncRedisSaver 实例并 await saver.asetup() 建立所需索引,如:

self._exit_stack = AsyncExitStack()
saver_mgr = AsyncRedisSaver.from_conn_string(config.REDIS_URL)
self.checkpointer = await self._exit_stack.enter_async_context(saver_mgr)
await self.checkpointer.asetup()

参照官方示例,可在编译图时将 checkpointer 传入 StateGraph.compile(checkpointer=...)。这样,图的每次执行都会自动保存状态到 Redis。关闭时,通过 await self._exit_stack.aclose() 释放 Redis 连接(或使用 async with AsyncRedisSaver.from_conn_string(...) 上下文管理器)。

thread_id 用作对话流水号:首次对话时自动生成(如 userID:timestamp),并在后续调用时传入图的 config 部分({"configurable":{"thread_id": thread_id}}),以检索或续接该会话的历史。通过 checkpointer.get(config) 可异步取回当前线程的全部消息列表,以实现 get_conversation_history 等功能(可参考原代码的取值逻辑)。

日志输出与追踪

  • 日志框架:使用 Python 内置的 logging 模块,设置基本配置输出到控制台。例如:
  import logging
  logging.basicConfig(
      level=config.LOG_LEVEL,
      format="%(asctime)s %(levelname)s: %(message)s"
  )
  logger = logging.getLogger(__name__)

在关键步骤(如初始化模型/Redis、节点执行前后、工具调用等)使用 logger.info()/debug()/warning() 记录状态和统计信息,以便实时追踪流程。

  • 控制台追踪:避免过于复杂的日志管理,简单的 printlogger 输出即可。建议在 assistant 节点前后输出提示(如“调用模型,Thread=xxx”),在工具函数开始时输出工具名和参数,在异常时使用 logger.error() 打印堆栈信息。这样可在终端实时观察 Agent 的运行轨迹,而无需额外工具监控。
  • 日志等级:通过 config.py 中的 LOG_LEVEL 配置调试信息输出级别(如 DEBUG, INFO)。开发时可设为 DEBUG 以观察细节,生产时切换为 INFO 以减少冗余输出。

以上方案在保留原有 LangChain Agent/工具调用风格的同时,采用 StateGraph 明确化流程,各节点可访问共享的上下文状态。使用 AsyncRedisSaver 实现对话历史的持久化,利用 thread_id 管理不同会话;日志输出则通过标准 logging 模块实现可控的实时跟踪输出。

参考资料: LangGraph ReAct 架构示例;RedisSaver 用法指南;状态注入示例。