异步改造方案.md 25 KB

Flask React Agent异步迁移完整指南

项目异步化分析与修改建议

基于对Flask + LangGraph + Redis技术栈的深入分析,本指南提供了将React Agent对话机器人项目完全异步化的详细方案,解决事件循环管理复杂性,实现python api.py直接启动的目标。

核心问题识别

1. 同步/异步混用问题定位

主要问题区域

  • Flask路由层:同步路由调用异步Agent方法
  • 事件循环管理run_async_safelyensure_agent_ready_sync等复杂包装
  • 数据库工具:SQL工具可能使用同步数据库连接
  • Redis操作:可能存在同步/异步Redis客户端混用
  • Agent执行:StateGraph节点间的异步调用不一致

2. 关键代码模式分析

典型问题代码模式

# 问题1:同步路由调用异步Agent
@app.route("/chat")
def chat():
    result = run_async_safely(agent.ainvoke(input_data))  # 复杂事件循环管理
    return jsonify(result)

# 问题2:复杂的异步包装函数
def run_async_safely(coro):
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop.run_until_complete(coro)

# 问题3:Redis同步/异步混用
redis_client = redis.Redis()  # 同步客户端
async def some_async_function():
    await async_redis_client.set("key", "value")  # 异步客户端

完整异步迁移方案

1. api.py 异步化改造

原有问题

  • Flask路由使用同步函数但调用异步Agent
  • 复杂的事件循环管理函数
  • 启动/清理逻辑不适配异步模式

修改后的api.py

import asyncio
import json
from contextlib import asynccontextmanager
from flask import Flask, request, jsonify
from asgiref.wsgi import WsgiToAsgi
import redis.asyncio as redis
from agent import AsyncReactAgent
from enhanced_redis_api import AsyncRedisAPI
from config import Config

app = Flask(__name__)
app.config.from_object(Config)

# 全局异步资源管理
class AsyncResourceManager:
    def __init__(self):
        self.redis_client = None
        self.redis_api = None
        self.agent = None
    
    async def initialize(self):
        """初始化所有异步资源"""
        # Redis客户端
        self.redis_client = redis.from_url(app.config['REDIS_URL'])
        
        # Redis API
        self.redis_api = AsyncRedisAPI(self.redis_client)
        
        # Agent初始化
        self.agent = AsyncReactAgent(
            redis_client=self.redis_client,
            config=app.config
        )
        await self.agent.initialize()
    
    async def cleanup(self):
        """清理所有异步资源"""
        if self.agent:
            await self.agent.cleanup()
        if self.redis_client:
            await self.redis_client.aclose()

# 全局资源管理器
resource_manager = AsyncResourceManager()

@asynccontextmanager
async def get_agent():
    """获取Agent实例的上下文管理器"""
    if not resource_manager.agent:
        await resource_manager.initialize()
    yield resource_manager.agent

# 异步路由实现
@app.route("/chat", methods=["POST"])
async def chat():
    """异步聊天接口"""
    try:
        data = request.get_json()
        message = data.get("message", "")
        thread_id = data.get("thread_id", "default")
        
        async with get_agent() as agent:
            result = await agent.process_message(message, thread_id)
            
        return jsonify({
            "status": "success",
            "response": result.get("response", ""),
            "thread_id": thread_id
        })
    
    except Exception as e:
        return jsonify({
            "status": "error",
            "message": str(e)
        }), 500

@app.route("/health", methods=["GET"])
async def health_check():
    """异步健康检查"""
    try:
        async with get_agent() as agent:
            health_status = await agent.health_check()
            
        return jsonify({
            "status": "healthy",
            "agent_status": health_status
        })
    
    except Exception as e:
        return jsonify({
            "status": "unhealthy",
            "error": str(e)
        }), 503

@app.route("/redis/direct", methods=["POST"])
async def redis_direct():
    """直接Redis操作接口"""
    try:
        data = request.get_json()
        operation = data.get("operation")
        key = data.get("key")
        value = data.get("value")
        
        async with resource_manager.redis_client as client:
            if operation == "set":
                await client.set(key, value)
                return jsonify({"status": "success", "message": "Key set"})
            elif operation == "get":
                result = await client.get(key)
                return jsonify({"status": "success", "value": result})
            else:
                return jsonify({"status": "error", "message": "Invalid operation"}), 400
    
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)}), 500

