migration_and_integration_plan.md 22 KB

Custom React Agent 迁移和整合方案

📋 项目背景

本文档描述了将 test/custom_react_agent 模块迁移到项目主体,并与现有 agent 目录下的API进行整合的完整方案。

🎯 整合目标

  1. 模块迁移:将 test/custom_react_agent 从测试目录迁移到项目根目录
  2. API整合:将 citu_app.py 中的API与 custom_react_agent/api.py 进行合并
  3. 异步统一:确保所有API在异步环境下正常运行
  4. 双版本并存:保持原有 ask_agent 和新的 ask_react_agent 同时可用

🏗️ 技术架构

  • Framework: Python + Flask v3.1.1 + LangGraph + LangChain
  • 异步支持: Flask 3.1.1 + ASGI (asgiref.wsgi.WsgiToAsgi)
  • 启动方式: asgi_app.py + api.py

📋 一、API兼容性分析

✅ 完全兼容的API (可直接迁移)

API类别 数量 API列表 兼容性
QA反馈系统 6个 /qa_feedback/query
/qa_feedback/add
/qa_feedback/delete/{id}
/qa_feedback/update/{id}
/qa_feedback/add_to_training
/qa_feedback/stats
✅ 直接迁移
Redis对话管理 8个 /user/{user_id}/conversations
/conversation/{conv_id}/messages
/conversation_stats
/conversation_cleanup
/embedding_cache_stats
/embedding_cache_cleanup
/qa_cache_stats
/qa_cache_cleanup
✅ 直接迁移
训练数据管理 4个 /training_data/stats
/training_data/query
/training_data/create
/training_data/delete
✅ 直接迁移
Data Pipeline 10+个 /data_pipeline/tasks
/data_pipeline/tasks/{id}/execute
/database/tables
/database/table/ddl
✅ 直接迁移

⚙️ 需要异步改造的API

API 改造需求 解决方案
/api/v0/ask_agent 异步包装 使用 asyncio.run() 包装 agent 调用

📊 迁移工作量评估

  • 可直接迁移: 28+ API (90%)
  • 需要改造: 1个 API (10%)
  • 预计工作量: 3-4天

📋 二、API命名方案

🎯 推荐方案:使用不同的名字 (方案B)

原始API:     /api/v0/ask_agent          # 简单场景,消耗token较少
新React API: /api/v0/ask_react_agent    # 智能场景,不介意token消耗高

🌟 方案优势

优势 说明
语义清晰 名字直接体现技术架构差异
向后兼容 现有客户端无需修改
维护简单 每个API有独立的代码路径
扩展性好 未来可以增加更多agent类型
并行开发 两个团队可以独立维护不同版本

🚫 不推荐的方案

  • 方案A: 版本号区分 (/api/v0/ask_agent, /api/v1/ask_agent) - 容易产生版本管理混乱
  • 方案C: 配置控制统一入口 - 增加配置复杂度,调试困难

📋 三、目录迁移规划

🏗️ 推荐目录结构

项目根目录/
├── agent/                    # 保留原有agent (v0版本)
│   ├── __init__.py
│   ├── citu_agent.py
│   ├── classifier.py
│   ├── config.py
│   ├── state.py
│   └── tools/
├── react_agent/             # 迁移custom_react_agent到这里 (v1版本)
│   ├── __init__.py
│   ├── agent.py            # 从test/custom_react_agent/agent.py
│   ├── config.py           # 合并配置到统一配置文件
│   ├── state.py            # 从test/custom_react_agent/state.py
│   ├── sql_tools.py        # 从test/custom_react_agent/sql_tools.py
│   └── requirements.txt    # 依赖清单
├── config/                  # 统一配置目录
│   ├── agent_config.py     # 新增:Agent配置管理
│   └── logging_config.yaml # 现有:日志配置
├── api.py                   # 统一的API入口 (基于custom_react_agent/api.py改造)
├── asgi_app.py             # ASGI启动器 (从custom_react_agent迁移)
├── citu_app.py             # 逐步废弃,保留作为过渡
└── test/
    └── custom_react_agent/ # 迁移完成后删除

📁 迁移策略

  1. 并行共存:两个agent版本暂时并存
  2. 渐进迁移:分阶段迁移用户到新API
  3. 最终清理:稳定后删除旧版本代码

