# Custom React Agent 迁移和整合方案
## 📋 项目背景
本文档描述了将 `test/custom_react_agent` 模块迁移到项目主体,并与现有 `agent` 目录下的API进行整合的完整方案。
### 🎯 整合目标
1. **模块迁移**:将 `test/custom_react_agent` 从测试目录迁移到项目根目录
2. **API整合**:将 `citu_app.py` 中的API与 `custom_react_agent/api.py` 进行合并
3. **异步统一**:确保所有API在异步环境下正常运行
4. **双版本并存**:保持原有 `ask_agent` 和新的 `ask_react_agent` 同时可用
### 🏗️ 技术架构
- **Framework**: Python + Flask v3.1.1 + LangGraph + LangChain
- **异步支持**: Flask 3.1.1 + ASGI (asgiref.wsgi.WsgiToAsgi)
- **启动方式**: `asgi_app.py` + `api.py`
---
## 📋 一、API兼容性分析
### ✅ 完全兼容的API (可直接迁移)
| API类别 | 数量 | API列表 | 兼容性 |
|---------|------|---------|--------|
| **QA反馈系统** | 6个 | `/qa_feedback/query`
`/qa_feedback/add`
`/qa_feedback/delete/{id}`
`/qa_feedback/update/{id}`
`/qa_feedback/add_to_training`
`/qa_feedback/stats` | ✅ 直接迁移 |
| **Redis对话管理** | 8个 | `/user/{user_id}/conversations`
`/conversation/{conv_id}/messages`
`/conversation_stats`
`/conversation_cleanup`
`/embedding_cache_stats`
`/embedding_cache_cleanup`
`/qa_cache_stats`
`/qa_cache_cleanup` | ✅ 直接迁移 |
| **训练数据管理** | 4个 | `/training_data/stats`
`/training_data/query`
`/training_data/create`
`/training_data/delete` | ✅ 直接迁移 |
| **Data Pipeline** | 10+个 | `/data_pipeline/tasks`
`/data_pipeline/tasks/{id}/execute`
`/database/tables`
`/database/table/ddl`等 | ✅ 直接迁移 |
### ⚙️ 需要异步改造的API
| API | 改造需求 | 解决方案 |
|-----|----------|----------|
| `/api/v0/ask_agent` | **异步包装** | 使用 `asyncio.run()` 包装 agent 调用 |
### 📊 迁移工作量评估
- **可直接迁移**: 28+ API (90%)
- **需要改造**: 1个 API (10%)
- **预计工作量**: 3-4天
---
## 📋 二、API命名方案
### 🎯 推荐方案:使用不同的名字 (方案B)
```
原始API: /api/v0/ask_agent # 简单场景,消耗token较少
新React API: /api/v0/ask_react_agent # 智能场景,不介意token消耗高
```
### 🌟 方案优势
| 优势 | 说明 |
|------|------|
| **语义清晰** | 名字直接体现技术架构差异 |
| **向后兼容** | 现有客户端无需修改 |
| **维护简单** | 每个API有独立的代码路径 |
| **扩展性好** | 未来可以增加更多agent类型 |
| **并行开发** | 两个团队可以独立维护不同版本 |
### 🚫 不推荐的方案
- **方案A**: 版本号区分 (`/api/v0/ask_agent`, `/api/v1/ask_agent`) - 容易产生版本管理混乱
- **方案C**: 配置控制统一入口 - 增加配置复杂度,调试困难
---
## 📋 三、目录迁移规划
### 🏗️ 推荐目录结构
```
项目根目录/
├── agent/ # 保留原有agent (v0版本)
│ ├── __init__.py
│ ├── citu_agent.py
│ ├── classifier.py
│ ├── config.py
│ ├── state.py
│ └── tools/
├── react_agent/ # 迁移custom_react_agent到这里 (v1版本)
│ ├── __init__.py
│ ├── agent.py # 从test/custom_react_agent/agent.py
│ ├── config.py # 合并配置到统一配置文件
│ ├── state.py # 从test/custom_react_agent/state.py
│ ├── sql_tools.py # 从test/custom_react_agent/sql_tools.py
│ └── requirements.txt # 依赖清单
├── config/ # 统一配置目录
│ ├── agent_config.py # 新增:Agent配置管理
│ └── logging_config.yaml # 现有:日志配置
├── api.py # 统一的API入口 (基于custom_react_agent/api.py改造)
├── asgi_app.py # ASGI启动器 (从custom_react_agent迁移)
├── citu_app.py # 逐步废弃,保留作为过渡
└── test/
└── custom_react_agent/ # 迁移完成后删除
```
### 📁 迁移策略
1. **并行共存**:两个agent版本暂时并存
2. **渐进迁移**:分阶段迁移用户到新API
3. **最终清理**:稳定后删除旧版本代码
---
## 📋 四、详细迁移步骤
### 🚀 Phase 1: 目录结构迁移 (1天)
#### Step 1.1: 创建新目录结构
```bash
# 创建新目录
mkdir -p react_agent
mkdir -p config
# 迁移核心文件
cp test/custom_react_agent/agent.py react_agent/
cp test/custom_react_agent/state.py react_agent/
cp test/custom_react_agent/sql_tools.py react_agent/
cp test/custom_react_agent/requirements.txt react_agent/
# 迁移API和启动文件到根目录
cp test/custom_react_agent/api.py ./
cp test/custom_react_agent/asgi_app.py ./
# 创建__init__.py文件
touch react_agent/__init__.py
```
#### Step 1.2: 路径修正
修改所有导入路径,从 `test.custom_react_agent` 改为 `react_agent`
### 🔧 Phase 2: 日志服务统一 (0.5天)
#### Step 2.1: 修改React Agent配置
```python
# react_agent/config.py (修改后)
import os
from core.logging import get_agent_logger, initialize_logging
# 移除自定义日志配置,使用项目统一日志
logger = get_agent_logger("ReactAgent")
# 保留其他配置,但与app_config.py保持一致
from app_config import (
LLM_MODEL_TYPE,
API_LLM_MODEL,
API_QIANWEN_CONFIG,
REDIS_URL
)
# React Agent特定配置
REACT_AGENT_CONFIG = {
"default_user_id": "guest",
"max_retries": 3,
"network_timeout": 60,
"debug_mode": True
}
```
#### Step 2.2: 更新Agent实现
修改 `react_agent/agent.py` 中的日志调用:
```python
# 替换现有的日志导入
from core.logging import get_agent_logger
class CustomReactAgent:
def __init__(self):
self.logger = get_agent_logger("ReactAgent")
# ... 其他初始化代码
```
### 🔗 Phase 3: API整合 (2天)
#### Step 3.1: 整合API结构
修改根目录的 `api.py`:
```python
# api.py (整合后的结构)
"""
统一API服务入口
整合原有agent API和React Agent API
"""
import asyncio
import logging
from flask import Flask, request, jsonify
from datetime import datetime
# 统一日志和响应格式
from core.logging import get_app_logger, initialize_logging
from common.result import (
success_response, bad_request_response,
agent_success_response, agent_error_response,
internal_error_response
)
# 初始化日志
initialize_logging()
logger = get_app_logger("UnifiedAPI")
# Agent实例导入
from agent.citu_agent import get_citu_langraph_agent
from react_agent.agent import CustomReactAgent
# 公共模块导入
from common.redis_conversation_manager import RedisConversationManager
from common.qa_feedback_manager import QAFeedbackManager
# 创建Flask应用
app = Flask(__name__)
# === 健康检查 ===
@app.route("/")
def root():
return jsonify({"message": "统一API服务正在运行", "version": "v1.0"})
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查端点"""
try:
health_status = {
"status": "healthy",
"services": {
"original_agent": "available",
"react_agent": "available",
"redis": "checking",
"database": "checking"
},
"timestamp": datetime.now().isoformat()
}
return jsonify(health_status), 200
except Exception as e:
logger.error(f"健康检查失败: {e}")
return jsonify({"status": "unhealthy", "error": str(e)}), 500
# === React Agent API (新版本) ===
@app.route('/api/v0/ask_react_agent', methods=['POST'])
async def ask_react_agent():
"""React Agent API - 智能场景,高token消耗"""
# 保持现有custom_react_agent的实现
# ... (从原api.py迁移代码)
# === 原始Agent API (兼容版本) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""原始Agent API - 简单场景,低token消耗 (异步改造版)"""
try:
# ... 参数处理逻辑从citu_app.py迁移 ...
# 关键改造点:异步调用包装
agent = get_citu_langraph_agent()
agent_result = asyncio.run(agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
))
# ... 结果处理逻辑 ...
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(agent_error_response(
response_text="查询处理失败,请稍后重试",
error_type="agent_processing_failed"
)), 500
# === QA反馈系统API (直接迁移) ===
@app.route('/api/v0/qa_feedback/query', methods=['POST'])
def qa_feedback_query():
"""查询反馈记录API"""
# 从citu_app.py直接迁移代码
# ...
@app.route('/api/v0/qa_feedback/add', methods=['POST'])
def qa_feedback_add():
"""添加反馈记录API"""
# 从citu_app.py直接迁移代码
# ...
# === Redis对话管理API (直接迁移) ===
@app.route('/api/v0/user//conversations', methods=['GET'])
def get_user_conversations(user_id):
"""获取用户对话列表"""
# 从citu_app.py直接迁移代码
# ...
# === 训练数据管理API (直接迁移) ===
@app.route('/api/v0/training_data/stats', methods=['GET'])
def training_data_stats():
"""获取训练数据统计信息"""
# 从citu_app.py直接迁移代码
# ...
# === Data Pipeline API (直接迁移) ===
@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
def create_data_pipeline_task():
"""创建数据管道任务"""
# 从citu_app.py直接迁移代码
# ...
# Flask应用启动配置
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8084, debug=True)
```
#### Step 3.2: 修改ASGI启动器
```python
# asgi_app.py
"""
ASGI应用启动文件 - 统一版本
支持异步操作的生产环境启动
"""
from asgiref.wsgi import WsgiToAsgi
from api import app
# 将Flask WSGI应用转换为ASGI应用
asgi_app = WsgiToAsgi(app)
# 启动命令:
# uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --reload
```
### ⚙️ Phase 4: 异步改造重点 (1天)
#### 核心改造:ask_agent异步包装
```python
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""支持对话上下文的ask_agent API - 异步改造版本"""
req = request.get_json(force=True)
question = req.get("question", None)
# ... 其他参数处理 ...
if not question:
return jsonify(bad_request_response(
response_text="缺少必需参数:question",
missing_params=["question"]
)), 400
try:
# 1. 上下文处理 (同步部分)
user_id = redis_conversation_manager.resolve_user_id(...)
conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(...)
context = redis_conversation_manager.get_context(conversation_id)
# 2. 检查缓存 (同步部分)
cached_answer = redis_conversation_manager.get_cached_answer(question, context)
if cached_answer:
return jsonify(agent_success_response(...))
# 3. 关键改造:异步Agent调用
agent = get_citu_langraph_agent()
# 创建异步包装函数
async def process_with_agent():
return await agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
)
# 在同步上下文中执行异步操作
agent_result = asyncio.run(process_with_agent())
# 4. 结果处理 (同步部分)
if agent_result.get("success", False):
# 保存消息到Redis
redis_conversation_manager.save_message(...)
# 缓存结果
redis_conversation_manager.cache_answer(...)
return jsonify(agent_success_response(...))
else:
return jsonify(agent_error_response(...))
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(internal_error_response(
response_text="查询处理失败,请稍后重试"
)), 500
```
### 📊 Phase 5: 配置统一 (0.5天)
#### 创建统一Agent配置
```python
# config/agent_config.py
"""
Agent配置统一管理
"""
from app_config import * # 继承主配置
# Agent版本配置
AGENT_VERSIONS = {
"v0": {
"name": "Original LangGraph Agent",
"type": "langgraph",
"class_path": "agent.citu_agent.CituLangGraphAgent",
"description": "简单场景,低token消耗",
"features": ["database_query", "basic_chat", "context_aware"]
},
"v1": {
"name": "React Agent",
"type": "react_agent",
"class_path": "react_agent.agent.CustomReactAgent",
"description": "智能场景,高token消耗",
"features": ["advanced_reasoning", "tool_calling", "multi_step_planning"]
}
}
# API路由配置
API_ROUTES = {
"ask_agent": "v0", # 映射到原始版本
"ask_react_agent": "v1" # 映射到React版本
}
# 性能配置
PERFORMANCE_CONFIG = {
"v0": {
"timeout": 30,
"max_tokens": 2000,
"cache_enabled": True
},
"v1": {
"timeout": 60,
"max_tokens": 4000,
"cache_enabled": True
}
}
```
---
## 📋 五、启动方案调整
### 🚀 新的启动方式
#### 开发环境启动
```bash
# 方式1:直接启动Flask (开发调试)
python api.py
# 方式2:使用Flask命令
export FLASK_APP=api.py
flask run --host=0.0.0.0 --port=8084 --debug
```
#### 生产环境启动
```bash
# 方式1:使用uvicorn (推荐)
uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --workers 1
# 方式2:使用Gunicorn + uvicorn worker (高并发)
gunicorn asgi_app:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084
# 方式3:Docker部署
docker run -p 8084:8084 -e PORT=8084 your-app:latest
```
### 📊 启动配置对比
| 启动方式 | 适用场景 | 性能 | 异步支持 | 推荐度 |
|----------|----------|------|----------|--------|
| `python api.py` | 开发调试 | 低 | ✅ | 开发环境 ⭐⭐⭐ |
| `uvicorn` | 生产环境 | 高 | ✅ | 生产环境 ⭐⭐⭐⭐⭐ |
| `gunicorn+uvicorn` | 高并发生产 | 最高 | ✅ | 大规模部署 ⭐⭐⭐⭐ |
---
## 📋 六、迁移时间表
### 📅 详细时间规划
| 阶段 | 任务内容 | 预估时间 | 关键交付物 | 验收标准 |
|------|----------|----------|------------|----------|
| **Phase 1** | 目录迁移 + 路径修正 | 1天 | 新目录结构
路径修正完成 | ✅ 无导入错误
✅ 文件结构清晰 |
| **Phase 2** | 日志服务统一 | 0.5天 | 统一日志配置 | ✅ 日志格式一致
✅ 日志级别正确 |
| **Phase 3** | API整合 (非异步) | 2天 | 80%+ API可用
统一响应格式 | ✅ QA/Redis/Training API正常
✅ 响应格式标准化 |
| **Phase 4** | ask_agent异步改造 | 1天 | 100% API可用 | ✅ ask_agent正常工作
✅ 异步调用无阻塞 |
| **Phase 5** | 配置统一 + 测试 | 1天 | 完整迁移 | ✅ 全功能测试通过
✅ 性能无明显下降 |
### 📊 里程碑检查点
- **M1 (Day 1)**: 目录结构迁移完成,无编译错误
- **M2 (Day 2)**: 非异步API全部正常工作
- **M3 (Day 4)**: ask_agent异步版本正常工作
- **M4 (Day 5)**: 完整功能验证,性能测试通过
---
## 📋 七、风险评估与缓解措施
### ⚠️ 高风险项
| 风险项 | 风险等级 | 影响范围 | 缓解措施 |
|--------|----------|----------|----------|
| **异步兼容性问题** | 🔴 高 | ask_agent API | 1. 充分的异步测试
2. 渐进式部署
3. 快速回滚机制 |
| **依赖冲突** | 🟡 中 | 整个应用 | 1. 虚拟环境隔离
2. 依赖版本锁定
3. 逐步验证依赖 |
| **性能影响** | 🟡 中 | 系统性能 | 1. 性能基准测试
2. 监控指标设置
3. 负载测试 |
### 🟢 低风险项
| 风险项 | 风险等级 | 缓解措施 |
|--------|----------|----------|
| **配置管理复杂度** | 🟢 低 | 统一配置文件,清晰文档 |
| **开发团队学习成本** | 🟢 低 | 详细文档,代码注释 |
| **测试覆盖不足** | 🟢 低 | 分阶段测试,自动化测试 |
### 🛡️ 风险缓解策略
#### 技术风险缓解
1. **异步测试环境**:单独搭建异步测试环境
2. **性能监控**:部署前后性能对比
3. **灰度发布**:先发布到测试环境,再到生产环境
#### 业务风险缓解
1. **向后兼容**:保持所有现有API不变
2. **快速回滚**:保留 `citu_app.py` 作为备用启动方案
3. **用户通知**:提前通知用户API变更计划
---
## 📋 八、测试验证计划
### 🧪 测试范围
#### 功能测试
```bash
# API功能测试
python -m pytest tests/test_api_migration.py -v
# Agent功能测试
python -m pytest tests/test_agent_compatibility.py -v
# 异步功能测试
python -m pytest tests/test_async_operations.py -v
```
#### 性能测试
```bash
# 并发测试
ab -n 1000 -c 10 http://localhost:8084/api/v0/ask_agent
# 压力测试
locust -f tests/locust_test.py --host=http://localhost:8084
```
#### 兼容性测试
- ✅ 所有现有API调用方式保持不变
- ✅ 响应格式完全一致
- ✅ 错误处理机制一致
### 📊 验收标准
| 测试项 | 通过标准 | 备注 |
|--------|----------|------|
| **功能测试** | 100%通过 | 所有API正常响应 |
| **性能测试** | 响应时间不超过原版本20% | 可接受的性能损失 |
| **并发测试** | 支持与原版本相同的并发量 | 无明显性能下降 |
| **错误处理** | 错误响应格式一致 | 保持用户体验 |
---
## 📋 九、部署和监控
### 🚀 部署策略
#### 阶段性部署
1. **开发环境部署** (Day 1-3)
2. **测试环境部署** (Day 4)
3. **预生产环境部署** (Day 5)
4. **生产环境部署** (Day 6+)
#### 部署检查清单
- [ ] 依赖安装完成
- [ ] 配置文件正确
- [ ] 数据库连接正常
- [ ] Redis连接正常
- [ ] 日志输出正常
- [ ] 健康检查通过
- [ ] API功能验证
- [ ] 性能指标正常
### 📊 监控指标
#### 关键指标
```python
# 监控配置示例
MONITORING_METRICS = {
"api_response_time": {
"ask_agent": "< 5s",
"ask_react_agent": "< 10s",
"other_apis": "< 2s"
},
"error_rate": "< 1%",
"concurrent_users": "> 50",
"memory_usage": "< 2GB",
"cpu_usage": "< 80%"
}
```
#### 告警设置
- 🚨 API响应时间超过阈值
- 🚨 错误率超过1%
- 🚨 内存使用率超过90%
- 🚨 异步任务堆积
---
## 📋 十、维护和后续规划
### 🔧 维护计划
#### 短期维护 (1-3个月)
- **监控优化**:根据实际使用情况调整监控指标
- **性能调优**:根据性能数据进行优化
- **bug修复**:处理迁移过程中发现的问题
#### 中期规划 (3-6个月)
- **功能增强**:基于用户反馈增加新功能
- **代码重构**:优化代码结构和性能
- **测试完善**:增加自动化测试覆盖率
#### 长期规划 (6个月+)
- **架构优化**:考虑微服务拆分
- **云原生改造**:支持容器化部署
- **版本整合**:逐步淘汰旧版本API
### 📈 后续优化方向
1. **性能优化**
- 异步操作优化
- 缓存策略改进
- 数据库查询优化
2. **功能增强**
- 更多Agent类型支持
- 高级路由策略
- 智能负载均衡
3. **运维改进**
- 自动化部署
- 容器化支持
- 监控告警完善
---
## 📋 十一、总结和建议
### ✅ 方案可行性评估
| 评估维度 | 评分 | 说明 |
|----------|------|------|
| **技术可行性** | ⭐⭐⭐⭐⭐ | Flask 3.1.1完全支持异步,技术方案成熟 |
| **实施复杂度** | ⭐⭐⭐⭐ | 大部分API可直接迁移,复杂度可控 |
| **风险控制** | ⭐⭐⭐⭐ | 风险识别充分,缓解措施明确 |
| **维护成本** | ⭐⭐⭐⭐ | 架构清晰,维护成本合理 |
| **扩展性** | ⭐⭐⭐⭐⭐ | 支持多版本Agent,扩展性强 |
### 🎯 核心优势
1. **技术先进性**:基于Flask 3.1.1 + ASGI的现代化架构
2. **向后兼容性**:所有现有API保持不变
3. **架构清晰性**:两个Agent版本职责分明
4. **扩展性**:支持未来增加更多Agent类型
5. **维护性**:模块化设计,便于独立维护
### 💡 实施建议
#### 优先级建议
1. **第一优先级**:完成基础迁移,确保现有功能不受影响
2. **第二优先级**:优化异步性能,提升用户体验
3. **第三优先级**:增强监控和告警,确保系统稳定性
#### 团队协作建议
1. **分工明确**:前端团队关注API兼容性,后端团队关注性能优化
2. **沟通机制**:建立日常同步机制,及时发现和解决问题
3. **文档维护**:保持文档与代码同步更新
### ⚡ 快速开始
如果您认可这个方案,建议按以下顺序开始实施:
1. **立即开始**:Phase 1 目录迁移 (风险最低)
2. **并行进行**:Phase 2 日志统一 (可与Phase 1并行)
3. **重点关注**:Phase 4 异步改造 (核心技术难点)
4. **全面测试**:Phase 5 测试验证 (确保质量)
---
## 📞 联系和支持
如果在实施过程中遇到问题,建议:
1. **参考文档**:优先查阅本文档和相关技术文档
2. **代码注释**:查看迁移后的代码注释和说明
3. **测试用例**:参考测试用例了解预期行为
4. **性能监控**:关注监控指标,及时发现问题
---
**文档版本**: v1.0
**创建日期**: 2025-01-15
**最后更新**: 2025-01-15
**文档状态**: 待审核