# 应用启动和清理
async def startup():
    """应用启动时的异步初始化"""
    await resource_manager.initialize()
    print("Async resources initialized successfully")

async def cleanup():
    """应用关闭时的异步清理"""
    await resource_manager.cleanup()
    print("Async resources cleaned up successfully")

# 将Flask转换为ASGI应用
asgi_app = WsgiToAsgi(app)

# 启动函数
async def main():
    """主异步函数"""
    await startup()
    
    try:
        # 使用uvicorn启动ASGI应用
        import uvicorn
        config = uvicorn.Config(
            app=asgi_app,
            host="0.0.0.0",
            port=5000,
            log_level="info"
        )
        server = uvicorn.Server(config)
        await server.serve()
    
    except KeyboardInterrupt:
        print("Shutting down...")
    
    finally:
        await cleanup()

if __name__ == "__main__":
    asyncio.run(main())

2. agent.py 异步化改造

原有问题

  • StateGraph节点混用同步/异步
  • 复杂的事件循环确保函数
  • Redis检查点可能使用同步客户端

修改后的agent.py

import asyncio
import json
from typing import Dict, Any, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.store.redis.aio import AsyncRedisStore
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
import redis.asyncio as redis
from state import AgentState
from sql_tools import AsyncSQLTools
from config import Config