📋 四、详细迁移步骤

🚀 Phase 1: 目录结构迁移 (1天)

Step 1.1: 创建新目录结构

# 创建新目录
mkdir -p react_agent
mkdir -p config

# 迁移核心文件
cp test/custom_react_agent/agent.py react_agent/
cp test/custom_react_agent/state.py react_agent/
cp test/custom_react_agent/sql_tools.py react_agent/
cp test/custom_react_agent/requirements.txt react_agent/

# 迁移API和启动文件到根目录
cp test/custom_react_agent/api.py ./
cp test/custom_react_agent/asgi_app.py ./

# 创建__init__.py文件
touch react_agent/__init__.py

Step 1.2: 路径修正

修改所有导入路径,从 test.custom_react_agent 改为 react_agent

🔧 Phase 2: 日志服务统一 (0.5天)

Step 2.1: 修改React Agent配置

# react_agent/config.py (修改后)
import os
from core.logging import get_agent_logger, initialize_logging

# 移除自定义日志配置,使用项目统一日志
logger = get_agent_logger("ReactAgent")

# 保留其他配置,但与app_config.py保持一致
from app_config import (
    LLM_MODEL_TYPE,
    API_LLM_MODEL, 
    API_QIANWEN_CONFIG,
    REDIS_URL
)

# React Agent特定配置
REACT_AGENT_CONFIG = {
    "default_user_id": "guest",
    "max_retries": 3,
    "network_timeout": 60,
    "debug_mode": True
}

Step 2.2: 更新Agent实现

修改 react_agent/agent.py 中的日志调用:

# 替换现有的日志导入
from core.logging import get_agent_logger

class CustomReactAgent:
    def __init__(self):
        self.logger = get_agent_logger("ReactAgent")
        # ... 其他初始化代码

🔗 Phase 3: API整合 (2天)

Step 3.1: 整合API结构

修改根目录的 api.py

# api.py (整合后的结构)
"""
统一API服务入口
整合原有agent API和React Agent API
"""
import asyncio
import logging
from flask import Flask, request, jsonify
from datetime import datetime

# 统一日志和响应格式
from core.logging import get_app_logger, initialize_logging
from common.result import (
    success_response, bad_request_response, 
    agent_success_response, agent_error_response,
    internal_error_response
)

# 初始化日志
initialize_logging()
logger = get_app_logger("UnifiedAPI")

# Agent实例导入
from agent.citu_agent import get_citu_langraph_agent
from react_agent.agent import CustomReactAgent

# 公共模块导入
from common.redis_conversation_manager import RedisConversationManager
from common.qa_feedback_manager import QAFeedbackManager

# 创建Flask应用
app = Flask(__name__)

