Преглед изворни кода

部分修复了Event loop的错误,简化提示词,

wangxq пре 1 месец
родитељ
комит
e19d1a0975
4 измењених фајлова са 108 додато и 18 уклоњено
  1. 2 2
      app_config.py
  2. 66 10
      react_agent/agent.py
  3. 3 1
      react_agent/config.py
  4. 37 5
      unified_api.py

+ 2 - 2
app_config.py

@@ -37,12 +37,12 @@ API_DEEPSEEK_CONFIG = {
 API_QIANWEN_CONFIG = {
     "api_key": os.getenv("QWEN_API_KEY"),  # 从环境变量读取API密钥
     "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",  # 千问API地址
-    "model": "qwen3-235b-a22b",
+    "model": "qwen-plus",
     "allow_llm_to_see_data": True,
     "temperature": 0.6,
     "n_results": 6,
     "language": "Chinese",
-    "stream": True,  # 是否使用流式模式
+    "stream": False,  # 是否使用流式模式
     "enable_thinking": False  # 是否启用思考功能(要求stream=True)
 }
 #qwen3-30b-a3b

+ 66 - 10
react_agent/agent.py

@@ -92,8 +92,9 @@ class CustomReactAgent:
             temperature=0.1,
             timeout=config.NETWORK_TIMEOUT,  # 添加超时配置
             max_retries=0,  # 禁用OpenAI客户端重试,改用Agent层统一重试
+            streaming=True,
             extra_body={
-                "enable_thinking": False,
+                "enable_thinking": True,  # False by wxq
                 "misc": {
                     "ensure_ascii": False
                 }
@@ -113,7 +114,7 @@ class CustomReactAgent:
                 )
             )
         )
-        logger.info(f"   LLM 已初始化,模型: {config.QWEN_MODEL}")
+        logger.info(f"   ReactAgent LLM 已初始化,模型: {config.QWEN_MODEL}")
 
         # 3. 绑定工具
         self.tools = sql_tools
@@ -141,6 +142,31 @@ class CustomReactAgent:
         logger.info("   StateGraph 已构建并编译。")
         logger.info("✅ CustomReactAgent 初始化完成。")
 
+    async def _reinitialize_checkpointer(self):
+        """重新初始化checkpointer连接"""
+        try:
+            # 清理旧的连接
+            if self._exit_stack:
+                try:
+                    await self._exit_stack.aclose()
+                except:
+                    pass
+                
+            # 重新创建
+            if config.REDIS_ENABLED and AsyncRedisSaver is not None:
+                self._exit_stack = AsyncExitStack()
+                checkpointer_manager = AsyncRedisSaver.from_conn_string(config.REDIS_URL)
+                self.checkpointer = await self._exit_stack.enter_async_context(checkpointer_manager)
+                await self.checkpointer.asetup()
+                logger.info("✅ Checkpointer重新初始化成功")
+            else:
+                self.checkpointer = None
+                logger.warning("⚠️ Redis禁用,checkpointer设为None")
+                
+        except Exception as e:
+            logger.error(f"❌ Checkpointer重新初始化失败: {e}")
+            self.checkpointer = None
+
     async def close(self):
         """清理资源,关闭 Redis 连接。"""
         if self._exit_stack:
@@ -857,6 +883,22 @@ class CustomReactAgent:
         try:
             logger.info(f"🚀 开始处理用户消息: {message[:50]}...")
             
+            # 检查checkpointer状态,如果Redis连接有问题则重新初始化
+            if self.checkpointer:
+                try:
+                    # 简单的连接测试 - 不用aget_tuple因为可能没有数据
+                    # 直接测试Redis连接
+                    if hasattr(self.checkpointer, 'conn') and self.checkpointer.conn:
+                        await self.checkpointer.conn.ping()
+                except Exception as checkpoint_error:
+                    if "Event loop is closed" in str(checkpoint_error) or "closed" in str(checkpoint_error).lower():
+                        logger.warning(f"⚠️ Checkpointer连接异常,尝试重新初始化: {checkpoint_error}")
+                        await self._reinitialize_checkpointer()
+                        # 重新构建graph使用新的checkpointer
+                        self.agent_executor = self._create_graph()
+                    else:
+                        logger.warning(f"⚠️ Checkpointer测试失败,但继续执行: {checkpoint_error}")
+            
             final_state = await self.agent_executor.ainvoke(inputs, config)
             
             # 🔍 调试:打印 final_state 的所有 keys
@@ -889,8 +931,26 @@ class CustomReactAgent:
             return result
             
         except Exception as e:
-            logger.error(f"❌ 处理过程中发生严重错误 - Thread: {thread_id}: {e}", exc_info=True)
-            return {"success": False, "error": str(e), "thread_id": thread_id}
+            # 特殊处理Redis相关的Event loop错误
+            if "Event loop is closed" in str(e):
+                logger.error(f"❌ Redis Event loop已关闭 - Thread: {thread_id}: {e}")
+                # 尝试重新初始化checkpointer
+                try:
+                    await self._reinitialize_checkpointer()
+                    self.agent_executor = self._create_graph()
+                    logger.info("🔄 已重新初始化checkpointer,请重试请求")
+                except Exception as reinit_error:
+                    logger.error(f"❌ 重新初始化失败: {reinit_error}")
+                
+                return {
+                    "success": False, 
+                    "error": "Redis连接问题,请重试", 
+                    "thread_id": thread_id,
+                    "retry_suggested": True
+                }
+            else:
+                logger.error(f"❌ 处理过程中发生严重错误 - Thread: {thread_id}: {e}", exc_info=True)
+                return {"success": False, "error": str(e), "thread_id": thread_id}
     
     async def get_conversation_history(self, thread_id: str) -> List[Dict[str, Any]]:
         """从 checkpointer 获取指定线程的对话历史。"""
@@ -988,8 +1048,8 @@ class CustomReactAgent:
                 if cursor == 0:
                     break
             
-            # 关闭临时Redis连接
-            await redis_client.close()
+            # 注意:不要关闭共享的Redis客户端连接,因为其他操作可能还在使用
+            # await redis_client.close()  # 已注释掉,避免关闭共享连接
             
             # 2. 按时间戳排序(新的在前)
             sorted_threads = sorted(
@@ -1416,15 +1476,11 @@ not on explaining your decision-making process.
 调用工具时的严格要求:
 1. **原样传递原则**:question 参数必须与用户问题完全一致,一字不差
 2. **禁止任何改写**:不得进行同义词替换、语言优化或任何形式的修改
-3. **保持专有名词**:所有人名、地名、专业术语必须保持原始表达
 
 ❌ 错误示例:
 - 用户问"充电桩",不得改为"充电栋"
-- 用户提到"南城服务区",不得改为"南峡服务区"
-
 ✅ 正确做法:
 - 完全复制用户的原始问题作为question参数
-- 保持所有词汇的原始形态
 
 请严格遵守此要求,确保工具调用的准确性。"""
         

+ 3 - 1
react_agent/config.py

@@ -13,7 +13,9 @@ PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(_
 # qwen-plus
 # qwen3-235b-a22b
 # qwen3-30b-a3b
-QWEN_API_KEY = "sk-db68e37f00974031935395315bfe07f0"
+# sk-8f2320dafc9e4076968accdd8eebd8e9
+# my:"sk-db68e37f00974031935395315bfe07f0"
+QWEN_API_KEY = "sk-8f2320dafc9e4076968accdd8eebd8e9"
 QWEN_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
 QWEN_MODEL = "qwen3-235b-a22b"
 

+ 37 - 5
unified_api.py

@@ -453,17 +453,23 @@ async def ask_react_agent():
             error_msg = agent_result.get("error", "React Agent处理失败")
             logger.error(f"❌ React Agent处理失败: {error_msg}")
             
+            # 检查是否建议重试
+            retry_suggested = agent_result.get("retry_suggested", False)
+            error_code = 503 if retry_suggested else 500
+            message = "服务暂时不可用,请稍后重试" if retry_suggested else "处理失败"
+            
             return jsonify({
-                "code": 500,
-                "message": "处理失败",
+                "code": error_code,
+                "message": message,
                 "success": False,
                 "error": error_msg,
+                "retry_suggested": retry_suggested,
                 "data": {
                     "conversation_id": agent_result.get("thread_id"),
                     "user_id": validated_data['user_id'],
                     "timestamp": datetime.now().isoformat()
                 }
-            }), 500
+            }), error_code
         
         # Agent处理成功
         api_data = agent_result.get("api_data", {})
@@ -1936,8 +1942,34 @@ if __name__ == '__main__':
     logger.info("📘 React Agent API: http://localhost:8084/api/v0/ask_react_agent")
     logger.info("📘 LangGraph Agent API: http://localhost:8084/api/v0/ask_agent")
     
-    # 启动标准Flask应用(支持异步路由)
-    app.run(host="0.0.0.0", port=8084, debug=False, threaded=True)
+    try:
+        # 尝试使用ASGI模式启动(推荐)
+        import uvicorn
+        from asgiref.wsgi import WsgiToAsgi
+        
+        logger.info("🚀 使用ASGI模式启动异步Flask应用...")
+        logger.info("   这将解决事件循环冲突问题,支持LangGraph异步checkpoint保存")
+        
+        # 将Flask WSGI应用转换为ASGI应用
+        asgi_app = WsgiToAsgi(app)
+        
+        # 使用uvicorn启动ASGI应用
+        uvicorn.run(
+            asgi_app,
+            host="0.0.0.0",
+            port=8084,
+            log_level="info",
+            access_log=True
+        )
+        
+    except ImportError as e:
+        # 如果缺少ASGI依赖,fallback到传统Flask模式
+        logger.warning("⚠️ ASGI依赖缺失,使用传统Flask模式启动")
+        logger.warning("   建议安装: pip install uvicorn asgiref")
+        logger.warning("   传统模式可能存在异步事件循环冲突问题")
+        
+        # 启动标准Flask应用(支持异步路由)
+        app.run(host="0.0.0.0", port=8084, debug=False, threaded=True)
 
 # Data Pipeline 全局变量 - 从 citu_app.py 迁移
 data_pipeline_manager = None