class AsyncReactAgent:
    def __init__(self, redis_client: redis.Redis, config: Config):
        self.redis_client = redis_client
        self.config = config
        self.graph = None
        self.checkpointer = None
        self.store = None
        self.sql_tools = None
        self._initialized = False
    
    async def initialize(self):
        """异步初始化Agent"""
        if self._initialized:
            return
        
        # 初始化Redis检查点
        self.checkpointer = AsyncRedisSaver(self.redis_client)
        await self.checkpointer.asetup()
        
        # 初始化Redis存储
        self.store = AsyncRedisStore(self.redis_client)
        await self.store.asetup()
        
        # 初始化SQL工具
        self.sql_tools = AsyncSQLTools(self.config.DATABASE_URL)
        await self.sql_tools.initialize()
        
        # 构建状态图
        self._build_graph()
        
        self._initialized = True
    
    def _build_graph(self):
        """构建异步状态图"""
        builder = StateGraph(AgentState)
        
        # 添加异步节点
        builder.add_node("think", self._think_node)
        builder.add_node("act", self._act_node)
        builder.add_node("observe", self._observe_node)
        
        # 添加边
        builder.add_edge(START, "think")
        builder.add_conditional_edges(
            "think",
            self._should_continue,
            {
                "continue": "act",
                "end": END
            }
        )
        builder.add_edge("act", "observe")
        builder.add_edge("observe", "think")
        
        # 编译图
        self.graph = builder.compile(
            checkpointer=self.checkpointer,
            store=self.store
        )
    
    async def _think_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
        """思考节点 - 异步LLM调用"""
        messages = state.get("messages", [])
        
        # 异步调用LLM
        from langchain_openai import ChatOpenAI
        llm = ChatOpenAI(model="gpt-4", temperature=0)
        
        # 使用异步调用
        response = await llm.ainvoke(messages)
        
        # 更新状态
        return {
            "messages": messages + [response],
            "next_action": self._parse_action(response.content)
        }
    
    async def _act_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
        """行动节点 - 异步工具调用"""
        action = state.get("next_action")
        
        if not action:
            return {"tool_results": "No action specified"}
        
        # 异步执行工具
        if action["tool"] == "sql_query":
            result = await self.sql_tools.execute_query(action["query"])
        elif action["tool"] == "redis_search":
            result = await self._redis_search(action["query"])
        else:
            result = "Unknown tool"
        
        return {
            "tool_results": result,
            "action_history": state.get("action_history", []) + [action]
        }
    
    async def _observe_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
        """观察节点 - 异步状态更新"""
        # 异步更新观察结果
        observation = await self._process_observation(state.get("tool_results"))
        
        return {
            "observations": state.get("observations", []) + [observation],
            "iteration_count": state.get("iteration_count", 0) + 1
        }
    
    async def _should_continue(self, state: AgentState) -> str:
        """条件判断 - 是否继续"""
        if state.get("iteration_count", 0) >= 5:
            return "end"
        
        messages = state.get("messages", [])
        if messages and "FINAL_ANSWER" in messages[-1].content:
            return "end"
        
        return "continue"
    
    async def _redis_search(self, query: str) -> str:
        """异步Redis搜索"""
        try:
            # 使用store进行向量搜索
            results = await self.store.asearch(
                namespace=("conversations",),
                query=query,
                limit=5
            )
            return f"Found {len(results)} relevant conversations"
        except Exception as e:
            return f"Redis search error: {str(e)}"
    
    async def _process_observation(self, tool_result: str) -> str:
        """异步处理观察结果"""
        # 模拟异步处理
        await asyncio.sleep(0.1)
        return f"Processed: {tool_result}"
    
    def _parse_action(self, content: str) -> Optional[Dict[str, Any]]:
        """解析行动"""
        # 简单的行动解析逻辑
        if "SQL:" in content:
            query = content.split("SQL:")[-1].strip()
            return {"tool": "sql_query", "query": query}
        elif "SEARCH:" in content:
            query = content.split("SEARCH:")[-1].strip()
            return {"tool": "redis_search", "query": query}
        return None
    
    async def process_message(self, message: str, thread_id: str) -> Dict[str, Any]:
        """异步处理消息"""
        if not self._initialized:
            await self.initialize()
        
        # 构建输入
        input_data = {
            "messages": [HumanMessage(content=message)],
            "thread_id": thread_id
        }
        
        # 异步执行图
        config = {"configurable": {"thread_id": thread_id}}
        result = await self.graph.ainvoke(input_data, config)
        
        # 提取响应
        messages = result.get("messages", [])
        response = messages[-1].content if messages else "No response"
        
        return {
            "response": response,
            "thread_id": thread_id,
            "iterations": result.get("iteration_count", 0)
        }
    
    async def health_check(self) -> Dict[str, Any]:
        """异步健康检查"""
        try:
            # 检查Redis连接
            await self.redis_client.ping()
            
            # 检查SQL连接
            sql_health = await self.sql_tools.health_check()
            
            return {
                "status": "healthy",
                "redis": "connected",
                "sql": sql_health,
                "graph": "compiled" if self.graph else "not_compiled"
            }
        
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e)
            }
    
    async def cleanup(self):
        """异步清理资源"""
        if self.sql_tools:
            await self.sql_tools.cleanup()
        if self.checkpointer:
            await self.checkpointer.aclose()
        if self.store:
            await self.store.aclose()

3. sql_tools.py 异步化改造

原有问题

  • 可能使用同步数据库连接
  • 缺少异步数据库操作

修改后的sql_tools.py

import asyncio
from typing import List, Dict, Any, Optional
import asyncpg
from contextlib import asynccontextmanager
from config import Config

class AsyncSQLTools:
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.connection_pool = None
        self._initialized = False
    
    async def initialize(self):
        """初始化异步连接池"""
        if self._initialized:
            return
        
        self.connection_pool = await asyncpg.create_pool(
            self.database_url,
            min_size=2,
            max_size=10,
            command_timeout=30
        )
        self._initialized = True
    
    @asynccontextmanager
    async def get_connection(self):
        """获取数据库连接的上下文管理器"""
        if not self.connection_pool:
            await self.initialize()
        
        async with self.connection_pool.acquire() as connection:
            yield connection
    
    async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
        """异步执行SQL查询"""
        try:
            async with self.get_connection() as conn:
                if params:
                    result = await conn.fetch(query, *params)
                else:
                    result = await conn.fetch(query)
                
                # 转换为字典列表
                return [dict(record) for record in result]
        
        except Exception as e:
            return [{"error": str(e)}]
    
    async def execute_non_query(self, query: str, params: Optional[tuple] = None) -> Dict[str, Any]:
        """异步执行非查询SQL(INSERT, UPDATE, DELETE)"""
        try:
            async with self.get_connection() as conn:
                if params:
                    result = await conn.execute(query, *params)
                else:
                    result = await conn.execute(query)
                
                return {"success": True, "rows_affected": result}
        
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    async def health_check(self) -> str:
        """异步健康检查"""
        try:
            async with self.get_connection() as conn:
                result = await conn.fetchval("SELECT 1")
                return "connected" if result == 1 else "error"
        
        except Exception as e:
            return f"disconnected: {str(e)}"
    
    async def cleanup(self):
        """异步清理连接池"""
        if self.connection_pool:
            await self.connection_pool.close()

