明白了,我将为你准备一个基于 StateGraph 重构的方案,保留 ReAct 模型结构(即 LangChain Agent 使用 tools 推理的能力),同时确保: * 使用 RedisSaver 进行异步 checkpoint 和历史追踪 * 每个节点都能访问上下文(如 user\_id、thread\_id) * 明确 thread\_id 作为会话隔离标识 * 保留工具调用逻辑和错误处理能力 * 结构上为向生产过渡做准备,如拆出 config.py 和增强日志输出 我将整理一份详细的改造建议和新的代码框架提案,请稍等,我整理好后马上给你。 # 重构整体架构 * **模块划分**:引入 `config.py` 管理全局配置(模型名称、`redis_url`、日志等级等),主逻辑放在如 `qwen3_agent.py` 的模块中,将 Graph 定义、Agent 类等拆分。`client_api.py` 与 `shell.py` 保持调用接口不变(`get_conversation_history`、`get_user_conversations` 等),只是内部调用新 Agent 类即可。 * **状态定义**:为 StateGraph 定义自定义状态(如 `ChatState`),包括至少 `messages`(保存对话列表)、`thread_id`、`user_id` 等字段;可使用 `typing.Annotated` 和 `add_messages` 注解以维护消息列表。例如:`class ChatState(TypedDict): messages: Annotated[List[BaseMessage], add_messages]; thread_id: str; user_id: str`。。这样所有节点(包括工具函数)都可访问上下文信息;必要时可在工具函数参数中使用 `Annotated[..., InjectedState]` 注解注入状态字段。 * **配置文件**:`config.py` 示例: ```python MODEL_NAME = "qwen3-235b-a22b" REDIS_URL = "redis://localhost:6379" REDIS_ENABLED = True LOG_LEVEL = logging.INFO ``` 主文件中读取这些常量来初始化模型、Redis、日志等配置。 ## Graph 定义流程示意 重构后使用 LangGraph 的 `StateGraph` 明确描述 ReAct 流程。基本流程为:**用户输入→LLM(助手)思考→根据需要调用工具→工具返回结果→LLM 继续思考→…→最终输出答案**。可参考如下伪代码流程: ```python from langgraph.graph import StateGraph, START from langgraph.prebuilt import ToolNode, tools_condition # 创建 StateGraph,指定状态类型 ChatState builder = StateGraph(ChatState) # 节点:assistant 调用 LLM(绑定工具) def assistant_node(state: ChatState) -> dict: # 调用绑定工具的模型,输入当前消息列表 response = llm.bind_tools(tools).invoke(state["messages"]) return {"messages": response} builder.add_node("assistant", assistant_node) builder.add_node("tools", ToolNode(tools)) # 工具节点 # 边:开始进入 assistant 节点 builder.add_edge(START, "assistant") # 如果 assistant 输出包含工具调用,则流转到 tools 节点,否则结束 builder.add_conditional_edges( "assistant", tools_condition # 有工具调用则进 tools,否则结束 ) # tools 处理后回到 assistant 节点(形成循环) builder.add_edge("tools", "assistant") # 编译 StateGraph(稍后传入 checkpointer) graph = builder.compile() ``` 如示例所示,**ReAct 图** 有两个核心节点:“assistant” 节点用于调用模型并产生 `ToolCall`;“tools” 节点用于并行执行这些工具调用。通过 `add_conditional_edges` 将 **assistant→tools** 或 **assistant→END** 的流转条件化(`tools_condition` 判断最新 AIMessage 中是否有工具调用)。整体流程为:用户消息进 `assistant`,若有工具调用则进入 `tools` 执行后再回 `assistant` 继续,直至无工具调用后结束并返回最终答案。 ## RedisSaver 持久化生命周期 采用 `langgraph-checkpoint-redis` 提供的 **AsyncRedisSaver** 进行短期(线程级)持久化,以便跨会话保持对话历史。**初始化**时,用 Redis URL 创建 AsyncRedisSaver 实例并 `await saver.asetup()` 建立所需索引,如: ```python self._exit_stack = AsyncExitStack() saver_mgr = AsyncRedisSaver.from_conn_string(config.REDIS_URL) self.checkpointer = await self._exit_stack.enter_async_context(saver_mgr) await self.checkpointer.asetup() ``` 参照官方示例,可在编译图时将 `checkpointer` 传入 `StateGraph.compile(checkpointer=...)`。这样,图的每次执行都会自动保存状态到 Redis。**关闭**时,通过 `await self._exit_stack.aclose()` 释放 Redis 连接(或使用 `async with AsyncRedisSaver.from_conn_string(...)` 上下文管理器)。 `thread_id` 用作对话流水号:首次对话时自动生成(如 `userID:timestamp`),并在后续调用时传入图的 `config` 部分(`{"configurable":{"thread_id": thread_id}}`),以检索或续接该会话的历史。通过 `checkpointer.get(config)` 可异步取回当前线程的全部消息列表,以实现 `get_conversation_history` 等功能(可参考原代码的取值逻辑)。 ## 日志输出与追踪 * **日志框架**:使用 Python 内置的 `logging` 模块,设置基本配置输出到控制台。例如: ```python import logging logging.basicConfig( level=config.LOG_LEVEL, format="%(asctime)s %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) ``` 在关键步骤(如初始化模型/Redis、节点执行前后、工具调用等)使用 `logger.info()/debug()/warning()` 记录状态和统计信息,以便实时追踪流程。 * **控制台追踪**:避免过于复杂的日志管理,简单的 `print` 或 `logger` 输出即可。建议在 `assistant` 节点前后输出提示(如“调用模型,Thread=xxx”),在工具函数开始时输出工具名和参数,在异常时使用 `logger.error()` 打印堆栈信息。这样可在终端实时观察 Agent 的运行轨迹,而无需额外工具监控。 * **日志等级**:通过 `config.py` 中的 `LOG_LEVEL` 配置调试信息输出级别(如 DEBUG, INFO)。开发时可设为 DEBUG 以观察细节,生产时切换为 INFO 以减少冗余输出。 以上方案在保留原有 LangChain Agent/工具调用风格的同时,采用 StateGraph 明确化流程,各节点可访问共享的上下文状态。使用 AsyncRedisSaver 实现对话历史的持久化,利用 `thread_id` 管理不同会话;日志输出则通过标准 `logging` 模块实现可控的实时跟踪输出。 **参考资料:** LangGraph ReAct 架构示例;RedisSaver 用法指南;状态注入示例。