明白了,我将为你准备一个基于 StateGraph 重构的方案,保留 ReAct 模型结构(即 LangChain Agent 使用 tools 推理的能力),同时确保:
我将整理一份详细的改造建议和新的代码框架提案,请稍等,我整理好后马上给你。
config.py
管理全局配置(模型名称、redis_url
、日志等级等),主逻辑放在如 qwen3_agent.py
的模块中,将 Graph 定义、Agent 类等拆分。client_api.py
与 shell.py
保持调用接口不变(get_conversation_history
、get_user_conversations
等),只是内部调用新 Agent 类即可。ChatState
),包括至少 messages
(保存对话列表)、thread_id
、user_id
等字段;可使用 typing.Annotated
和 add_messages
注解以维护消息列表。例如:class ChatState(TypedDict): messages: Annotated[List[BaseMessage], add_messages]; thread_id: str; user_id: str
。。这样所有节点(包括工具函数)都可访问上下文信息;必要时可在工具函数参数中使用 Annotated[..., InjectedState]
注解注入状态字段。config.py
示例: MODEL_NAME = "qwen3-235b-a22b"
REDIS_URL = "redis://localhost:6379"
REDIS_ENABLED = True
LOG_LEVEL = logging.INFO
主文件中读取这些常量来初始化模型、Redis、日志等配置。
重构后使用 LangGraph 的 StateGraph
明确描述 ReAct 流程。基本流程为:用户输入→LLM(助手)思考→根据需要调用工具→工具返回结果→LLM 继续思考→…→最终输出答案。可参考如下伪代码流程:
from langgraph.graph import StateGraph, START
from langgraph.prebuilt import ToolNode, tools_condition
# 创建 StateGraph,指定状态类型 ChatState
builder = StateGraph(ChatState)
# 节点:assistant 调用 LLM(绑定工具)
def assistant_node(state: ChatState) -> dict:
# 调用绑定工具的模型,输入当前消息列表
response = llm.bind_tools(tools).invoke(state["messages"])
return {"messages": response}
builder.add_node("assistant", assistant_node)
builder.add_node("tools", ToolNode(tools)) # 工具节点
# 边:开始进入 assistant 节点
builder.add_edge(START, "assistant")
# 如果 assistant 输出包含工具调用,则流转到 tools 节点,否则结束
builder.add_conditional_edges(
"assistant",
tools_condition # 有工具调用则进 tools,否则结束
)
# tools 处理后回到 assistant 节点(形成循环)
builder.add_edge("tools", "assistant")
# 编译 StateGraph(稍后传入 checkpointer)
graph = builder.compile()
如示例所示,ReAct 图 有两个核心节点:“assistant” 节点用于调用模型并产生 ToolCall
;“tools” 节点用于并行执行这些工具调用。通过 add_conditional_edges
将 assistant→tools 或 assistant→END 的流转条件化(tools_condition
判断最新 AIMessage 中是否有工具调用)。整体流程为:用户消息进 assistant
,若有工具调用则进入 tools
执行后再回 assistant
继续,直至无工具调用后结束并返回最终答案。
采用 langgraph-checkpoint-redis
提供的 AsyncRedisSaver 进行短期(线程级)持久化,以便跨会话保持对话历史。初始化时,用 Redis URL 创建 AsyncRedisSaver 实例并 await saver.asetup()
建立所需索引,如:
self._exit_stack = AsyncExitStack()
saver_mgr = AsyncRedisSaver.from_conn_string(config.REDIS_URL)
self.checkpointer = await self._exit_stack.enter_async_context(saver_mgr)
await self.checkpointer.asetup()
参照官方示例,可在编译图时将 checkpointer
传入 StateGraph.compile(checkpointer=...)
。这样,图的每次执行都会自动保存状态到 Redis。关闭时,通过 await self._exit_stack.aclose()
释放 Redis 连接(或使用 async with AsyncRedisSaver.from_conn_string(...)
上下文管理器)。
thread_id
用作对话流水号:首次对话时自动生成(如 userID:timestamp
),并在后续调用时传入图的 config
部分({"configurable":{"thread_id": thread_id}}
),以检索或续接该会话的历史。通过 checkpointer.get(config)
可异步取回当前线程的全部消息列表,以实现 get_conversation_history
等功能(可参考原代码的取值逻辑)。
logging
模块,设置基本配置输出到控制台。例如: import logging
logging.basicConfig(
level=config.LOG_LEVEL,
format="%(asctime)s %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
在关键步骤(如初始化模型/Redis、节点执行前后、工具调用等)使用 logger.info()/debug()/warning()
记录状态和统计信息,以便实时追踪流程。
print
或 logger
输出即可。建议在 assistant
节点前后输出提示(如“调用模型,Thread=xxx”),在工具函数开始时输出工具名和参数,在异常时使用 logger.error()
打印堆栈信息。这样可在终端实时观察 Agent 的运行轨迹,而无需额外工具监控。config.py
中的 LOG_LEVEL
配置调试信息输出级别(如 DEBUG, INFO)。开发时可设为 DEBUG 以观察细节,生产时切换为 INFO 以减少冗余输出。以上方案在保留原有 LangChain Agent/工具调用风格的同时,采用 StateGraph 明确化流程,各节点可访问共享的上下文状态。使用 AsyncRedisSaver 实现对话历史的持久化,利用 thread_id
管理不同会话;日志输出则通过标准 logging
模块实现可控的实时跟踪输出。
参考资料: LangGraph ReAct 架构示例;RedisSaver 用法指南;状态注入示例。