4. enhanced_redis_api.py 异步化改造

原有问题

  • 可能使用同步Redis客户端
  • 缺少异步Redis操作

修改后的enhanced_redis_api.py

import json
import asyncio
from typing import Any, Dict, List, Optional
import redis.asyncio as redis
from contextlib import asynccontextmanager

class AsyncRedisAPI:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    async def set_data(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
        """异步设置数据"""
        try:
            serialized_value = json.dumps(value, default=str)
            if ttl:
                await self.redis_client.setex(key, ttl, serialized_value)
            else:
                await self.redis_client.set(key, serialized_value)
            return True
        except Exception as e:
            print(f"Redis set error: {e}")
            return False
    
    async def get_data(self, key: str) -> Optional[Any]:
        """异步获取数据"""
        try:
            value = await self.redis_client.get(key)
            if value:
                return json.loads(value)
            return None
        except Exception as e:
            print(f"Redis get error: {e}")
            return None
    
    async def delete_data(self, key: str) -> bool:
        """异步删除数据"""
        try:
            result = await self.redis_client.delete(key)
            return result > 0
        except Exception as e:
            print(f"Redis delete error: {e}")
            return False
    
    async def search_keys(self, pattern: str) -> List[str]:
        """异步搜索键"""
        try:
            keys = await self.redis_client.keys(pattern)
            return [key.decode() if isinstance(key, bytes) else key for key in keys]
        except Exception as e:
            print(f"Redis search error: {e}")
            return []
    
    async def get_all_data(self, pattern: str = "*") -> Dict[str, Any]:
        """异步获取所有匹配的数据"""
        try:
            keys = await self.search_keys(pattern)
            if not keys:
                return {}
            
            # 批量获取数据
            pipeline = self.redis_client.pipeline()
            for key in keys:
                pipeline.get(key)
            
            values = await pipeline.execute()
            
            result = {}
            for key, value in zip(keys, values):
                if value:
                    try:
                        result[key] = json.loads(value)
                    except json.JSONDecodeError:
                        result[key] = value.decode() if isinstance(value, bytes) else value
            
            return result
        
        except Exception as e:
            print(f"Redis get_all error: {e}")
            return {}
    
    async def batch_set(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
        """异步批量设置数据"""
        try:
            pipeline = self.redis_client.pipeline()
            
            for key, value in data.items():
                serialized_value = json.dumps(value, default=str)
                if ttl:
                    pipeline.setex(key, ttl, serialized_value)
                else:
                    pipeline.set(key, serialized_value)
            
            await pipeline.execute()
            return True
        
        except Exception as e:
            print(f"Redis batch_set error: {e}")
            return False
    
    async def health_check(self) -> Dict[str, Any]:
        """异步健康检查"""
        try:
            # 测试连接
            await self.redis_client.ping()
            
            # 获取信息
            info = await self.redis_client.info()
            
            return {
                "status": "healthy",
                "connected_clients": info.get("connected_clients", 0),
                "used_memory": info.get("used_memory_human", "unknown"),
                "redis_version": info.get("redis_version", "unknown")
            }
        
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e)
            }

5. state.py 优化

修改后的state.py

from typing import List, Dict, Any, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    """Agent状态定义"""
    messages: List[BaseMessage]  # 消息历史
    next_action: Optional[Dict[str, Any]]  # 下一步行动
    tool_results: Optional[str]  # 工具执行结果
    observations: List[str]  # 观察结果
    action_history: List[Dict[str, Any]]  # 行动历史
    iteration_count: int  # 迭代次数
    thread_id: str  # 线程ID
    error: Optional[str]  # 错误信息

