本文档描述了将 test/custom_react_agent
模块迁移到项目主体,并与现有 agent
目录下的API进行整合的完整方案。
test/custom_react_agent
从测试目录迁移到项目根目录citu_app.py
中的API与 custom_react_agent/api.py
进行合并ask_agent
和新的 ask_react_agent
同时可用asgi_app.py
+ api.py
API类别 | 数量 | API列表 | 兼容性 |
---|---|---|---|
QA反馈系统 | 6个 | /qa_feedback/query /qa_feedback/add /qa_feedback/delete/{id} /qa_feedback/update/{id} /qa_feedback/add_to_training /qa_feedback/stats |
✅ 直接迁移 |
Redis对话管理 | 8个 | /user/{user_id}/conversations /conversation/{conv_id}/messages /conversation_stats /conversation_cleanup /embedding_cache_stats /embedding_cache_cleanup /qa_cache_stats /qa_cache_cleanup |
✅ 直接迁移 |
训练数据管理 | 4个 | /training_data/stats /training_data/query /training_data/create /training_data/delete |
✅ 直接迁移 |
Data Pipeline | 10+个 | /data_pipeline/tasks /data_pipeline/tasks/{id}/execute /database/tables /database/table/ddl 等 |
✅ 直接迁移 |
API | 改造需求 | 解决方案 |
---|---|---|
/api/v0/ask_agent |
异步包装 | 使用 asyncio.run() 包装 agent 调用 |
原始API: /api/v0/ask_agent # 简单场景,消耗token较少
新React API: /api/v0/ask_react_agent # 智能场景,不介意token消耗高
优势 | 说明 |
---|---|
语义清晰 | 名字直接体现技术架构差异 |
向后兼容 | 现有客户端无需修改 |
维护简单 | 每个API有独立的代码路径 |
扩展性好 | 未来可以增加更多agent类型 |
并行开发 | 两个团队可以独立维护不同版本 |
/api/v0/ask_agent
, /api/v1/ask_agent
) - 容易产生版本管理混乱项目根目录/
├── agent/ # 保留原有agent (v0版本)
│ ├── __init__.py
│ ├── citu_agent.py
│ ├── classifier.py
│ ├── config.py
│ ├── state.py
│ └── tools/
├── react_agent/ # 迁移custom_react_agent到这里 (v1版本)
│ ├── __init__.py
│ ├── agent.py # 从test/custom_react_agent/agent.py
│ ├── config.py # 合并配置到统一配置文件
│ ├── state.py # 从test/custom_react_agent/state.py
│ ├── sql_tools.py # 从test/custom_react_agent/sql_tools.py
│ └── requirements.txt # 依赖清单
├── config/ # 统一配置目录
│ ├── agent_config.py # 新增:Agent配置管理
│ └── logging_config.yaml # 现有:日志配置
├── api.py # 统一的API入口 (基于custom_react_agent/api.py改造)
├── asgi_app.py # ASGI启动器 (从custom_react_agent迁移)
├── citu_app.py # 逐步废弃,保留作为过渡
└── test/
└── custom_react_agent/ # 迁移完成后删除
# 创建新目录
mkdir -p react_agent
mkdir -p config
# 迁移核心文件
cp test/custom_react_agent/agent.py react_agent/
cp test/custom_react_agent/state.py react_agent/
cp test/custom_react_agent/sql_tools.py react_agent/
cp test/custom_react_agent/requirements.txt react_agent/
# 迁移API和启动文件到根目录
cp test/custom_react_agent/api.py ./
cp test/custom_react_agent/asgi_app.py ./
# 创建__init__.py文件
touch react_agent/__init__.py
修改所有导入路径,从 test.custom_react_agent
改为 react_agent
# react_agent/config.py (修改后)
import os
from core.logging import get_agent_logger, initialize_logging
# 移除自定义日志配置,使用项目统一日志
logger = get_agent_logger("ReactAgent")
# 保留其他配置,但与app_config.py保持一致
from app_config import (
LLM_MODEL_TYPE,
API_LLM_MODEL,
API_QIANWEN_CONFIG,
REDIS_URL
)
# React Agent特定配置
REACT_AGENT_CONFIG = {
"default_user_id": "guest",
"max_retries": 3,
"network_timeout": 60,
"debug_mode": True
}
修改 react_agent/agent.py
中的日志调用:
# 替换现有的日志导入
from core.logging import get_agent_logger
class CustomReactAgent:
def __init__(self):
self.logger = get_agent_logger("ReactAgent")
# ... 其他初始化代码
修改根目录的 api.py
:
# api.py (整合后的结构)
"""
统一API服务入口
整合原有agent API和React Agent API
"""
import asyncio
import logging
from flask import Flask, request, jsonify
from datetime import datetime
# 统一日志和响应格式
from core.logging import get_app_logger, initialize_logging
from common.result import (
success_response, bad_request_response,
agent_success_response, agent_error_response,
internal_error_response
)
# 初始化日志
initialize_logging()
logger = get_app_logger("UnifiedAPI")
# Agent实例导入
from agent.citu_agent import get_citu_langraph_agent
from react_agent.agent import CustomReactAgent
# 公共模块导入
from common.redis_conversation_manager import RedisConversationManager
from common.qa_feedback_manager import QAFeedbackManager
# 创建Flask应用
app = Flask(__name__)
# === 健康检查 ===
@app.route("/")
def root():
return jsonify({"message": "统一API服务正在运行", "version": "v1.0"})
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
try:
health_status = {
"status": "healthy",
"services": {
"original_agent": "available",
"react_agent": "available",
"redis": "checking",
"database": "checking"
},
"timestamp": datetime.now().isoformat()
}
return jsonify(health_status), 200
except Exception as e:
logger.error(f"健康检查失败: {e}")
return jsonify({"status": "unhealthy", "error": str(e)}), 500
# === React Agent API (新版本) ===
@app.route('/api/v0/ask_react_agent', methods=['POST'])
async def ask_react_agent():
"""React Agent API - 智能场景,高token消耗"""
# 保持现有custom_react_agent的实现
# ... (从原api.py迁移代码)
# === 原始Agent API (兼容版本) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""原始Agent API - 简单场景,低token消耗 (异步改造版)"""
try:
# ... 参数处理逻辑从citu_app.py迁移 ...
# 关键改造点:异步调用包装
agent = get_citu_langraph_agent()
agent_result = asyncio.run(agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
))
# ... 结果处理逻辑 ...
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(agent_error_response(
response_text="查询处理失败,请稍后重试",
error_type="agent_processing_failed"
)), 500
# === QA反馈系统API (直接迁移) ===
@app.route('/api/v0/qa_feedback/query', methods=['POST'])
def qa_feedback_query():
"""查询反馈记录API"""
# 从citu_app.py直接迁移代码
# ...
@app.route('/api/v0/qa_feedback/add', methods=['POST'])
def qa_feedback_add():
"""添加反馈记录API"""
# 从citu_app.py直接迁移代码
# ...
# === Redis对话管理API (直接迁移) ===
@app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id):
"""获取用户对话列表"""
# 从citu_app.py直接迁移代码
# ...
# === 训练数据管理API (直接迁移) ===
@app.route('/api/v0/training_data/stats', methods=['GET'])
def training_data_stats():
"""获取训练数据统计信息"""
# 从citu_app.py直接迁移代码
# ...
# === Data Pipeline API (直接迁移) ===
@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
def create_data_pipeline_task():
"""创建数据管道任务"""
# 从citu_app.py直接迁移代码
# ...
# Flask应用启动配置
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8084, debug=True)
# asgi_app.py
"""
ASGI应用启动文件 - 统一版本
支持异步操作的生产环境启动
"""
from asgiref.wsgi import WsgiToAsgi
from api import app
# 将Flask WSGI应用转换为ASGI应用
asgi_app = WsgiToAsgi(app)
# 启动命令:
# uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --reload
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""支持对话上下文的ask_agent API - 异步改造版本"""
req = request.get_json(force=True)
question = req.get("question", None)
# ... 其他参数处理 ...
if not question:
return jsonify(bad_request_response(
response_text="缺少必需参数:question",
missing_params=["question"]
)), 400
try:
# 1. 上下文处理 (同步部分)
user_id = redis_conversation_manager.resolve_user_id(...)
conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(...)
context = redis_conversation_manager.get_context(conversation_id)
# 2. 检查缓存 (同步部分)
cached_answer = redis_conversation_manager.get_cached_answer(question, context)
if cached_answer:
return jsonify(agent_success_response(...))
# 3. 关键改造:异步Agent调用
agent = get_citu_langraph_agent()
# 创建异步包装函数
async def process_with_agent():
return await agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
)
# 在同步上下文中执行异步操作
agent_result = asyncio.run(process_with_agent())
# 4. 结果处理 (同步部分)
if agent_result.get("success", False):
# 保存消息到Redis
redis_conversation_manager.save_message(...)
# 缓存结果
redis_conversation_manager.cache_answer(...)
return jsonify(agent_success_response(...))
else:
return jsonify(agent_error_response(...))
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(internal_error_response(
response_text="查询处理失败,请稍后重试"
)), 500
# config/agent_config.py
"""
Agent配置统一管理
"""
from app_config import * # 继承主配置
# Agent版本配置
AGENT_VERSIONS = {
"v0": {
"name": "Original LangGraph Agent",
"type": "langgraph",
"class_path": "agent.citu_agent.CituLangGraphAgent",
"description": "简单场景,低token消耗",
"features": ["database_query", "basic_chat", "context_aware"]
},
"v1": {
"name": "React Agent",
"type": "react_agent",
"class_path": "react_agent.agent.CustomReactAgent",
"description": "智能场景,高token消耗",
"features": ["advanced_reasoning", "tool_calling", "multi_step_planning"]
}
}
# API路由配置
API_ROUTES = {
"ask_agent": "v0", # 映射到原始版本
"ask_react_agent": "v1" # 映射到React版本
}
# 性能配置
PERFORMANCE_CONFIG = {
"v0": {
"timeout": 30,
"max_tokens": 2000,
"cache_enabled": True
},
"v1": {
"timeout": 60,
"max_tokens": 4000,
"cache_enabled": True
}
}
# 方式1:直接启动Flask (开发调试)
python api.py
# 方式2:使用Flask命令
export FLASK_APP=api.py
flask run --host=0.0.0.0 --port=8084 --debug
# 方式1:使用uvicorn (推荐)
uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --workers 1
# 方式2:使用Gunicorn + uvicorn worker (高并发)
gunicorn asgi_app:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084
# 方式3:Docker部署
docker run -p 8084:8084 -e PORT=8084 your-app:latest
启动方式 | 适用场景 | 性能 | 异步支持 | 推荐度 |
---|---|---|---|---|
python api.py |
开发调试 | 低 | ✅ | 开发环境 ⭐⭐⭐ |
uvicorn |
生产环境 | 高 | ✅ | 生产环境 ⭐⭐⭐⭐⭐ |
gunicorn+uvicorn |
高并发生产 | 最高 | ✅ | 大规模部署 ⭐⭐⭐⭐ |
阶段 | 任务内容 | 预估时间 | 关键交付物 | 验收标准 |
---|---|---|---|---|
Phase 1 | 目录迁移 + 路径修正 | 1天 | 新目录结构 路径修正完成 |
✅ 无导入错误 ✅ 文件结构清晰 |
Phase 2 | 日志服务统一 | 0.5天 | 统一日志配置 | ✅ 日志格式一致 ✅ 日志级别正确 |
Phase 3 | API整合 (非异步) | 2天 | 80%+ API可用 统一响应格式 |
✅ QA/Redis/Training API正常 ✅ 响应格式标准化 |
Phase 4 | ask_agent异步改造 | 1天 | 100% API可用 | ✅ ask_agent正常工作 ✅ 异步调用无阻塞 |
Phase 5 | 配置统一 + 测试 | 1天 | 完整迁移 | ✅ 全功能测试通过 ✅ 性能无明显下降 |
风险项 | 风险等级 | 影响范围 | 缓解措施 |
---|---|---|---|
异步兼容性问题 | 🔴 高 | ask_agent API | 1. 充分的异步测试 2. 渐进式部署 3. 快速回滚机制 |
依赖冲突 | 🟡 中 | 整个应用 | 1. 虚拟环境隔离 2. 依赖版本锁定 3. 逐步验证依赖 |
性能影响 | 🟡 中 | 系统性能 | 1. 性能基准测试 2. 监控指标设置 3. 负载测试 |
风险项 | 风险等级 | 缓解措施 |
---|---|---|
配置管理复杂度 | 🟢 低 | 统一配置文件,清晰文档 |
开发团队学习成本 | 🟢 低 | 详细文档,代码注释 |
测试覆盖不足 | 🟢 低 | 分阶段测试,自动化测试 |
citu_app.py
作为备用启动方案# API功能测试
python -m pytest tests/test_api_migration.py -v
# Agent功能测试
python -m pytest tests/test_agent_compatibility.py -v
# 异步功能测试
python -m pytest tests/test_async_operations.py -v
# 并发测试
ab -n 1000 -c 10 http://localhost:8084/api/v0/ask_agent
# 压力测试
locust -f tests/locust_test.py --host=http://localhost:8084
测试项 | 通过标准 | 备注 |
---|---|---|
功能测试 | 100%通过 | 所有API正常响应 |
性能测试 | 响应时间不超过原版本20% | 可接受的性能损失 |
并发测试 | 支持与原版本相同的并发量 | 无明显性能下降 |
错误处理 | 错误响应格式一致 | 保持用户体验 |
# 监控配置示例
MONITORING_METRICS = {
"api_response_time": {
"ask_agent": "< 5s",
"ask_react_agent": "< 10s",
"other_apis": "< 2s"
},
"error_rate": "< 1%",
"concurrent_users": "> 50",
"memory_usage": "< 2GB",
"cpu_usage": "< 80%"
}
性能优化
功能增强
运维改进
评估维度 | 评分 | 说明 |
---|---|---|
技术可行性 | ⭐⭐⭐⭐⭐ | Flask 3.1.1完全支持异步,技术方案成熟 |
实施复杂度 | ⭐⭐⭐⭐ | 大部分API可直接迁移,复杂度可控 |
风险控制 | ⭐⭐⭐⭐ | 风险识别充分,缓解措施明确 |
维护成本 | ⭐⭐⭐⭐ | 架构清晰,维护成本合理 |
扩展性 | ⭐⭐⭐⭐⭐ | 支持多版本Agent,扩展性强 |
如果您认可这个方案,建议按以下顺序开始实施:
如果在实施过程中遇到问题,建议:
文档版本: v1.0
创建日期: 2025-01-15
最后更新: 2025-01-15
文档状态: 待审核