# === 健康检查 ===
@app.route("/")
def root():
    return jsonify({"message": "统一API服务正在运行", "version": "v1.0"})

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查端点"""
    try:
        health_status = {
            "status": "healthy",
            "services": {
                "original_agent": "available",
                "react_agent": "available",
                "redis": "checking",
                "database": "checking"
            },
            "timestamp": datetime.now().isoformat()
        }
        return jsonify(health_status), 200
    except Exception as e:
        logger.error(f"健康检查失败: {e}")
        return jsonify({"status": "unhealthy", "error": str(e)}), 500

# === React Agent API (新版本) ===
@app.route('/api/v0/ask_react_agent', methods=['POST'])
async def ask_react_agent():
    """React Agent API - 智能场景,高token消耗"""
    # 保持现有custom_react_agent的实现
    # ... (从原api.py迁移代码)

# === 原始Agent API (兼容版本) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    """原始Agent API - 简单场景,低token消耗 (异步改造版)"""
    try:
        # ... 参数处理逻辑从citu_app.py迁移 ...
        
        # 关键改造点:异步调用包装
        agent = get_citu_langraph_agent()
        agent_result = asyncio.run(agent.process_question(
            question=enhanced_question,
            session_id=browser_session_id,
            context_type=context_type,
            routing_mode=effective_routing_mode
        ))
        
        # ... 结果处理逻辑 ...
        
    except Exception as e:
        logger.error(f"ask_agent执行失败: {str(e)}")
        return jsonify(agent_error_response(
            response_text="查询处理失败,请稍后重试",
            error_type="agent_processing_failed"
        )), 500

# === QA反馈系统API (直接迁移) ===
@app.route('/api/v0/qa_feedback/query', methods=['POST'])
def qa_feedback_query():
    """查询反馈记录API"""
    # 从citu_app.py直接迁移代码
    # ... 

@app.route('/api/v0/qa_feedback/add', methods=['POST'])
def qa_feedback_add():
    """添加反馈记录API"""
    # 从citu_app.py直接迁移代码
    # ...

# === Redis对话管理API (直接迁移) ===
@app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id):
    """获取用户对话列表"""
    # 从citu_app.py直接迁移代码
    # ...

# === 训练数据管理API (直接迁移) ===
@app.route('/api/v0/training_data/stats', methods=['GET'])
def training_data_stats():
    """获取训练数据统计信息"""
    # 从citu_app.py直接迁移代码
    # ...

# === Data Pipeline API (直接迁移) ===
@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
def create_data_pipeline_task():
    """创建数据管道任务"""
    # 从citu_app.py直接迁移代码
    # ...

# Flask应用启动配置
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8084, debug=True)

Step 3.2: 修改ASGI启动器

# asgi_app.py
"""
ASGI应用启动文件 - 统一版本
支持异步操作的生产环境启动
"""
from asgiref.wsgi import WsgiToAsgi
from api import app

# 将Flask WSGI应用转换为ASGI应用
asgi_app = WsgiToAsgi(app)

# 启动命令:
# uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --reload

⚙️ Phase 4: 异步改造重点 (1天)

核心改造:ask_agent异步包装

@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
    """支持对话上下文的ask_agent API - 异步改造版本"""
    req = request.get_json(force=True)
    question = req.get("question", None)
    # ... 其他参数处理 ...
    
    if not question:
        return jsonify(bad_request_response(
            response_text="缺少必需参数:question",
            missing_params=["question"]
        )), 400
    
    try:
        # 1. 上下文处理 (同步部分)
        user_id = redis_conversation_manager.resolve_user_id(...)
        conversation_id, conversation_status = redis_conversation_manager.resolve_conversation_id(...)
        context = redis_conversation_manager.get_context(conversation_id)
        
        # 2. 检查缓存 (同步部分)
        cached_answer = redis_conversation_manager.get_cached_answer(question, context)
        if cached_answer:
            return jsonify(agent_success_response(...))
        
        # 3. 关键改造:异步Agent调用
        agent = get_citu_langraph_agent()
        
        # 创建异步包装函数
        async def process_with_agent():
            return await agent.process_question(
                question=enhanced_question,
                session_id=browser_session_id, 
                context_type=context_type,
                routing_mode=effective_routing_mode
            )
        
        # 在同步上下文中执行异步操作
        agent_result = asyncio.run(process_with_agent())
        
        # 4. 结果处理 (同步部分)
        if agent_result.get("success", False):
            # 保存消息到Redis
            redis_conversation_manager.save_message(...)
            # 缓存结果
            redis_conversation_manager.cache_answer(...)
            
            return jsonify(agent_success_response(...))
        else:
            return jsonify(agent_error_response(...))
            
    except Exception as e:
        logger.error(f"ask_agent执行失败: {str(e)}")
        return jsonify(internal_error_response(
            response_text="查询处理失败,请稍后重试"
        )), 500

📊 Phase 5: 配置统一 (0.5天)

创建统一Agent配置

# config/agent_config.py
"""
Agent配置统一管理
"""
from app_config import *  # 继承主配置

# Agent版本配置
AGENT_VERSIONS = {
    "v0": {
        "name": "Original LangGraph Agent",
        "type": "langgraph",
        "class_path": "agent.citu_agent.CituLangGraphAgent",
        "description": "简单场景,低token消耗",
        "features": ["database_query", "basic_chat", "context_aware"]
    },
    "v1": {
        "name": "React Agent",
        "type": "react_agent",
        "class_path": "react_agent.agent.CustomReactAgent", 
        "description": "智能场景,高token消耗",
        "features": ["advanced_reasoning", "tool_calling", "multi_step_planning"]
    }
}

# API路由配置
API_ROUTES = {
    "ask_agent": "v0",           # 映射到原始版本
    "ask_react_agent": "v1"      # 映射到React版本
}

# 性能配置
PERFORMANCE_CONFIG = {
    "v0": {
        "timeout": 30,
        "max_tokens": 2000,
        "cache_enabled": True
    },
    "v1": {
        "timeout": 60,
        "max_tokens": 4000, 
        "cache_enabled": True
    }
}

📋 五、启动方案调整

🚀 新的启动方式

开发环境启动

# 方式1:直接启动Flask (开发调试)
python api.py

# 方式2:使用Flask命令
export FLASK_APP=api.py
flask run --host=0.0.0.0 --port=8084 --debug

生产环境启动

# 方式1:使用uvicorn (推荐)
uvicorn asgi_app:asgi_app --host 0.0.0.0 --port 8084 --workers 1

# 方式2:使用Gunicorn + uvicorn worker (高并发)
gunicorn asgi_app:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084

# 方式3:Docker部署
docker run -p 8084:8084 -e PORT=8084 your-app:latest

📊 启动配置对比

启动方式 适用场景 性能 异步支持 推荐度
python api.py 开发调试 开发环境 ⭐⭐⭐
uvicorn 生产环境 生产环境 ⭐⭐⭐⭐⭐
gunicorn+uvicorn 高并发生产 最高 大规模部署 ⭐⭐⭐⭐

📋 六、迁移时间表

📅 详细时间规划

阶段 任务内容 预估时间 关键交付物 验收标准
Phase 1 目录迁移 + 路径修正 1天 新目录结构
路径修正完成
✅ 无导入错误
✅ 文件结构清晰
Phase 2 日志服务统一 0.5天 统一日志配置 ✅ 日志格式一致
✅ 日志级别正确
Phase 3 API整合 (非异步) 2天 80%+ API可用
统一响应格式
✅ QA/Redis/Training API正常
✅ 响应格式标准化
Phase 4 ask_agent异步改造 1天 100% API可用 ✅ ask_agent正常工作
✅ 异步调用无阻塞
Phase 5 配置统一 + 测试 1天 完整迁移 ✅ 全功能测试通过
✅ 性能无明显下降

📊 里程碑检查点

  • M1 (Day 1): 目录结构迁移完成,无编译错误
  • M2 (Day 2): 非异步API全部正常工作
  • M3 (Day 4): ask_agent异步版本正常工作
  • M4 (Day 5): 完整功能验证,性能测试通过

📋 七、风险评估与缓解措施

⚠️ 高风险项

风险项 风险等级 影响范围 缓解措施
异步兼容性问题 🔴 高 ask_agent API 1. 充分的异步测试
2. 渐进式部署
3. 快速回滚机制
依赖冲突 🟡 中 整个应用 1. 虚拟环境隔离
2. 依赖版本锁定
3. 逐步验证依赖
性能影响 🟡 中 系统性能 1. 性能基准测试
2. 监控指标设置
3. 负载测试

🟢 低风险项

风险项 风险等级 缓解措施
配置管理复杂度 🟢 低 统一配置文件,清晰文档
开发团队学习成本 🟢 低 详细文档,代码注释
测试覆盖不足 🟢 低 分阶段测试,自动化测试

🛡️ 风险缓解策略

技术风险缓解

  1. 异步测试环境:单独搭建异步测试环境
  2. 性能监控:部署前后性能对比
  3. 灰度发布:先发布到测试环境,再到生产环境

业务风险缓解

  1. 向后兼容:保持所有现有API不变
  2. 快速回滚:保留 citu_app.py 作为备用启动方案
  3. 用户通知:提前通知用户API变更计划

📋 八、测试验证计划

🧪 测试范围

功能测试

# API功能测试
python -m pytest tests/test_api_migration.py -v

# Agent功能测试  
python -m pytest tests/test_agent_compatibility.py -v

# 异步功能测试
python -m pytest tests/test_async_operations.py -v

性能测试

# 并发测试
ab -n 1000 -c 10 http://localhost:8084/api/v0/ask_agent

# 压力测试
locust -f tests/locust_test.py --host=http://localhost:8084

兼容性测试

  • ✅ 所有现有API调用方式保持不变
  • ✅ 响应格式完全一致
  • ✅ 错误处理机制一致

📊 验收标准

测试项 通过标准 备注
功能测试 100%通过 所有API正常响应
性能测试 响应时间不超过原版本20% 可接受的性能损失
并发测试 支持与原版本相同的并发量 无明显性能下降
错误处理 错误响应格式一致 保持用户体验

📋 九、部署和监控

🚀 部署策略

阶段性部署

  1. 开发环境部署 (Day 1-3)
  2. 测试环境部署 (Day 4)
  3. 预生产环境部署 (Day 5)
  4. 生产环境部署 (Day 6+)

部署检查清单

  • 依赖安装完成
  • 配置文件正确
  • 数据库连接正常
  • Redis连接正常
  • 日志输出正常
  • 健康检查通过
  • API功能验证
  • 性能指标正常

📊 监控指标

关键指标

# 监控配置示例
MONITORING_METRICS = {
    "api_response_time": {
        "ask_agent": "< 5s",
        "ask_react_agent": "< 10s", 
        "other_apis": "< 2s"
    },
    "error_rate": "< 1%",
    "concurrent_users": "> 50",
    "memory_usage": "< 2GB",
    "cpu_usage": "< 80%"
}

告警设置

  • 🚨 API响应时间超过阈值
  • 🚨 错误率超过1%
  • 🚨 内存使用率超过90%
  • 🚨 异步任务堆积

📋 十、维护和后续规划

🔧 维护计划

短期维护 (1-3个月)

  • 监控优化:根据实际使用情况调整监控指标
  • 性能调优:根据性能数据进行优化
  • bug修复:处理迁移过程中发现的问题

中期规划 (3-6个月)

  • 功能增强:基于用户反馈增加新功能
  • 代码重构:优化代码结构和性能
  • 测试完善:增加自动化测试覆盖率

长期规划 (6个月+)

  • 架构优化:考虑微服务拆分
  • 云原生改造:支持容器化部署
  • 版本整合:逐步淘汰旧版本API

📈 后续优化方向

  1. 性能优化

    • 异步操作优化
    • 缓存策略改进
    • 数据库查询优化
  2. 功能增强

    • 更多Agent类型支持
    • 高级路由策略
    • 智能负载均衡
  3. 运维改进

    • 自动化部署
    • 容器化支持
    • 监控告警完善

📋 十一、总结和建议

✅ 方案可行性评估

评估维度 评分 说明
技术可行性 ⭐⭐⭐⭐⭐ Flask 3.1.1完全支持异步,技术方案成熟
实施复杂度 ⭐⭐⭐⭐ 大部分API可直接迁移,复杂度可控
风险控制 ⭐⭐⭐⭐ 风险识别充分,缓解措施明确
维护成本 ⭐⭐⭐⭐ 架构清晰,维护成本合理
扩展性 ⭐⭐⭐⭐⭐ 支持多版本Agent,扩展性强

🎯 核心优势

  1. 技术先进性:基于Flask 3.1.1 + ASGI的现代化架构
  2. 向后兼容性:所有现有API保持不变
  3. 架构清晰性:两个Agent版本职责分明
  4. 扩展性:支持未来增加更多Agent类型
  5. 维护性:模块化设计,便于独立维护

💡 实施建议

优先级建议

  1. 第一优先级:完成基础迁移,确保现有功能不受影响
  2. 第二优先级:优化异步性能,提升用户体验
  3. 第三优先级:增强监控和告警,确保系统稳定性

团队协作建议

  1. 分工明确:前端团队关注API兼容性,后端团队关注性能优化
  2. 沟通机制:建立日常同步机制,及时发现和解决问题
  3. 文档维护:保持文档与代码同步更新

⚡ 快速开始

如果您认可这个方案,建议按以下顺序开始实施:

  1. 立即开始:Phase 1 目录迁移 (风险最低)
  2. 并行进行:Phase 2 日志统一 (可与Phase 1并行)
  3. 重点关注:Phase 4 异步改造 (核心技术难点)
  4. 全面测试:Phase 5 测试验证 (确保质量)

📞 联系和支持

如果在实施过程中遇到问题,建议:

  1. 参考文档:优先查阅本文档和相关技术文档
  2. 代码注释:查看迁移后的代码注释和说明
  3. 测试用例:参考测试用例了解预期行为
  4. 性能监控:关注监控指标,及时发现问题

文档版本: v1.0
创建日期: 2025-01-15
最后更新: 2025-01-15
文档状态: 待审核