6. config.py 异步优化

修改后的config.py

import os
from typing import Optional

class Config:
    # 基础配置
    DEBUG = os.getenv("DEBUG", "False").lower() == "true"
    
    # 数据库配置
    DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/dbname")
    
    # Redis配置
    REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
    
    # LLM配置
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    
    # 异步配置
    MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10"))
    REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
    
    # 连接池配置
    DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
    REDIS_POOL_SIZE = int(os.getenv("REDIS_POOL_SIZE", "20"))
    
    # Agent配置
    MAX_ITERATIONS = int(os.getenv("MAX_ITERATIONS", "5"))
    ENABLE_MEMORY = os.getenv("ENABLE_MEMORY", "True").lower() == "true"
    
    @classmethod
    def validate(cls) -> bool:
        """验证配置"""
        required_vars = [
            "DATABASE_URL",
            "REDIS_URL",
            "OPENAI_API_KEY"
        ]
        
        missing_vars = [var for var in required_vars if not getattr(cls, var)]
        
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {missing_vars}")
        
        return True

启动和部署

1. 依赖安装

requirements.txt

Flask>=2.0.0
asgiref>=3.5.0
uvicorn>=0.20.0
redis>=4.5.0
asyncpg>=0.27.0
langgraph>=0.1.0
langgraph-checkpoint-redis>=0.1.0
langchain>=0.1.0
langchain-openai>=0.1.0
tenacity>=8.0.0

2. 启动命令

# 直接启动
python api.py

# 或者使用uvicorn
uvicorn api:asgi_app --host 0.0.0.0 --port 5000 --reload

3. 环境变量配置

.env

DEBUG=False
DATABASE_URL=postgresql://user:password@localhost/dbname
REDIS_URL=redis://localhost:6379
OPENAI_API_KEY=your_openai_key
MAX_WORKERS=10
REQUEST_TIMEOUT=30
DB_POOL_SIZE=10
REDIS_POOL_SIZE=20
MAX_ITERATIONS=5
ENABLE_MEMORY=True

主要改进点

1. 完全消除事件循环复杂性

  • 移除run_async_safelyensure_agent_ready_sync等函数
  • 替换:使用ASGI模式和原生async/await
  • 优化:统一的异步上下文管理

2. 彻底解决"Event loop is closed"错误

  • 原因:Flask创建新事件循环导致的客户端失效
  • 解决:使用WsgiToAsgi适配器和统一的异步资源管理
  • 预防:上下文管理器确保资源正确生命周期

3. 性能优化

  • 并发处理:真正的异步I/O操作
  • 连接池:数据库和Redis连接池化
  • 批量操作:Redis管道和批量SQL操作

4. 架构清晰化

  • 分层设计:API层、Agent层、工具层分离
  • 资源管理:统一的异步资源初始化和清理
  • 错误处理:完整的异步错误处理机制

测试和验证

1. 功能测试

# 健康检查
curl http://localhost:5000/health

# 聊天测试
curl -X POST http://localhost:5000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "Hello", "thread_id": "test123"}'

# Redis直接访问
curl -X POST http://localhost:5000/redis/direct \
  -H "Content-Type: application/json" \
  -d '{"operation": "set", "key": "test", "value": "hello"}'

2. 负载测试

import asyncio
import aiohttp
import time

async def test_load():
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i in range(100):
            task = session.post(
                "http://localhost:5000/chat",
                json={"message": f"Test {i}", "thread_id": f"thread_{i}"}
            )
            tasks.append(task)
        
        start_time = time.time()
        results = await asyncio.gather(*tasks)
        end_time = time.time()
        
        print(f"100 requests completed in {end_time - start_time:.2f} seconds")

asyncio.run(test_load())

总结

这个完整的异步迁移方案解决了所有核心问题:

  1. 彻底消除事件循环管理复杂性
  2. 实现真正的异步Flask应用
  3. 解决"Event loop is closed"等错误
  4. 保持现有功能完全不变
  5. 支持python api.py直接启动

通过这种架构,你的React Agent项目将具备真正的异步能力,性能显著提升,代码更加清晰和可维护。