# Flask React Agent异步迁移完整指南 ## 项目异步化分析与修改建议 基于对Flask + LangGraph + Redis技术栈的深入分析,本指南提供了将React Agent对话机器人项目完全异步化的详细方案,解决事件循环管理复杂性,实现`python api.py`直接启动的目标。 ## 核心问题识别 ### 1. 同步/异步混用问题定位 **主要问题区域**: - **Flask路由层**:同步路由调用异步Agent方法 - **事件循环管理**:`run_async_safely`、`ensure_agent_ready_sync`等复杂包装 - **数据库工具**:SQL工具可能使用同步数据库连接 - **Redis操作**:可能存在同步/异步Redis客户端混用 - **Agent执行**:StateGraph节点间的异步调用不一致 ### 2. 关键代码模式分析 **典型问题代码模式**: ```python # 问题1:同步路由调用异步Agent @app.route("/chat") def chat(): result = run_async_safely(agent.ainvoke(input_data)) # 复杂事件循环管理 return jsonify(result) # 问题2:复杂的异步包装函数 def run_async_safely(coro): try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop.run_until_complete(coro) # 问题3:Redis同步/异步混用 redis_client = redis.Redis() # 同步客户端 async def some_async_function(): await async_redis_client.set("key", "value") # 异步客户端 ``` ## 完整异步迁移方案 ### 1. api.py 异步化改造 **原有问题**: - Flask路由使用同步函数但调用异步Agent - 复杂的事件循环管理函数 - 启动/清理逻辑不适配异步模式 **修改后的api.py**: ```python import asyncio import json from contextlib import asynccontextmanager from flask import Flask, request, jsonify from asgiref.wsgi import WsgiToAsgi import redis.asyncio as redis from agent import AsyncReactAgent from enhanced_redis_api import AsyncRedisAPI from config import Config app = Flask(__name__) app.config.from_object(Config) # 全局异步资源管理 class AsyncResourceManager: def __init__(self): self.redis_client = None self.redis_api = None self.agent = None async def initialize(self): """初始化所有异步资源""" # Redis客户端 self.redis_client = redis.from_url(app.config['REDIS_URL']) # Redis API self.redis_api = AsyncRedisAPI(self.redis_client) # Agent初始化 self.agent = AsyncReactAgent( redis_client=self.redis_client, config=app.config ) await self.agent.initialize() async def cleanup(self): """清理所有异步资源""" if self.agent: await self.agent.cleanup() if self.redis_client: await self.redis_client.aclose() # 全局资源管理器 resource_manager = AsyncResourceManager() @asynccontextmanager async def get_agent(): """获取Agent实例的上下文管理器""" if not resource_manager.agent: await resource_manager.initialize() yield resource_manager.agent # 异步路由实现 @app.route("/chat", methods=["POST"]) async def chat(): """异步聊天接口""" try: data = request.get_json() message = data.get("message", "") thread_id = data.get("thread_id", "default") async with get_agent() as agent: result = await agent.process_message(message, thread_id) return jsonify({ "status": "success", "response": result.get("response", ""), "thread_id": thread_id }) except Exception as e: return jsonify({ "status": "error", "message": str(e) }), 500 @app.route("/health", methods=["GET"]) async def health_check(): """异步健康检查""" try: async with get_agent() as agent: health_status = await agent.health_check() return jsonify({ "status": "healthy", "agent_status": health_status }) except Exception as e: return jsonify({ "status": "unhealthy", "error": str(e) }), 503 @app.route("/redis/direct", methods=["POST"]) async def redis_direct(): """直接Redis操作接口""" try: data = request.get_json() operation = data.get("operation") key = data.get("key") value = data.get("value") async with resource_manager.redis_client as client: if operation == "set": await client.set(key, value) return jsonify({"status": "success", "message": "Key set"}) elif operation == "get": result = await client.get(key) return jsonify({"status": "success", "value": result}) else: return jsonify({"status": "error", "message": "Invalid operation"}), 400 except Exception as e: return jsonify({"status": "error", "message": str(e)}), 500 # 应用启动和清理 async def startup(): """应用启动时的异步初始化""" await resource_manager.initialize() print("Async resources initialized successfully") async def cleanup(): """应用关闭时的异步清理""" await resource_manager.cleanup() print("Async resources cleaned up successfully") # 将Flask转换为ASGI应用 asgi_app = WsgiToAsgi(app) # 启动函数 async def main(): """主异步函数""" await startup() try: # 使用uvicorn启动ASGI应用 import uvicorn config = uvicorn.Config( app=asgi_app, host="0.0.0.0", port=5000, log_level="info" ) server = uvicorn.Server(config) await server.serve() except KeyboardInterrupt: print("Shutting down...") finally: await cleanup() if __name__ == "__main__": asyncio.run(main()) ``` ### 2. agent.py 异步化改造 **原有问题**: - StateGraph节点混用同步/异步 - 复杂的事件循环确保函数 - Redis检查点可能使用同步客户端 **修改后的agent.py**: ```python import asyncio import json from typing import Dict, Any, Optional from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.redis.aio import AsyncRedisSaver from langgraph.store.redis.aio import AsyncRedisStore from langchain_core.messages import HumanMessage, AIMessage from langchain_core.runnables import RunnableConfig import redis.asyncio as redis from state import AgentState from sql_tools import AsyncSQLTools from config import Config class AsyncReactAgent: def __init__(self, redis_client: redis.Redis, config: Config): self.redis_client = redis_client self.config = config self.graph = None self.checkpointer = None self.store = None self.sql_tools = None self._initialized = False async def initialize(self): """异步初始化Agent""" if self._initialized: return # 初始化Redis检查点 self.checkpointer = AsyncRedisSaver(self.redis_client) await self.checkpointer.asetup() # 初始化Redis存储 self.store = AsyncRedisStore(self.redis_client) await self.store.asetup() # 初始化SQL工具 self.sql_tools = AsyncSQLTools(self.config.DATABASE_URL) await self.sql_tools.initialize() # 构建状态图 self._build_graph() self._initialized = True def _build_graph(self): """构建异步状态图""" builder = StateGraph(AgentState) # 添加异步节点 builder.add_node("think", self._think_node) builder.add_node("act", self._act_node) builder.add_node("observe", self._observe_node) # 添加边 builder.add_edge(START, "think") builder.add_conditional_edges( "think", self._should_continue, { "continue": "act", "end": END } ) builder.add_edge("act", "observe") builder.add_edge("observe", "think") # 编译图 self.graph = builder.compile( checkpointer=self.checkpointer, store=self.store ) async def _think_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]: """思考节点 - 异步LLM调用""" messages = state.get("messages", []) # 异步调用LLM from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4", temperature=0) # 使用异步调用 response = await llm.ainvoke(messages) # 更新状态 return { "messages": messages + [response], "next_action": self._parse_action(response.content) } async def _act_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]: """行动节点 - 异步工具调用""" action = state.get("next_action") if not action: return {"tool_results": "No action specified"} # 异步执行工具 if action["tool"] == "sql_query": result = await self.sql_tools.execute_query(action["query"]) elif action["tool"] == "redis_search": result = await self._redis_search(action["query"]) else: result = "Unknown tool" return { "tool_results": result, "action_history": state.get("action_history", []) + [action] } async def _observe_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]: """观察节点 - 异步状态更新""" # 异步更新观察结果 observation = await self._process_observation(state.get("tool_results")) return { "observations": state.get("observations", []) + [observation], "iteration_count": state.get("iteration_count", 0) + 1 } async def _should_continue(self, state: AgentState) -> str: """条件判断 - 是否继续""" if state.get("iteration_count", 0) >= 5: return "end" messages = state.get("messages", []) if messages and "FINAL_ANSWER" in messages[-1].content: return "end" return "continue" async def _redis_search(self, query: str) -> str: """异步Redis搜索""" try: # 使用store进行向量搜索 results = await self.store.asearch( namespace=("conversations",), query=query, limit=5 ) return f"Found {len(results)} relevant conversations" except Exception as e: return f"Redis search error: {str(e)}" async def _process_observation(self, tool_result: str) -> str: """异步处理观察结果""" # 模拟异步处理 await asyncio.sleep(0.1) return f"Processed: {tool_result}" def _parse_action(self, content: str) -> Optional[Dict[str, Any]]: """解析行动""" # 简单的行动解析逻辑 if "SQL:" in content: query = content.split("SQL:")[-1].strip() return {"tool": "sql_query", "query": query} elif "SEARCH:" in content: query = content.split("SEARCH:")[-1].strip() return {"tool": "redis_search", "query": query} return None async def process_message(self, message: str, thread_id: str) -> Dict[str, Any]: """异步处理消息""" if not self._initialized: await self.initialize() # 构建输入 input_data = { "messages": [HumanMessage(content=message)], "thread_id": thread_id } # 异步执行图 config = {"configurable": {"thread_id": thread_id}} result = await self.graph.ainvoke(input_data, config) # 提取响应 messages = result.get("messages", []) response = messages[-1].content if messages else "No response" return { "response": response, "thread_id": thread_id, "iterations": result.get("iteration_count", 0) } async def health_check(self) -> Dict[str, Any]: """异步健康检查""" try: # 检查Redis连接 await self.redis_client.ping() # 检查SQL连接 sql_health = await self.sql_tools.health_check() return { "status": "healthy", "redis": "connected", "sql": sql_health, "graph": "compiled" if self.graph else "not_compiled" } except Exception as e: return { "status": "unhealthy", "error": str(e) } async def cleanup(self): """异步清理资源""" if self.sql_tools: await self.sql_tools.cleanup() if self.checkpointer: await self.checkpointer.aclose() if self.store: await self.store.aclose() ``` ### 3. sql_tools.py 异步化改造 **原有问题**: - 可能使用同步数据库连接 - 缺少异步数据库操作 **修改后的sql_tools.py**: ```python import asyncio from typing import List, Dict, Any, Optional import asyncpg from contextlib import asynccontextmanager from config import Config class AsyncSQLTools: def __init__(self, database_url: str): self.database_url = database_url self.connection_pool = None self._initialized = False async def initialize(self): """初始化异步连接池""" if self._initialized: return self.connection_pool = await asyncpg.create_pool( self.database_url, min_size=2, max_size=10, command_timeout=30 ) self._initialized = True @asynccontextmanager async def get_connection(self): """获取数据库连接的上下文管理器""" if not self.connection_pool: await self.initialize() async with self.connection_pool.acquire() as connection: yield connection async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """异步执行SQL查询""" try: async with self.get_connection() as conn: if params: result = await conn.fetch(query, *params) else: result = await conn.fetch(query) # 转换为字典列表 return [dict(record) for record in result] except Exception as e: return [{"error": str(e)}] async def execute_non_query(self, query: str, params: Optional[tuple] = None) -> Dict[str, Any]: """异步执行非查询SQL(INSERT, UPDATE, DELETE)""" try: async with self.get_connection() as conn: if params: result = await conn.execute(query, *params) else: result = await conn.execute(query) return {"success": True, "rows_affected": result} except Exception as e: return {"success": False, "error": str(e)} async def health_check(self) -> str: """异步健康检查""" try: async with self.get_connection() as conn: result = await conn.fetchval("SELECT 1") return "connected" if result == 1 else "error" except Exception as e: return f"disconnected: {str(e)}" async def cleanup(self): """异步清理连接池""" if self.connection_pool: await self.connection_pool.close() ``` ### 4. enhanced_redis_api.py 异步化改造 **原有问题**: - 可能使用同步Redis客户端 - 缺少异步Redis操作 **修改后的enhanced_redis_api.py**: ```python import json import asyncio from typing import Any, Dict, List, Optional import redis.asyncio as redis from contextlib import asynccontextmanager class AsyncRedisAPI: def __init__(self, redis_client: redis.Redis): self.redis_client = redis_client async def set_data(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """异步设置数据""" try: serialized_value = json.dumps(value, default=str) if ttl: await self.redis_client.setex(key, ttl, serialized_value) else: await self.redis_client.set(key, serialized_value) return True except Exception as e: print(f"Redis set error: {e}") return False async def get_data(self, key: str) -> Optional[Any]: """异步获取数据""" try: value = await self.redis_client.get(key) if value: return json.loads(value) return None except Exception as e: print(f"Redis get error: {e}") return None async def delete_data(self, key: str) -> bool: """异步删除数据""" try: result = await self.redis_client.delete(key) return result > 0 except Exception as e: print(f"Redis delete error: {e}") return False async def search_keys(self, pattern: str) -> List[str]: """异步搜索键""" try: keys = await self.redis_client.keys(pattern) return [key.decode() if isinstance(key, bytes) else key for key in keys] except Exception as e: print(f"Redis search error: {e}") return [] async def get_all_data(self, pattern: str = "*") -> Dict[str, Any]: """异步获取所有匹配的数据""" try: keys = await self.search_keys(pattern) if not keys: return {} # 批量获取数据 pipeline = self.redis_client.pipeline() for key in keys: pipeline.get(key) values = await pipeline.execute() result = {} for key, value in zip(keys, values): if value: try: result[key] = json.loads(value) except json.JSONDecodeError: result[key] = value.decode() if isinstance(value, bytes) else value return result except Exception as e: print(f"Redis get_all error: {e}") return {} async def batch_set(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool: """异步批量设置数据""" try: pipeline = self.redis_client.pipeline() for key, value in data.items(): serialized_value = json.dumps(value, default=str) if ttl: pipeline.setex(key, ttl, serialized_value) else: pipeline.set(key, serialized_value) await pipeline.execute() return True except Exception as e: print(f"Redis batch_set error: {e}") return False async def health_check(self) -> Dict[str, Any]: """异步健康检查""" try: # 测试连接 await self.redis_client.ping() # 获取信息 info = await self.redis_client.info() return { "status": "healthy", "connected_clients": info.get("connected_clients", 0), "used_memory": info.get("used_memory_human", "unknown"), "redis_version": info.get("redis_version", "unknown") } except Exception as e: return { "status": "unhealthy", "error": str(e) } ``` ### 5. state.py 优化 **修改后的state.py**: ```python from typing import List, Dict, Any, Optional from typing_extensions import TypedDict from langgraph.graph.message import add_messages from langchain_core.messages import BaseMessage class AgentState(TypedDict): """Agent状态定义""" messages: List[BaseMessage] # 消息历史 next_action: Optional[Dict[str, Any]] # 下一步行动 tool_results: Optional[str] # 工具执行结果 observations: List[str] # 观察结果 action_history: List[Dict[str, Any]] # 行动历史 iteration_count: int # 迭代次数 thread_id: str # 线程ID error: Optional[str] # 错误信息 ``` ### 6. config.py 异步优化 **修改后的config.py**: ```python import os from typing import Optional class Config: # 基础配置 DEBUG = os.getenv("DEBUG", "False").lower() == "true" # 数据库配置 DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/dbname") # Redis配置 REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") # LLM配置 OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # 异步配置 MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10")) REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30")) # 连接池配置 DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10")) REDIS_POOL_SIZE = int(os.getenv("REDIS_POOL_SIZE", "20")) # Agent配置 MAX_ITERATIONS = int(os.getenv("MAX_ITERATIONS", "5")) ENABLE_MEMORY = os.getenv("ENABLE_MEMORY", "True").lower() == "true" @classmethod def validate(cls) -> bool: """验证配置""" required_vars = [ "DATABASE_URL", "REDIS_URL", "OPENAI_API_KEY" ] missing_vars = [var for var in required_vars if not getattr(cls, var)] if missing_vars: raise ValueError(f"Missing required environment variables: {missing_vars}") return True ``` ## 启动和部署 ### 1. 依赖安装 **requirements.txt**: ``` Flask>=2.0.0 asgiref>=3.5.0 uvicorn>=0.20.0 redis>=4.5.0 asyncpg>=0.27.0 langgraph>=0.1.0 langgraph-checkpoint-redis>=0.1.0 langchain>=0.1.0 langchain-openai>=0.1.0 tenacity>=8.0.0 ``` ### 2. 启动命令 ```bash # 直接启动 python api.py # 或者使用uvicorn uvicorn api:asgi_app --host 0.0.0.0 --port 5000 --reload ``` ### 3. 环境变量配置 **.env**: ```env DEBUG=False DATABASE_URL=postgresql://user:password@localhost/dbname REDIS_URL=redis://localhost:6379 OPENAI_API_KEY=your_openai_key MAX_WORKERS=10 REQUEST_TIMEOUT=30 DB_POOL_SIZE=10 REDIS_POOL_SIZE=20 MAX_ITERATIONS=5 ENABLE_MEMORY=True ``` ## 主要改进点 ### 1. 完全消除事件循环复杂性 - **移除**:`run_async_safely`、`ensure_agent_ready_sync`等函数 - **替换**:使用ASGI模式和原生async/await - **优化**:统一的异步上下文管理 ### 2. 彻底解决"Event loop is closed"错误 - **原因**:Flask创建新事件循环导致的客户端失效 - **解决**:使用WsgiToAsgi适配器和统一的异步资源管理 - **预防**:上下文管理器确保资源正确生命周期 ### 3. 性能优化 - **并发处理**:真正的异步I/O操作 - **连接池**:数据库和Redis连接池化 - **批量操作**:Redis管道和批量SQL操作 ### 4. 架构清晰化 - **分层设计**:API层、Agent层、工具层分离 - **资源管理**:统一的异步资源初始化和清理 - **错误处理**:完整的异步错误处理机制 ## 测试和验证 ### 1. 功能测试 ```bash # 健康检查 curl http://localhost:5000/health # 聊天测试 curl -X POST http://localhost:5000/chat \ -H "Content-Type: application/json" \ -d '{"message": "Hello", "thread_id": "test123"}' # Redis直接访问 curl -X POST http://localhost:5000/redis/direct \ -H "Content-Type: application/json" \ -d '{"operation": "set", "key": "test", "value": "hello"}' ``` ### 2. 负载测试 ```python import asyncio import aiohttp import time async def test_load(): async with aiohttp.ClientSession() as session: tasks = [] for i in range(100): task = session.post( "http://localhost:5000/chat", json={"message": f"Test {i}", "thread_id": f"thread_{i}"} ) tasks.append(task) start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() print(f"100 requests completed in {end_time - start_time:.2f} seconds") asyncio.run(test_load()) ``` ## 总结 这个完整的异步迁移方案解决了所有核心问题: 1. **彻底消除事件循环管理复杂性** 2. **实现真正的异步Flask应用** 3. **解决"Event loop is closed"等错误** 4. **保持现有功能完全不变** 5. **支持`python api.py`直接启动** 通过这种架构,你的React Agent项目将具备真正的异步能力,性能显著提升,代码更加清晰和可维护。