# Custom React Agent 完整迁移实施指南 ## 📋 文档说明 本文档是对现有 `migration_and_integration_plan.md` 的详细补充和具体实施指南,提供完整的代码迁移步骤、配置方案和测试验证计划。 **文档层次关系:** - `migration_and_integration_plan.md` - 总体方案概述 - `complete_migration_implementation_guide.md` (本文档) - 详细实施指南 ### 🔧 用户需求澄清 (2025-01-15更新) 根据用户反馈,明确以下关键要点: #### 1. API整合方式澄清 - ✅ 在项目根目录创建新的 `unified_api.py`(推荐命名) - ✅ 从 `citu_app.py` **复制**所需API到新文件(保留原文件) - ✅ 包含 `custom_react_agent/api.py` 的**全部内容** - ✅ **保留**原有的 `citu_app.py` 和 `test/custom_react_agent/api.py` 不变 #### 2. 配置文件策略调整 - ✅ `agent/` 目录**保持独立**的 `config.py` 文件 - ✅ `react_agent/` 目录**也保持独立**的 `config.py` 文件 - ❌ **不需要**创建统一的 `config/agent_config.py` - 📝 **理由**:每个模块保持独立配置更清晰,维护性更好 #### 3. 日志管理策略确认 - ✅ 使用项目**统一的日志管理服务**(`core.logging`) - ✅ 为 `react_agent` 设置**独立的日志文件**(仿照 `data_pipeline` 模式) - ✅ **经验证**:`agent/` 使用 `get_agent_logger("CituAgent")`,`data_pipeline` 有独立日志文件 - 📁 **日志文件位置**:`logs/react_agent_YYYYMMDD.log` --- ## 📋 一、API兼容性分析详细报告 ### ✅ 完全兼容API清单 (可直接迁移) #### 1. QA反馈系统API (6个) | API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 | |---------|------|----------|----------|----------| | `/api/v0/qa_feedback/query` | POST | 同步 | ⭐ 简单 | 30分钟 | | `/api/v0/qa_feedback/add` | POST | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/qa_feedback/delete/{feedback_id}` | DELETE | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/qa_feedback/update/{feedback_id}` | PUT | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/qa_feedback/add_to_training` | POST | 同步 | ⭐ 简单 | 30分钟 | | `/api/v0/qa_feedback/stats` | GET | 同步 | ⭐ 简单 | 15分钟 | #### 2. Redis对话管理API (8个) | API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 | |---------|------|----------|----------|----------| | `/api/v0/user/{user_id}/conversations` | GET | 同步 | ⭐ 简单 | 20分钟 | | `/api/v0/conversation/{conv_id}/messages` | GET | 同步 | ⭐ 简单 | 20分钟 | | `/api/v0/conversation_stats` | GET | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/conversation_cleanup` | POST | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/embedding_cache_stats` | GET | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/embedding_cache_cleanup` | POST | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/qa_cache_stats` | GET | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/qa_cache_cleanup` | POST | 同步 | ⭐ 简单 | 15分钟 | #### 3. 训练数据管理API (4个) | API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 | |---------|------|----------|----------|----------| | `/api/v0/training_data/stats` | GET | 同步 | ⭐ 简单 | 15分钟 | | `/api/v0/training_data/query` | POST | 同步 | ⭐ 简单 | 30分钟 | | `/api/v0/training_data/create` | POST | 同步 | ⭐ 简单 | 45分钟 | | `/api/v0/training_data/delete` | POST | 同步 | ⭐ 简单 | 30分钟 | #### 4. Data Pipeline API (10+个) | API类别 | 端点数量 | 迁移难度 | 预计时间 | |---------|----------|----------|----------| | 任务管理 | 5个 | ⭐⭐ 中等 | 2小时 | | 文件管理 | 3个 | ⭐ 简单 | 1小时 | | 数据库操作 | 2个 | ⭐ 简单 | 30分钟 | | 监控日志 | 2个 | ⭐ 简单 | 30分钟 | ### ⚙️ 需要异步改造的API #### 核心改造API | API端点 | 改造类型 | 技术难度 | 预计时间 | 风险等级 | |---------|----------|----------|----------|----------| | `/api/v0/ask_agent` | 异步包装 | ⭐⭐⭐ 复杂 | 4小时 | 🔴 高 | **改造技术方案:** ```python # 改造前 (citu_app.py) agent_result = asyncio.run(agent.process_question(...)) # 改造后 (新api.py) @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """异步包装版本""" try: # 同步部分:参数处理和缓存检查 # ... # 异步部分:Agent调用 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: agent_result = loop.run_until_complete( agent.process_question(...) ) finally: loop.close() # 同步部分:结果处理 # ... except Exception as e: # 错误处理 ``` --- ## 📋 二、API命名方案最终确认 ### 🎯 采用方案B:不同名字策略 ``` 原始API: /api/v0/ask_agent # 保持不变,简单场景 新React API: /api/v0/ask_react_agent # 新增,智能场景 ``` #### 命名规范详细说明 | API类型 | 命名格式 | 示例 | 适用场景 | |---------|----------|------|----------| | **原始Agent** | `/api/v0/{action}_agent` | `/api/v0/ask_agent` | 简单查询,低token消耗 | | **React Agent** | `/api/v0/{action}_react_agent` | `/api/v0/ask_react_agent` | 复杂推理,高token消耗 | | **其他API** | 保持不变 | `/api/v0/qa_feedback/query` | 所有其他功能API | #### 版本兼容性保证 ```python # 兼容性映射配置 API_COMPATIBILITY_MAP = { # 原有API保持不变 "/api/v0/ask_agent": { "handler": "ask_agent_v0", "agent_type": "langgraph", "deprecated": False }, # 新增React Agent API "/api/v0/ask_react_agent": { "handler": "ask_react_agent_v1", "agent_type": "react", "deprecated": False }, # 未来可能的扩展 "/api/v0/ask_advanced_agent": { "handler": "ask_advanced_agent_v2", "agent_type": "future", "deprecated": False } } ``` --- ## 📋 三、详细目录迁移操作步骤 ### 🏗️ Step-by-Step 迁移操作 #### Step 1: 创建新目录结构(已调整) ```bash # 1. 创建react_agent目录(不创建config目录) mkdir -p react_agent mkdir -p logs # 确保日志目录存在 # 2. 复制核心文件 (保留原文件,保持配置文件独立) cp test/custom_react_agent/agent.py react_agent/ cp test/custom_react_agent/state.py react_agent/ cp test/custom_react_agent/sql_tools.py react_agent/ cp test/custom_react_agent/shell.py react_agent/ cp test/custom_react_agent/enhanced_redis_api.py react_agent/ cp test/custom_react_agent/config.py react_agent/ # 保持原名,不重命名 # 3. 复制API文件到根目录(使用推荐命名) cp test/custom_react_agent/api.py ./unified_api.py # 使用推荐的文件名 cp test/custom_react_agent/asgi_app.py ./ # 保持原名,后续会修改导入 # 4. 创建初始化文件 echo "# React Agent Module" > react_agent/__init__.py # 5. 复制依赖文件 cp test/custom_react_agent/requirements.txt react_agent/ echo "✅ 目录结构创建完成" echo "📁 react_agent/ - React Agent模块" echo "📄 unified_api.py - 统一API入口" echo "📄 asgi_app.py - ASGI启动器" ``` #### Step 2: 路径修正脚本 ```python # scripts/fix_imports.py """ 自动修正导入路径的脚本 """ import os import re def fix_imports_in_file(file_path): """修正单个文件的导入路径""" with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # 修正规则 replacements = [ (r'from test\.custom_react_agent', 'from react_agent'), (r'import test\.custom_react_agent', 'import react_agent'), (r'from \.agent import', 'from react_agent.agent import'), (r'from \.config import', 'from react_agent.config_react import'), (r'from \.state import', 'from react_agent.state import'), (r'from \.sql_tools import', 'from react_agent.sql_tools import'), ] for pattern, replacement in replacements: content = re.sub(pattern, replacement, content) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) def fix_all_imports(): """批量修正所有文件的导入路径""" react_agent_files = [ 'react_agent/agent.py', 'react_agent/state.py', 'react_agent/sql_tools.py', 'react_agent/shell.py', 'react_agent/enhanced_redis_api.py', 'unified_api.py', # 使用正确的文件名 'asgi_app.py' # 使用正确的文件名 ] for file_path in react_agent_files: if os.path.exists(file_path): fix_imports_in_file(file_path) print(f"✅ 已修正: {file_path}") else: print(f"❌ 文件不存在: {file_path}") if __name__ == "__main__": fix_all_imports() ``` #### Step 3: 验证迁移结果 ```bash # 运行路径修正脚本 python scripts/fix_imports.py # 验证Python语法 python -m py_compile react_agent/agent.py python -m py_compile react_agent/state.py python -m py_compile react_agent/sql_tools.py python -m py_compile api_unified.py # 输出验证结果 echo "✅ 目录迁移完成,语法检查通过" ``` --- ## 📋 四、日志服务统一详细方案 ### 🔧 React Agent独立日志配置 基于用户需求和现有实践,为React Agent设置独立日志文件,仿照`data_pipeline`模式。 #### 📊 现有日志系统分析 经验证,项目中的日志使用情况: - **`agent/`**: 使用 `get_agent_logger("CituAgent")` 统一日志系统 - **`data_pipeline/`**: 使用独立日志文件 `./data_pipeline/training_data/{task_id}/data_pipeline.log` - **方案**: React Agent使用统一日志系统但输出到独立文件 #### Step 1: 创建React Agent日志管理器 ```python # react_agent/logger.py (新建) """ React Agent 独立日志管理器 仿照data_pipeline模式,使用统一日志系统但输出到独立文件 """ import os from pathlib import Path from datetime import datetime from core.logging import get_agent_logger class ReactAgentLogManager: """React Agent 日志管理器""" _logger_instance = None _file_handler = None @classmethod def get_logger(cls, name: str = "ReactAgent"): """ 获取React Agent专用logger 使用统一日志系统但输出到独立文件 """ if cls._logger_instance is None: cls._logger_instance = cls._create_logger(name) return cls._logger_instance @classmethod def _create_logger(cls, name: str): """创建独立文件的logger""" # 使用统一日志系统获取logger logger = get_agent_logger(name) # 添加独立的文件处理器 cls._add_file_handler(logger) return logger @classmethod def _add_file_handler(cls, logger): """添加独立的文件处理器""" try: # 确保日志目录存在 project_root = Path(__file__).parent.parent log_dir = project_root / "logs" log_dir.mkdir(exist_ok=True) # 按日期创建日志文件 today = datetime.now().strftime("%Y%m%d") log_file = log_dir / f"react_agent_{today}.log" # 创建文件处理器 import logging file_handler = logging.FileHandler(log_file, encoding='utf-8') file_handler.setLevel(logging.DEBUG) # 设置格式 formatter = logging.Formatter( '%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) file_handler.setFormatter(formatter) # 添加到logger(不影响原有的控制台输出) logger.addHandler(file_handler) cls._file_handler = file_handler logger.info(f"✅ React Agent独立日志文件已创建: {log_file}") except Exception as e: logger.warning(f"⚠️ 创建React Agent独立日志文件失败: {e}") @classmethod def cleanup(cls): """清理资源""" if cls._file_handler: cls._file_handler.close() cls._file_handler = None # 对外接口 def get_react_agent_logger(name: str = "ReactAgent"): """获取React Agent专用logger""" return ReactAgentLogManager.get_logger(name) ``` #### Step 2: 修改React Agent配置 ```python # react_agent/config.py (修改后) """ React Agent 独立配置 保持独立配置,但使用统一日志系统和独立日志文件 """ import os from .logger import get_react_agent_logger # 使用React Agent专用logger logger = get_react_agent_logger("ReactAgentConfig") # 继承主配置 try: from app_config import ( LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG, REDIS_URL, VECTOR_DB_TYPE ) logger.info("✅ 成功加载主配置文件") except ImportError as e: logger.warning(f"⚠️ 主配置加载失败,使用默认配置: {e}") # 默认配置 REDIS_URL = "redis://localhost:6379" LLM_MODEL_TYPE = "api" # React Agent 特定配置 REACT_AGENT_CONFIG = { "default_user_id": "guest", "max_retries": 3, "retry_base_delay": 3, "network_timeout": 60, "debug_mode": True, "max_log_length": 1000 } # HTTP连接配置 HTTP_CONFIG = { "max_connections": 10, "max_keepalive_connections": 5, "keepalive_expiry": 30.0, "connect_timeout": 10.0, "pool_timeout": 5.0 } logger.info("✅ React Agent配置初始化完成") ``` #### Step 3: 更新Agent实现类 ```python # react_agent/agent.py (关键修改部分) """ Custom React Agent 实现 使用统一日志系统和独立日志文件 """ from .logger import get_react_agent_logger from .config import REACT_AGENT_CONFIG class CustomReactAgent: def __init__(self): # 使用React Agent专用logger self.logger = get_react_agent_logger("ReactAgent.Core") self.config = REACT_AGENT_CONFIG self.logger.info("🚀 CustomReactAgent 初始化开始") # 其他初始化逻辑... self.logger.info("✅ CustomReactAgent 初始化完成") async def process_question(self, question: str, **kwargs): """处理问题的主要方法""" self.logger.info(f"📝 开始处理问题: {question[:100]}...") try: # 处理逻辑... result = await self._internal_process(question, **kwargs) self.logger.info("✅ 问题处理完成") return result except Exception as e: self.logger.error(f"❌ 问题处理失败: {str(e)}") raise def cleanup(self): """清理资源""" self.logger.info("🧹 开始清理React Agent资源") # 清理逻辑... # 清理日志资源 from .logger import ReactAgentLogManager ReactAgentLogManager.cleanup() ``` #### Step 4: 日志文件组织结构 ``` logs/ ├── app.log # 主应用日志(原有) ├── react_agent_20250115.log # React Agent独立日志(新增) ├── react_agent_20250116.log # 按日期轮换 └── data_pipeline/ # Data Pipeline日志目录(原有) └── task_20250115_143052/ └── data_pipeline.log ``` #### Step 5: 验证日志配置 ```python # scripts/test_react_agent_logging.py """ 验证React Agent日志配置 """ def test_react_agent_logging(): """测试React Agent日志功能""" # 测试日志系统 from react_agent.logger import get_react_agent_logger logger = get_react_agent_logger("TestLogger") logger.info("测试 React Agent 日志系统") logger.warning("测试警告日志") logger.error("测试错误日志") print("✅ React Agent日志系统测试完成") print("📁 请检查 logs/react_agent_YYYYMMDD.log 文件") if __name__ == "__main__": test_react_agent_logging() from core.logging import get_agent_logger, initialize_logging # 使用项目统一日志系统 logger = get_agent_logger("ReactAgent") # 继承主配置 try: from app_config import ( LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG, REDIS_URL, VECTOR_DB_TYPE ) logger.info("✅ 成功加载主配置文件") except ImportError as e: logger.warning(f"⚠️ 主配置加载失败,使用默认配置: {e}") # 默认配置 REDIS_URL = "redis://localhost:6379" LLM_MODEL_TYPE = "api" # React Agent 特定配置 REACT_AGENT_CONFIG = { "default_user_id": "guest", "max_retries": 3, "retry_base_delay": 3, "network_timeout": 60, "debug_mode": True, "max_log_length": 1000 } # HTTP连接配置 HTTP_CONFIG = { "max_connections": 10, "max_keepalive_connections": 5, "keepalive_expiry": 30.0, "connect_timeout": 10.0, "pool_timeout": 5.0 } logger.info("✅ React Agent配置初始化完成") ``` #### Step 2: 修改Agent实现类 ```python # react_agent/agent.py (关键修改部分) """ Custom React Agent 实现 统一使用项目日志系统 """ from core.logging import get_agent_logger from .config_react import REACT_AGENT_CONFIG, logger as config_logger class CustomReactAgent: def __init__(self): # 使用统一日志系统 self.logger = get_agent_logger("ReactAgent.Core") self.config = REACT_AGENT_CONFIG self.logger.info("🚀 CustomReactAgent 初始化开始") # 其他初始化逻辑... self.logger.info("✅ CustomReactAgent 初始化完成") async def process_question(self, question: str, **kwargs): """处理问题的主要方法""" self.logger.info(f"📝 开始处理问题: {question[:100]}...") try: # 处理逻辑... result = await self._internal_process(question, **kwargs) self.logger.info("✅ 问题处理完成") return result except Exception as e: self.logger.error(f"❌ 问题处理失败: {str(e)}") raise ``` #### Step 3: 日志格式统一验证 ```python # scripts/verify_logging.py """ 验证日志格式统一性 """ import logging from core.logging import get_agent_logger def test_logging_consistency(): """测试日志格式一致性""" # 测试不同模块的日志格式 loggers = { "CituApp": get_agent_logger("CituApp"), "ReactAgent": get_agent_logger("ReactAgent"), "UnifiedAPI": get_agent_logger("UnifiedAPI") } for name, logger in loggers.items(): logger.info(f"测试 {name} 模块日志格式") logger.warning(f"测试 {name} 模块警告日志") logger.error(f"测试 {name} 模块错误日志") print("✅ 日志格式统一性测试完成") if __name__ == "__main__": test_logging_consistency() ``` --- ## 📋 五、API整合详细实施方案(已调整) ### 📋 API整合策略说明 基于用户澄清,API整合采用**复制策略**而非合并策略: 1. **保留原文件**:`citu_app.py` 和 `test/custom_react_agent/api.py` 保持不变 2. **创建新文件**:在根目录创建 `unified_api.py` 3. **复制内容**: - 从 `citu_app.py` **复制**需要的API到 `unified_api.py` - 包含 `custom_react_agent/api.py` 的**全部内容** 4. **独立运行**:新的 `unified_api.py` 可以独立提供所有服务 ### 🔗 统一API文件结构 ```python # unified_api.py (完整结构) """ 统一API服务入口 复制原有agent API、包含React Agent API的全部内容和所有管理API 注意:这是一个独立的API文件,不影响原有的citu_app.py和test/custom_react_agent/api.py """ import asyncio import logging import atexit from datetime import datetime from typing import Optional, Dict, Any from flask import Flask, request, jsonify from asgiref.wsgi import WsgiToAsgi # === 核心导入 === from core.logging import get_app_logger, initialize_logging from common.result import ( success_response, bad_request_response, not_found_response, internal_error_response, agent_success_response, agent_error_response, validation_failed_response, service_unavailable_response ) # === Agent导入 === try: from agent.citu_agent import get_citu_langraph_agent ORIGINAL_AGENT_AVAILABLE = True except ImportError as e: print(f"⚠️ 原始Agent不可用: {e}") ORIGINAL_AGENT_AVAILABLE = False try: from react_agent.agent import CustomReactAgent REACT_AGENT_AVAILABLE = True except ImportError as e: print(f"⚠️ React Agent不可用: {e}") REACT_AGENT_AVAILABLE = False # === 公共服务导入 === from common.redis_conversation_manager import RedisConversationManager from common.qa_feedback_manager import QAFeedbackManager # === 初始化 === initialize_logging() logger = get_app_logger("UnifiedAPI") # 创建Flask应用 app = Flask(__name__) # 全局实例 _original_agent = None _react_agent = None _redis_manager = RedisConversationManager() _qa_manager = QAFeedbackManager() # === 应用生命周期管理 === def initialize_agents(): """初始化Agent实例""" global _original_agent, _react_agent if ORIGINAL_AGENT_AVAILABLE and _original_agent is None: try: _original_agent = get_citu_langraph_agent() logger.info("✅ 原始Agent初始化成功") except Exception as e: logger.error(f"❌ 原始Agent初始化失败: {e}") if REACT_AGENT_AVAILABLE and _react_agent is None: try: _react_agent = CustomReactAgent() logger.info("✅ React Agent初始化成功") except Exception as e: logger.error(f"❌ React Agent初始化失败: {e}") def cleanup_resources(): """清理资源""" global _original_agent, _react_agent logger.info("🧹 开始清理资源...") if _react_agent: try: # 如果React Agent有清理方法 if hasattr(_react_agent, 'cleanup'): _react_agent.cleanup() except Exception as e: logger.error(f"React Agent清理失败: {e}") _original_agent = None _react_agent = None logger.info("✅ 资源清理完成") atexit.register(cleanup_resources) # === 健康检查 === @app.route("/") def root(): """根路径健康检查""" return jsonify({ "message": "统一API服务正在运行", "version": "v1.0", "services": { "original_agent": ORIGINAL_AGENT_AVAILABLE, "react_agent": REACT_AGENT_AVAILABLE, "redis": _redis_manager.is_available(), }, "timestamp": datetime.now().isoformat() }) @app.route('/health', methods=['GET']) def health_check(): """详细健康检查""" try: # 检查各个组件状态 health_status = { "status": "healthy", "components": { "original_agent": { "available": ORIGINAL_AGENT_AVAILABLE, "initialized": _original_agent is not None }, "react_agent": { "available": REACT_AGENT_AVAILABLE, "initialized": _react_agent is not None }, "redis": { "available": _redis_manager.is_available(), "connection": "ok" if _redis_manager.is_available() else "failed" }, "qa_feedback": { "available": True, "status": "ok" } }, "timestamp": datetime.now().isoformat() } # 判断整体健康状态 all_critical_healthy = ( health_status["components"]["redis"]["available"] and (ORIGINAL_AGENT_AVAILABLE or REACT_AGENT_AVAILABLE) ) if not all_critical_healthy: health_status["status"] = "degraded" return jsonify(health_status), 503 return jsonify(health_status), 200 except Exception as e: logger.error(f"健康检查失败: {e}") return jsonify({ "status": "unhealthy", "error": str(e), "timestamp": datetime.now().isoformat() }), 500 # === React Agent API (新版本) === @app.route('/api/v0/ask_react_agent', methods=['POST']) def ask_react_agent(): """React Agent API - 智能场景,高token消耗""" if not REACT_AGENT_AVAILABLE: return jsonify(service_unavailable_response( response_text="React Agent服务不可用" )), 503 # 确保Agent已初始化 if _react_agent is None: initialize_agents() if _react_agent is None: return jsonify(service_unavailable_response( response_text="React Agent初始化失败" )), 503 try: data = request.get_json(force=True) question = data.get('question', '').strip() user_id = data.get('user_id', 'guest') thread_id = data.get('thread_id') if not question: return jsonify(bad_request_response( response_text="问题不能为空", missing_params=["question"] )), 400 # 异步调用React Agent loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: result = loop.run_until_complete( _react_agent.process_question( question=question, user_id=user_id, thread_id=thread_id ) ) finally: loop.close() if result.get('success', False): return jsonify(success_response( response_text="React Agent处理成功", data=result )) else: return jsonify(agent_error_response( response_text=result.get('error', 'React Agent处理失败'), error_type="react_agent_error" )), 500 except Exception as e: logger.error(f"React Agent API错误: {str(e)}") return jsonify(internal_error_response( response_text="React Agent处理失败,请稍后重试" )), 500 # === 原始Agent API (兼容版本) === @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """原始Agent API - 简单场景,低token消耗""" if not ORIGINAL_AGENT_AVAILABLE: return jsonify(service_unavailable_response( response_text="原始Agent服务不可用" )), 503 # 确保Agent已初始化 if _original_agent is None: initialize_agents() if _original_agent is None: return jsonify(service_unavailable_response( response_text="原始Agent初始化失败" )), 503 # 这里会包含从citu_app.py迁移的完整ask_agent逻辑 # 包括Redis上下文管理、缓存检查、异步Agent调用等 # ... (从citu_app.py复制完整实现,添加适当的异步包装) # === QA反馈系统API === @app.route('/api/v0/qa_feedback/query', methods=['POST']) def qa_feedback_query(): """查询反馈记录API""" # 从citu_app.py完整迁移 # ... @app.route('/api/v0/qa_feedback/add', methods=['POST']) def qa_feedback_add(): """添加反馈记录API""" # 从citu_app.py完整迁移 # ... # === Redis对话管理API === @app.route('/api/v0/user//conversations', methods=['GET']) def get_user_conversations(user_id): """获取用户对话列表""" # 从citu_app.py完整迁移 # ... # === 训练数据管理API === @app.route('/api/v0/training_data/stats', methods=['GET']) def training_data_stats(): """获取训练数据统计信息""" # 从citu_app.py完整迁移 # ... # === Data Pipeline API === @app.route('/api/v0/data_pipeline/tasks', methods=['POST']) def create_data_pipeline_task(): """创建数据管道任务""" # 从citu_app.py完整迁移 # ... # === 应用启动配置 === @app.before_first_request def before_first_request(): """首次请求前的初始化""" logger.info("🚀 统一API服务启动,开始初始化...") initialize_agents() logger.info("✅ 统一API服务初始化完成") if __name__ == '__main__': logger.info("🚀 以开发模式启动统一API服务...") app.run(host='0.0.0.0', port=8084, debug=True) ``` ### 📊 API迁移检查清单 | API类别 | 迁移状态 | 测试状态 | 备注 | |---------|----------|----------|------| | Health Check | ✅ 完成 | ✅ 通过 | 新增组件状态检查 | | React Agent | ✅ 完成 | ⏳ 待测试 | 异步包装完成 | | Original Agent | ⏳ 进行中 | ⏳ 待测试 | 需要异步改造 | | QA Feedback (6个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 | | Redis管理 (8个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 | | 训练数据 (4个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 | | Data Pipeline (10+个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 | --- ## 📋 六、异步改造核心技术方案 ### ⚙️ ask_agent异步改造详细方案 #### 改造前后对比 ```python # === 改造前 (citu_app.py) === @app.flask_app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): # ... 参数处理 ... # 直接异步调用 (在Flask-WSGI中可能有问题) agent_result = asyncio.run(agent.process_question(...)) # ... 结果处理 ... # === 改造后 (api_unified.py) === @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): # ... 参数处理 ... # 安全的异步包装 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: agent_result = loop.run_until_complete( agent.process_question(...) ) finally: loop.close() # ... 结果处理 ... ``` #### 完整异步改造实现 ```python @app.route('/api/v0/ask_agent', methods=['POST']) def ask_agent(): """ 支持对话上下文的ask_agent API - 异步改造版本 从citu_app.py完整迁移并添加异步安全包装 """ req = request.get_json(force=True) question = req.get("question", None) browser_session_id = req.get("session_id", None) # 参数解析 (从citu_app.py复制) user_id_input = req.get("user_id", None) conversation_id_input = req.get("conversation_id", None) continue_conversation = req.get("continue_conversation", False) api_routing_mode = req.get("routing_mode", None) VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"] # 参数验证 if not question: return jsonify(bad_request_response( response_text="缺少必需参数:question", missing_params=["question"] )), 400 if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES: return jsonify(bad_request_response( response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}", invalid_params=["routing_mode"] )), 400 try: # 1. ID解析 (同步操作) from flask import session login_user_id = session.get('user_id') if 'user_id' in session else None user_id = _redis_manager.resolve_user_id( user_id_input, browser_session_id, request.remote_addr, login_user_id ) conversation_id, conversation_status = _redis_manager.resolve_conversation_id( user_id, conversation_id_input, continue_conversation ) # 2. 上下文获取 (同步操作) context = _redis_manager.get_context(conversation_id) # 3. 上下文类型检测 context_type = None if context: try: messages = _redis_manager.get_messages(conversation_id, limit=10) for message in reversed(messages): if message.get("role") == "assistant": metadata = message.get("metadata", {}) context_type = metadata.get("type") if context_type: logger.info(f"检测到上下文类型: {context_type}") break except Exception as e: logger.warning(f"获取上下文类型失败: {str(e)}") # 4. 缓存检查 (同步操作) cached_answer = _redis_manager.get_cached_answer(question, context) if cached_answer: logger.info("使用缓存答案") return jsonify(agent_success_response( response_type=cached_answer.get("type", "UNKNOWN"), response=cached_answer.get("response", ""), sql=cached_answer.get("sql"), records=cached_answer.get("query_result"), summary=cached_answer.get("summary"), session_id=browser_session_id, execution_path=cached_answer.get("execution_path", []), classification_info=cached_answer.get("classification_info", {}), conversation_id=conversation_id, user_id=user_id, is_guest_user=(user_id == "guest"), context_used=bool(context), from_cache=True, conversation_status=conversation_status["status"], conversation_message=conversation_status["message"], requested_conversation_id=conversation_status.get("requested_id") )) # 5. 保存用户消息 (同步操作) _redis_manager.save_message(conversation_id, "user", question) # 6. 构建带上下文的问题 if context: enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}" logger.info(f"使用上下文,长度: {len(context)}字符") else: enhanced_question = question logger.info("新对话,无上下文") # 7. 确定路由模式 if api_routing_mode: effective_routing_mode = api_routing_mode logger.info(f"使用API指定的路由模式: {effective_routing_mode}") else: try: from app_config import QUESTION_ROUTING_MODE effective_routing_mode = QUESTION_ROUTING_MODE logger.info(f"使用配置文件路由模式: {effective_routing_mode}") except ImportError: effective_routing_mode = "hybrid" logger.info(f"使用默认路由模式: {effective_routing_mode}") # 8. 关键异步改造:Agent调用 if _original_agent is None: initialize_agents() if _original_agent is None: return jsonify(service_unavailable_response( response_text="AI服务暂时不可用,请稍后重试", can_retry=True )), 503 # 异步安全包装 async def process_with_agent(): """异步处理函数""" return await _original_agent.process_question( question=enhanced_question, session_id=browser_session_id, context_type=context_type, routing_mode=effective_routing_mode ) # 在新的事件循环中执行异步操作 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: agent_result = loop.run_until_complete(process_with_agent()) finally: loop.close() asyncio.set_event_loop(None) # 清理事件循环 # 9. 处理Agent结果 (同步操作) if agent_result.get("success", False): response_type = agent_result.get("type", "UNKNOWN") response_text = agent_result.get("response", "") sql = agent_result.get("sql") query_result = agent_result.get("query_result") summary = agent_result.get("summary") execution_path = agent_result.get("execution_path", []) classification_info = agent_result.get("classification_info", {}) # 确定助手回复内容 if response_type == "DATABASE": if response_text: assistant_response = response_text elif summary: assistant_response = summary elif query_result: row_count = query_result.get("row_count", 0) assistant_response = f"查询执行完成,共返回 {row_count} 条记录。" else: assistant_response = "数据库查询已处理。" else: assistant_response = response_text # 保存助手回复 _redis_manager.save_message( conversation_id, "assistant", assistant_response, metadata={ "type": response_type, "sql": sql, "execution_path": execution_path } ) # 缓存答案 _redis_manager.cache_answer(question, agent_result, context) return jsonify(agent_success_response( response_type=response_type, response=response_text, sql=sql, records=query_result, summary=summary, session_id=browser_session_id, execution_path=execution_path, classification_info=classification_info, conversation_id=conversation_id, user_id=user_id, is_guest_user=(user_id == "guest"), context_used=bool(context), from_cache=False, conversation_status=conversation_status["status"], conversation_message=conversation_status["message"], requested_conversation_id=conversation_status.get("requested_id"), routing_mode_used=effective_routing_mode, routing_mode_source="api" if api_routing_mode else "config" )) else: error_message = agent_result.get("error", "Agent处理失败") error_code = agent_result.get("error_code", 500) return jsonify(agent_error_response( response_text=error_message, error_type="agent_processing_failed", code=error_code, session_id=browser_session_id, conversation_id=conversation_id, user_id=user_id )), error_code except Exception as e: logger.error(f"ask_agent执行失败: {str(e)}") return jsonify(internal_error_response( response_text="查询处理失败,请稍后重试" )), 500 ``` #### 异步安全性检查 ```python # scripts/test_async_safety.py """ 异步安全性测试脚本 """ import asyncio import threading import time from concurrent.futures import ThreadPoolExecutor def test_async_event_loop_isolation(): """测试异步事件循环隔离""" def sync_function_with_async(): """模拟同步函数中的异步调用""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: async def async_task(): await asyncio.sleep(0.1) return "async_result" result = loop.run_until_complete(async_task()) return result finally: loop.close() # 并发测试 with ThreadPoolExecutor(max_workers=5) as executor: futures = [ executor.submit(sync_function_with_async) for _ in range(10) ] results = [future.result() for future in futures] assert all(r == "async_result" for r in results) print("✅ 异步事件循环隔离测试通过") if __name__ == "__main__": test_async_event_loop_isolation() ``` --- ## 📋 七、测试验证详细计划 ### 🧪 分阶段测试方案 #### Phase 1: 基础功能测试 ```python # tests/test_migration_basic.py """ 基础迁移功能测试 """ import pytest import requests from unittest.mock import patch class TestBasicMigration: @pytest.fixture def api_base_url(self): return "http://localhost:8084" def test_health_check(self, api_base_url): """测试健康检查接口""" response = requests.get(f"{api_base_url}/health") assert response.status_code == 200 data = response.json() assert data["status"] in ["healthy", "degraded"] assert "components" in data def test_root_endpoint(self, api_base_url): """测试根路径""" response = requests.get(f"{api_base_url}/") assert response.status_code == 200 data = response.json() assert "message" in data assert "version" in data def test_react_agent_api_availability(self, api_base_url): """测试React Agent API可用性""" payload = { "question": "测试问题", "user_id": "test_user" } response = requests.post( f"{api_base_url}/api/v0/ask_react_agent", json=payload ) # 应该返回有效响应 (可能是错误,但不应该是404) assert response.status_code != 404 ``` #### Phase 2: API兼容性测试 ```python # tests/test_api_compatibility.py """ API兼容性测试 确保迁移后API行为与原版本一致 """ import pytest import requests class TestAPICompatibility: @pytest.fixture def api_base_url(self): return "http://localhost:8084" def test_ask_agent_parameter_validation(self, api_base_url): """测试ask_agent参数验证""" # 测试缺少question参数 response = requests.post( f"{api_base_url}/api/v0/ask_agent", json={} ) assert response.status_code == 400 data = response.json() assert "question" in data.get("missing_params", []) # 测试无效routing_mode response = requests.post( f"{api_base_url}/api/v0/ask_agent", json={ "question": "测试", "routing_mode": "invalid_mode" } ) assert response.status_code == 400 def test_response_format_consistency(self, api_base_url): """测试响应格式一致性""" payload = { "question": "简单测试问题" } response = requests.post( f"{api_base_url}/api/v0/ask_agent", json=payload ) data = response.json() # 检查标准响应字段 required_fields = ["code", "success", "message"] for field in required_fields: assert field in data, f"响应缺少必需字段: {field}" ``` #### Phase 3: 异步性能测试 ```python # tests/test_async_performance.py """ 异步性能测试 """ import asyncio import aiohttp import time import pytest from concurrent.futures import ThreadPoolExecutor class TestAsyncPerformance: @pytest.fixture def api_base_url(self): return "http://localhost:8084" async def test_concurrent_requests(self, api_base_url): """测试并发请求处理""" async with aiohttp.ClientSession() as session: tasks = [] for i in range(10): payload = { "question": f"并发测试问题 {i}", "user_id": f"test_user_{i}" } task = session.post( f"{api_base_url}/api/v0/ask_react_agent", json=payload ) tasks.append(task) start_time = time.time() responses = await asyncio.gather(*tasks, return_exceptions=True) end_time = time.time() # 检查响应 valid_responses = [ r for r in responses if not isinstance(r, Exception) and r.status in [200, 400, 500] ] assert len(valid_responses) >= 8 # 至少80%成功 assert end_time - start_time < 30 # 30秒内完成 def test_sync_async_isolation(self, api_base_url): """测试同步异步隔离""" def make_request(): """发起请求的同步函数""" import requests response = requests.post( f"{api_base_url}/api/v0/ask_agent", json={"question": "隔离测试"} ) return response.status_code # 多线程并发测试 with ThreadPoolExecutor(max_workers=5) as executor: futures = [ executor.submit(make_request) for _ in range(10) ] results = [future.result() for future in futures] # 检查是否有异步冲突 valid_status_codes = [200, 400, 500, 503] assert all(code in valid_status_codes for code in results) ``` #### Phase 4: 压力测试 ```bash # scripts/stress_test.sh #!/bin/bash echo "🚀 开始压力测试..." # 1. 基础负载测试 echo "1. 基础负载测试..." ab -n 100 -c 5 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_react_agent # 2. 持续负载测试 echo "2. 持续负载测试..." ab -n 1000 -c 10 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_agent # 3. 内存泄漏检测 echo "3. 内存使用监控..." python scripts/monitor_memory.py & MONITOR_PID=$! # 运行一段时间的负载 ab -n 500 -c 8 -T application/json -p tests/test_payload.json http://localhost:8084/health # 停止监控 kill $MONITOR_PID echo "✅ 压力测试完成" ``` --- ## 📋 八、部署和监控详细方案 ### 🚀 ASGI部署配置 ```python # asgi_app_new.py (更新版) """ ASGI应用启动文件 - 生产环境配置 支持异步操作和性能优化 """ import os import logging from asgiref.wsgi import WsgiToAsgi # 导入统一API应用 from api_unified import app # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 性能优化配置 ASGI_CONFIG = { "max_workers": int(os.getenv("MAX_WORKERS", "4")), "timeout": int(os.getenv("TIMEOUT", "60")), "keepalive": int(os.getenv("KEEPALIVE", "30")), } # 将Flask WSGI应用转换为ASGI应用 asgi_app = WsgiToAsgi(app) logger.info(f"✅ ASGI应用配置完成: {ASGI_CONFIG}") # 生产环境启动命令: # uvicorn asgi_app_new:asgi_app --host 0.0.0.0 --port 8084 --workers 4 # 或 # gunicorn asgi_app_new:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084 ``` ### 📊 生产环境部署配置 ```yaml # deploy/docker-compose.yml version: '3.8' services: unified-api: build: context: . dockerfile: deploy/Dockerfile ports: - "8084:8084" environment: - PORT=8084 - MAX_WORKERS=4 - TIMEOUT=60 - REDIS_URL=redis://redis:6379 - DATABASE_URL=postgresql://user:pass@postgres:5432/db depends_on: - redis - postgres volumes: - ./logs:/app/logs - ./data:/app/data restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped postgres: image: postgres:15-alpine environment: - POSTGRES_DB=vanna_db - POSTGRES_USER=vanna_user - POSTGRES_PASSWORD=vanna_pass ports: - "5432:5432" volumes: - postgres_data:/var/lib/postgresql/data restart: unless-stopped volumes: redis_data: postgres_data: ``` ```dockerfile # deploy/Dockerfile FROM python:3.10-slim WORKDIR /app # 安装系统依赖 RUN apt-get update && apt-get install -y \ gcc \ g++ \ && rm -rf /var/lib/apt/lists/* # 复制依赖文件 COPY requirements.txt . COPY react_agent/requirements.txt ./react_agent/ # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt RUN pip install --no-cache-dir -r react_agent/requirements.txt # 复制应用代码 COPY . . # 创建日志目录 RUN mkdir -p logs data # 设置环境变量 ENV PYTHONPATH=/app ENV FLASK_APP=api_unified.py # 暴露端口 EXPOSE 8084 # 启动命令 CMD ["uvicorn", "asgi_app_new:asgi_app", "--host", "0.0.0.0", "--port", "8084", "--workers", "1"] ``` ### 📈 监控配置 ```python # monitoring/metrics.py """ 应用监控指标收集 """ import time import psutil import logging from functools import wraps from flask import request, g from datetime import datetime logger = logging.getLogger(__name__) class MetricsCollector: def __init__(self): self.request_count = 0 self.error_count = 0 self.response_times = [] def record_request(self, endpoint, method, status_code, response_time): """记录请求指标""" self.request_count += 1 if status_code >= 400: self.error_count += 1 self.response_times.append(response_time) # 保持最近1000条记录 if len(self.response_times) > 1000: self.response_times = self.response_times[-1000:] logger.info(f"📊 {method} {endpoint} - {status_code} - {response_time:.3f}s") def get_stats(self): """获取统计信息""" if not self.response_times: return { "request_count": self.request_count, "error_count": self.error_count, "error_rate": 0, "avg_response_time": 0, "system_stats": self._get_system_stats() } return { "request_count": self.request_count, "error_count": self.error_count, "error_rate": (self.error_count / self.request_count) * 100, "avg_response_time": sum(self.response_times) / len(self.response_times), "max_response_time": max(self.response_times), "min_response_time": min(self.response_times), "system_stats": self._get_system_stats() } def _get_system_stats(self): """获取系统统计信息""" return { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "disk_percent": psutil.disk_usage('/').percent, "timestamp": datetime.now().isoformat() } # 全局监控实例 metrics = MetricsCollector() def monitor_requests(app): """为Flask应用添加请求监控""" @app.before_request def before_request(): g.start_time = time.time() @app.after_request def after_request(response): response_time = time.time() - g.start_time metrics.record_request( endpoint=request.endpoint or 'unknown', method=request.method, status_code=response.status_code, response_time=response_time ) return response # 添加监控端点 @app.route('/api/v0/metrics', methods=['GET']) def get_metrics(): """获取应用监控指标""" return metrics.get_stats() ``` ### 🚨 告警配置 ```python # monitoring/alerting.py """ 告警系统 """ import smtplib import logging from email.mime.text import MIMEText from datetime import datetime, timedelta logger = logging.getLogger(__name__) class AlertManager: def __init__(self): self.alert_rules = { "high_error_rate": { "threshold": 5.0, # 5%错误率 "window": 300, # 5分钟窗口 "cooldown": 900 # 15分钟冷却期 }, "slow_response": { "threshold": 10.0, # 10秒响应时间 "window": 180, # 3分钟窗口 "cooldown": 600 # 10分钟冷却期 }, "high_memory": { "threshold": 85.0, # 85%内存使用率 "window": 120, # 2分钟窗口 "cooldown": 1800 # 30分钟冷却期 } } self.last_alerts = {} def check_alerts(self, metrics): """检查告警条件""" current_time = datetime.now() # 检查错误率 if metrics["error_rate"] > self.alert_rules["high_error_rate"]["threshold"]: self._trigger_alert( "high_error_rate", f"错误率过高: {metrics['error_rate']:.2f}%", current_time ) # 检查响应时间 if metrics.get("avg_response_time", 0) > self.alert_rules["slow_response"]["threshold"]: self._trigger_alert( "slow_response", f"响应时间过慢: {metrics['avg_response_time']:.2f}s", current_time ) # 检查内存使用率 memory_percent = metrics["system_stats"]["memory_percent"] if memory_percent > self.alert_rules["high_memory"]["threshold"]: self._trigger_alert( "high_memory", f"内存使用率过高: {memory_percent:.2f}%", current_time ) def _trigger_alert(self, alert_type, message, current_time): """触发告警""" # 检查冷却期 if alert_type in self.last_alerts: last_alert_time = self.last_alerts[alert_type] cooldown_seconds = self.alert_rules[alert_type]["cooldown"] if (current_time - last_alert_time).seconds < cooldown_seconds: return # 还在冷却期内 # 记录告警时间 self.last_alerts[alert_type] = current_time # 发送告警 logger.error(f"🚨 告警: {alert_type} - {message}") # 这里可以添加更多告警方式:邮件、短信、Slack等 self._send_email_alert(alert_type, message) def _send_email_alert(self, alert_type, message): """发送邮件告警 (示例)""" try: # 邮件配置 (需要根据实际情况配置) smtp_server = "smtp.example.com" smtp_port = 587 username = "alerts@example.com" password = "password" msg = MIMEText(f"时间: {datetime.now()}\n类型: {alert_type}\n详情: {message}") msg['Subject'] = f"[统一API服务] {alert_type}告警" msg['From'] = username msg['To'] = "admin@example.com" with smtplib.SMTP(smtp_server, smtp_port) as server: server.starttls() server.login(username, password) server.send_message(msg) logger.info(f"✅ 告警邮件已发送: {alert_type}") except Exception as e: logger.error(f"❌ 告警邮件发送失败: {e}") # 全局告警管理器 alert_manager = AlertManager() ``` --- ## 📋 九、实施时间表和检查点 ### 📅 详细实施计划 #### Week 1: 基础迁移 | 日期 | 任务 | 负责人 | 交付物 | 验收标准 | |------|------|--------|--------|----------| | **Day 1** | 目录结构迁移 | 后端 | 新目录结构、路径修正 | ✅ 无导入错误 | | **Day 2** | 日志服务统一 | 后端 | 统一日志配置 | ✅ 日志格式一致 | | **Day 3** | React Agent API整合 | 后端 | ask_react_agent可用 | ✅ API正常响应 | | **Day 4** | 基础API迁移(一) | 后端 | QA反馈、Redis管理API | ✅ 6+8个API可用 | | **Day 5** | 基础API迁移(二) | 后端 | 训练数据、Data Pipeline API | ✅ 4+10个API可用 | #### Week 2: 核心改造和测试 | 日期 | 任务 | 负责人 | 交付物 | 验收标准 | |------|------|--------|--------|----------| | **Day 6** | ask_agent异步改造 | 后端 | 异步版ask_agent | ✅ 异步调用正常 | | **Day 7** | 集成测试(一) | QA | 功能测试报告 | ✅ 核心功能正常 | | **Day 8** | 性能测试 | QA | 性能测试报告 | ✅ 性能无明显下降 | | **Day 9** | 兼容性测试 | QA | 兼容性测试报告 | ✅ API兼容性100% | | **Day 10** | Bug修复和优化 | 后端 | 修复报告 | ✅ 关键bug已修复 | #### Week 3: 部署和监控 | 日期 | 任务 | 负责人 | 交付物 | 验收标准 | |------|------|--------|--------|----------| | **Day 11** | 部署配置准备 | 运维 | Docker/K8s配置 | ✅ 部署脚本可用 | | **Day 12** | 监控系统搭建 | 运维 | 监控配置 | ✅ 监控指标正常 | | **Day 13** | 预生产部署 | 运维 | 预生产环境 | ✅ 预生产环境稳定 | | **Day 14** | 生产环境部署 | 运维 | 生产环境 | ✅ 生产环境稳定 | | **Day 15** | 上线后监控 | 全体 | 监控报告 | ✅ 无重大故障 | ### 🎯 关键检查点 #### 检查点1: 基础迁移完成 (Day 5) - [ ] 所有文件迁移完成,无导入错误 - [ ] 日志格式统一,输出正常 - [ ] React Agent API可正常调用 - [ ] 80%+ 管理API可正常调用 **如果失败**: 延期2天,重新评估技术方案 #### 检查点2: 核心功能完成 (Day 10) - [ ] ask_agent异步改造完成 - [ ] 功能测试100%通过 - [ ] 性能测试达标 - [ ] 兼容性测试通过 **如果失败**: 评估回滚方案,或延期1周 #### 检查点3: 生产就绪 (Day 15) - [ ] 部署配置完成 - [ ] 监控系统运行正常 - [ ] 生产环境稳定运行24小时 - [ ] 用户反馈良好 **如果失败**: 执行回滚计划 --- ## 📋 十、风险管控和应急预案 ### ⚠️ 风险识别矩阵 | 风险项 | 概率 | 影响 | 风险等级 | 缓解措施 | |--------|------|------|----------|----------| | **异步兼容性问题** | 高 | 高 | 🔴 **极高** | 充分测试、渐进部署、快速回滚 | | **性能显著下降** | 中 | 高 | 🟡 **高** | 性能基准、监控告警、优化方案 | | **数据丢失/损坏** | 低 | 极高 | 🟡 **高** | 数据备份、事务保护、验证机制 | | **第三方依赖冲突** | 中 | 中 | 🟡 **中** | 依赖版本锁定、虚拟环境隔离 | | **部署失败** | 中 | 中 | 🟡 **中** | 自动化部署、回滚脚本、蓝绿部署 | ### 🛡️ 应急预案 #### 预案A: 异步兼容性严重问题 **触发条件**: ask_agent异步改造后出现严重错误,影响核心功能 **应急措施**: 1. **立即回滚** (5分钟内) ```bash # 快速切换到备用启动方式 pkill -f "api_unified" python citu_app.py & ``` 2. **问题定位** (30分钟内) ```bash # 收集错误日志 tail -1000 logs/app.log > emergency_logs.txt # 检查异步调用堆栈 python scripts/debug_async_issues.py ``` 3. **修复方案** (2小时内) - 如果是事件循环冲突:改用线程池方案 - 如果是资源泄漏:添加资源清理机制 - 如果是死锁问题:重新设计异步调用流程 #### 预案B: 性能严重下降 **触发条件**: 响应时间超过原版本50%,或并发能力下降明显 **应急措施**: 1. **资源扩容** (10分钟内) ```bash # 增加worker进程 gunicorn asgi_app_new:asgi_app -w 8 -k uvicorn.workers.UvicornWorker # 或增加内存限制 docker update --memory=4g unified-api ``` 2. **性能分析** (1小时内) ```bash # 使用profiler分析性能瓶颈 python -m cProfile -o profile.out api_unified.py # 分析内存使用 python scripts/memory_profiler.py ``` 3. **优化措施** (4小时内) - 异步调用优化:使用连接池、减少事件循环创建 - 缓存优化:增加Redis缓存命中率 - 代码优化:移除性能热点 #### 预案C: 数据完整性问题 **触发条件**: 发现数据丢失、损坏或不一致 **应急措施**: 1. **立即停止写操作** (1分钟内) ```bash # 设置只读模式 curl -X POST http://localhost:8084/api/v0/maintenance/readonly ``` 2. **数据备份和恢复** (30分钟内) ```bash # 创建紧急备份 pg_dump vanna_db > emergency_backup_$(date +%Y%m%d_%H%M%S).sql # 如需恢复备份 psql vanna_db < backup_file.sql ``` 3. **数据验证** (1小时内) ```bash # 运行数据一致性检查 python scripts/data_integrity_check.py # 对比迁移前后数据 python scripts/compare_data.py ``` --- ## 📋 十一、成功标准和验收清单 ### ✅ 迁移成功标准 #### 功能完整性 (100%要求) - [ ] 所有原有API功能保持不变 - [ ] 新增React Agent API正常工作 - [ ] 错误处理机制与原版本一致 - [ ] 日志输出格式统一且完整 #### 性能标准 (95%要求) - [ ] API响应时间不超过原版本120% - [ ] 并发处理能力不低于原版本90% - [ ] 内存使用不超过原版本150% - [ ] CPU使用在正常负载下不超过80% #### 稳定性标准 (99%要求) - [ ] 连续运行24小时无重大故障 - [ ] 错误率低于1% - [ ] 异步调用无死锁或资源泄漏 - [ ] 各组件间无明显冲突 #### 兼容性标准 (100%要求) - [ ] 所有现有客户端无需修改 - [ ] API路径和参数完全兼容 - [ ] 响应格式完全一致 - [ ] 错误码映射正确 ### 📋 最终验收清单 #### 技术验收 - [ ] **代码质量**: 通过代码审查,符合项目规范 - [ ] **单元测试**: 测试覆盖率达到80%以上 - [ ] **集成测试**: 所有API端到端测试通过 - [ ] **性能测试**: 达到性能标准要求 - [ ] **安全测试**: 无明显安全漏洞 #### 部署验收 - [ ] **开发环境**: 功能完整,可供开发调试 - [ ] **测试环境**: 与生产环境一致,测试通过 - [ ] **预生产环境**: 生产级配置,稳定运行 - [ ] **生产环境**: 正式上线,监控正常 #### 文档验收 - [ ] **API文档**: 更新完整,示例清晰 - [ ] **部署文档**: 部署步骤明确,可执行 - [ ] **运维文档**: 监控、告警、故障处理流程完整 - [ ] **用户指南**: 迁移对用户的影响说明清楚 #### 团队验收 - [ ] **知识转移**: 相关团队成员了解新架构 - [ ] **培训完成**: 运维团队具备维护能力 - [ ] **文档交付**: 完整的技术文档和操作手册 - [ ] **支持准备**: 技术支持团队准备就绪 --- ## 📞 联系方式和后续支持 ### 📧 实施团队联系方式 | 角色 | 负责范围 | 联系方式 | |------|----------|----------| | **技术负责人** | 整体架构、关键技术决策 | tech-lead@example.com | | **后端开发** | API迁移、异步改造 | backend-dev@example.com | | **测试工程师** | 功能测试、性能测试 | qa-engineer@example.com | | **运维工程师** | 部署配置、监控告警 | devops@example.com | ### 📚 参考资料 1. **技术文档** - Flask 3.1.1 异步支持文档 - ASGI 部署最佳实践 - LangGraph Agent 开发指南 2. **项目相关文档** - `migration_and_integration_plan.md` - 总体方案 - `api_compatibility_matrix.xlsx` - API兼容性矩阵 - `performance_benchmark.md` - 性能基准报告 3. **应急联系** - 技术支持热线: (紧急情况) - 项目Slack频道: #unified-api-migration - 紧急邮件列表: emergency-team@example.com --- **文档版本**: v1.0 **创建日期**: 2025-01-15 **最后更新**: 2025-01-15 **文档状态**: 详细实施指南 - 待执行 **适用范围**: Custom React Agent 完整迁移项目 **依赖文档**: migration_and_integration_plan.md