本文档是对现有 migration_and_integration_plan.md
的详细补充和具体实施指南,提供完整的代码迁移步骤、配置方案和测试验证计划。
文档层次关系:
migration_and_integration_plan.md
- 总体方案概述complete_migration_implementation_guide.md
(本文档) - 详细实施指南API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/qa_feedback/query |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/qa_feedback/add |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/delete/{feedback_id} |
DELETE | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/update/{feedback_id} |
PUT | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_feedback/add_to_training |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/qa_feedback/stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/user/{user_id}/conversations |
GET | 同步 | ⭐ 简单 | 20分钟 |
/api/v0/conversation/{conv_id}/messages |
GET | 同步 | ⭐ 简单 | 20分钟 |
/api/v0/conversation_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/conversation_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/embedding_cache_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/embedding_cache_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_cache_stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/qa_cache_cleanup |
POST | 同步 | ⭐ 简单 | 15分钟 |
API端点 | 方法 | 当前状态 | 迁移难度 | 预计时间 |
---|---|---|---|---|
/api/v0/training_data/stats |
GET | 同步 | ⭐ 简单 | 15分钟 |
/api/v0/training_data/query |
POST | 同步 | ⭐ 简单 | 30分钟 |
/api/v0/training_data/create |
POST | 同步 | ⭐ 简单 | 45分钟 |
/api/v0/training_data/delete |
POST | 同步 | ⭐ 简单 | 30分钟 |
API类别 | 端点数量 | 迁移难度 | 预计时间 |
---|---|---|---|
任务管理 | 5个 | ⭐⭐ 中等 | 2小时 |
文件管理 | 3个 | ⭐ 简单 | 1小时 |
数据库操作 | 2个 | ⭐ 简单 | 30分钟 |
监控日志 | 2个 | ⭐ 简单 | 30分钟 |
API端点 | 改造类型 | 技术难度 | 预计时间 | 风险等级 |
---|---|---|---|---|
/api/v0/ask_agent |
异步包装 | ⭐⭐⭐ 复杂 | 4小时 | 🔴 高 |
改造技术方案:
# 改造前 (citu_app.py)
agent_result = asyncio.run(agent.process_question(...))
# 改造后 (新api.py)
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""异步包装版本"""
try:
# 同步部分:参数处理和缓存检查
# ...
# 异步部分:Agent调用
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(
agent.process_question(...)
)
finally:
loop.close()
# 同步部分:结果处理
# ...
except Exception as e:
# 错误处理
原始API: /api/v0/ask_agent # 保持不变,简单场景
新React API: /api/v0/ask_react_agent # 新增,智能场景
API类型 | 命名格式 | 示例 | 适用场景 |
---|---|---|---|
原始Agent | /api/v0/{action}_agent |
/api/v0/ask_agent |
简单查询,低token消耗 |
React Agent | /api/v0/{action}_react_agent |
/api/v0/ask_react_agent |
复杂推理,高token消耗 |
其他API | 保持不变 | /api/v0/qa_feedback/query |
所有其他功能API |
# 兼容性映射配置
API_COMPATIBILITY_MAP = {
# 原有API保持不变
"/api/v0/ask_agent": {
"handler": "ask_agent_v0",
"agent_type": "langgraph",
"deprecated": False
},
# 新增React Agent API
"/api/v0/ask_react_agent": {
"handler": "ask_react_agent_v1",
"agent_type": "react",
"deprecated": False
},
# 未来可能的扩展
"/api/v0/ask_advanced_agent": {
"handler": "ask_advanced_agent_v2",
"agent_type": "future",
"deprecated": False
}
}
# 1. 创建react_agent目录
mkdir -p react_agent
mkdir -p config
# 2. 复制核心文件 (保留原文件)
cp test/custom_react_agent/agent.py react_agent/
cp test/custom_react_agent/state.py react_agent/
cp test/custom_react_agent/sql_tools.py react_agent/
cp test/custom_react_agent/shell.py react_agent/
cp test/custom_react_agent/enhanced_redis_api.py react_agent/
cp test/custom_react_agent/config.py react_agent/config_react.py # 重命名避免冲突
# 3. 复制API文件到根目录 (重命名)
cp test/custom_react_agent/api.py ./api_unified.py # 重命名,后续将整合所有API
cp test/custom_react_agent/asgi_app.py ./asgi_app_new.py # 重命名,避免冲突
# 4. 创建初始化文件
echo "# React Agent Module" > react_agent/__init__.py
# 5. 复制依赖文件
cp test/custom_react_agent/requirements.txt react_agent/
# scripts/fix_imports.py
"""
自动修正导入路径的脚本
"""
import os
import re
def fix_imports_in_file(file_path):
"""修正单个文件的导入路径"""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# 修正规则
replacements = [
(r'from test\.custom_react_agent', 'from react_agent'),
(r'import test\.custom_react_agent', 'import react_agent'),
(r'from \.agent import', 'from react_agent.agent import'),
(r'from \.config import', 'from react_agent.config_react import'),
(r'from \.state import', 'from react_agent.state import'),
(r'from \.sql_tools import', 'from react_agent.sql_tools import'),
]
for pattern, replacement in replacements:
content = re.sub(pattern, replacement, content)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
def fix_all_imports():
"""批量修正所有文件的导入路径"""
react_agent_files = [
'react_agent/agent.py',
'react_agent/state.py',
'react_agent/sql_tools.py',
'react_agent/shell.py',
'react_agent/enhanced_redis_api.py',
'api_unified.py',
'asgi_app_new.py'
]
for file_path in react_agent_files:
if os.path.exists(file_path):
fix_imports_in_file(file_path)
print(f"✅ 已修正: {file_path}")
else:
print(f"❌ 文件不存在: {file_path}")
if __name__ == "__main__":
fix_all_imports()
# 运行路径修正脚本
python scripts/fix_imports.py
# 验证Python语法
python -m py_compile react_agent/agent.py
python -m py_compile react_agent/state.py
python -m py_compile react_agent/sql_tools.py
python -m py_compile api_unified.py
# 输出验证结果
echo "✅ 目录迁移完成,语法检查通过"
# react_agent/config_react.py (修改后)
"""
React Agent 统一配置
与项目主配置保持一致
"""
import os
from core.logging import get_agent_logger, initialize_logging
# 使用项目统一日志系统
logger = get_agent_logger("ReactAgent")
# 继承主配置
try:
from app_config import (
LLM_MODEL_TYPE, API_LLM_MODEL, API_QIANWEN_CONFIG,
REDIS_URL, VECTOR_DB_TYPE
)
logger.info("✅ 成功加载主配置文件")
except ImportError as e:
logger.warning(f"⚠️ 主配置加载失败,使用默认配置: {e}")
# 默认配置
REDIS_URL = "redis://localhost:6379"
LLM_MODEL_TYPE = "api"
# React Agent 特定配置
REACT_AGENT_CONFIG = {
"default_user_id": "guest",
"max_retries": 3,
"retry_base_delay": 3,
"network_timeout": 60,
"debug_mode": True,
"max_log_length": 1000
}
# HTTP连接配置
HTTP_CONFIG = {
"max_connections": 10,
"max_keepalive_connections": 5,
"keepalive_expiry": 30.0,
"connect_timeout": 10.0,
"pool_timeout": 5.0
}
logger.info("✅ React Agent配置初始化完成")
# react_agent/agent.py (关键修改部分)
"""
Custom React Agent 实现
统一使用项目日志系统
"""
from core.logging import get_agent_logger
from .config_react import REACT_AGENT_CONFIG, logger as config_logger
class CustomReactAgent:
def __init__(self):
# 使用统一日志系统
self.logger = get_agent_logger("ReactAgent.Core")
self.config = REACT_AGENT_CONFIG
self.logger.info("🚀 CustomReactAgent 初始化开始")
# 其他初始化逻辑...
self.logger.info("✅ CustomReactAgent 初始化完成")
async def process_question(self, question: str, **kwargs):
"""处理问题的主要方法"""
self.logger.info(f"📝 开始处理问题: {question[:100]}...")
try:
# 处理逻辑...
result = await self._internal_process(question, **kwargs)
self.logger.info("✅ 问题处理完成")
return result
except Exception as e:
self.logger.error(f"❌ 问题处理失败: {str(e)}")
raise
# scripts/verify_logging.py
"""
验证日志格式统一性
"""
import logging
from core.logging import get_agent_logger
def test_logging_consistency():
"""测试日志格式一致性"""
# 测试不同模块的日志格式
loggers = {
"CituApp": get_agent_logger("CituApp"),
"ReactAgent": get_agent_logger("ReactAgent"),
"UnifiedAPI": get_agent_logger("UnifiedAPI")
}
for name, logger in loggers.items():
logger.info(f"测试 {name} 模块日志格式")
logger.warning(f"测试 {name} 模块警告日志")
logger.error(f"测试 {name} 模块错误日志")
print("✅ 日志格式统一性测试完成")
if __name__ == "__main__":
test_logging_consistency()
# api_unified.py (完整结构)
"""
统一API服务入口
整合原有agent API、React Agent API和所有管理API
"""
import asyncio
import logging
import atexit
from datetime import datetime
from typing import Optional, Dict, Any
from flask import Flask, request, jsonify
from asgiref.wsgi import WsgiToAsgi
# === 核心导入 ===
from core.logging import get_app_logger, initialize_logging
from common.result import (
success_response, bad_request_response, not_found_response,
internal_error_response, agent_success_response, agent_error_response,
validation_failed_response, service_unavailable_response
)
# === Agent导入 ===
try:
from agent.citu_agent import get_citu_langraph_agent
ORIGINAL_AGENT_AVAILABLE = True
except ImportError as e:
print(f"⚠️ 原始Agent不可用: {e}")
ORIGINAL_AGENT_AVAILABLE = False
try:
from react_agent.agent import CustomReactAgent
REACT_AGENT_AVAILABLE = True
except ImportError as e:
print(f"⚠️ React Agent不可用: {e}")
REACT_AGENT_AVAILABLE = False
# === 公共服务导入 ===
from common.redis_conversation_manager import RedisConversationManager
from common.qa_feedback_manager import QAFeedbackManager
# === 初始化 ===
initialize_logging()
logger = get_app_logger("UnifiedAPI")
# 创建Flask应用
app = Flask(__name__)
# 全局实例
_original_agent = None
_react_agent = None
_redis_manager = RedisConversationManager()
_qa_manager = QAFeedbackManager()
# === 应用生命周期管理 ===
def initialize_agents():
"""初始化Agent实例"""
global _original_agent, _react_agent
if ORIGINAL_AGENT_AVAILABLE and _original_agent is None:
try:
_original_agent = get_citu_langraph_agent()
logger.info("✅ 原始Agent初始化成功")
except Exception as e:
logger.error(f"❌ 原始Agent初始化失败: {e}")
if REACT_AGENT_AVAILABLE and _react_agent is None:
try:
_react_agent = CustomReactAgent()
logger.info("✅ React Agent初始化成功")
except Exception as e:
logger.error(f"❌ React Agent初始化失败: {e}")
def cleanup_resources():
"""清理资源"""
global _original_agent, _react_agent
logger.info("🧹 开始清理资源...")
if _react_agent:
try:
# 如果React Agent有清理方法
if hasattr(_react_agent, 'cleanup'):
_react_agent.cleanup()
except Exception as e:
logger.error(f"React Agent清理失败: {e}")
_original_agent = None
_react_agent = None
logger.info("✅ 资源清理完成")
atexit.register(cleanup_resources)
# === 健康检查 ===
@app.route("/")
def root():
"""根路径健康检查"""
return jsonify({
"message": "统一API服务正在运行",
"version": "v1.0",
"services": {
"original_agent": ORIGINAL_AGENT_AVAILABLE,
"react_agent": REACT_AGENT_AVAILABLE,
"redis": _redis_manager.is_available(),
},
"timestamp": datetime.now().isoformat()
})
@app.route('/health', methods=['GET'])
def health_check():
"""详细健康检查"""
try:
# 检查各个组件状态
health_status = {
"status": "healthy",
"components": {
"original_agent": {
"available": ORIGINAL_AGENT_AVAILABLE,
"initialized": _original_agent is not None
},
"react_agent": {
"available": REACT_AGENT_AVAILABLE,
"initialized": _react_agent is not None
},
"redis": {
"available": _redis_manager.is_available(),
"connection": "ok" if _redis_manager.is_available() else "failed"
},
"qa_feedback": {
"available": True,
"status": "ok"
}
},
"timestamp": datetime.now().isoformat()
}
# 判断整体健康状态
all_critical_healthy = (
health_status["components"]["redis"]["available"] and
(ORIGINAL_AGENT_AVAILABLE or REACT_AGENT_AVAILABLE)
)
if not all_critical_healthy:
health_status["status"] = "degraded"
return jsonify(health_status), 503
return jsonify(health_status), 200
except Exception as e:
logger.error(f"健康检查失败: {e}")
return jsonify({
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.now().isoformat()
}), 500
# === React Agent API (新版本) ===
@app.route('/api/v0/ask_react_agent', methods=['POST'])
def ask_react_agent():
"""React Agent API - 智能场景,高token消耗"""
if not REACT_AGENT_AVAILABLE:
return jsonify(service_unavailable_response(
response_text="React Agent服务不可用"
)), 503
# 确保Agent已初始化
if _react_agent is None:
initialize_agents()
if _react_agent is None:
return jsonify(service_unavailable_response(
response_text="React Agent初始化失败"
)), 503
try:
data = request.get_json(force=True)
question = data.get('question', '').strip()
user_id = data.get('user_id', 'guest')
thread_id = data.get('thread_id')
if not question:
return jsonify(bad_request_response(
response_text="问题不能为空",
missing_params=["question"]
)), 400
# 异步调用React Agent
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(
_react_agent.process_question(
question=question,
user_id=user_id,
thread_id=thread_id
)
)
finally:
loop.close()
if result.get('success', False):
return jsonify(success_response(
response_text="React Agent处理成功",
data=result
))
else:
return jsonify(agent_error_response(
response_text=result.get('error', 'React Agent处理失败'),
error_type="react_agent_error"
)), 500
except Exception as e:
logger.error(f"React Agent API错误: {str(e)}")
return jsonify(internal_error_response(
response_text="React Agent处理失败,请稍后重试"
)), 500
# === 原始Agent API (兼容版本) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""原始Agent API - 简单场景,低token消耗"""
if not ORIGINAL_AGENT_AVAILABLE:
return jsonify(service_unavailable_response(
response_text="原始Agent服务不可用"
)), 503
# 确保Agent已初始化
if _original_agent is None:
initialize_agents()
if _original_agent is None:
return jsonify(service_unavailable_response(
response_text="原始Agent初始化失败"
)), 503
# 这里会包含从citu_app.py迁移的完整ask_agent逻辑
# 包括Redis上下文管理、缓存检查、异步Agent调用等
# ... (从citu_app.py复制完整实现,添加适当的异步包装)
# === QA反馈系统API ===
@app.route('/api/v0/qa_feedback/query', methods=['POST'])
def qa_feedback_query():
"""查询反馈记录API"""
# 从citu_app.py完整迁移
# ...
@app.route('/api/v0/qa_feedback/add', methods=['POST'])
def qa_feedback_add():
"""添加反馈记录API"""
# 从citu_app.py完整迁移
# ...
# === Redis对话管理API ===
@app.route('/api/v0/user/<user_id>/conversations', methods=['GET'])
def get_user_conversations(user_id):
"""获取用户对话列表"""
# 从citu_app.py完整迁移
# ...
# === 训练数据管理API ===
@app.route('/api/v0/training_data/stats', methods=['GET'])
def training_data_stats():
"""获取训练数据统计信息"""
# 从citu_app.py完整迁移
# ...
# === Data Pipeline API ===
@app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
def create_data_pipeline_task():
"""创建数据管道任务"""
# 从citu_app.py完整迁移
# ...
# === 应用启动配置 ===
@app.before_first_request
def before_first_request():
"""首次请求前的初始化"""
logger.info("🚀 统一API服务启动,开始初始化...")
initialize_agents()
logger.info("✅ 统一API服务初始化完成")
if __name__ == '__main__':
logger.info("🚀 以开发模式启动统一API服务...")
app.run(host='0.0.0.0', port=8084, debug=True)
API类别 | 迁移状态 | 测试状态 | 备注 |
---|---|---|---|
Health Check | ✅ 完成 | ✅ 通过 | 新增组件状态检查 |
React Agent | ✅ 完成 | ⏳ 待测试 | 异步包装完成 |
Original Agent | ⏳ 进行中 | ⏳ 待测试 | 需要异步改造 |
QA Feedback (6个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
Redis管理 (8个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
训练数据 (4个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
Data Pipeline (10+个) | ⏳ 待迁移 | ⏳ 待测试 | 直接复制 |
# === 改造前 (citu_app.py) ===
@app.flask_app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
# ... 参数处理 ...
# 直接异步调用 (在Flask-WSGI中可能有问题)
agent_result = asyncio.run(agent.process_question(...))
# ... 结果处理 ...
# === 改造后 (api_unified.py) ===
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
# ... 参数处理 ...
# 安全的异步包装
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(
agent.process_question(...)
)
finally:
loop.close()
# ... 结果处理 ...
@app.route('/api/v0/ask_agent', methods=['POST'])
def ask_agent():
"""
支持对话上下文的ask_agent API - 异步改造版本
从citu_app.py完整迁移并添加异步安全包装
"""
req = request.get_json(force=True)
question = req.get("question", None)
browser_session_id = req.get("session_id", None)
# 参数解析 (从citu_app.py复制)
user_id_input = req.get("user_id", None)
conversation_id_input = req.get("conversation_id", None)
continue_conversation = req.get("continue_conversation", False)
api_routing_mode = req.get("routing_mode", None)
VALID_ROUTING_MODES = ["database_direct", "chat_direct", "hybrid", "llm_only"]
# 参数验证
if not question:
return jsonify(bad_request_response(
response_text="缺少必需参数:question",
missing_params=["question"]
)), 400
if api_routing_mode and api_routing_mode not in VALID_ROUTING_MODES:
return jsonify(bad_request_response(
response_text=f"无效的routing_mode参数值: {api_routing_mode},支持的值: {VALID_ROUTING_MODES}",
invalid_params=["routing_mode"]
)), 400
try:
# 1. ID解析 (同步操作)
from flask import session
login_user_id = session.get('user_id') if 'user_id' in session else None
user_id = _redis_manager.resolve_user_id(
user_id_input, browser_session_id, request.remote_addr, login_user_id
)
conversation_id, conversation_status = _redis_manager.resolve_conversation_id(
user_id, conversation_id_input, continue_conversation
)
# 2. 上下文获取 (同步操作)
context = _redis_manager.get_context(conversation_id)
# 3. 上下文类型检测
context_type = None
if context:
try:
messages = _redis_manager.get_messages(conversation_id, limit=10)
for message in reversed(messages):
if message.get("role") == "assistant":
metadata = message.get("metadata", {})
context_type = metadata.get("type")
if context_type:
logger.info(f"检测到上下文类型: {context_type}")
break
except Exception as e:
logger.warning(f"获取上下文类型失败: {str(e)}")
# 4. 缓存检查 (同步操作)
cached_answer = _redis_manager.get_cached_answer(question, context)
if cached_answer:
logger.info("使用缓存答案")
return jsonify(agent_success_response(
response_type=cached_answer.get("type", "UNKNOWN"),
response=cached_answer.get("response", ""),
sql=cached_answer.get("sql"),
records=cached_answer.get("query_result"),
summary=cached_answer.get("summary"),
session_id=browser_session_id,
execution_path=cached_answer.get("execution_path", []),
classification_info=cached_answer.get("classification_info", {}),
conversation_id=conversation_id,
user_id=user_id,
is_guest_user=(user_id == "guest"),
context_used=bool(context),
from_cache=True,
conversation_status=conversation_status["status"],
conversation_message=conversation_status["message"],
requested_conversation_id=conversation_status.get("requested_id")
))
# 5. 保存用户消息 (同步操作)
_redis_manager.save_message(conversation_id, "user", question)
# 6. 构建带上下文的问题
if context:
enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
logger.info(f"使用上下文,长度: {len(context)}字符")
else:
enhanced_question = question
logger.info("新对话,无上下文")
# 7. 确定路由模式
if api_routing_mode:
effective_routing_mode = api_routing_mode
logger.info(f"使用API指定的路由模式: {effective_routing_mode}")
else:
try:
from app_config import QUESTION_ROUTING_MODE
effective_routing_mode = QUESTION_ROUTING_MODE
logger.info(f"使用配置文件路由模式: {effective_routing_mode}")
except ImportError:
effective_routing_mode = "hybrid"
logger.info(f"使用默认路由模式: {effective_routing_mode}")
# 8. 关键异步改造:Agent调用
if _original_agent is None:
initialize_agents()
if _original_agent is None:
return jsonify(service_unavailable_response(
response_text="AI服务暂时不可用,请稍后重试",
can_retry=True
)), 503
# 异步安全包装
async def process_with_agent():
"""异步处理函数"""
return await _original_agent.process_question(
question=enhanced_question,
session_id=browser_session_id,
context_type=context_type,
routing_mode=effective_routing_mode
)
# 在新的事件循环中执行异步操作
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent_result = loop.run_until_complete(process_with_agent())
finally:
loop.close()
asyncio.set_event_loop(None) # 清理事件循环
# 9. 处理Agent结果 (同步操作)
if agent_result.get("success", False):
response_type = agent_result.get("type", "UNKNOWN")
response_text = agent_result.get("response", "")
sql = agent_result.get("sql")
query_result = agent_result.get("query_result")
summary = agent_result.get("summary")
execution_path = agent_result.get("execution_path", [])
classification_info = agent_result.get("classification_info", {})
# 确定助手回复内容
if response_type == "DATABASE":
if response_text:
assistant_response = response_text
elif summary:
assistant_response = summary
elif query_result:
row_count = query_result.get("row_count", 0)
assistant_response = f"查询执行完成,共返回 {row_count} 条记录。"
else:
assistant_response = "数据库查询已处理。"
else:
assistant_response = response_text
# 保存助手回复
_redis_manager.save_message(
conversation_id, "assistant", assistant_response,
metadata={
"type": response_type,
"sql": sql,
"execution_path": execution_path
}
)
# 缓存答案
_redis_manager.cache_answer(question, agent_result, context)
return jsonify(agent_success_response(
response_type=response_type,
response=response_text,
sql=sql,
records=query_result,
summary=summary,
session_id=browser_session_id,
execution_path=execution_path,
classification_info=classification_info,
conversation_id=conversation_id,
user_id=user_id,
is_guest_user=(user_id == "guest"),
context_used=bool(context),
from_cache=False,
conversation_status=conversation_status["status"],
conversation_message=conversation_status["message"],
requested_conversation_id=conversation_status.get("requested_id"),
routing_mode_used=effective_routing_mode,
routing_mode_source="api" if api_routing_mode else "config"
))
else:
error_message = agent_result.get("error", "Agent处理失败")
error_code = agent_result.get("error_code", 500)
return jsonify(agent_error_response(
response_text=error_message,
error_type="agent_processing_failed",
code=error_code,
session_id=browser_session_id,
conversation_id=conversation_id,
user_id=user_id
)), error_code
except Exception as e:
logger.error(f"ask_agent执行失败: {str(e)}")
return jsonify(internal_error_response(
response_text="查询处理失败,请稍后重试"
)), 500
# scripts/test_async_safety.py
"""
异步安全性测试脚本
"""
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def test_async_event_loop_isolation():
"""测试异步事件循环隔离"""
def sync_function_with_async():
"""模拟同步函数中的异步调用"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
async def async_task():
await asyncio.sleep(0.1)
return "async_result"
result = loop.run_until_complete(async_task())
return result
finally:
loop.close()
# 并发测试
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(sync_function_with_async)
for _ in range(10)
]
results = [future.result() for future in futures]
assert all(r == "async_result" for r in results)
print("✅ 异步事件循环隔离测试通过")
if __name__ == "__main__":
test_async_event_loop_isolation()
# tests/test_migration_basic.py
"""
基础迁移功能测试
"""
import pytest
import requests
from unittest.mock import patch
class TestBasicMigration:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
def test_health_check(self, api_base_url):
"""测试健康检查接口"""
response = requests.get(f"{api_base_url}/health")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["healthy", "degraded"]
assert "components" in data
def test_root_endpoint(self, api_base_url):
"""测试根路径"""
response = requests.get(f"{api_base_url}/")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "version" in data
def test_react_agent_api_availability(self, api_base_url):
"""测试React Agent API可用性"""
payload = {
"question": "测试问题",
"user_id": "test_user"
}
response = requests.post(
f"{api_base_url}/api/v0/ask_react_agent",
json=payload
)
# 应该返回有效响应 (可能是错误,但不应该是404)
assert response.status_code != 404
# tests/test_api_compatibility.py
"""
API兼容性测试
确保迁移后API行为与原版本一致
"""
import pytest
import requests
class TestAPICompatibility:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
def test_ask_agent_parameter_validation(self, api_base_url):
"""测试ask_agent参数验证"""
# 测试缺少question参数
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={}
)
assert response.status_code == 400
data = response.json()
assert "question" in data.get("missing_params", [])
# 测试无效routing_mode
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={
"question": "测试",
"routing_mode": "invalid_mode"
}
)
assert response.status_code == 400
def test_response_format_consistency(self, api_base_url):
"""测试响应格式一致性"""
payload = {
"question": "简单测试问题"
}
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json=payload
)
data = response.json()
# 检查标准响应字段
required_fields = ["code", "success", "message"]
for field in required_fields:
assert field in data, f"响应缺少必需字段: {field}"
# tests/test_async_performance.py
"""
异步性能测试
"""
import asyncio
import aiohttp
import time
import pytest
from concurrent.futures import ThreadPoolExecutor
class TestAsyncPerformance:
@pytest.fixture
def api_base_url(self):
return "http://localhost:8084"
async def test_concurrent_requests(self, api_base_url):
"""测试并发请求处理"""
async with aiohttp.ClientSession() as session:
tasks = []
for i in range(10):
payload = {
"question": f"并发测试问题 {i}",
"user_id": f"test_user_{i}"
}
task = session.post(
f"{api_base_url}/api/v0/ask_react_agent",
json=payload
)
tasks.append(task)
start_time = time.time()
responses = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
# 检查响应
valid_responses = [
r for r in responses
if not isinstance(r, Exception) and r.status in [200, 400, 500]
]
assert len(valid_responses) >= 8 # 至少80%成功
assert end_time - start_time < 30 # 30秒内完成
def test_sync_async_isolation(self, api_base_url):
"""测试同步异步隔离"""
def make_request():
"""发起请求的同步函数"""
import requests
response = requests.post(
f"{api_base_url}/api/v0/ask_agent",
json={"question": "隔离测试"}
)
return response.status_code
# 多线程并发测试
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(make_request)
for _ in range(10)
]
results = [future.result() for future in futures]
# 检查是否有异步冲突
valid_status_codes = [200, 400, 500, 503]
assert all(code in valid_status_codes for code in results)
# scripts/stress_test.sh
#!/bin/bash
echo "🚀 开始压力测试..."
# 1. 基础负载测试
echo "1. 基础负载测试..."
ab -n 100 -c 5 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_react_agent
# 2. 持续负载测试
echo "2. 持续负载测试..."
ab -n 1000 -c 10 -T application/json -p tests/test_payload.json http://localhost:8084/api/v0/ask_agent
# 3. 内存泄漏检测
echo "3. 内存使用监控..."
python scripts/monitor_memory.py &
MONITOR_PID=$!
# 运行一段时间的负载
ab -n 500 -c 8 -T application/json -p tests/test_payload.json http://localhost:8084/health
# 停止监控
kill $MONITOR_PID
echo "✅ 压力测试完成"
# asgi_app_new.py (更新版)
"""
ASGI应用启动文件 - 生产环境配置
支持异步操作和性能优化
"""
import os
import logging
from asgiref.wsgi import WsgiToAsgi
# 导入统一API应用
from api_unified import app
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 性能优化配置
ASGI_CONFIG = {
"max_workers": int(os.getenv("MAX_WORKERS", "4")),
"timeout": int(os.getenv("TIMEOUT", "60")),
"keepalive": int(os.getenv("KEEPALIVE", "30")),
}
# 将Flask WSGI应用转换为ASGI应用
asgi_app = WsgiToAsgi(app)
logger.info(f"✅ ASGI应用配置完成: {ASGI_CONFIG}")
# 生产环境启动命令:
# uvicorn asgi_app_new:asgi_app --host 0.0.0.0 --port 8084 --workers 4
# 或
# gunicorn asgi_app_new:asgi_app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8084
# deploy/docker-compose.yml
version: '3.8'
services:
unified-api:
build:
context: .
dockerfile: deploy/Dockerfile
ports:
- "8084:8084"
environment:
- PORT=8084
- MAX_WORKERS=4
- TIMEOUT=60
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/db
depends_on:
- redis
- postgres
volumes:
- ./logs:/app/logs
- ./data:/app/data
restart: unless-stopped
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_DB=vanna_db
- POSTGRES_USER=vanna_user
- POSTGRES_PASSWORD=vanna_pass
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
volumes:
redis_data:
postgres_data:
# deploy/Dockerfile
FROM python:3.10-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
COPY react_agent/requirements.txt ./react_agent/
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -r react_agent/requirements.txt
# 复制应用代码
COPY . .
# 创建日志目录
RUN mkdir -p logs data
# 设置环境变量
ENV PYTHONPATH=/app
ENV FLASK_APP=api_unified.py
# 暴露端口
EXPOSE 8084
# 启动命令
CMD ["uvicorn", "asgi_app_new:asgi_app", "--host", "0.0.0.0", "--port", "8084", "--workers", "1"]
# monitoring/metrics.py
"""
应用监控指标收集
"""
import time
import psutil
import logging
from functools import wraps
from flask import request, g
from datetime import datetime
logger = logging.getLogger(__name__)
class MetricsCollector:
def __init__(self):
self.request_count = 0
self.error_count = 0
self.response_times = []
def record_request(self, endpoint, method, status_code, response_time):
"""记录请求指标"""
self.request_count += 1
if status_code >= 400:
self.error_count += 1
self.response_times.append(response_time)
# 保持最近1000条记录
if len(self.response_times) > 1000:
self.response_times = self.response_times[-1000:]
logger.info(f"📊 {method} {endpoint} - {status_code} - {response_time:.3f}s")
def get_stats(self):
"""获取统计信息"""
if not self.response_times:
return {
"request_count": self.request_count,
"error_count": self.error_count,
"error_rate": 0,
"avg_response_time": 0,
"system_stats": self._get_system_stats()
}
return {
"request_count": self.request_count,
"error_count": self.error_count,
"error_rate": (self.error_count / self.request_count) * 100,
"avg_response_time": sum(self.response_times) / len(self.response_times),
"max_response_time": max(self.response_times),
"min_response_time": min(self.response_times),
"system_stats": self._get_system_stats()
}
def _get_system_stats(self):
"""获取系统统计信息"""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"timestamp": datetime.now().isoformat()
}
# 全局监控实例
metrics = MetricsCollector()
def monitor_requests(app):
"""为Flask应用添加请求监控"""
@app.before_request
def before_request():
g.start_time = time.time()
@app.after_request
def after_request(response):
response_time = time.time() - g.start_time
metrics.record_request(
endpoint=request.endpoint or 'unknown',
method=request.method,
status_code=response.status_code,
response_time=response_time
)
return response
# 添加监控端点
@app.route('/api/v0/metrics', methods=['GET'])
def get_metrics():
"""获取应用监控指标"""
return metrics.get_stats()
# monitoring/alerting.py
"""
告警系统
"""
import smtplib
import logging
from email.mime.text import MIMEText
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class AlertManager:
def __init__(self):
self.alert_rules = {
"high_error_rate": {
"threshold": 5.0, # 5%错误率
"window": 300, # 5分钟窗口
"cooldown": 900 # 15分钟冷却期
},
"slow_response": {
"threshold": 10.0, # 10秒响应时间
"window": 180, # 3分钟窗口
"cooldown": 600 # 10分钟冷却期
},
"high_memory": {
"threshold": 85.0, # 85%内存使用率
"window": 120, # 2分钟窗口
"cooldown": 1800 # 30分钟冷却期
}
}
self.last_alerts = {}
def check_alerts(self, metrics):
"""检查告警条件"""
current_time = datetime.now()
# 检查错误率
if metrics["error_rate"] > self.alert_rules["high_error_rate"]["threshold"]:
self._trigger_alert(
"high_error_rate",
f"错误率过高: {metrics['error_rate']:.2f}%",
current_time
)
# 检查响应时间
if metrics.get("avg_response_time", 0) > self.alert_rules["slow_response"]["threshold"]:
self._trigger_alert(
"slow_response",
f"响应时间过慢: {metrics['avg_response_time']:.2f}s",
current_time
)
# 检查内存使用率
memory_percent = metrics["system_stats"]["memory_percent"]
if memory_percent > self.alert_rules["high_memory"]["threshold"]:
self._trigger_alert(
"high_memory",
f"内存使用率过高: {memory_percent:.2f}%",
current_time
)
def _trigger_alert(self, alert_type, message, current_time):
"""触发告警"""
# 检查冷却期
if alert_type in self.last_alerts:
last_alert_time = self.last_alerts[alert_type]
cooldown_seconds = self.alert_rules[alert_type]["cooldown"]
if (current_time - last_alert_time).seconds < cooldown_seconds:
return # 还在冷却期内
# 记录告警时间
self.last_alerts[alert_type] = current_time
# 发送告警
logger.error(f"🚨 告警: {alert_type} - {message}")
# 这里可以添加更多告警方式:邮件、短信、Slack等
self._send_email_alert(alert_type, message)
def _send_email_alert(self, alert_type, message):
"""发送邮件告警 (示例)"""
try:
# 邮件配置 (需要根据实际情况配置)
smtp_server = "smtp.example.com"
smtp_port = 587
username = "alerts@example.com"
password = "password"
msg = MIMEText(f"时间: {datetime.now()}\n类型: {alert_type}\n详情: {message}")
msg['Subject'] = f"[统一API服务] {alert_type}告警"
msg['From'] = username
msg['To'] = "admin@example.com"
with smtplib.SMTP(smtp_server, smtp_port) as server:
server.starttls()
server.login(username, password)
server.send_message(msg)
logger.info(f"✅ 告警邮件已发送: {alert_type}")
except Exception as e:
logger.error(f"❌ 告警邮件发送失败: {e}")
# 全局告警管理器
alert_manager = AlertManager()
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 1 | 目录结构迁移 | 后端 | 新目录结构、路径修正 | ✅ 无导入错误 |
Day 2 | 日志服务统一 | 后端 | 统一日志配置 | ✅ 日志格式一致 |
Day 3 | React Agent API整合 | 后端 | ask_react_agent可用 | ✅ API正常响应 |
Day 4 | 基础API迁移(一) | 后端 | QA反馈、Redis管理API | ✅ 6+8个API可用 |
Day 5 | 基础API迁移(二) | 后端 | 训练数据、Data Pipeline API | ✅ 4+10个API可用 |
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 6 | ask_agent异步改造 | 后端 | 异步版ask_agent | ✅ 异步调用正常 |
Day 7 | 集成测试(一) | QA | 功能测试报告 | ✅ 核心功能正常 |
Day 8 | 性能测试 | QA | 性能测试报告 | ✅ 性能无明显下降 |
Day 9 | 兼容性测试 | QA | 兼容性测试报告 | ✅ API兼容性100% |
Day 10 | Bug修复和优化 | 后端 | 修复报告 | ✅ 关键bug已修复 |
日期 | 任务 | 负责人 | 交付物 | 验收标准 |
---|---|---|---|---|
Day 11 | 部署配置准备 | 运维 | Docker/K8s配置 | ✅ 部署脚本可用 |
Day 12 | 监控系统搭建 | 运维 | 监控配置 | ✅ 监控指标正常 |
Day 13 | 预生产部署 | 运维 | 预生产环境 | ✅ 预生产环境稳定 |
Day 14 | 生产环境部署 | 运维 | 生产环境 | ✅ 生产环境稳定 |
Day 15 | 上线后监控 | 全体 | 监控报告 | ✅ 无重大故障 |
如果失败: 延期2天,重新评估技术方案
如果失败: 评估回滚方案,或延期1周
如果失败: 执行回滚计划
风险项 | 概率 | 影响 | 风险等级 | 缓解措施 |
---|---|---|---|---|
异步兼容性问题 | 高 | 高 | 🔴 极高 | 充分测试、渐进部署、快速回滚 |
性能显著下降 | 中 | 高 | 🟡 高 | 性能基准、监控告警、优化方案 |
数据丢失/损坏 | 低 | 极高 | 🟡 高 | 数据备份、事务保护、验证机制 |
第三方依赖冲突 | 中 | 中 | 🟡 中 | 依赖版本锁定、虚拟环境隔离 |
部署失败 | 中 | 中 | 🟡 中 | 自动化部署、回滚脚本、蓝绿部署 |
触发条件: ask_agent异步改造后出现严重错误,影响核心功能
应急措施:
立即回滚 (5分钟内)
# 快速切换到备用启动方式
pkill -f "api_unified"
python citu_app.py &
问题定位 (30分钟内) ```bash
tail -1000 logs/app.log > emergency_logs.txt
python scripts/debug_async_issues.py ```
修复方案 (2小时内)
触发条件: 响应时间超过原版本50%,或并发能力下降明显
应急措施:
资源扩容 (10分钟内) ```bash
gunicorn asgi_app_new:asgi_app -w 8 -k uvicorn.workers.UvicornWorker
# 或增加内存限制 docker update --memory=4g unified-api
2. **性能分析** (1小时内)
```bash
# 使用profiler分析性能瓶颈
python -m cProfile -o profile.out api_unified.py
# 分析内存使用
python scripts/memory_profiler.py
触发条件: 发现数据丢失、损坏或不一致
应急措施:
立即停止写操作 (1分钟内)
# 设置只读模式
curl -X POST http://localhost:8084/api/v0/maintenance/readonly
数据备份和恢复 (30分钟内) ```bash
pg_dump vanna_db > emergencybackup$(date +%Y%m%d_%H%M%S).sql
psql vanna_db < backup_file.sql ```
数据验证 (1小时内) ```bash
python scripts/data_integrity_check.py
# 对比迁移前后数据 python scripts/compare_data.py ```
角色 | 负责范围 | 联系方式 |
---|---|---|
技术负责人 | 整体架构、关键技术决策 | tech-lead@example.com |
后端开发 | API迁移、异步改造 | backend-dev@example.com |
测试工程师 | 功能测试、性能测试 | qa-engineer@example.com |
运维工程师 | 部署配置、监控告警 | devops@example.com |
技术文档
项目相关文档
migration_and_integration_plan.md
- 总体方案api_compatibility_matrix.xlsx
- API兼容性矩阵performance_benchmark.md
- 性能基准报告应急联系
文档版本: v1.0
创建日期: 2025-01-15
最后更新: 2025-01-15
文档状态: 详细实施指南 - 待执行
适用范围: Custom React Agent 完整迁移项目
依赖文档: migration_and_integration_plan.md