# Custom React Agent 迁移和整合方案 ## 📋 项目背景 本文档描述了将 `test/custom_react_agent` 模块迁移到项目主体,并与现有 `agent` 目录下的API进行整合的完整方案。 ### 🎯 整合目标 1. **模块迁移**:将 `test/custom_react_agent` 从测试目录迁移到项目根目录 2. **API整合**:将 `citu_app.py` 中的API与 `custom_react_agent/api.py` 进行合并 3. **异步统一**:确保所有API在异步环境下正常运行 4. **双版本并存**:保持原有 `ask_agent` 和新的 `ask_react_agent` 同时可用 ### 🏗️ 技术架构 - **Framework**: Python + Flask v3.1.1 + LangGraph + LangChain - **异步支持**: Flask 3.1.1 + ASGI (asgiref.wsgi.WsgiToAsgi) - **启动方式**: `asgi_app.py` + `api.py` --- ## 📋 一、API兼容性分析 ### ✅ 完全兼容的API (可直接迁移) | API类别 | 数量 | API列表 | 兼容性 | |---------|------|---------|--------| | **QA反馈系统** | 6个 | `/qa_feedback/query`
`/qa_feedback/add`
`/qa_feedback/delete/{id}`
`/qa_feedback/update/{id}`
`/qa_feedback/add_to_training`
`/qa_feedback/stats` | ✅ 直接迁移 | | **Redis对话管理** | 8个 | `/user/{user_id}/conversations`
`/conversation/{conv_id}/messages`
`/conversation_stats`
`/conversation_cleanup`
`/embedding_cache_stats`
`/embedding_cache_cleanup`
`/qa_cache_stats`
`/qa_cache_cleanup` | ✅ 直接迁移 | | **训练数据管理** | 4个 | `/training_data/stats`
`/training_data/query`
`/training_data/create`
`/training_data/delete` | ✅ 直接迁移 | | **Data Pipeline** | 10+个 | `/data_pipeline/tasks`
`/data_pipeline/tasks/{id}/execute`
`/database/tables`
`/database/table/ddl`等 | ✅ 直接迁移 | ### ⚙️ 需要异步改造的API | API | 改造需求 | 解决方案 | |-----|----------|----------| | `/api/v0/ask_agent` | **异步包装** | 使用 `asyncio.run()` 包装 agent 调用 | ### 📊 迁移工作量评估 - **可直接迁移**: 28+ API (90%) - **需要改造**: 1个 API (10%) - **预计工作量**: 3-4天 --- ## 📋 二、API命名方案 ### 🎯 推荐方案:使用不同的名字 (方案B) ``` 原始API: /api/v0/ask_agent # 简单场景,消耗token较少 新React API: /api/v0/ask_react_agent # 智能场景,不介意token消耗高 ``` ### 🌟 方案优势 | 优势 | 说明 | |------|------| | **语义清晰** | 名字直接体现技术架构差异 | | **向后兼容** | 现有客户端无需修改 | | **维护简单** | 每个API有独立的代码路径 | | **扩展性好** | 未来可以增加更多agent类型 | | **并行开发** | 两个团队可以独立维护不同版本 | ### 🚫 不推荐的方案 - **方案A**: 版本号区分 (`/api/v0/ask_agent`, `/api/v1/ask_agent`) - 容易产生版本管理混乱 - **方案C**: 配置控制统一入口 - 增加配置复杂度,调试困难 --- ## 📋 三、目录迁移规划 ### 🏗️ 推荐目录结构 ``` 项目根目录/ ├── agent/ # 保留原有agent (v0版本) │ ├── __init__.py │ ├── citu_agent.py │ ├── classifier.py │ ├── config.py │ ├── state.py │ └── tools/ ├── react_agent/ # 迁移custom_react_agent到这里 (v1版本) │ ├── __init__.py │ ├── agent.py # 从test/custom_react_agent/agent.py │ ├── config.py # 合并配置到统一配置文件 │ ├── state.py # 从test/custom_react_agent/state.py │ ├── sql_tools.py # 从test/custom_react_agent/sql_tools.py │ └── requirements.txt # 依赖清单 ├── config/ # 统一配置目录 │ ├── agent_config.py # 新增:Agent配置管理 │ └── logging_config.yaml # 现有:日志配置 ├── api.py # 统一的API入口 (基于custom_react_agent/api.py改造) ├── asgi_app.py # ASGI启动器 (从custom_react_agent迁移) ├── citu_app.py # 逐步废弃,保留作为过渡 └── test/ └── custom_react_agent/ # 迁移完成后删除 ``` ### 📁 迁移策略 1. **并行共存**:两个agent版本暂时并存 2. **渐进迁移**:分阶段迁移用户到新API 3. **最终清理**:稳定后删除旧版本代码 --- ## 📋 四、详细迁移步骤 ### 🚀 Phase 1: 目录结构迁移 (1天) #### Step 1.1: 创建新目录结构 ```bash # 创建新目录 mkdir -p react_agent mkdir -p config # 迁移核心文件 cp test/custom_react_agent/agent.py react_agent/ cp test/custom_react_agent/state.py react_agent/ cp test/custom_react_agent/sql_tools.py react_agent/ cp test/custom_react_agent/requirements.txt react_agent/ # 迁移API和启动文件到根目录 cp test/custom_react_agent/api.py ./ cp test/custom_react_agent/asgi_app.py ./ # 创建__init__.py文件 touch react_agent/__init__.py ``` #### Step 1.2: 路径修正 修改所有导入路径,从 `test.custom_react_agent` 改为 `react_agent` ### 🔧 Phase 2: 日志服务统一 (0.5天) #### Step 2.1: 修改React Agent配置 ```python # react_agent/config.py (修改后) import os from core.logging import get_agent_logger, initialize_logging # 移除自定义日志配置,使用项目统一日志 logger = get_agent_logger("ReactAgent") # 保留其他配置,但与app_config.py保持一致 from app_config import ( LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG, REDIS_URL ) # React Agent特定配置 REACT_AGENT_CONFIG = { "default_user_id": "guest", "max_retries": 3, "network_timeout": 60, "debug_mode": True } ``` #### Step 2.2: 更新Agent实现 修改 `react_agent/agent.py` 中的日志调用: ```python # 替换现有的日志导入 from core.logging import get_agent_logger class CustomReactAgent: def __init__(self): self.logger = get_agent_logger("ReactAgent") # ... 其他初始化代码 ``` ### 🔗 Phase 3: API整合 (2天) #### Step 3.1: 整合API结构 修改根目录的 `api.py`: ```python # api.py (整合后的结构) """ 统一API服务入口 整合原有agent API和React Agent API """ import asyncio import logging from flask import Flask, request, jsonify from datetime import datetime # 统一日志和响应格式 from core.logging import get_app_logger, initialize_logging from common.result import ( success_response, bad_request_response, agent_success_response, agent_error_response, internal_error_response ) # 初始化日志 initialize_logging() logger = get_app_logger("UnifiedAPI") # Agent实例导入 from agent.citu_agent import get_citu_langraph_agent from react_agent.agent import CustomReactAgent # 公共模块导入 from common.redis_conversation_manager import RedisConversationManager from common.qa_feedback_manager import QAFeedbackManager # 创建Flask应用 app = Flask(__name__) # === 健康检查 === @app.route("/") def root(): return jsonify({"message": "统一API服务正在运行", "version": "v1.0"}) @app.route('/health', methods=['GET']) def health_check(): """健康检查端点""" try: health_status = { "status": "healthy", "services": { "original_agent": "available", "react_agent": "available", "redis": "checking", "database": "checking" }, "timestamp": datetime.now().isoformat() } return jsonify(health_status), 200 except Exception as e: logger.error(f"健康检查失败: {e}") return jsonify({"status": "unhealthy", "error": str(e)}), 500 # === React Agent API (新版本) === @app.route('/api/v0/ask_react_agent', methods=['POST']) async def ask_react_agent(): """React Agent API - 智能场景,高token消耗""" # 保持现有custom_react_agent的实现 # ... (从原api.py迁移代码) # === 原始Agent API (兼容版本) === @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """原始Agent API - 简单场景,低token消耗 (异步改造版)""" try: # ... 参数处理逻辑从citu_app.py迁移 ... # 关键改造点:异步调用包装 agent = get_citu_langraph_agent() agent_result = asyncio.run(agent.process_question( question=enhanced_question, session_id=browser_session_id, context_type=context_type, routing_mode=effective_routing_mode )) # ... 结果处理逻辑 ... except Exception as e: logger.error(f"ask_agent执行失败: {str(e)}") return jsonify(agent_error_response( response_text="查询处理失败,请稍后重试", error_type="agent_processing_failed" )), 500 # === QA反馈系统API (直接迁移) === @app.route('/api/v0/qa_feedback/query', methods=['POST']) def qa_feedback_query(): """查询反馈记录API""" # 从citu_app.py直接迁移代码 # ... @app.route('/api/v0/qa_feedback/add', methods=['POST']) def qa_feedback_add(): """添加反馈记录API""" # 从citu_app.py直接迁移代码 # ... # === Redis对话管理API (直接迁移) === @app.route('/api/v0/user//conversations', methods=['GET']) def get_user_conversations(user_id): """获取用户对话列表""" # 从citu_app.py直接迁移代码 # ... # === 训练数据管理API (直接迁移) === @app.route('/api/v0/training_data/stats', methods=['GET']) def training_data_stats(): """获取训练数据统计信息""" # 从citu_app.py直接迁移代码 # ... # === Data Pipeline API (直接迁移) === @app.route('/api/v0/data_pipeline/tasks', methods=['POST']) def create_data_pipeline_task(): """创建数据管道任务""" # 从citu_app.py直接迁移代码 # ... # Flask应用启动配置 if __name__ == '__main__': app.run(host='0.0.0.0', port=8084, debug=True) ``` #### Step 3.2: 修改ASGI启动器 ```python # asgi_app.py """ ASGI应用启动文件 - 统一版本 支持异步操作的生产环境启动 """ from asgiref.wsgi import WsgiToAsgi from api import app # 将Flask WSGI应用转换为ASGI应用 asgi_app = WsgiToAsgi(app) # 启动命令: # uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --reload ``` ### ⚙️ Phase 4: 异步改造重点 (1天) #### 核心改造:ask_agent异步包装 ```python @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """支持对话上下文的ask_agent API - 异步改造版本""" req = request.get_json(force=True) question = req.get("question", None) # ... 其他参数处理 ... if not question: return jsonify(bad_request_response( response_text="缺少必需参数:question", missing_params=["question"] )), 400 try: # 1. 上下文处理 (同步部分) user_id = redis_conversation_manager.resolve_user_id(...) conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(...) context = redis_conversation_manager.get_context(conversation_id) # 2. 检查缓存 (同步部分) cached_answer = redis_conversation_manager.get_cached_answer(question, context) if cached_answer: return jsonify(agent_success_response(...)) # 3. 关键改造:异步Agent调用 agent = get_citu_langraph_agent() # 创建异步包装函数 async def process_with_agent(): return await agent.process_question( question=enhanced_question, session_id=browser_session_id, context_type=context_type, routing_mode=effective_routing_mode ) # 在同步上下文中执行异步操作 agent_result = asyncio.run(process_with_agent()) # 4. 结果处理 (同步部分) if agent_result.get("success", False): # 保存消息到Redis redis_conversation_manager.save_message(...) # 缓存结果 redis_conversation_manager.cache_answer(...) return jsonify(agent_success_response(...)) else: return jsonify(agent_error_response(...)) except Exception as e: logger.error(f"ask_agent执行失败: {str(e)}") return jsonify(internal_error_response( response_text="查询处理失败,请稍后重试" )), 500 ``` ### 📊 Phase 5: 配置统一 (0.5天) #### 创建统一Agent配置 ```python # config/agent_config.py """ Agent配置统一管理 """ from app_config import * # 继承主配置 # Agent版本配置 AGENT_VERSIONS = { "v0": { "name": "Original LangGraph Agent", "type": "langgraph", "class_path": "agent.citu_agent.CituLangGraphAgent", "description": "简单场景,低token消耗", "features": ["database_query", "basic_chat", "context_aware"] }, "v1": { "name": "React Agent", "type": "react_agent", "class_path": "react_agent.agent.CustomReactAgent", "description": "智能场景,高token消耗", "features": ["advanced_reasoning", "tool_calling", "multi_step_planning"] } } # API路由配置 API_ROUTES = { "ask_agent": "v0", # 映射到原始版本 "ask_react_agent": "v1" # 映射到React版本 } # 性能配置 PERFORMANCE_CONFIG = { "v0": { "timeout": 30, "max_tokens": 2000, "cache_enabled": True }, "v1": { "timeout": 60, "max_tokens": 4000, "cache_enabled": True } } ``` --- ## 📋 五、启动方案调整 ### 🚀 新的启动方式 #### 开发环境启动 ```bash # 方式1:直接启动Flask (开发调试) python api.py # 方式2:使用Flask命令 export FLASK_APP=api.py flask run --host=0.0.0.0 --port=8084 --debug ``` #### 生产环境启动 ```bash # 方式1:使用uvicorn (推荐) uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --workers 1 # 方式2:使用Gunicorn + uvicorn worker (高并发) gunicorn asgi_app:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084 # 方式3:Docker部署 docker run -p 8084:8084 -e PORT=8084 your-app:latest ``` ### 📊 启动配置对比 | 启动方式 | 适用场景 | 性能 | 异步支持 | 推荐度 | |----------|----------|------|----------|--------| | `python api.py` | 开发调试 | 低 | ✅ | 开发环境 ⭐⭐⭐ | | `uvicorn` | 生产环境 | 高 | ✅ | 生产环境 ⭐⭐⭐⭐⭐ | | `gunicorn+uvicorn` | 高并发生产 | 最高 | ✅ | 大规模部署 ⭐⭐⭐⭐ | --- ## 📋 六、迁移时间表 ### 📅 详细时间规划 | 阶段 | 任务内容 | 预估时间 | 关键交付物 | 验收标准 | |------|----------|----------|------------|----------| | **Phase 1** | 目录迁移 + 路径修正 | 1天 | 新目录结构
路径修正完成 | ✅ 无导入错误
✅ 文件结构清晰 | | **Phase 2** | 日志服务统一 | 0.5天 | 统一日志配置 | ✅ 日志格式一致
✅ 日志级别正确 | | **Phase 3** | API整合 (非异步) | 2天 | 80%+ API可用
统一响应格式 | ✅ QA/Redis/Training API正常
✅ 响应格式标准化 | | **Phase 4** | ask_agent异步改造 | 1天 | 100% API可用 | ✅ ask_agent正常工作
✅ 异步调用无阻塞 | | **Phase 5** | 配置统一 + 测试 | 1天 | 完整迁移 | ✅ 全功能测试通过
✅ 性能无明显下降 | ### 📊 里程碑检查点 - **M1 (Day 1)**: 目录结构迁移完成,无编译错误 - **M2 (Day 2)**: 非异步API全部正常工作 - **M3 (Day 4)**: ask_agent异步版本正常工作 - **M4 (Day 5)**: 完整功能验证,性能测试通过 --- ## 📋 七、风险评估与缓解措施 ### ⚠️ 高风险项 | 风险项 | 风险等级 | 影响范围 | 缓解措施 | |--------|----------|----------|----------| | **异步兼容性问题** | 🔴 高 | ask_agent API | 1. 充分的异步测试
2. 渐进式部署
3. 快速回滚机制 | | **依赖冲突** | 🟡 中 | 整个应用 | 1. 虚拟环境隔离
2. 依赖版本锁定
3. 逐步验证依赖 | | **性能影响** | 🟡 中 | 系统性能 | 1. 性能基准测试
2. 监控指标设置
3. 负载测试 | ### 🟢 低风险项 | 风险项 | 风险等级 | 缓解措施 | |--------|----------|----------| | **配置管理复杂度** | 🟢 低 | 统一配置文件,清晰文档 | | **开发团队学习成本** | 🟢 低 | 详细文档,代码注释 | | **测试覆盖不足** | 🟢 低 | 分阶段测试,自动化测试 | ### 🛡️ 风险缓解策略 #### 技术风险缓解 1. **异步测试环境**:单独搭建异步测试环境 2. **性能监控**:部署前后性能对比 3. **灰度发布**:先发布到测试环境,再到生产环境 #### 业务风险缓解 1. **向后兼容**:保持所有现有API不变 2. **快速回滚**:保留 `citu_app.py` 作为备用启动方案 3. **用户通知**:提前通知用户API变更计划 --- ## 📋 八、测试验证计划 ### 🧪 测试范围 #### 功能测试 ```bash # API功能测试 python -m pytest tests/test_api_migration.py -v # Agent功能测试 python -m pytest tests/test_agent_compatibility.py -v # 异步功能测试 python -m pytest tests/test_async_operations.py -v ``` #### 性能测试 ```bash # 并发测试 ab -n 1000 -c 10 http://localhost:8084/api/v0/ask_agent # 压力测试 locust -f tests/locust_test.py --host=http://localhost:8084 ``` #### 兼容性测试 - ✅ 所有现有API调用方式保持不变 - ✅ 响应格式完全一致 - ✅ 错误处理机制一致 ### 📊 验收标准 | 测试项 | 通过标准 | 备注 | |--------|----------|------| | **功能测试** | 100%通过 | 所有API正常响应 | | **性能测试** | 响应时间不超过原版本20% | 可接受的性能损失 | | **并发测试** | 支持与原版本相同的并发量 | 无明显性能下降 | | **错误处理** | 错误响应格式一致 | 保持用户体验 | --- ## 📋 九、部署和监控 ### 🚀 部署策略 #### 阶段性部署 1. **开发环境部署** (Day 1-3) 2. **测试环境部署** (Day 4) 3. **预生产环境部署** (Day 5) 4. **生产环境部署** (Day 6+) #### 部署检查清单 - [ ] 依赖安装完成 - [ ] 配置文件正确 - [ ] 数据库连接正常 - [ ] Redis连接正常 - [ ] 日志输出正常 - [ ] 健康检查通过 - [ ] API功能验证 - [ ] 性能指标正常 ### 📊 监控指标 #### 关键指标 ```python # 监控配置示例 MONITORING_METRICS = { "api_response_time": { "ask_agent": "< 5s", "ask_react_agent": "< 10s", "other_apis": "< 2s" }, "error_rate": "< 1%", "concurrent_users": "> 50", "memory_usage": "< 2GB", "cpu_usage": "< 80%" } ``` #### 告警设置 - 🚨 API响应时间超过阈值 - 🚨 错误率超过1% - 🚨 内存使用率超过90% - 🚨 异步任务堆积 --- ## 📋 十、维护和后续规划 ### 🔧 维护计划 #### 短期维护 (1-3个月) - **监控优化**:根据实际使用情况调整监控指标 - **性能调优**:根据性能数据进行优化 - **bug修复**:处理迁移过程中发现的问题 #### 中期规划 (3-6个月) - **功能增强**:基于用户反馈增加新功能 - **代码重构**:优化代码结构和性能 - **测试完善**:增加自动化测试覆盖率 #### 长期规划 (6个月+) - **架构优化**:考虑微服务拆分 - **云原生改造**:支持容器化部署 - **版本整合**:逐步淘汰旧版本API ### 📈 后续优化方向 1. **性能优化** - 异步操作优化 - 缓存策略改进 - 数据库查询优化 2. **功能增强** - 更多Agent类型支持 - 高级路由策略 - 智能负载均衡 3. **运维改进** - 自动化部署 - 容器化支持 - 监控告警完善 --- ## 📋 十一、总结和建议 ### ✅ 方案可行性评估 | 评估维度 | 评分 | 说明 | |----------|------|------| | **技术可行性** | ⭐⭐⭐⭐⭐ | Flask 3.1.1完全支持异步,技术方案成熟 | | **实施复杂度** | ⭐⭐⭐⭐ | 大部分API可直接迁移,复杂度可控 | | **风险控制** | ⭐⭐⭐⭐ | 风险识别充分,缓解措施明确 | | **维护成本** | ⭐⭐⭐⭐ | 架构清晰,维护成本合理 | | **扩展性** | ⭐⭐⭐⭐⭐ | 支持多版本Agent,扩展性强 | ### 🎯 核心优势 1. **技术先进性**:基于Flask 3.1.1 + ASGI的现代化架构 2. **向后兼容性**:所有现有API保持不变 3. **架构清晰性**:两个Agent版本职责分明 4. **扩展性**:支持未来增加更多Agent类型 5. **维护性**:模块化设计,便于独立维护 ### 💡 实施建议 #### 优先级建议 1. **第一优先级**:完成基础迁移,确保现有功能不受影响 2. **第二优先级**:优化异步性能,提升用户体验 3. **第三优先级**:增强监控和告警,确保系统稳定性 #### 团队协作建议 1. **分工明确**:前端团队关注API兼容性,后端团队关注性能优化 2. **沟通机制**:建立日常同步机制,及时发现和解决问题 3. **文档维护**:保持文档与代码同步更新 ### ⚡ 快速开始 如果您认可这个方案,建议按以下顺序开始实施: 1. **立即开始**:Phase 1 目录迁移 (风险最低) 2. **并行进行**:Phase 2 日志统一 (可与Phase 1并行) 3. **重点关注**:Phase 4 异步改造 (核心技术难点) 4. **全面测试**:Phase 5 测试验证 (确保质量) --- ## 📞 联系和支持 如果在实施过程中遇到问题,建议: 1. **参考文档**:优先查阅本文档和相关技术文档 2. **代码注释**:查看迁移后的代码注释和说明 3. **测试用例**:参考测试用例了解预期行为 4. **性能监控**:关注监控指标,及时发现问题 --- **文档版本**: v1.0 **创建日期**: 2025-01-15 **最后更新**: 2025-01-15 **文档状态**: 待审核