本文档是对现有 migration_and_integration_plan.md
的详细补充和具体实施指南,提供完整的代码迁移步骤、配置方案和测试验证计划。
文档层次关系:
migration_and_integration_plan.md
- 总体方案概述complete_migration_implementation_guide.md
(本文档) - 详细实施指南根据用户反馈,明确以下关键要点:
unified_api.py
(推荐命名)citu_app.py
复制所需API到新文件(保留原文件)custom_react_agent/api.py
的全部内容citu_app.py
和 test/custom_react_agent/api.py
不变agent/
目录保持独立的 config.py
文件react_agent/
目录也保持独立的 config.py
文件config/agent_config.py
core.logging
)react_agent
设置独立的日志文件(仿照 data_pipeline
模式)agent/
使用 get_agent_logger("CituAgent")
,data_pipeline
有独立日志文件logs/react_agent_YYYYMMDD.log
API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/qa_feedback/query |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/qa_feedback/add |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/delete/{feedback_id} |
DELETE | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/update/{feedback_id} |
PUT | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/add_to_training |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/qa_feedback/stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/user/{user_id}/conversations |
GET | 同步 | ⭐ 简单 | 20分钟 |
/api/v0/conversation/{conv_id}/messages |
GET | 同步 | ⭐ 简单 | 20分钟 |
/api/v0/conversation_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/conversation_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/embedding_cache_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/embedding_cache_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_cache_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_cache_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/training_data/stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/training_data/query |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/training_data/create |
POST | 同步 | ⭐ 简单 | 45分钟 |
/api/v0/training_data/delete |
POST | 同步 | ⭐ 简单 | 30分钟 |
API类别 | 端点数量 | 迁移难度 | 预计时间 |
---|---|---|---|
任务管理 | 5个 | ⭐⭐ 中等 | 2小时 |
文件管理 | 3个 | ⭐ 简单 | 1小时 |
数据库操作 | 2个 | ⭐ 简单 | 30分钟 |
监控日志 | 2个 | ⭐ 简单 | 30分钟 |
API端点 | 改造类型 | 技术难度 | 预计时间 | 风险等级 |
---|---|---|---|---|
/api/v0/ask_agent |
异步包装 | ⭐⭐⭐ 复杂 | 4小时 | 🔴 高 |
改造技术方案:
# 改造前 (citu_app.py)
agent_result = asyncio.run(agent.process_question(...))
# 改造后 (新api.py)
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""异步包装版本"""
try:
# 同步部分:参数处理和缓存检查
# ...
# 异步部分:Agent调用
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(
agent.process_question(...)
)
finally:
loop.close()
# 同步部分:结果处理
# ...
except Exception as e:
# 错误处理
原始API: /api/v0/ask_agent # 保持不变,简单场景
新React API: /api/v0/ask_react_agent # 新增,智能场景
API类型 | 命名格式 | 示例 | 适用场景 |
---|---|---|---|
原始Agent | /api/v0/{action}_agent |
/api/v0/ask_agent |
简单查询,低token消耗 |
React Agent | /api/v0/{action}_react_agent |
/api/v0/ask_react_agent |
复杂推理,高token消耗 |
其他API | 保持不变 | /api/v0/qa_feedback/query |
所有其他功能API |
# 兼容性映射配置
API_COMPATIBILITY_MAP = {
# 原有API保持不变
"/api/v0/ask_agent": {
"handler": "ask_agent_v0",
"agent_type": "langgraph",
"deprecated": False
},
# 新增React Agent API
"/api/v0/ask_react_agent": {
"handler": "ask_react_agent_v1",
"agent_type": "react",
"deprecated": False
},
# 未来可能的扩展
"/api/v0/ask_advanced_agent": {
"handler": "ask_advanced_agent_v2",
"agent_type": "future",
"deprecated": False
}
}
# 1. 创建react_agent目录(不创建config目录)
mkdir -p react_agent
mkdir -p logs # 确保日志目录存在
# 2. 复制核心文件 (保留原文件,保持配置文件独立)
cp test/custom_react_agent/agent.py react_agent/
cp test/custom_react_agent/state.py react_agent/
cp test/custom_react_agent/sql_tools.py react_agent/
cp test/custom_react_agent/shell.py react_agent/
cp test/custom_react_agent/enhanced_redis_api.py react_agent/
cp test/custom_react_agent/config.py react_agent/ # 保持原名,不重命名
# 3. 复制API文件到根目录(使用推荐命名)
cp test/custom_react_agent/api.py ./unified_api.py # 使用推荐的文件名
cp test/custom_react_agent/asgi_app.py ./ # 保持原名,后续会修改导入
# 4. 创建初始化文件
echo "# React Agent Module" > react_agent/__init__.py
# 5. 复制依赖文件
cp test/custom_react_agent/requirements.txt react_agent/
echo "✅ 目录结构创建完成"
echo "📁 react_agent/ - React Agent模块"
echo "📄 unified_api.py - 统一API入口"
echo "📄 asgi_app.py - ASGI启动器"
# scripts/fix_imports.py
"""
自动修正导入路径的脚本
"""
import os
import re
def fix_imports_in_file(file_path):
"""修正单个文件的导入路径"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 修正规则
replacements = [
(r'from test\.custom_react_agent', 'from react_agent'),
(r'import test\.custom_react_agent', 'import react_agent'),
(r'from \.agent import', 'from react_agent.agent import'),
(r'from \.config import', 'from react_agent.config_react import'),
(r'from \.state import', 'from react_agent.state import'),
(r'from \.sql_tools import', 'from react_agent.sql_tools import'),
]
for pattern, replacement in replacements:
content = re.sub(pattern, replacement, content)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
def fix_all_imports():
"""批量修正所有文件的导入路径"""
react_agent_files = [
'react_agent/agent.py',
'react_agent/state.py',
'react_agent/sql_tools.py',
'react_agent/shell.py',
'react_agent/enhanced_redis_api.py',
'unified_api.py', # 使用正确的文件名
'asgi_app.py' # 使用正确的文件名
]
for file_path in react_agent_files:
if os.path.exists(file_path):
fix_imports_in_file(file_path)
print(f"✅ 已修正: {file_path}")
else:
print(f"❌ 文件不存在: {file_path}")
if __name__ == "__main__":
fix_all_imports()
# 运行路径修正脚本
python scripts/fix_imports.py
# 验证Python语法
python -m py_compile react_agent/agent.py
python -m py_compile react_agent/state.py
python -m py_compile react_agent/sql_tools.py
python -m py_compile api_unified.py
# 输出验证结果
echo "✅ 目录迁移完成,语法检查通过"
基于用户需求和现有实践,为React Agent设置独立日志文件,仿照data_pipeline
模式。
经验证,项目中的日志使用情况:
agent/
: 使用 get_agent_logger("CituAgent")
统一日志系统data_pipeline/
: 使用独立日志文件 ./data_pipeline/training_data/{task_id}/data_pipeline.log
# react_agent/logger.py (新建)
"""
React Agent 独立日志管理器
仿照data_pipeline模式,使用统一日志系统但输出到独立文件
"""
import os
from pathlib import Path
from datetime import datetime
from core.logging import get_agent_logger
class ReactAgentLogManager:
"""React Agent 日志管理器"""
_logger_instance = None
_file_handler = None
@classmethod
def get_logger(cls, name: str = "ReactAgent"):
"""
获取React Agent专用logger
使用统一日志系统但输出到独立文件
"""
if cls._logger_instance is None:
cls._logger_instance = cls._create_logger(name)
return cls._logger_instance
@classmethod
def _create_logger(cls, name: str):
"""创建独立文件的logger"""
# 使用统一日志系统获取logger
logger = get_agent_logger(name)
# 添加独立的文件处理器
cls._add_file_handler(logger)
return logger
@classmethod
def _add_file_handler(cls, logger):
"""添加独立的文件处理器"""
try:
# 确保日志目录存在
project_root = Path(__file__).parent.parent
log_dir = project_root / "logs"
log_dir.mkdir(exist_ok=True)
# 按日期创建日志文件
today = datetime.now().strftime("%Y%m%d")
log_file = log_dir / f"react_agent_{today}.log"
# 创建文件处理器
import logging
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 设置格式
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] [%(name)s] %(filename)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(formatter)
# 添加到logger(不影响原有的控制台输出)
logger.addHandler(file_handler)
cls._file_handler = file_handler
logger.info(f"✅ React Agent独立日志文件已创建: {log_file}")
except Exception as e:
logger.warning(f"⚠️ 创建React Agent独立日志文件失败: {e}")
@classmethod
def cleanup(cls):
"""清理资源"""
if cls._file_handler:
cls._file_handler.close()
cls._file_handler = None
# 对外接口
def get_react_agent_logger(name: str = "ReactAgent"):
"""获取React Agent专用logger"""
return ReactAgentLogManager.get_logger(name)
# react_agent/config.py (修改后)
"""
React Agent 独立配置
保持独立配置,但使用统一日志系统和独立日志文件
"""
import os
from .logger import get_react_agent_logger
# 使用React Agent专用logger
logger = get_react_agent_logger("ReactAgentConfig")
# 继承主配置
try:
from app_config import (
LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG,
REDIS_URL, VECTOR_DB_TYPE
)
logger.info("✅ 成功加载主配置文件")
except ImportError as e:
logger.warning(f"⚠️ 主配置加载失败,使用默认配置: {e}")
# 默认配置
REDIS_URL = "redis://localhost:6379"
LLM_MODEL_TYPE = "api"
# React Agent 特定配置
REACT_AGENT_CONFIG = {
"default_user_id": "guest",
"max_retries": 3,
"retry_base_delay": 3,
"network_timeout": 60,
"debug_mode": True,
"max_log_length": 1000
}
# HTTP连接配置
HTTP_CONFIG = {
"max_connections": 10,
"max_keepalive_connections": 5,
"keepalive_expiry": 30.0,
"connect_timeout": 10.0,
"pool_timeout": 5.0
}
logger.info("✅ React Agent配置初始化完成")
# react_agent/agent.py (关键修改部分)
"""
Custom React Agent 实现
使用统一日志系统和独立日志文件
"""
from .logger import get_react_agent_logger
from .config import REACT_AGENT_CONFIG
class CustomReactAgent:
def __init__(self):
# 使用React Agent专用logger
self.logger = get_react_agent_logger("ReactAgent.Core")
self.config = REACT_AGENT_CONFIG
self.logger.info("🚀 CustomReactAgent 初始化开始")
# 其他初始化逻辑...
self.logger.info("✅ CustomReactAgent 初始化完成")
async def process_question(self, question: str, **kwargs):
"""处理问题的主要方法"""
self.logger.info(f"📝 开始处理问题: {question[:100]}...")
try:
# 处理逻辑...
result = await self._internal_process(question, **kwargs)
self.logger.info("✅ 问题处理完成")
return result
except Exception as e:
self.logger.error(f"❌ 问题处理失败: {str(e)}")
raise
def cleanup(self):
"""清理资源"""
self.logger.info("🧹 开始清理React Agent资源")
# 清理逻辑...
# 清理日志资源
from .logger import ReactAgentLogManager
ReactAgentLogManager.cleanup()
logs/
├── app.log # 主应用日志(原有)
├── react_agent_20250115.log # React Agent独立日志(新增)
├── react_agent_20250116.log # 按日期轮换
└── data_pipeline/ # Data Pipeline日志目录(原有)
└── task_20250115_143052/
└── data_pipeline.log
# scripts/test_react_agent_logging.py
"""
验证React Agent日志配置
"""
def test_react_agent_logging():
"""测试React Agent日志功能"""
# 测试日志系统
from react_agent.logger import get_react_agent_logger
logger = get_react_agent_logger("TestLogger")
logger.info("测试 React Agent 日志系统")
logger.warning("测试警告日志")
logger.error("测试错误日志")
print("✅ React Agent日志系统测试完成")
print("📁 请检查 logs/react_agent_YYYYMMDD.log 文件")
if __name__ == "__main__":
test_react_agent_logging()
from core.logging import get_agent_logger, initialize_logging
# 使用项目统一日志系统
logger = get_agent_logger("ReactAgent")
# 继承主配置
try:
from app_config import (
LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG,
REDIS_URL, VECTOR_DB_TYPE
)
logger.info("✅ 成功加载主配置文件")
except ImportError as e:
logger.warning(f"⚠️ 主配置加载失败,使用默认配置: {e}")
# 默认配置
REDIS_URL = "redis://localhost:6379"
LLM_MODEL_TYPE = "api"
# React Agent 特定配置
REACT_AGENT_CONFIG = {
"default_user_id": "guest",
"max_retries": 3,
"retry_base_delay": 3,
"network_timeout": 60,
"debug_mode": True,
"max_log_length": 1000
}
# HTTP连接配置
HTTP_CONFIG = {
"max_connections": 10,
"max_keepalive_connections": 5,
"keepalive_expiry": 30.0,
"connect_timeout": 10.0,
"pool_timeout": 5.0
}
logger.info("✅ React Agent配置初始化完成")
# react_agent/agent.py (关键修改部分)
"""
Custom React Agent 实现
统一使用项目日志系统
"""
from core.logging import get_agent_logger
from .config_react import REACT_AGENT_CONFIG, logger as config_logger
class CustomReactAgent:
def __init__(self):
# 使用统一日志系统
self.logger = get_agent_logger("ReactAgent.Core")
self.config = REACT_AGENT_CONFIG
self.logger.info("🚀 CustomReactAgent 初始化开始")
# 其他初始化逻辑...
self.logger.info("✅ CustomReactAgent 初始化完成")
async def process_question(self, question: str, **kwargs):
"""处理问题的主要方法"""
self.logger.info(f"📝 开始处理问题: {question[:100]}...")
try:
# 处理逻辑...
result = await self._internal_process(question, **kwargs)
self.logger.info("✅ 问题处理完成")
return result
except Exception as e:
self.logger.error(f"❌ 问题处理失败: {str(e)}")
raise
# scripts/verify_logging.py
"""
验证日志格式统一性
"""
import logging
from core.logging import get_agent_logger
def test_logging_consistency():
"""测试日志格式一致性"""
# 测试不同模块的日志格式
loggers = {
"CituApp": get_agent_logger("CituApp"),
"ReactAgent": get_agent_logger("ReactAgent"),
"UnifiedAPI": get_agent_logger("UnifiedAPI")
}
for name, logger in loggers.items():
logger.info(f"测试 {name} 模块日志格式")
logger.warning(f"测试 {name} 模块警告日志")
logger.error(f"测试 {name} 模块错误日志")
print("✅ 日志格式统一性测试完成")
if __name__ == "__main__":
test_logging_consistency()
基于用户澄清,API整合采用复制策略而非合并策略:
citu_app.py
和 test/custom_react_agent/api.py
保持不变unified_api.py
citu_app.py
复制需要的API到 unified_api.py
custom_react_agent/api.py
的全部内容unified_api.py
可以独立提供所有服务# unified_api.py (完整结构)
"""
统一API服务入口
复制原有agent API、包含React Agent API的全部内容和所有管理API
注意:这是一个独立的API文件,不影响原有的citu_app.py和test/custom_react_agent/api.py
"""
import asyncio
import logging
import atexit
from datetime import datetime
from typing import Optional, Dict, Any
from flask import Flask, request, jsonify
from asgiref.wsgi import WsgiToAsgi
# === 核心导入 ===
from core.logging import get_app_logger, initialize_logging
from common.result import (
success_response, bad_request_response, not_found_response,
internal_error_response, agent_success_response, agent_error_response,
validation_failed_response, service_unavailable_response
)
# === Agent导入 ===
try:
from agent.citu_agent import get_citu_langraph_agent
ORIGINAL_AGENT_AVAILABLE = True
except ImportError as e:
print(f"⚠️ 原始Agent不可用: {e}")
ORIGINAL_AGENT_AVAILABLE = False
try:
from react_agent.agent import CustomReactAgent
REACT_AGENT_AVAILABLE = True
except ImportError as e:
print(f"⚠️ React Agent不可用: {e}")
REACT_AGENT_AVAILABLE = False
# === 公共服务导入 ===
from common.redis_conversation_manager import RedisConversationManager
from common.qa_feedback_manager import QAFeedbackManager
# === 初始化 ===
initialize_logging()
logger = get_app_logger("UnifiedAPI")
# 创建Flask应用
app = Flask(__name__)
# 全局实例
_original_agent = None
_react_agent = None
_redis_manager = RedisConversationManager()
_qa_manager = QAFeedbackManager()
# === 应用生命周期管理 ===
def initialize_agents():
"""初始化Agent实例"""
global _original_agent, _react_agent
if ORIGINAL_AGENT_AVAILABLE and _original_agent is None:
try:
_original_agent = get_citu_langraph_agent()
logger.info("✅ 原始Agent初始化成功")
except Exception as e:
logger.error(f"❌ 原始Agent初始化失败: {e}")
if REACT_AGENT_AVAILABLE and _react_agent is None:
try:
_react_agent = CustomReactAgent()
logger.info("✅ React Agent初始化成功")
except Exception as e:
logger.error(f"❌ React Agent初始化失败: {e}")
def cleanup_resources():
"""清理资源"""
global _original_agent, _react_agent
logger.info("🧹 开始清理资源...")
if _react_agent:
try:
# 如果React Agent有清理方法
if hasattr(_react_agent, 'cleanup'):
_react_agent.cleanup()
except Exception as e:
logger.error(f"React Agent清理失败: {e}")
_original_agent = None
_react_agent = None
logger.info("✅ 资源清理完成")
atexit.register(cleanup_resources)
# === 健康检查 ===
@app.route("/")
def root():
"""根路径健康检查"""
return jsonify({
"message": "统一API服务正在运行",
"version": "v1.0",
"services": {
"original_agent": ORIGINAL_AGENT_AVAILABLE,
"react_agent": REACT_AGENT_AVAILABLE,
"redis": _redis_manager.is_available(),
},
"timestamp": datetime.now().isoformat()
})
@app.route('/health', methods=['GET'])
def health_check():
"""详细健康检查"""
try:
# 检查各个组件状态
health_status = {
"status": "healthy",
"components": {
"original_agent": {
"available": ORIGINAL_AGENT_AVAILABLE,
"initialized": _original_agent is not None
},
"react_agent": {
"available": REACT_AGENT_AVAILABLE,
"initialized": _react_agent is not None
},
"redis": {
"available": _redis_manager.is_available(),
"connection": "ok" if _redis_manager.is_available() else "failed"
},
"qa_feedback": {
"available": True,
"status": "ok"
}
},
"timestamp": datetime.now().isoformat()
}
# 判断整体健康状态
all_critical_healthy = (
health_status["components"]["redis"]["available"] and
(ORIGINAL_AGENT_AVAILABLE or REACT_AGENT_AVAILABLE)
)
if not all_critical_healthy:
health_status["status"] = "degraded"
return jsonify(health_status), 503
return jsonify(health_status), 200
except Exception as e:
logger.error(f"健康检查失败: {e}")
return jsonify({
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 500
# === React Agent API (新版本) ===
@app.route('/api/v0/ask_react_agent', methods=['POST'])
def ask_react_agent():
"""React Agent API - 智能场景,高token消耗"""
if not REACT_AGENT_AVAILABLE:
return jsonify(service_unavailable_response(
response_text="React Agent服务不可用"
)), 503
# 确保Agent已初始化
if _react_agent is None:
initialize_agents()
if _react_agent is None:
return jsonify(service_unavailable_response(
response_text="React Agent初始化失败"
)), 503
try:
data = request.get_json(force=True)
question = data.get('question', '').strip()
user_id = data.get('user_id', 'guest')
thread_id = data.get('thread_id')
if not question:
return jsonify(bad_request_response(
response_text="问题不能为空",
missing_params=["question"]
)), 400
# 异步调用React Agent
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_react_agent.process_question(
question=question,
user_id=user_id,
thread_id=thread_id
)
)
finally:
loop.close()
if result.get('success', False):
return jsonify(success_response(
response_text="React Agent处理成功",
data=result
))
else:
return jsonify(agent_error_response(
response_text=result.get('error', 'React Agent处理失败'),
error_type="react_agent_error"
)), 500
except Exception as e:
logger.error(f"React Agent API错误: {str(e)}")
return jsonify(internal_error_response(
response_text="React Agent处理失败,请稍后重试"
)), 500
# === 原始Agent API (兼容版本) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""原始Agent API - 简单场景,低token消耗"""
if not ORIGINAL_AGENT_AVAILABLE:
return jsonify(service_unavailable_response(
response_text="原始Agent服务不可用"
)), 503
# 确保Agent已初始化
if _original_agent is None:
initialize_agents()
if _original_agent is None:
return jsonify(service_unavailable_response(
response_text="原始Agent初始化失败"
)), 503
# 这里会包含从citu_app.py迁移的完整ask_agent逻辑
# 包括Redis上下文管理、缓存检查、异步Agent调用等
# ... (从citu_app.py复制完整实现,添加适当的异步包装)
# === QA反馈系统API ===
@app.route('/api/v0/qa_feedback/query', methods=['POST'])
def qa_feedback_query():
"""查询反馈记录API"""
# 从citu_app.py完整迁移
# ...
@app.route('/api/v0/qa_feedback/add', methods=['POST'])
def qa_feedback_add():
"""添加反馈记录API"""
# 从citu_app.py完整迁移
# ...
# === Redis对话管理API ===
@app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id):
"""获取用户对话列表"""
# 从citu_app.py完整迁移
# ...
# === 训练数据管理API ===
@app.route('/api/v0/training_data/stats', methods=['GET'])
def training_data_stats():
"""获取训练数据统计信息"""
# 从citu_app.py完整迁移
# ...
# === Data Pipeline API ===
@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
def create_data_pipeline_task():
"""创建数据管道任务"""
# 从citu_app.py完整迁移
# ...
# === 应用启动配置 ===
@app.before_first_request
def before_first_request():
"""首次请求前的初始化"""
logger.info("🚀 统一API服务启动,开始初始化...")
initialize_agents()
logger.info("✅ 统一API服务初始化完成")
if __name__ == '__main__':
logger.info("🚀 以开发模式启动统一API服务...")
app.run(host='0.0.0.0', port=8084, debug=True)
API类别 | 迁移状态 | 测试状态 | 备注 |
---|---|---|---|
Health Check | ✅ 完成 | ✅ 通过 | 新增组件状态检查 |
React Agent | ✅ 完成 | ⏳ 待测试 | 异步包装完成 |
Original Agent | ⏳ 进行中 | ⏳ 待测试 | 需要异步改造 |
QA Feedback (6个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
Redis管理 (8个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
训练数据 (4个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
Data Pipeline (10+个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
# === 改造前 (citu_app.py) ===
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
# ... 参数处理 ...
# 直接异步调用 (在Flask-WSGI中可能有问题)
agent_result = asyncio.run(agent.process_question(...))
# ... 结果处理 ...
# === 改造后 (api_unified.py) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
# ... 参数处理 ...
# 安全的异步包装
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(
agent.process_question(...)
)
finally:
loop.close()
# ... 结果处理 ...
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""
支持对话上下文的ask_agent API - 异步改造版本
从citu_app.py完整迁移并添加异步安全包装
"""
req = request.get_json(force=True)
question = req.get("question", None)
browser_session_id = req.get("session_id", None)
# 参数解析 (从citu_app.py复制)
user_id_input = req.get("user_id", None)
conversation_id_input = req.get("conversation_id", None)
continue_conversation = req.get("continue_conversation", False)
api_routing_mode = req.get("routing_mode", None)
VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
# 参数验证
if not question:
return jsonify(bad_request_response(
response_text="缺少必需参数:question",
missing_params=["question"]
)), 400
if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
return jsonify(bad_request_response(
response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
invalid_params=["routing_mode"]
)), 400
try:
# 1. ID解析 (同步操作)
from flask import session
login_user_id = session.get('user_id') if 'user_id' in session else None
user_id = _redis_manager.resolve_user_id(
user_id_input, browser_session_id, request.remote_addr, login_user_id
)
conversation_id, conversation_status = _redis_manager.resolve_conversation_id(
user_id, conversation_id_input, continue_conversation
)
# 2. 上下文获取 (同步操作)
context = _redis_manager.get_context(conversation_id)
# 3. 上下文类型检测
context_type = None
if context:
try:
messages = _redis_manager.get_messages(conversation_id, limit=10)
for message in reversed(messages):
if message.get("role") == "assistant":
metadata = message.get("metadata", {})
context_type = metadata.get("type")
if context_type:
logger.info(f"检测到上下文类型: {context_type}")
break
except Exception as e:
logger.warning(f"获取上下文类型失败: {str(e)}")
# 4. 缓存检查 (同步操作)
cached_answer = _redis_manager.get_cached_answer(question, context)
if cached_answer:
logger.info("使用缓存答案")
return jsonify(agent_success_response(
response_type=cached_answer.get("type", "UNKNOWN"),
response=cached_answer.get("response", ""),
sql=cached_answer.get("sql"),
records=cached_answer.get("query_result"),
summary=cached_answer.get("summary"),
session_id=browser_session_id,
execution_path=cached_answer.get("execution_path", []),
classification_info=cached_answer.get("classification_info", {}),
conversation_id=conversation_id,
user_id=user_id,
is_guest_user=(user_id == "guest"),
context_used=bool(context),
from_cache=True,
conversation_status=conversation_status["status"],
conversation_message=conversation_status["message"],
requested_conversation_id=conversation_status.get("requested_id")
))
# 5. 保存用户消息 (同步操作)
_redis_manager.save_message(conversation_id, "user", question)
# 6. 构建带上下文的问题
if context:
enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
logger.info(f"使用上下文,长度: {len(context)}字符")
else:
enhanced_question = question
logger.info("新对话,无上下文")
# 7. 确定路由模式
if api_routing_mode:
effective_routing_mode = api_routing_mode
logger.info(f"使用API指定的路由模式: {effective_routing_mode}")
else:
try:
from app_config import QUESTION_ROUTING_MODE
effective_routing_mode = QUESTION_ROUTING_MODE
logger.info(f"使用配置文件路由模式: {effective_routing_mode}")
except ImportError:
effective_routing_mode = "hybrid"
logger.info(f"使用默认路由模式: {effective_routing_mode}")
# 8. 关键异步改造:Agent调用
if _original_agent is None:
initialize_agents()
if _original_agent is None:
return jsonify(service_unavailable_response(
response_text="AI服务暂时不可用,请稍后重试",
can_retry=True
)), 503
# 异步安全包装
async def process_with_agent():
"""异步处理函数"""
return await _original_agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
)
# 在新的事件循环中执行异步操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(process_with_agent())
finally:
loop.close()
asyncio.set_event_loop(None) # 清理事件循环
# 9. 处理Agent结果 (同步操作)
if agent_result.get("success", False):
response_type = agent_result.get("type", "UNKNOWN")
response_text = agent_result.get("response", "")
sql = agent_result.get("sql")
query_result = agent_result.get("query_result")
summary = agent_result.get("summary")
execution_path = agent_result.get("execution_path", [])
classification_info = agent_result.get("classification_info", {})
# 确定助手回复内容
if response_type == "DATABASE":
if response_text:
assistant_response = response_text
elif summary:
assistant_response = summary
elif query_result:
row_count = query_result.get("row_count", 0)
assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
else:
assistant_response = "数据库查询已处理。"
else:
assistant_response = response_text
# 保存助手回复
_redis_manager.save_message(
conversation_id, "assistant", assistant_response,
metadata={
"type": response_type,
"sql": sql,
"execution_path": execution_path
}
)
# 缓存答案
_redis_manager.cache_answer(question, agent_result, context)
return jsonify(agent_success_response(
response_type=response_type,
response=response_text,
sql=sql,
records=query_result,
summary=summary,
session_id=browser_session_id,
execution_path=execution_path,
classification_info=classification_info,
conversation_id=conversation_id,
user_id=user_id,
is_guest_user=(user_id == "guest"),
context_used=bool(context),
from_cache=False,
conversation_status=conversation_status["status"],
conversation_message=conversation_status["message"],
requested_conversation_id=conversation_status.get("requested_id"),
routing_mode_used=effective_routing_mode,
routing_mode_source="api" if api_routing_mode else "config"
))
else:
error_message = agent_result.get("error", "Agent处理失败")
error_code = agent_result.get("error_code", 500)
return jsonify(agent_error_response(
response_text=error_message,
error_type="agent_processing_failed",
code=error_code,
session_id=browser_session_id,
conversation_id=conversation_id,
user_id=user_id
)), error_code
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(internal_error_response(
response_text="查询处理失败,请稍后重试"
)), 500
# scripts/test_async_safety.py
"""
异步安全性测试脚本
"""
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def test_async_event_loop_isolation():
"""测试异步事件循环隔离"""
def sync_function_with_async():
"""模拟同步函数中的异步调用"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
async def async_task():
await asyncio.sleep(0.1)
return "async_result"
result = loop.run_until_complete(async_task())
return result
finally:
loop.close()
# 并发测试
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(sync_function_with_async)
for _ in range(10)
]
results = [future.result() for future in futures]
assert all(r == "async_result" for r in results)
print("✅ 异步事件循环隔离测试通过")
if __name__ == "__main__":
test_async_event_loop_isolation()
# tests/test_migration_basic.py
"""
基础迁移功能测试
"""
import pytest
import requests
from unittest.mock import patch
class TestBasicMigration:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
def test_health_check(self, api_base_url):
"""测试健康检查接口"""
response = requests.get(f"{api_base_url}/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
assert "components" in data
def test_root_endpoint(self, api_base_url):
"""测试根路径"""
response = requests.get(f"{api_base_url}/")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "version" in data
def test_react_agent_api_availability(self, api_base_url):
"""测试React Agent API可用性"""
payload = {
"question": "测试问题",
"user_id": "test_user"
}
response = requests.post(
f"{api_base_url}/api/v0/ask_react_agent",
json=payload
)
# 应该返回有效响应 (可能是错误,但不应该是404)
assert response.status_code != 404
# tests/test_api_compatibility.py
"""
API兼容性测试
确保迁移后API行为与原版本一致
"""
import pytest
import requests
class TestAPICompatibility:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
def test_ask_agent_parameter_validation(self, api_base_url):
"""测试ask_agent参数验证"""
# 测试缺少question参数
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={}
)
assert response.status_code == 400
data = response.json()
assert "question" in data.get("missing_params", [])
# 测试无效routing_mode
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={
"question": "测试",
"routing_mode": "invalid_mode"
}
)
assert response.status_code == 400
def test_response_format_consistency(self, api_base_url):
"""测试响应格式一致性"""
payload = {
"question": "简单测试问题"
}
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json=payload
)
data = response.json()
# 检查标准响应字段
required_fields = ["code", "success", "message"]
for field in required_fields:
assert field in data, f"响应缺少必需字段: {field}"
# tests/test_async_performance.py
"""
异步性能测试
"""
import asyncio
import aiohttp
import time
import pytest
from concurrent.futures import ThreadPoolExecutor
class TestAsyncPerformance:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
async def test_concurrent_requests(self, api_base_url):
"""测试并发请求处理"""
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(10):
payload = {
"question": f"并发测试问题 {i}",
"user_id": f"test_user_{i}"
}
task = session.post(
f"{api_base_url}/api/v0/ask_react_agent",
json=payload
)
tasks.append(task)
start_time = time.time()
responses = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
# 检查响应
valid_responses = [
r for r in responses
if not isinstance(r, Exception) and r.status in [200, 400, 500]
]
assert len(valid_responses) >= 8 # 至少80%成功
assert end_time - start_time < 30 # 30秒内完成
def test_sync_async_isolation(self, api_base_url):
"""测试同步异步隔离"""
def make_request():
"""发起请求的同步函数"""
import requests
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={"question": "隔离测试"}
)
return response.status_code
# 多线程并发测试
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(make_request)
for _ in range(10)
]
results = [future.result() for future in futures]
# 检查是否有异步冲突
valid_status_codes = [200, 400, 500, 503]
assert all(code in valid_status_codes for code in results)
# scripts/stress_test.sh
#!/bin/bash
echo "🚀 开始压力测试..."
# 1. 基础负载测试
echo "1. 基础负载测试..."
ab -n 100 -c 5 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_react_agent
# 2. 持续负载测试
echo "2. 持续负载测试..."
ab -n 1000 -c 10 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_agent
# 3. 内存泄漏检测
echo "3. 内存使用监控..."
python scripts/monitor_memory.py &
MONITOR_PID=$!
# 运行一段时间的负载
ab -n 500 -c 8 -T application/json -p tests/test_payload.json http://localhost:8084/health
# 停止监控
kill $MONITOR_PID
echo "✅ 压力测试完成"
# asgi_app_new.py (更新版)
"""
ASGI应用启动文件 - 生产环境配置
支持异步操作和性能优化
"""
import os
import logging
from asgiref.wsgi import WsgiToAsgi
# 导入统一API应用
from api_unified import app
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 性能优化配置
ASGI_CONFIG = {
"max_workers": int(os.getenv("MAX_WORKERS", "4")),
"timeout": int(os.getenv("TIMEOUT", "60")),
"keepalive": int(os.getenv("KEEPALIVE", "30")),
}
# 将Flask WSGI应用转换为ASGI应用
asgi_app = WsgiToAsgi(app)
logger.info(f"✅ ASGI应用配置完成: {ASGI_CONFIG}")
# 生产环境启动命令:
# uvicorn asgi_app_new:asgi_app --host 0.0.0.0 --port 8084 --workers 4
# 或
# gunicorn asgi_app_new:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084
# deploy/docker-compose.yml
version: '3.8'
services:
unified-api:
build:
context: .
dockerfile: deploy/Dockerfile
ports:
- "8084:8084"
environment:
- PORT=8084
- MAX_WORKERS=4
- TIMEOUT=60
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/db
depends_on:
- redis
- postgres
volumes:
- ./logs:/app/logs
- ./data:/app/data
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=vanna_db
- POSTGRES_USER=vanna_user
- POSTGRES_PASSWORD=vanna_pass
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
volumes:
redis_data:
postgres_data:
# deploy/Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
COPY react_agent/requirements.txt ./react_agent/
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r react_agent/requirements.txt
# 复制应用代码
COPY . .
# 创建日志目录
RUN mkdir -p logs data
# 设置环境变量
ENV PYTHONPATH=/app
ENV FLASK_APP=api_unified.py
# 暴露端口
EXPOSE 8084
# 启动命令
CMD ["uvicorn", "asgi_app_new:asgi_app", "--host", "0.0.0.0", "--port", "8084", "--workers", "1"]
# monitoring/metrics.py
"""
应用监控指标收集
"""
import time
import psutil
import logging
from functools import wraps
from flask import request, g
from datetime import datetime
logger = logging.getLogger(__name__)
class MetricsCollector:
def __init__(self):
self.request_count = 0
self.error_count = 0
self.response_times = []
def record_request(self, endpoint, method, status_code, response_time):
"""记录请求指标"""
self.request_count += 1
if status_code >= 400:
self.error_count += 1
self.response_times.append(response_time)
# 保持最近1000条记录
if len(self.response_times) > 1000:
self.response_times = self.response_times[-1000:]
logger.info(f"📊 {method} {endpoint} - {status_code} - {response_time:.3f}s")
def get_stats(self):
"""获取统计信息"""
if not self.response_times:
return {
"request_count": self.request_count,
"error_count": self.error_count,
"error_rate": 0,
"avg_response_time": 0,
"system_stats": self._get_system_stats()
}
return {
"request_count": self.request_count,
"error_count": self.error_count,
"error_rate": (self.error_count / self.request_count) * 100,
"avg_response_time": sum(self.response_times) / len(self.response_times),
"max_response_time": max(self.response_times),
"min_response_time": min(self.response_times),
"system_stats": self._get_system_stats()
}
def _get_system_stats(self):
"""获取系统统计信息"""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"timestamp": datetime.now().isoformat()
}
# 全局监控实例
metrics = MetricsCollector()
def monitor_requests(app):
"""为Flask应用添加请求监控"""
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
response_time = time.time() - g.start_time
metrics.record_request(
endpoint=request.endpoint or 'unknown',
method=request.method,
status_code=response.status_code,
response_time=response_time
)
return response
# 添加监控端点
@app.route('/api/v0/metrics', methods=['GET'])
def get_metrics():
"""获取应用监控指标"""
return metrics.get_stats()
# monitoring/alerting.py
"""
告警系统
"""
import smtplib
import logging
from email.mime.text import MIMEText
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class AlertManager:
def __init__(self):
self.alert_rules = {
"high_error_rate": {
"threshold": 5.0, # 5%错误率
"window": 300, # 5分钟窗口
"cooldown": 900 # 15分钟冷却期
},
"slow_response": {
"threshold": 10.0, # 10秒响应时间
"window": 180, # 3分钟窗口
"cooldown": 600 # 10分钟冷却期
},
"high_memory": {
"threshold": 85.0, # 85%内存使用率
"window": 120, # 2分钟窗口
"cooldown": 1800 # 30分钟冷却期
}
}
self.last_alerts = {}
def check_alerts(self, metrics):
"""检查告警条件"""
current_time = datetime.now()
# 检查错误率
if metrics["error_rate"] > self.alert_rules["high_error_rate"]["threshold"]:
self._trigger_alert(
"high_error_rate",
f"错误率过高: {metrics['error_rate']:.2f}%",
current_time
)
# 检查响应时间
if metrics.get("avg_response_time", 0) > self.alert_rules["slow_response"]["threshold"]:
self._trigger_alert(
"slow_response",
f"响应时间过慢: {metrics['avg_response_time']:.2f}s",
current_time
)
# 检查内存使用率
memory_percent = metrics["system_stats"]["memory_percent"]
if memory_percent > self.alert_rules["high_memory"]["threshold"]:
self._trigger_alert(
"high_memory",
f"内存使用率过高: {memory_percent:.2f}%",
current_time
)
def _trigger_alert(self, alert_type, message, current_time):
"""触发告警"""
# 检查冷却期
if alert_type in self.last_alerts:
last_alert_time = self.last_alerts[alert_type]
cooldown_seconds = self.alert_rules[alert_type]["cooldown"]
if (current_time - last_alert_time).seconds < cooldown_seconds:
return # 还在冷却期内
# 记录告警时间
self.last_alerts[alert_type] = current_time
# 发送告警
logger.error(f"🚨 告警: {alert_type} - {message}")
# 这里可以添加更多告警方式:邮件、短信、Slack等
self._send_email_alert(alert_type, message)
def _send_email_alert(self, alert_type, message):
"""发送邮件告警 (示例)"""
try:
# 邮件配置 (需要根据实际情况配置)
smtp_server = "smtp.example.com"
smtp_port = 587
username = "alerts@example.com"
password = "password"
msg = MIMEText(f"时间: {datetime.now()}\n类型: {alert_type}\n详情: {message}")
msg['Subject'] = f"[统一API服务] {alert_type}告警"
msg['From'] = username
msg['To'] = "admin@example.com"
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(username, password)
server.send_message(msg)
logger.info(f"✅ 告警邮件已发送: {alert_type}")
except Exception as e:
logger.error(f"❌ 告警邮件发送失败: {e}")
# 全局告警管理器
alert_manager = AlertManager()
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 1 | 目录结构迁移 | 后端 | 新目录结构、路径修正 | ✅ 无导入错误 |
Day 2 | 日志服务统一 | 后端 | 统一日志配置 | ✅ 日志格式一致 |
Day 3 | React Agent API整合 | 后端 | ask_react_agent可用 | ✅ API正常响应 |
Day 4 | 基础API迁移(一) | 后端 | QA反馈、Redis管理API | ✅ 6+8个API可用 |
Day 5 | 基础API迁移(二) | 后端 | 训练数据、Data Pipeline API | ✅ 4+10个API可用 |
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 6 | ask_agent异步改造 | 后端 | 异步版ask_agent | ✅ 异步调用正常 |
Day 7 | 集成测试(一) | QA | 功能测试报告 | ✅ 核心功能正常 |
Day 8 | 性能测试 | QA | 性能测试报告 | ✅ 性能无明显下降 |
Day 9 | 兼容性测试 | QA | 兼容性测试报告 | ✅ API兼容性100% |
Day 10 | Bug修复和优化 | 后端 | 修复报告 | ✅ 关键bug已修复 |
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 11 | 部署配置准备 | 运维 | Docker/K8s配置 | ✅ 部署脚本可用 |
Day 12 | 监控系统搭建 | 运维 | 监控配置 | ✅ 监控指标正常 |
Day 13 | 预生产部署 | 运维 | 预生产环境 | ✅ 预生产环境稳定 |
Day 14 | 生产环境部署 | 运维 | 生产环境 | ✅ 生产环境稳定 |
Day 15 | 上线后监控 | 全体 | 监控报告 | ✅ 无重大故障 |
如果失败: 延期2天,重新评估技术方案
如果失败: 评估回滚方案,或延期1周
如果失败: 执行回滚计划
风险项 | 概率 | 影响 | 风险等级 | 缓解措施 |
---|---|---|---|---|
异步兼容性问题 | 高 | 高 | 🔴 极高 | 充分测试、渐进部署、快速回滚 |
性能显著下降 | 中 | 高 | 🟡 高 | 性能基准、监控告警、优化方案 |
数据丢失/损坏 | 低 | 极高 | 🟡 高 | 数据备份、事务保护、验证机制 |
第三方依赖冲突 | 中 | 中 | 🟡 中 | 依赖版本锁定、虚拟环境隔离 |
部署失败 | 中 | 中 | 🟡 中 | 自动化部署、回滚脚本、蓝绿部署 |
触发条件: ask_agent异步改造后出现严重错误,影响核心功能
应急措施:
立即回滚 (5分钟内)
# 快速切换到备用启动方式
pkill -f "api_unified"
python citu_app.py &
问题定位 (30分钟内) ```bash
tail -1000 logs/app.log > emergency_logs.txt
python scripts/debug_async_issues.py ```
修复方案 (2小时内)
触发条件: 响应时间超过原版本50%,或并发能力下降明显
应急措施:
资源扩容 (10分钟内) ```bash
gunicorn asgi_app_new:asgi_app -w 8 -k uvicorn.workers.UvicornWorker
# 或增加内存限制 docker update --memory=4g unified-api
2. **性能分析** (1小时内)
```bash
# 使用profiler分析性能瓶颈
python -m cProfile -o profile.out api_unified.py
# 分析内存使用
python scripts/memory_profiler.py
触发条件: 发现数据丢失、损坏或不一致
应急措施:
立即停止写操作 (1分钟内)
# 设置只读模式
curl -X POST http://localhost:8084/api/v0/maintenance/readonly
数据备份和恢复 (30分钟内) ```bash
pg_dump vanna_db > emergencybackup$(date +%Y%m%d_%H%M%S).sql
psql vanna_db < backup_file.sql ```
数据验证 (1小时内) ```bash
python scripts/data_integrity_check.py
# 对比迁移前后数据 python scripts/compare_data.py ```
角色 | 负责范围 | 联系方式 |
---|---|---|
技术负责人 | 整体架构、关键技术决策 | tech-lead@example.com |
后端开发 | API迁移、异步改造 | backend-dev@example.com |
测试工程师 | 功能测试、性能测试 | qa-engineer@example.com |
运维工程师 | 部署配置、监控告警 | devops@example.com |
技术文档
项目相关文档
migration_and_integration_plan.md
- 总体方案api_compatibility_matrix.xlsx
- API兼容性矩阵performance_benchmark.md
- 性能基准报告应急联系
文档版本: v1.0
创建日期: 2025-01-15
最后更新: 2025-01-15
文档状态: 详细实施指南 - 待执行
适用范围: Custom React Agent 完整迁移项目
依赖文档: migration_and_integration_plan.md