基于对Flask + LangGraph + Redis技术栈的深入分析,本指南提供了将React Agent对话机器人项目完全异步化的详细方案,解决事件循环管理复杂性,实现python api.py
直接启动的目标。
主要问题区域:
run_async_safely
、ensure_agent_ready_sync
等复杂包装典型问题代码模式:
# 问题1:同步路由调用异步Agent
@app.route("/chat")
def chat():
result = run_async_safely(agent.ainvoke(input_data)) # 复杂事件循环管理
return jsonify(result)
# 问题2:复杂的异步包装函数
def run_async_safely(coro):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
# 问题3:Redis同步/异步混用
redis_client = redis.Redis() # 同步客户端
async def some_async_function():
await async_redis_client.set("key", "value") # 异步客户端
原有问题:
修改后的api.py:
import asyncio
import json
from contextlib import asynccontextmanager
from flask import Flask, request, jsonify
from asgiref.wsgi import WsgiToAsgi
import redis.asyncio as redis
from agent import AsyncReactAgent
from enhanced_redis_api import AsyncRedisAPI
from config import Config
app = Flask(__name__)
app.config.from_object(Config)
# 全局异步资源管理
class AsyncResourceManager:
def __init__(self):
self.redis_client = None
self.redis_api = None
self.agent = None
async def initialize(self):
"""初始化所有异步资源"""
# Redis客户端
self.redis_client = redis.from_url(app.config['REDIS_URL'])
# Redis API
self.redis_api = AsyncRedisAPI(self.redis_client)
# Agent初始化
self.agent = AsyncReactAgent(
redis_client=self.redis_client,
config=app.config
)
await self.agent.initialize()
async def cleanup(self):
"""清理所有异步资源"""
if self.agent:
await self.agent.cleanup()
if self.redis_client:
await self.redis_client.aclose()
# 全局资源管理器
resource_manager = AsyncResourceManager()
@asynccontextmanager
async def get_agent():
"""获取Agent实例的上下文管理器"""
if not resource_manager.agent:
await resource_manager.initialize()
yield resource_manager.agent
# 异步路由实现
@app.route("/chat", methods=["POST"])
async def chat():
"""异步聊天接口"""
try:
data = request.get_json()
message = data.get("message", "")
thread_id = data.get("thread_id", "default")
async with get_agent() as agent:
result = await agent.process_message(message, thread_id)
return jsonify({
"status": "success",
"response": result.get("response", ""),
"thread_id": thread_id
})
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
@app.route("/health", methods=["GET"])
async def health_check():
"""异步健康检查"""
try:
async with get_agent() as agent:
health_status = await agent.health_check()
return jsonify({
"status": "healthy",
"agent_status": health_status
})
except Exception as e:
return jsonify({
"status": "unhealthy",
"error": str(e)
}), 503
@app.route("/redis/direct", methods=["POST"])
async def redis_direct():
"""直接Redis操作接口"""
try:
data = request.get_json()
operation = data.get("operation")
key = data.get("key")
value = data.get("value")
async with resource_manager.redis_client as client:
if operation == "set":
await client.set(key, value)
return jsonify({"status": "success", "message": "Key set"})
elif operation == "get":
result = await client.get(key)
return jsonify({"status": "success", "value": result})
else:
return jsonify({"status": "error", "message": "Invalid operation"}), 400
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
# 应用启动和清理
async def startup():
"""应用启动时的异步初始化"""
await resource_manager.initialize()
print("Async resources initialized successfully")
async def cleanup():
"""应用关闭时的异步清理"""
await resource_manager.cleanup()
print("Async resources cleaned up successfully")
# 将Flask转换为ASGI应用
asgi_app = WsgiToAsgi(app)
# 启动函数
async def main():
"""主异步函数"""
await startup()
try:
# 使用uvicorn启动ASGI应用
import uvicorn
config = uvicorn.Config(
app=asgi_app,
host="0.0.0.0",
port=5000,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
except KeyboardInterrupt:
print("Shutting down...")
finally:
await cleanup()
if __name__ == "__main__":
asyncio.run(main())
原有问题:
修改后的agent.py:
import asyncio
import json
from typing import Dict, Any, Optional
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.store.redis.aio import AsyncRedisStore
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.runnables import RunnableConfig
import redis.asyncio as redis
from state import AgentState
from sql_tools import AsyncSQLTools
from config import Config
class AsyncReactAgent:
def __init__(self, redis_client: redis.Redis, config: Config):
self.redis_client = redis_client
self.config = config
self.graph = None
self.checkpointer = None
self.store = None
self.sql_tools = None
self._initialized = False
async def initialize(self):
"""异步初始化Agent"""
if self._initialized:
return
# 初始化Redis检查点
self.checkpointer = AsyncRedisSaver(self.redis_client)
await self.checkpointer.asetup()
# 初始化Redis存储
self.store = AsyncRedisStore(self.redis_client)
await self.store.asetup()
# 初始化SQL工具
self.sql_tools = AsyncSQLTools(self.config.DATABASE_URL)
await self.sql_tools.initialize()
# 构建状态图
self._build_graph()
self._initialized = True
def _build_graph(self):
"""构建异步状态图"""
builder = StateGraph(AgentState)
# 添加异步节点
builder.add_node("think", self._think_node)
builder.add_node("act", self._act_node)
builder.add_node("observe", self._observe_node)
# 添加边
builder.add_edge(START, "think")
builder.add_conditional_edges(
"think",
self._should_continue,
{
"continue": "act",
"end": END
}
)
builder.add_edge("act", "observe")
builder.add_edge("observe", "think")
# 编译图
self.graph = builder.compile(
checkpointer=self.checkpointer,
store=self.store
)
async def _think_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
"""思考节点 - 异步LLM调用"""
messages = state.get("messages", [])
# 异步调用LLM
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4", temperature=0)
# 使用异步调用
response = await llm.ainvoke(messages)
# 更新状态
return {
"messages": messages + [response],
"next_action": self._parse_action(response.content)
}
async def _act_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
"""行动节点 - 异步工具调用"""
action = state.get("next_action")
if not action:
return {"tool_results": "No action specified"}
# 异步执行工具
if action["tool"] == "sql_query":
result = await self.sql_tools.execute_query(action["query"])
elif action["tool"] == "redis_search":
result = await self._redis_search(action["query"])
else:
result = "Unknown tool"
return {
"tool_results": result,
"action_history": state.get("action_history", []) + [action]
}
async def _observe_node(self, state: AgentState, config: RunnableConfig) -> Dict[str, Any]:
"""观察节点 - 异步状态更新"""
# 异步更新观察结果
observation = await self._process_observation(state.get("tool_results"))
return {
"observations": state.get("observations", []) + [observation],
"iteration_count": state.get("iteration_count", 0) + 1
}
async def _should_continue(self, state: AgentState) -> str:
"""条件判断 - 是否继续"""
if state.get("iteration_count", 0) >= 5:
return "end"
messages = state.get("messages", [])
if messages and "FINAL_ANSWER" in messages[-1].content:
return "end"
return "continue"
async def _redis_search(self, query: str) -> str:
"""异步Redis搜索"""
try:
# 使用store进行向量搜索
results = await self.store.asearch(
namespace=("conversations",),
query=query,
limit=5
)
return f"Found {len(results)} relevant conversations"
except Exception as e:
return f"Redis search error: {str(e)}"
async def _process_observation(self, tool_result: str) -> str:
"""异步处理观察结果"""
# 模拟异步处理
await asyncio.sleep(0.1)
return f"Processed: {tool_result}"
def _parse_action(self, content: str) -> Optional[Dict[str, Any]]:
"""解析行动"""
# 简单的行动解析逻辑
if "SQL:" in content:
query = content.split("SQL:")[-1].strip()
return {"tool": "sql_query", "query": query}
elif "SEARCH:" in content:
query = content.split("SEARCH:")[-1].strip()
return {"tool": "redis_search", "query": query}
return None
async def process_message(self, message: str, thread_id: str) -> Dict[str, Any]:
"""异步处理消息"""
if not self._initialized:
await self.initialize()
# 构建输入
input_data = {
"messages": [HumanMessage(content=message)],
"thread_id": thread_id
}
# 异步执行图
config = {"configurable": {"thread_id": thread_id}}
result = await self.graph.ainvoke(input_data, config)
# 提取响应
messages = result.get("messages", [])
response = messages[-1].content if messages else "No response"
return {
"response": response,
"thread_id": thread_id,
"iterations": result.get("iteration_count", 0)
}
async def health_check(self) -> Dict[str, Any]:
"""异步健康检查"""
try:
# 检查Redis连接
await self.redis_client.ping()
# 检查SQL连接
sql_health = await self.sql_tools.health_check()
return {
"status": "healthy",
"redis": "connected",
"sql": sql_health,
"graph": "compiled" if self.graph else "not_compiled"
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
async def cleanup(self):
"""异步清理资源"""
if self.sql_tools:
await self.sql_tools.cleanup()
if self.checkpointer:
await self.checkpointer.aclose()
if self.store:
await self.store.aclose()
原有问题:
修改后的sql_tools.py:
import asyncio
from typing import List, Dict, Any, Optional
import asyncpg
from contextlib import asynccontextmanager
from config import Config
class AsyncSQLTools:
def __init__(self, database_url: str):
self.database_url = database_url
self.connection_pool = None
self._initialized = False
async def initialize(self):
"""初始化异步连接池"""
if self._initialized:
return
self.connection_pool = await asyncpg.create_pool(
self.database_url,
min_size=2,
max_size=10,
command_timeout=30
)
self._initialized = True
@asynccontextmanager
async def get_connection(self):
"""获取数据库连接的上下文管理器"""
if not self.connection_pool:
await self.initialize()
async with self.connection_pool.acquire() as connection:
yield connection
async def execute_query(self, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
"""异步执行SQL查询"""
try:
async with self.get_connection() as conn:
if params:
result = await conn.fetch(query, *params)
else:
result = await conn.fetch(query)
# 转换为字典列表
return [dict(record) for record in result]
except Exception as e:
return [{"error": str(e)}]
async def execute_non_query(self, query: str, params: Optional[tuple] = None) -> Dict[str, Any]:
"""异步执行非查询SQL(INSERT, UPDATE, DELETE)"""
try:
async with self.get_connection() as conn:
if params:
result = await conn.execute(query, *params)
else:
result = await conn.execute(query)
return {"success": True, "rows_affected": result}
except Exception as e:
return {"success": False, "error": str(e)}
async def health_check(self) -> str:
"""异步健康检查"""
try:
async with self.get_connection() as conn:
result = await conn.fetchval("SELECT 1")
return "connected" if result == 1 else "error"
except Exception as e:
return f"disconnected: {str(e)}"
async def cleanup(self):
"""异步清理连接池"""
if self.connection_pool:
await self.connection_pool.close()
原有问题:
修改后的enhanced_redis_api.py:
import json
import asyncio
from typing import Any, Dict, List, Optional
import redis.asyncio as redis
from contextlib import asynccontextmanager
class AsyncRedisAPI:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
async def set_data(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""异步设置数据"""
try:
serialized_value = json.dumps(value, default=str)
if ttl:
await self.redis_client.setex(key, ttl, serialized_value)
else:
await self.redis_client.set(key, serialized_value)
return True
except Exception as e:
print(f"Redis set error: {e}")
return False
async def get_data(self, key: str) -> Optional[Any]:
"""异步获取数据"""
try:
value = await self.redis_client.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
print(f"Redis get error: {e}")
return None
async def delete_data(self, key: str) -> bool:
"""异步删除数据"""
try:
result = await self.redis_client.delete(key)
return result > 0
except Exception as e:
print(f"Redis delete error: {e}")
return False
async def search_keys(self, pattern: str) -> List[str]:
"""异步搜索键"""
try:
keys = await self.redis_client.keys(pattern)
return [key.decode() if isinstance(key, bytes) else key for key in keys]
except Exception as e:
print(f"Redis search error: {e}")
return []
async def get_all_data(self, pattern: str = "*") -> Dict[str, Any]:
"""异步获取所有匹配的数据"""
try:
keys = await self.search_keys(pattern)
if not keys:
return {}
# 批量获取数据
pipeline = self.redis_client.pipeline()
for key in keys:
pipeline.get(key)
values = await pipeline.execute()
result = {}
for key, value in zip(keys, values):
if value:
try:
result[key] = json.loads(value)
except json.JSONDecodeError:
result[key] = value.decode() if isinstance(value, bytes) else value
return result
except Exception as e:
print(f"Redis get_all error: {e}")
return {}
async def batch_set(self, data: Dict[str, Any], ttl: Optional[int] = None) -> bool:
"""异步批量设置数据"""
try:
pipeline = self.redis_client.pipeline()
for key, value in data.items():
serialized_value = json.dumps(value, default=str)
if ttl:
pipeline.setex(key, ttl, serialized_value)
else:
pipeline.set(key, serialized_value)
await pipeline.execute()
return True
except Exception as e:
print(f"Redis batch_set error: {e}")
return False
async def health_check(self) -> Dict[str, Any]:
"""异步健康检查"""
try:
# 测试连接
await self.redis_client.ping()
# 获取信息
info = await self.redis_client.info()
return {
"status": "healthy",
"connected_clients": info.get("connected_clients", 0),
"used_memory": info.get("used_memory_human", "unknown"),
"redis_version": info.get("redis_version", "unknown")
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
修改后的state.py:
from typing import List, Dict, Any, Optional
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
"""Agent状态定义"""
messages: List[BaseMessage] # 消息历史
next_action: Optional[Dict[str, Any]] # 下一步行动
tool_results: Optional[str] # 工具执行结果
observations: List[str] # 观察结果
action_history: List[Dict[str, Any]] # 行动历史
iteration_count: int # 迭代次数
thread_id: str # 线程ID
error: Optional[str] # 错误信息
修改后的config.py:
import os
from typing import Optional
class Config:
# 基础配置
DEBUG = os.getenv("DEBUG", "False").lower() == "true"
# 数据库配置
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/dbname")
# Redis配置
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
# LLM配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# 异步配置
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
# 连接池配置
DB_POOL_SIZE = int(os.getenv("DB_POOL_SIZE", "10"))
REDIS_POOL_SIZE = int(os.getenv("REDIS_POOL_SIZE", "20"))
# Agent配置
MAX_ITERATIONS = int(os.getenv("MAX_ITERATIONS", "5"))
ENABLE_MEMORY = os.getenv("ENABLE_MEMORY", "True").lower() == "true"
@classmethod
def validate(cls) -> bool:
"""验证配置"""
required_vars = [
"DATABASE_URL",
"REDIS_URL",
"OPENAI_API_KEY"
]
missing_vars = [var for var in required_vars if not getattr(cls, var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {missing_vars}")
return True
requirements.txt:
Flask>=2.0.0
asgiref>=3.5.0
uvicorn>=0.20.0
redis>=4.5.0
asyncpg>=0.27.0
langgraph>=0.1.0
langgraph-checkpoint-redis>=0.1.0
langchain>=0.1.0
langchain-openai>=0.1.0
tenacity>=8.0.0
# 直接启动
python api.py
# 或者使用uvicorn
uvicorn api:asgi_app --host 0.0.0.0 --port 5000 --reload
.env:
DEBUG=False
DATABASE_URL=postgresql://user:password@localhost/dbname
REDIS_URL=redis://localhost:6379
OPENAI_API_KEY=your_openai_key
MAX_WORKERS=10
REQUEST_TIMEOUT=30
DB_POOL_SIZE=10
REDIS_POOL_SIZE=20
MAX_ITERATIONS=5
ENABLE_MEMORY=True
run_async_safely
、ensure_agent_ready_sync
等函数# 健康检查
curl http://localhost:5000/health
# 聊天测试
curl -X POST http://localhost:5000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Hello", "thread_id": "test123"}'
# Redis直接访问
curl -X POST http://localhost:5000/redis/direct \
-H "Content-Type: application/json" \
-d '{"operation": "set", "key": "test", "value": "hello"}'
import asyncio
import aiohttp
import time
async def test_load():
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(100):
task = session.post(
"http://localhost:5000/chat",
json={"message": f"Test {i}", "thread_id": f"thread_{i}"}
)
tasks.append(task)
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"100 requests completed in {end_time - start_time:.2f} seconds")
asyncio.run(test_load())
这个完整的异步迁移方案解决了所有核心问题:
python api.py
直接启动通过这种架构,你的React Agent项目将具备真正的异步能力,性能显著提升,代码更加清晰和可维护。