|
@@ -1,4 +1,8 @@
|
|
|
# 给dataops 对话助手返回结果
|
|
|
+# 初始化日志系统 - 必须在最前面
|
|
|
+from core.logging import initialize_logging, get_app_logger, set_log_context, clear_log_context
|
|
|
+initialize_logging()
|
|
|
+
|
|
|
from vanna.flask import VannaFlaskApp
|
|
|
from core.vanna_llm_factory import create_vanna_instance
|
|
|
from flask import request, jsonify
|
|
@@ -15,13 +19,10 @@ import sqlparse # 用于SQL语法检查
|
|
|
from common.redis_conversation_manager import RedisConversationManager # 添加Redis对话管理器导入
|
|
|
|
|
|
from common.qa_feedback_manager import QAFeedbackManager
|
|
|
-from common.result import success_response, bad_request_response, not_found_response, internal_error_response
|
|
|
-
|
|
|
-
|
|
|
from common.result import ( # 统一导入所有需要的响应函数
|
|
|
- bad_request_response, service_unavailable_response,
|
|
|
+ success_response, bad_request_response, not_found_response, internal_error_response,
|
|
|
+ error_response, service_unavailable_response,
|
|
|
agent_success_response, agent_error_response,
|
|
|
- internal_error_response, success_response,
|
|
|
validation_failed_response
|
|
|
)
|
|
|
from app_config import ( # 添加Redis相关配置导入
|
|
@@ -31,6 +32,9 @@ from app_config import ( # 添加Redis相关配置导入
|
|
|
ENABLE_QUESTION_ANSWER_CACHE
|
|
|
)
|
|
|
|
|
|
+# 创建app logger
|
|
|
+logger = get_app_logger("CituApp")
|
|
|
+
|
|
|
# 设置默认的最大返回行数
|
|
|
DEFAULT_MAX_RETURN_ROWS = 200
|
|
|
MAX_RETURN_ROWS = API_MAX_RETURN_ROWS if API_MAX_RETURN_ROWS is not None else DEFAULT_MAX_RETURN_ROWS
|
|
@@ -131,9 +135,9 @@ def ask_full():
|
|
|
if ENABLE_RESULT_SUMMARY:
|
|
|
try:
|
|
|
summary = vn.generate_summary(question=question, df=df)
|
|
|
- print(f"[INFO] 成功生成摘要: {summary}")
|
|
|
+ logger.info(f"成功生成摘要: {summary}")
|
|
|
except Exception as e:
|
|
|
- print(f"[WARNING] 生成摘要失败: {str(e)}")
|
|
|
+ logger.warning(f"生成摘要失败: {str(e)}")
|
|
|
summary = None
|
|
|
|
|
|
# 构建返回数据
|
|
@@ -156,7 +160,7 @@ def ask_full():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] ask_full执行失败: {str(e)}")
|
|
|
+ logger.error(f"ask_full执行失败: {str(e)}")
|
|
|
|
|
|
# 即使发生异常,也检查是否有业务层面的解释
|
|
|
if hasattr(vn, 'last_llm_explanation') and vn.last_llm_explanation:
|
|
@@ -219,7 +223,7 @@ def citu_run_sql():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] citu_run_sql执行失败: {str(e)}")
|
|
|
+ logger.error(f"citu_run_sql执行失败: {str(e)}")
|
|
|
from common.result import internal_error_response
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text=f"SQL执行失败,请检查SQL语句是否正确"
|
|
@@ -245,27 +249,27 @@ def ask_cached():
|
|
|
try:
|
|
|
# 生成conversation_id
|
|
|
# 调试:查看generate_id的实际行为
|
|
|
- print(f"[DEBUG] 输入问题: '{question}'")
|
|
|
+ logger.debug(f"输入问题: '{question}'")
|
|
|
conversation_id = app.cache.generate_id(question=question)
|
|
|
- print(f"[DEBUG] 生成的conversation_id: {conversation_id}")
|
|
|
+ logger.debug(f"生成的conversation_id: {conversation_id}")
|
|
|
|
|
|
# 再次用相同问题测试
|
|
|
conversation_id2 = app.cache.generate_id(question=question)
|
|
|
- print(f"[DEBUG] 再次生成的conversation_id: {conversation_id2}")
|
|
|
- print(f"[DEBUG] 两次ID是否相同: {conversation_id == conversation_id2}")
|
|
|
+ logger.debug(f"再次生成的conversation_id: {conversation_id2}")
|
|
|
+ logger.debug(f"两次ID是否相同: {conversation_id == conversation_id2}")
|
|
|
|
|
|
# 检查缓存
|
|
|
cached_sql = app.cache.get(id=conversation_id, field="sql")
|
|
|
|
|
|
if cached_sql is not None:
|
|
|
# 缓存命中
|
|
|
- print(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
|
|
|
+ logger.info(f"[CACHE HIT] 使用缓存结果: {conversation_id}")
|
|
|
sql = cached_sql
|
|
|
df = app.cache.get(id=conversation_id, field="df")
|
|
|
summary = app.cache.get(id=conversation_id, field="summary")
|
|
|
else:
|
|
|
# 缓存未命中,执行新查询
|
|
|
- print(f"[CACHE MISS] 执行新查询: {conversation_id}")
|
|
|
+ logger.info(f"[CACHE MISS] 执行新查询: {conversation_id}")
|
|
|
|
|
|
sql, df, _ = vn.ask(
|
|
|
question=question,
|
|
@@ -301,9 +305,9 @@ def ask_cached():
|
|
|
if ENABLE_RESULT_SUMMARY and isinstance(df, pd.DataFrame) and not df.empty:
|
|
|
try:
|
|
|
summary = vn.generate_summary(question=question, df=df)
|
|
|
- print(f"[INFO] 成功生成摘要: {summary}")
|
|
|
+ logger.info(f"成功生成摘要: {summary}")
|
|
|
except Exception as e:
|
|
|
- print(f"[WARNING] 生成摘要失败: {str(e)}")
|
|
|
+ logger.warning(f"生成摘要失败: {str(e)}")
|
|
|
summary = None
|
|
|
|
|
|
app.cache.set(id=conversation_id, field="summary", value=summary)
|
|
@@ -348,7 +352,7 @@ def ask_cached():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] ask_cached执行失败: {str(e)}")
|
|
|
+ logger.error(f"ask_cached执行失败: {str(e)}")
|
|
|
from common.result import internal_error_response
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="查询处理失败,请稍后重试"
|
|
@@ -386,10 +390,10 @@ def citu_train_question_sql():
|
|
|
# 正确的调用方式:同时传递question和sql
|
|
|
if question:
|
|
|
training_id = vn.train(question=question, sql=sql)
|
|
|
- print(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
|
|
|
+ logger.info(f"训练成功,训练ID为:{training_id},问题:{question},SQL:{sql}")
|
|
|
else:
|
|
|
training_id = vn.train(sql=sql)
|
|
|
- print(f"训练成功,训练ID为:{training_id},SQL:{sql}")
|
|
|
+ logger.info(f"训练成功,训练ID为:{training_id},SQL:{sql}")
|
|
|
|
|
|
from common.result import success_response
|
|
|
return jsonify(success_response(
|
|
@@ -418,23 +422,23 @@ def get_citu_langraph_agent():
|
|
|
if citu_langraph_agent is None:
|
|
|
try:
|
|
|
from agent.citu_agent import CituLangGraphAgent
|
|
|
- print("[CITU_APP] 开始创建LangGraph Agent实例...")
|
|
|
+ logger.info("开始创建LangGraph Agent实例...")
|
|
|
citu_langraph_agent = CituLangGraphAgent()
|
|
|
- print("[CITU_APP] LangGraph Agent实例创建成功")
|
|
|
+ logger.info("LangGraph Agent实例创建成功")
|
|
|
except ImportError as e:
|
|
|
- print(f"[CRITICAL] Agent模块导入失败: {str(e)}")
|
|
|
- print("[CRITICAL] 请检查agent模块是否存在以及依赖是否正确安装")
|
|
|
+ logger.critical(f"Agent模块导入失败: {str(e)}")
|
|
|
+ logger.critical("请检查agent模块是否存在以及依赖是否正确安装")
|
|
|
raise Exception(f"Agent模块导入失败: {str(e)}")
|
|
|
except Exception as e:
|
|
|
- print(f"[CRITICAL] LangGraph Agent实例创建失败: {str(e)}")
|
|
|
- print(f"[CRITICAL] 错误类型: {type(e).__name__}")
|
|
|
+ logger.critical(f"LangGraph Agent实例创建失败: {str(e)}")
|
|
|
+ logger.critical(f"错误类型: {type(e).__name__}")
|
|
|
# 提供更有用的错误信息
|
|
|
if "config" in str(e).lower():
|
|
|
- print("[CRITICAL] 可能是配置文件问题,请检查配置")
|
|
|
+ logger.critical("可能是配置文件问题,请检查配置")
|
|
|
elif "llm" in str(e).lower():
|
|
|
- print("[CRITICAL] 可能是LLM连接问题,请检查LLM配置")
|
|
|
+ logger.critical("可能是LLM连接问题,请检查LLM配置")
|
|
|
elif "tool" in str(e).lower():
|
|
|
- print("[CRITICAL] 可能是工具加载问题,请检查工具模块")
|
|
|
+ logger.critical("可能是工具加载问题,请检查工具模块")
|
|
|
raise Exception(f"Agent初始化失败: {str(e)}")
|
|
|
return citu_langraph_agent
|
|
|
|
|
@@ -495,15 +499,15 @@ def ask_agent():
|
|
|
metadata = message.get("metadata", {})
|
|
|
context_type = metadata.get("type")
|
|
|
if context_type:
|
|
|
- print(f"[AGENT_API] 检测到上下文类型: {context_type}")
|
|
|
+ logger.info(f"[AGENT_API] 检测到上下文类型: {context_type}")
|
|
|
break
|
|
|
except Exception as e:
|
|
|
- print(f"[WARNING] 获取上下文类型失败: {str(e)}")
|
|
|
+ logger.warning(f"获取上下文类型失败: {str(e)}")
|
|
|
|
|
|
# 4. 检查缓存(新逻辑:放宽使用条件,严控存储条件)
|
|
|
cached_answer = redis_conversation_manager.get_cached_answer(question, context)
|
|
|
if cached_answer:
|
|
|
- print(f"[AGENT_API] 使用缓存答案")
|
|
|
+ logger.info(f"[AGENT_API] 使用缓存答案")
|
|
|
|
|
|
# 确定缓存答案的助手回复内容(使用与非缓存相同的优先级逻辑)
|
|
|
cached_response_type = cached_answer.get("type", "UNKNOWN")
|
|
@@ -567,31 +571,31 @@ def ask_agent():
|
|
|
# 6. 构建带上下文的问题
|
|
|
if context:
|
|
|
enhanced_question = f"\n[CONTEXT]\n{context}\n\n[CURRENT]\n{question}"
|
|
|
- print(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
|
|
|
+ logger.info(f"[AGENT_API] 使用上下文,长度: {len(context)}字符")
|
|
|
else:
|
|
|
enhanced_question = question
|
|
|
- print(f"[AGENT_API] 新对话,无上下文")
|
|
|
+ logger.info(f"[AGENT_API] 新对话,无上下文")
|
|
|
|
|
|
# 7. 确定最终使用的路由模式(优先级逻辑)
|
|
|
if api_routing_mode:
|
|
|
# API传了参数,优先使用
|
|
|
effective_routing_mode = api_routing_mode
|
|
|
- print(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
|
|
|
+ logger.info(f"[AGENT_API] 使用API指定的路由模式: {effective_routing_mode}")
|
|
|
else:
|
|
|
# API没传参数,使用配置文件
|
|
|
try:
|
|
|
from app_config import QUESTION_ROUTING_MODE
|
|
|
effective_routing_mode = QUESTION_ROUTING_MODE
|
|
|
- print(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
|
|
|
+ logger.info(f"[AGENT_API] 使用配置文件路由模式: {effective_routing_mode}")
|
|
|
except ImportError:
|
|
|
effective_routing_mode = "hybrid"
|
|
|
- print(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
|
|
|
+ logger.info(f"[AGENT_API] 配置文件读取失败,使用默认路由模式: {effective_routing_mode}")
|
|
|
|
|
|
# 8. 现有Agent处理逻辑(修改为传递路由模式)
|
|
|
try:
|
|
|
agent = get_citu_langraph_agent()
|
|
|
except Exception as e:
|
|
|
- print(f"[CRITICAL] Agent初始化失败: {str(e)}")
|
|
|
+ logger.critical(f"Agent初始化失败: {str(e)}")
|
|
|
return jsonify(service_unavailable_response(
|
|
|
response_text="AI服务暂时不可用,请稍后重试",
|
|
|
can_retry=True
|
|
@@ -687,7 +691,7 @@ def ask_agent():
|
|
|
)), error_code
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] ask_agent执行失败: {str(e)}")
|
|
|
+ logger.error(f"ask_agent执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="查询处理失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -757,7 +761,7 @@ def agent_health():
|
|
|
|
|
|
# 检查3: LLM连接(简单测试)
|
|
|
try:
|
|
|
- from agent.utils import get_compatible_llm
|
|
|
+ from agent.tools.utils import get_compatible_llm
|
|
|
llm = get_compatible_llm()
|
|
|
health_data["checks"]["llm_connection"] = llm is not None
|
|
|
except Exception as e:
|
|
@@ -784,9 +788,9 @@ def agent_health():
|
|
|
health_data["status"] = "degraded"
|
|
|
health_data["message"] = "部分组件异常"
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 健康检查异常: {str(e)}")
|
|
|
+ logger.error(f"健康检查异常: {str(e)}")
|
|
|
import traceback
|
|
|
- print(f"[ERROR] 详细健康检查错误: {traceback.format_exc()}")
|
|
|
+ logger.error(f"详细健康检查错误: {traceback.format_exc()}")
|
|
|
health_data["status"] = "degraded"
|
|
|
health_data["message"] = f"完整测试失败: {str(e)}"
|
|
|
|
|
@@ -803,9 +807,9 @@ def agent_health():
|
|
|
return jsonify(health_error_response(**health_data)), 503
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 顶层健康检查异常: {str(e)}")
|
|
|
+ logger.error(f"顶层健康检查异常: {str(e)}")
|
|
|
import traceback
|
|
|
- print(f"[ERROR] 详细错误信息: {traceback.format_exc()}")
|
|
|
+ logger.error(f"详细错误信息: {traceback.format_exc()}")
|
|
|
from common.result import internal_error_response
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="健康检查失败,请稍后重试"
|
|
@@ -1517,7 +1521,7 @@ def training_error_question_sql():
|
|
|
question = data.get('question')
|
|
|
sql = data.get('sql')
|
|
|
|
|
|
- print(f"[DEBUG] 接收到错误SQL训练请求: question={question}, sql={sql}")
|
|
|
+ logger.debug(f"接收到错误SQL训练请求: question={question}, sql={sql}")
|
|
|
|
|
|
if not question or not sql:
|
|
|
from common.result import bad_request_response
|
|
@@ -1535,7 +1539,7 @@ def training_error_question_sql():
|
|
|
# 使用vn实例的train_error_sql方法存储错误SQL
|
|
|
id = vn.train_error_sql(question=question, sql=sql)
|
|
|
|
|
|
- print(f"[INFO] 成功存储错误SQL,ID: {id}")
|
|
|
+ logger.info(f"成功存储错误SQL,ID: {id}")
|
|
|
|
|
|
from common.result import success_response
|
|
|
return jsonify(success_response(
|
|
@@ -1547,7 +1551,7 @@ def training_error_question_sql():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 存储错误SQL失败: {str(e)}")
|
|
|
+ logger.error(f"存储错误SQL失败: {str(e)}")
|
|
|
from common.result import internal_error_response
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="存储错误SQL失败,请稍后重试"
|
|
@@ -1593,7 +1597,7 @@ def get_user_conversations(user_id: str):
|
|
|
conversation['conversation_title'] = "空对话"
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[WARNING] 获取对话标题失败 {conversation_id}: {str(e)}")
|
|
|
+ logger.warning(f"获取对话标题失败 {conversation_id}: {str(e)}")
|
|
|
conversation['conversation_title'] = "对话"
|
|
|
|
|
|
return jsonify(success_response(
|
|
@@ -1747,7 +1751,7 @@ def get_user_conversations_with_messages(user_id: str):
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 获取用户完整对话数据失败: {str(e)}")
|
|
|
+ logger.error(f"获取用户完整对话数据失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取用户对话数据失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -1770,7 +1774,7 @@ def embedding_cache_stats():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 获取embedding缓存统计失败: {str(e)}")
|
|
|
+ logger.error(f"获取embedding缓存统计失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取embedding缓存统计失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -1801,7 +1805,7 @@ def embedding_cache_cleanup():
|
|
|
)), 500
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 清空embedding缓存失败: {str(e)}")
|
|
|
+ logger.error(f"清空embedding缓存失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="清空embedding缓存失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -1827,15 +1831,15 @@ def get_qa_feedback_manager():
|
|
|
elif 'vn' in globals():
|
|
|
vanna_instance = vn
|
|
|
else:
|
|
|
- print("[INFO] 未找到可用的vanna实例,将创建新的数据库连接")
|
|
|
+ logger.info("未找到可用的vanna实例,将创建新的数据库连接")
|
|
|
except Exception as e:
|
|
|
- print(f"[INFO] 获取vanna实例失败: {e},将创建新的数据库连接")
|
|
|
+ logger.info(f"获取vanna实例失败: {e},将创建新的数据库连接")
|
|
|
vanna_instance = None
|
|
|
|
|
|
qa_feedback_manager = QAFeedbackManager(vanna_instance=vanna_instance)
|
|
|
- print("[CITU_APP] QA反馈管理器实例创建成功")
|
|
|
+ logger.info("QA反馈管理器实例创建成功")
|
|
|
except Exception as e:
|
|
|
- print(f"[CRITICAL] QA反馈管理器创建失败: {str(e)}")
|
|
|
+ logger.critical(f"QA反馈管理器创建失败: {str(e)}")
|
|
|
raise Exception(f"QA反馈管理器初始化失败: {str(e)}")
|
|
|
return qa_feedback_manager
|
|
|
|
|
@@ -1904,7 +1908,7 @@ def qa_feedback_query():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_query执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_query执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="查询反馈记录失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -1929,7 +1933,7 @@ def qa_feedback_delete(feedback_id):
|
|
|
)), 404
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_delete执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_delete执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="删除反馈记录失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -1973,7 +1977,7 @@ def qa_feedback_update(feedback_id):
|
|
|
)), 404
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_update执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_update执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="更新反馈记录失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2026,7 +2030,7 @@ def qa_feedback_add_to_training():
|
|
|
sql=record['sql']
|
|
|
)
|
|
|
positive_count += 1
|
|
|
- print(f"[TRAINING] 正向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
|
|
|
+ logger.info(f"正向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
|
|
|
else:
|
|
|
# 负向反馈 - 加入错误SQL训练集
|
|
|
training_id = vn.train_error_sql(
|
|
@@ -2034,18 +2038,18 @@ def qa_feedback_add_to_training():
|
|
|
sql=record['sql']
|
|
|
)
|
|
|
negative_count += 1
|
|
|
- print(f"[TRAINING] 负向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
|
|
|
+ logger.info(f"负向训练成功 - ID: {record['id']}, TrainingID: {training_id}")
|
|
|
|
|
|
successfully_trained_ids.append(record['id'])
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 训练失败 - 反馈ID: {record['id']}, 错误: {e}")
|
|
|
+ logger.error(f"训练失败 - 反馈ID: {record['id']}, 错误: {e}")
|
|
|
error_count += 1
|
|
|
|
|
|
# 更新训练状态
|
|
|
if successfully_trained_ids:
|
|
|
updated_count = manager.mark_training_status(successfully_trained_ids, True)
|
|
|
- print(f"[TRAINING] 批量更新训练状态完成,影响 {updated_count} 条记录")
|
|
|
+ logger.info(f"批量更新训练状态完成,影响 {updated_count} 条记录")
|
|
|
|
|
|
# 构建响应
|
|
|
total_processed = positive_count + negative_count + already_trained_count + error_count
|
|
@@ -2070,7 +2074,7 @@ def qa_feedback_add_to_training():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_add_to_training执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_add_to_training执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="添加训练数据失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2123,7 +2127,7 @@ def qa_feedback_add():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_add执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_add执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="创建反馈记录失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2158,7 +2162,7 @@ def qa_feedback_stats():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] qa_feedback_stats执行失败: {str(e)}")
|
|
|
+ logger.error(f"qa_feedback_stats执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取统计信息失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2178,7 +2182,7 @@ def qa_cache_stats():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 获取问答缓存统计失败: {str(e)}")
|
|
|
+ logger.error(f"获取问答缓存统计失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取问答缓存统计失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2209,7 +2213,7 @@ def qa_cache_list():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 获取问答缓存列表失败: {str(e)}")
|
|
|
+ logger.error(f"获取问答缓存列表失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取问答缓存列表失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2235,7 +2239,7 @@ def qa_cache_cleanup():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 清空问答缓存失败: {str(e)}")
|
|
|
+ logger.error(f"清空问答缓存失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="清空问答缓存失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2367,7 +2371,7 @@ def get_total_training_count():
|
|
|
return len(training_data)
|
|
|
return 0
|
|
|
except Exception as e:
|
|
|
- print(f"[WARNING] 获取训练数据总数失败: {e}")
|
|
|
+ logger.warning(f"获取训练数据总数失败: {e}")
|
|
|
return 0
|
|
|
|
|
|
@app.flask_app.route('/api/v0/training_data/query', methods=['POST'])
|
|
@@ -2460,7 +2464,7 @@ def training_data_query():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] training_data_query执行失败: {str(e)}")
|
|
|
+ logger.error(f"training_data_query执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="查询训练数据失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2520,20 +2524,51 @@ def training_data_create():
|
|
|
# 获取创建后的总记录数
|
|
|
current_total = get_total_training_count()
|
|
|
|
|
|
- return jsonify(success_response(
|
|
|
- response_text="训练数据创建完成",
|
|
|
- data={
|
|
|
- "total_requested": len(data_list),
|
|
|
- "successfully_created": successful_count,
|
|
|
- "failed_count": len(data_list) - successful_count,
|
|
|
- "results": results,
|
|
|
- "summary": type_summary,
|
|
|
- "current_total_count": current_total
|
|
|
- }
|
|
|
- ))
|
|
|
+ # 根据实际执行结果决定响应状态
|
|
|
+ failed_count = len(data_list) - successful_count
|
|
|
+
|
|
|
+ if failed_count == 0:
|
|
|
+ # 全部成功
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="训练数据创建完成",
|
|
|
+ data={
|
|
|
+ "total_requested": len(data_list),
|
|
|
+ "successfully_created": successful_count,
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "results": results,
|
|
|
+ "summary": type_summary,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ ))
|
|
|
+ elif successful_count == 0:
|
|
|
+ # 全部失败
|
|
|
+ return jsonify(error_response(
|
|
|
+ response_text="训练数据创建失败",
|
|
|
+ data={
|
|
|
+ "total_requested": len(data_list),
|
|
|
+ "successfully_created": successful_count,
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "results": results,
|
|
|
+ "summary": type_summary,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ )), 400
|
|
|
+ else:
|
|
|
+ # 部分成功,部分失败
|
|
|
+ return jsonify(error_response(
|
|
|
+ response_text=f"训练数据创建部分成功,成功{successful_count}条,失败{failed_count}条",
|
|
|
+ data={
|
|
|
+ "total_requested": len(data_list),
|
|
|
+ "successfully_created": successful_count,
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "results": results,
|
|
|
+ "summary": type_summary,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ )), 207
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] training_data_create执行失败: {str(e)}")
|
|
|
+ logger.error(f"training_data_create执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="创建训练数据失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2591,21 +2626,54 @@ def training_data_delete():
|
|
|
# 获取删除后的总记录数
|
|
|
current_total = get_total_training_count()
|
|
|
|
|
|
- return jsonify(success_response(
|
|
|
- response_text="训练数据删除完成",
|
|
|
- data={
|
|
|
- "total_requested": len(ids),
|
|
|
- "successfully_deleted": len(deleted_ids),
|
|
|
- "failed_count": len(failed_ids),
|
|
|
- "deleted_ids": deleted_ids,
|
|
|
- "failed_ids": failed_ids,
|
|
|
- "failed_details": failed_details,
|
|
|
- "current_total_count": current_total
|
|
|
- }
|
|
|
- ))
|
|
|
+ # 根据实际执行结果决定响应状态
|
|
|
+ failed_count = len(failed_ids)
|
|
|
+
|
|
|
+ if failed_count == 0:
|
|
|
+ # 全部成功
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="训练数据删除完成",
|
|
|
+ data={
|
|
|
+ "total_requested": len(ids),
|
|
|
+ "successfully_deleted": len(deleted_ids),
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "deleted_ids": deleted_ids,
|
|
|
+ "failed_ids": failed_ids,
|
|
|
+ "failed_details": failed_details,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ ))
|
|
|
+ elif len(deleted_ids) == 0:
|
|
|
+ # 全部失败
|
|
|
+ return jsonify(error_response(
|
|
|
+ response_text="训练数据删除失败",
|
|
|
+ data={
|
|
|
+ "total_requested": len(ids),
|
|
|
+ "successfully_deleted": len(deleted_ids),
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "deleted_ids": deleted_ids,
|
|
|
+ "failed_ids": failed_ids,
|
|
|
+ "failed_details": failed_details,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ )), 400
|
|
|
+ else:
|
|
|
+ # 部分成功,部分失败
|
|
|
+ return jsonify(error_response(
|
|
|
+ response_text=f"训练数据删除部分成功,成功{len(deleted_ids)}条,失败{failed_count}条",
|
|
|
+ data={
|
|
|
+ "total_requested": len(ids),
|
|
|
+ "successfully_deleted": len(deleted_ids),
|
|
|
+ "failed_count": failed_count,
|
|
|
+ "deleted_ids": deleted_ids,
|
|
|
+ "failed_ids": failed_ids,
|
|
|
+ "failed_details": failed_details,
|
|
|
+ "current_total_count": current_total
|
|
|
+ }
|
|
|
+ )), 207
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] training_data_delete执行失败: {str(e)}")
|
|
|
+ logger.error(f"training_data_delete执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="删除训练数据失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2666,7 +2734,7 @@ def training_data_stats():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] training_data_stats执行失败: {str(e)}")
|
|
|
+ logger.error(f"training_data_stats执行失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取统计信息失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2702,7 +2770,7 @@ def cache_overview_full():
|
|
|
))
|
|
|
|
|
|
except Exception as e:
|
|
|
- print(f"[ERROR] 获取综合缓存概览失败: {str(e)}")
|
|
|
+ logger.error(f"获取综合缓存概览失败: {str(e)}")
|
|
|
return jsonify(internal_error_response(
|
|
|
response_text="获取缓存概览失败,请稍后重试"
|
|
|
)), 500
|
|
@@ -2748,5 +2816,1665 @@ const chatSession = new ChatSession();
|
|
|
chatSession.askQuestion("各年龄段客户的流失率如何?");
|
|
|
"""
|
|
|
|
|
|
-print("正在启动Flask应用: http://localhost:8084")
|
|
|
-app.run(host="0.0.0.0", port=8084, debug=True)
|
|
|
+# ==================== Data Pipeline API ====================
|
|
|
+
|
|
|
+# 导入简化的Data Pipeline模块
|
|
|
+import asyncio
|
|
|
+import os
|
|
|
+from threading import Thread
|
|
|
+from flask import send_file
|
|
|
+
|
|
|
+from data_pipeline.api.simple_workflow import SimpleWorkflowManager
|
|
|
+from data_pipeline.api.simple_file_manager import SimpleFileManager
|
|
|
+
|
|
|
+# 创建简化的管理器
|
|
|
+data_pipeline_manager = None
|
|
|
+data_pipeline_file_manager = None
|
|
|
+
|
|
|
+def get_data_pipeline_manager():
|
|
|
+ """获取Data Pipeline管理器单例"""
|
|
|
+ global data_pipeline_manager
|
|
|
+ if data_pipeline_manager is None:
|
|
|
+ data_pipeline_manager = SimpleWorkflowManager()
|
|
|
+ return data_pipeline_manager
|
|
|
+
|
|
|
+def get_data_pipeline_file_manager():
|
|
|
+ """获取Data Pipeline文件管理器单例"""
|
|
|
+ global data_pipeline_file_manager
|
|
|
+ if data_pipeline_file_manager is None:
|
|
|
+ data_pipeline_file_manager = SimpleFileManager()
|
|
|
+ return data_pipeline_file_manager
|
|
|
+
|
|
|
+# ==================== 简化的Data Pipeline API端点 ====================
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['POST'])
|
|
|
+def create_data_pipeline_task():
|
|
|
+ """创建数据管道任务"""
|
|
|
+ try:
|
|
|
+ req = request.get_json(force=True)
|
|
|
+
|
|
|
+ # table_list_file和business_context现在都是可选参数
|
|
|
+ # 如果未提供table_list_file,将使用文件上传模式
|
|
|
+
|
|
|
+ # 创建任务(支持可选的db_connection参数)
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_id = manager.create_task(
|
|
|
+ table_list_file=req.get('table_list_file'),
|
|
|
+ business_context=req.get('business_context'),
|
|
|
+ db_name=req.get('db_name'), # 可选参数,用于指定特定数据库名称
|
|
|
+ db_connection=req.get('db_connection'), # 可选参数,用于指定数据库连接字符串
|
|
|
+ task_name=req.get('task_name'), # 可选参数,用于指定任务名称
|
|
|
+ enable_sql_validation=req.get('enable_sql_validation', True),
|
|
|
+ enable_llm_repair=req.get('enable_llm_repair', True),
|
|
|
+ modify_original_file=req.get('modify_original_file', True),
|
|
|
+ enable_training_data_load=req.get('enable_training_data_load', True)
|
|
|
+ )
|
|
|
+
|
|
|
+ # 获取任务信息
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "task_name": task_info.get('task_name'),
|
|
|
+ "status": task_info.get('status'),
|
|
|
+ "created_at": task_info.get('created_at').isoformat() if task_info.get('created_at') else None
|
|
|
+ }
|
|
|
+
|
|
|
+ # 检查是否为文件上传模式
|
|
|
+ file_upload_mode = not req.get('table_list_file')
|
|
|
+ response_message = "任务创建成功"
|
|
|
+
|
|
|
+ if file_upload_mode:
|
|
|
+ response_data["file_upload_mode"] = True
|
|
|
+ response_data["next_step"] = f"POST /api/v0/data_pipeline/tasks/{task_id}/upload-table-list"
|
|
|
+ response_message += ",请上传表清单文件后再执行任务"
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=response_message,
|
|
|
+ data=response_data
|
|
|
+ )), 201
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"创建数据管道任务失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="创建任务失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/execute', methods=['POST'])
|
|
|
+def execute_data_pipeline_task(task_id):
|
|
|
+ """执行数据管道任务"""
|
|
|
+ try:
|
|
|
+ req = request.get_json(force=True) if request.is_json else {}
|
|
|
+ execution_mode = req.get('execution_mode', 'complete')
|
|
|
+ step_name = req.get('step_name')
|
|
|
+
|
|
|
+ # 验证执行模式
|
|
|
+ if execution_mode not in ['complete', 'step']:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="无效的执行模式,必须是 'complete' 或 'step'",
|
|
|
+ invalid_params=['execution_mode']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 如果是步骤执行模式,验证步骤名称
|
|
|
+ if execution_mode == 'step':
|
|
|
+ if not step_name:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="步骤执行模式需要指定step_name",
|
|
|
+ missing_params=['step_name']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ valid_steps = ['ddl_generation', 'qa_generation', 'sql_validation', 'training_load']
|
|
|
+ if step_name not in valid_steps:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"无效的步骤名称,支持的步骤: {', '.join(valid_steps)}",
|
|
|
+ invalid_params=['step_name']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 检查任务是否存在
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 使用subprocess启动独立进程执行任务
|
|
|
+ def run_task_subprocess():
|
|
|
+ try:
|
|
|
+ import subprocess
|
|
|
+ import sys
|
|
|
+ from pathlib import Path
|
|
|
+
|
|
|
+ # 构建执行命令
|
|
|
+ python_executable = sys.executable
|
|
|
+ script_path = Path(__file__).parent / "data_pipeline" / "task_executor.py"
|
|
|
+
|
|
|
+ cmd = [
|
|
|
+ python_executable,
|
|
|
+ str(script_path),
|
|
|
+ "--task-id", task_id,
|
|
|
+ "--execution-mode", execution_mode
|
|
|
+ ]
|
|
|
+
|
|
|
+ if step_name:
|
|
|
+ cmd.extend(["--step-name", step_name])
|
|
|
+
|
|
|
+ logger.info(f"启动任务进程: {' '.join(cmd)}")
|
|
|
+
|
|
|
+ # 启动后台进程(不等待完成)
|
|
|
+ process = subprocess.Popen(
|
|
|
+ cmd,
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
+ text=True,
|
|
|
+ cwd=Path(__file__).parent
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"任务进程已启动: PID={process.pid}, task_id={task_id}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"启动任务进程失败: {task_id}, 错误: {str(e)}")
|
|
|
+
|
|
|
+ # 在新线程中启动subprocess(避免阻塞API响应)
|
|
|
+ thread = Thread(target=run_task_subprocess, daemon=True)
|
|
|
+ thread.start()
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "execution_mode": execution_mode,
|
|
|
+ "step_name": step_name if execution_mode == 'step' else None,
|
|
|
+ "message": "任务正在后台执行,请通过状态接口查询进度"
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="任务执行已启动",
|
|
|
+ data=response_data
|
|
|
+ )), 202
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"启动数据管道任务执行失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="启动任务执行失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>', methods=['GET'])
|
|
|
+def get_data_pipeline_task_status(task_id):
|
|
|
+ """
|
|
|
+ 获取数据管道任务状态
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "获取任务状态成功",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250627_143052",
|
|
|
+ "status": "in_progress",
|
|
|
+ "step_status": {
|
|
|
+ "ddl_generation": "completed",
|
|
|
+ "qa_generation": "running",
|
|
|
+ "sql_validation": "pending",
|
|
|
+ "training_load": "pending"
|
|
|
+ },
|
|
|
+ "created_at": "2025-06-27T14:30:52",
|
|
|
+ "started_at": "2025-06-27T14:31:00",
|
|
|
+ "parameters": {...},
|
|
|
+ "current_execution": {...},
|
|
|
+ "total_executions": 2
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 获取步骤状态
|
|
|
+ steps = manager.get_task_steps(task_id)
|
|
|
+ current_step = None
|
|
|
+ for step in steps:
|
|
|
+ if step['step_status'] == 'running':
|
|
|
+ current_step = step
|
|
|
+ break
|
|
|
+
|
|
|
+ # 构建步骤状态摘要
|
|
|
+ step_status_summary = {}
|
|
|
+ for step in steps:
|
|
|
+ step_status_summary[step['step_name']] = step['step_status']
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_info['task_id'],
|
|
|
+ "task_name": task_info.get('task_name'),
|
|
|
+ "status": task_info['status'],
|
|
|
+ "step_status": step_status_summary,
|
|
|
+ "created_at": task_info['created_at'].isoformat() if task_info.get('created_at') else None,
|
|
|
+ "started_at": task_info['started_at'].isoformat() if task_info.get('started_at') else None,
|
|
|
+ "completed_at": task_info['completed_at'].isoformat() if task_info.get('completed_at') else None,
|
|
|
+ "parameters": task_info.get('parameters', {}),
|
|
|
+ "result": task_info.get('result'),
|
|
|
+ "error_message": task_info.get('error_message'),
|
|
|
+ "current_step": {
|
|
|
+ "execution_id": current_step['execution_id'],
|
|
|
+ "step": current_step['step_name'],
|
|
|
+ "status": current_step['step_status'],
|
|
|
+ "started_at": current_step['started_at'].isoformat() if current_step and current_step.get('started_at') else None
|
|
|
+ } if current_step else None,
|
|
|
+ "total_steps": len(steps),
|
|
|
+ "steps": [{
|
|
|
+ "step_name": step['step_name'],
|
|
|
+ "step_status": step['step_status'],
|
|
|
+ "started_at": step['started_at'].isoformat() if step.get('started_at') else None,
|
|
|
+ "completed_at": step['completed_at'].isoformat() if step.get('completed_at') else None,
|
|
|
+ "error_message": step.get('error_message')
|
|
|
+ } for step in steps]
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取任务状态成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取数据管道任务状态失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取任务状态失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs', methods=['GET'])
|
|
|
+def get_data_pipeline_task_logs(task_id):
|
|
|
+ """
|
|
|
+ 获取数据管道任务日志(从任务目录文件读取)
|
|
|
+
|
|
|
+ 查询参数:
|
|
|
+ - limit: 日志行数限制,默认100
|
|
|
+ - level: 日志级别过滤,可选
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "获取任务日志成功",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250627_143052",
|
|
|
+ "logs": [
|
|
|
+ {
|
|
|
+ "timestamp": "2025-06-27 14:30:52",
|
|
|
+ "level": "INFO",
|
|
|
+ "message": "任务开始执行"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "total": 15,
|
|
|
+ "source": "file"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ limit = request.args.get('limit', 100, type=int)
|
|
|
+ level = request.args.get('level')
|
|
|
+
|
|
|
+ # 限制最大查询数量
|
|
|
+ limit = min(limit, 1000)
|
|
|
+
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+
|
|
|
+ # 验证任务是否存在
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 获取任务目录下的日志文件
|
|
|
+ import os
|
|
|
+ from pathlib import Path
|
|
|
+
|
|
|
+ # 获取项目根目录的绝对路径
|
|
|
+ project_root = Path(__file__).parent.absolute()
|
|
|
+ task_dir = project_root / "data_pipeline" / "training_data" / task_id
|
|
|
+ log_file = task_dir / "data_pipeline.log"
|
|
|
+
|
|
|
+ logs = []
|
|
|
+ if log_file.exists():
|
|
|
+ try:
|
|
|
+ # 读取日志文件的最后N行
|
|
|
+ with open(log_file, 'r', encoding='utf-8') as f:
|
|
|
+ lines = f.readlines()
|
|
|
+
|
|
|
+ # 取最后limit行
|
|
|
+ recent_lines = lines[-limit:] if len(lines) > limit else lines
|
|
|
+
|
|
|
+ # 解析日志行
|
|
|
+ import re
|
|
|
+ log_pattern = r'^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.+?): (.+)$'
|
|
|
+
|
|
|
+ for line in recent_lines:
|
|
|
+ line = line.strip()
|
|
|
+ if not line:
|
|
|
+ continue
|
|
|
+
|
|
|
+ match = re.match(log_pattern, line)
|
|
|
+ if match:
|
|
|
+ timestamp, log_level, logger_name, message = match.groups()
|
|
|
+
|
|
|
+ # 级别过滤
|
|
|
+ if level and log_level != level.upper():
|
|
|
+ continue
|
|
|
+
|
|
|
+ logs.append({
|
|
|
+ "timestamp": timestamp,
|
|
|
+ "level": log_level,
|
|
|
+ "logger": logger_name,
|
|
|
+ "message": message
|
|
|
+ })
|
|
|
+ else:
|
|
|
+ # 处理多行日志(如异常堆栈)
|
|
|
+ if logs:
|
|
|
+ logs[-1]["message"] += f"\n{line}"
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"读取日志文件失败: {e}")
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "logs": logs,
|
|
|
+ "total": len(logs),
|
|
|
+ "source": "file",
|
|
|
+ "log_file": str(log_file) if log_file.exists() else None
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取任务日志成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取数据管道任务日志失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取任务日志失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['GET'])
|
|
|
+def list_data_pipeline_tasks():
|
|
|
+ """获取数据管道任务列表"""
|
|
|
+ try:
|
|
|
+ limit = request.args.get('limit', 50, type=int)
|
|
|
+ offset = request.args.get('offset', 0, type=int)
|
|
|
+ status_filter = request.args.get('status')
|
|
|
+
|
|
|
+ # 限制查询数量
|
|
|
+ limit = min(limit, 100)
|
|
|
+
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ tasks = manager.get_tasks_list(
|
|
|
+ limit=limit,
|
|
|
+ offset=offset,
|
|
|
+ status_filter=status_filter
|
|
|
+ )
|
|
|
+
|
|
|
+ # 格式化任务列表
|
|
|
+ formatted_tasks = []
|
|
|
+ for task in tasks:
|
|
|
+ formatted_tasks.append({
|
|
|
+ "task_id": task.get('task_id'),
|
|
|
+ "task_name": task.get('task_name'),
|
|
|
+ "status": task.get('status'),
|
|
|
+ "step_status": task.get('step_status'),
|
|
|
+ "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
|
|
|
+ "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
|
|
|
+ "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
|
|
|
+ "created_by": task.get('by_user'),
|
|
|
+ "db_name": task.get('db_name'),
|
|
|
+ "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
|
|
|
+ # 新增字段
|
|
|
+ "directory_exists": task.get('directory_exists', True), # 默认为True,兼容旧数据
|
|
|
+ "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
|
|
|
+ })
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "tasks": formatted_tasks,
|
|
|
+ "total": len(formatted_tasks),
|
|
|
+ "limit": limit,
|
|
|
+ "offset": offset
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取任务列表成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取数据管道任务列表失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取任务列表失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/query', methods=['POST'])
|
|
|
+def query_data_pipeline_tasks():
|
|
|
+ """
|
|
|
+ 高级查询数据管道任务列表
|
|
|
+
|
|
|
+ 支持复杂筛选、排序、分页功能
|
|
|
+
|
|
|
+ 请求体:
|
|
|
+ {
|
|
|
+ "page": 1, // 页码,必须大于0,默认1
|
|
|
+ "page_size": 20, // 每页大小,1-100之间,默认20
|
|
|
+ "status": "completed", // 可选,任务状态筛选:"pending"|"running"|"completed"|"failed"|"cancelled"
|
|
|
+ "task_name": "highway", // 可选,任务名称模糊搜索,最大100字符
|
|
|
+ "created_by": "user123", // 可选,创建者精确匹配
|
|
|
+ "db_name": "highway_db", // 可选,数据库名称精确匹配
|
|
|
+ "created_time_start": "2025-01-01T00:00:00", // 可选,创建时间范围开始
|
|
|
+ "created_time_end": "2025-12-31T23:59:59", // 可选,创建时间范围结束
|
|
|
+ "started_time_start": "2025-01-01T00:00:00", // 可选,开始时间范围开始
|
|
|
+ "started_time_end": "2025-12-31T23:59:59", // 可选,开始时间范围结束
|
|
|
+ "completed_time_start": "2025-01-01T00:00:00", // 可选,完成时间范围开始
|
|
|
+ "completed_time_end": "2025-12-31T23:59:59", // 可选,完成时间范围结束
|
|
|
+ "sort_by": "created_at", // 可选,排序字段:"created_at"|"started_at"|"completed_at"|"task_name"|"status",默认"created_at"
|
|
|
+ "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
|
|
|
+ }
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "查询任务列表成功",
|
|
|
+ "data": {
|
|
|
+ "tasks": [...],
|
|
|
+ "pagination": {
|
|
|
+ "page": 1,
|
|
|
+ "page_size": 20,
|
|
|
+ "total": 150,
|
|
|
+ "total_pages": 8,
|
|
|
+ "has_next": true,
|
|
|
+ "has_prev": false
|
|
|
+ },
|
|
|
+ "filters_applied": {...},
|
|
|
+ "sort_applied": {...},
|
|
|
+ "query_time": "0.045s"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 获取请求数据
|
|
|
+ req = request.get_json(force=True) if request.is_json else {}
|
|
|
+
|
|
|
+ # 解析参数,设置默认值
|
|
|
+ page = req.get('page', 1)
|
|
|
+ page_size = req.get('page_size', 20)
|
|
|
+ status = req.get('status')
|
|
|
+ task_name = req.get('task_name')
|
|
|
+ created_by = req.get('created_by')
|
|
|
+ db_name = req.get('db_name')
|
|
|
+ created_time_start = req.get('created_time_start')
|
|
|
+ created_time_end = req.get('created_time_end')
|
|
|
+ started_time_start = req.get('started_time_start')
|
|
|
+ started_time_end = req.get('started_time_end')
|
|
|
+ completed_time_start = req.get('completed_time_start')
|
|
|
+ completed_time_end = req.get('completed_time_end')
|
|
|
+ sort_by = req.get('sort_by', 'created_at')
|
|
|
+ sort_order = req.get('sort_order', 'desc')
|
|
|
+
|
|
|
+ # 参数验证
|
|
|
+ # 验证分页参数
|
|
|
+ if page < 1:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="页码必须大于0",
|
|
|
+ invalid_params=['page']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if page_size < 1 or page_size > 100:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="每页大小必须在1-100之间",
|
|
|
+ invalid_params=['page_size']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证任务名称长度
|
|
|
+ if task_name and len(task_name) > 100:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="任务名称搜索关键词最大长度为100字符",
|
|
|
+ invalid_params=['task_name']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证排序参数
|
|
|
+ allowed_sort_fields = ['created_at', 'started_at', 'completed_at', 'task_name', 'status']
|
|
|
+ if sort_by not in allowed_sort_fields:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"不支持的排序字段: {sort_by},支持的字段: {', '.join(allowed_sort_fields)}",
|
|
|
+ invalid_params=['sort_by']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if sort_order.lower() not in ['asc', 'desc']:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="排序方向必须是 'asc' 或 'desc'",
|
|
|
+ invalid_params=['sort_order']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证状态筛选
|
|
|
+ if status:
|
|
|
+ allowed_statuses = ['pending', 'running', 'completed', 'failed', 'cancelled']
|
|
|
+ if status not in allowed_statuses:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"不支持的状态值: {status},支持的状态: {', '.join(allowed_statuses)}",
|
|
|
+ invalid_params=['status']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 调用管理器执行查询
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ result = manager.query_tasks_advanced(
|
|
|
+ page=page,
|
|
|
+ page_size=page_size,
|
|
|
+ status=status,
|
|
|
+ task_name=task_name,
|
|
|
+ created_by=created_by,
|
|
|
+ db_name=db_name,
|
|
|
+ created_time_start=created_time_start,
|
|
|
+ created_time_end=created_time_end,
|
|
|
+ started_time_start=started_time_start,
|
|
|
+ started_time_end=started_time_end,
|
|
|
+ completed_time_start=completed_time_start,
|
|
|
+ completed_time_end=completed_time_end,
|
|
|
+ sort_by=sort_by,
|
|
|
+ sort_order=sort_order
|
|
|
+ )
|
|
|
+
|
|
|
+ # 格式化任务列表
|
|
|
+ formatted_tasks = []
|
|
|
+ for task in result['tasks']:
|
|
|
+ formatted_tasks.append({
|
|
|
+ "task_id": task.get('task_id'),
|
|
|
+ "task_name": task.get('task_name'),
|
|
|
+ "status": task.get('status'),
|
|
|
+ "step_status": task.get('step_status'),
|
|
|
+ "created_at": task['created_at'].isoformat() if task.get('created_at') else None,
|
|
|
+ "started_at": task['started_at'].isoformat() if task.get('started_at') else None,
|
|
|
+ "completed_at": task['completed_at'].isoformat() if task.get('completed_at') else None,
|
|
|
+ "created_by": task.get('by_user'),
|
|
|
+ "db_name": task.get('db_name'),
|
|
|
+ "business_context": task.get('parameters', {}).get('business_context') if task.get('parameters') else None,
|
|
|
+ "directory_exists": task.get('directory_exists', True),
|
|
|
+ "updated_at": task['updated_at'].isoformat() if task.get('updated_at') else None
|
|
|
+ })
|
|
|
+
|
|
|
+ # 构建响应数据
|
|
|
+ response_data = {
|
|
|
+ "tasks": formatted_tasks,
|
|
|
+ "pagination": result['pagination'],
|
|
|
+ "filters_applied": {
|
|
|
+ k: v for k, v in {
|
|
|
+ "status": status,
|
|
|
+ "task_name": task_name,
|
|
|
+ "created_by": created_by,
|
|
|
+ "db_name": db_name,
|
|
|
+ "created_time_start": created_time_start,
|
|
|
+ "created_time_end": created_time_end,
|
|
|
+ "started_time_start": started_time_start,
|
|
|
+ "started_time_end": started_time_end,
|
|
|
+ "completed_time_start": completed_time_start,
|
|
|
+ "completed_time_end": completed_time_end
|
|
|
+ }.items() if v
|
|
|
+ },
|
|
|
+ "sort_applied": {
|
|
|
+ "sort_by": sort_by,
|
|
|
+ "sort_order": sort_order
|
|
|
+ },
|
|
|
+ "query_time": result.get('query_time', '0.000s')
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="查询任务列表成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"查询数据管道任务列表失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="查询任务列表失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+# ==================== 表检查API端点 ====================
|
|
|
+
|
|
|
+import asyncio
|
|
|
+from data_pipeline.api.table_inspector_api import TableInspectorAPI
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/database/tables', methods=['POST'])
|
|
|
+def get_database_tables():
|
|
|
+ """
|
|
|
+ 获取数据库表列表
|
|
|
+
|
|
|
+ 请求体:
|
|
|
+ {
|
|
|
+ "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
|
|
|
+ "schema": "public,ods", // 可选,支持多个schema用逗号分隔,默认为public
|
|
|
+ "table_name_pattern": "ods_*" // 可选,表名模式匹配,支持通配符:ods_*、*_dim、*fact*、ods_%
|
|
|
+ }
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "获取表列表成功",
|
|
|
+ "data": {
|
|
|
+ "tables": ["public.table1", "public.table2", "ods.table3"],
|
|
|
+ "total": 3,
|
|
|
+ "schemas": ["public", "ods"],
|
|
|
+ "table_name_pattern": "ods_*"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ req = request.get_json(force=True)
|
|
|
+
|
|
|
+ # 处理数据库连接参数(可选)
|
|
|
+ db_connection = req.get('db_connection')
|
|
|
+ if not db_connection:
|
|
|
+ # 使用app_config的默认数据库配置
|
|
|
+ import app_config
|
|
|
+ db_params = app_config.APP_DB_CONFIG
|
|
|
+ db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
|
|
|
+ logger.info("使用默认数据库配置获取表列表")
|
|
|
+ else:
|
|
|
+ logger.info("使用用户指定的数据库配置获取表列表")
|
|
|
+
|
|
|
+ # 可选参数
|
|
|
+ schema = req.get('schema', '')
|
|
|
+ table_name_pattern = req.get('table_name_pattern')
|
|
|
+
|
|
|
+ # 创建表检查API实例
|
|
|
+ table_inspector = TableInspectorAPI()
|
|
|
+
|
|
|
+ # 使用asyncio运行异步方法
|
|
|
+ async def get_tables():
|
|
|
+ return await table_inspector.get_tables_list(db_connection, schema, table_name_pattern)
|
|
|
+
|
|
|
+ # 在新的事件循环中运行异步方法
|
|
|
+ try:
|
|
|
+ loop = asyncio.new_event_loop()
|
|
|
+ asyncio.set_event_loop(loop)
|
|
|
+ tables = loop.run_until_complete(get_tables())
|
|
|
+ finally:
|
|
|
+ loop.close()
|
|
|
+
|
|
|
+ # 解析schema信息
|
|
|
+ parsed_schemas = table_inspector._parse_schemas(schema)
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "tables": tables,
|
|
|
+ "total": len(tables),
|
|
|
+ "schemas": parsed_schemas,
|
|
|
+ "db_connection_info": {
|
|
|
+ "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ # 如果使用了表名模式,添加到响应中
|
|
|
+ if table_name_pattern:
|
|
|
+ response_data["table_name_pattern"] = table_name_pattern
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取表列表成功",
|
|
|
+ data=response_data
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取数据库表列表失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=f"获取表列表失败: {str(e)}"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/database/table/ddl', methods=['POST'])
|
|
|
+def get_table_ddl():
|
|
|
+ """
|
|
|
+ 获取表的DDL语句或MD文档
|
|
|
+
|
|
|
+ 请求体:
|
|
|
+ {
|
|
|
+ "db_connection": "postgresql://postgres:postgres@192.168.67.1:5432/highway_db", // 可选,不传则使用默认配置
|
|
|
+ "table": "public.test",
|
|
|
+ "business_context": "这是高速公路服务区的相关数据", // 可选
|
|
|
+ "type": "ddl" // 可选,支持ddl/md/both,默认为ddl
|
|
|
+ }
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "获取表DDL成功",
|
|
|
+ "data": {
|
|
|
+ "ddl": "create table public.test (...);",
|
|
|
+ "md": "## test表...", // 仅当type为md或both时返回
|
|
|
+ "table_info": {
|
|
|
+ "table_name": "test",
|
|
|
+ "schema_name": "public",
|
|
|
+ "full_name": "public.test",
|
|
|
+ "comment": "测试表",
|
|
|
+ "field_count": 10,
|
|
|
+ "row_count": 1000
|
|
|
+ },
|
|
|
+ "fields": [...]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ req = request.get_json(force=True)
|
|
|
+
|
|
|
+ # 处理参数(table仍为必需,db_connection可选)
|
|
|
+ table = req.get('table')
|
|
|
+ db_connection = req.get('db_connection')
|
|
|
+
|
|
|
+ if not table:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="缺少必需参数:table",
|
|
|
+ missing_params=['table']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not db_connection:
|
|
|
+ # 使用app_config的默认数据库配置
|
|
|
+ import app_config
|
|
|
+ db_params = app_config.APP_DB_CONFIG
|
|
|
+ db_connection = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"
|
|
|
+ logger.info("使用默认数据库配置获取表DDL")
|
|
|
+ else:
|
|
|
+ logger.info("使用用户指定的数据库配置获取表DDL")
|
|
|
+
|
|
|
+ # 可选参数
|
|
|
+ business_context = req.get('business_context', '')
|
|
|
+ output_type = req.get('type', 'ddl')
|
|
|
+
|
|
|
+ # 验证type参数
|
|
|
+ valid_types = ['ddl', 'md', 'both']
|
|
|
+ if output_type not in valid_types:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"无效的type参数: {output_type},支持的值: {valid_types}",
|
|
|
+ invalid_params=['type']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 创建表检查API实例
|
|
|
+ table_inspector = TableInspectorAPI()
|
|
|
+
|
|
|
+ # 使用asyncio运行异步方法
|
|
|
+ async def get_ddl():
|
|
|
+ return await table_inspector.get_table_ddl(
|
|
|
+ db_connection=db_connection,
|
|
|
+ table=table,
|
|
|
+ business_context=business_context,
|
|
|
+ output_type=output_type
|
|
|
+ )
|
|
|
+
|
|
|
+ # 在新的事件循环中运行异步方法
|
|
|
+ try:
|
|
|
+ loop = asyncio.new_event_loop()
|
|
|
+ asyncio.set_event_loop(loop)
|
|
|
+ result = loop.run_until_complete(get_ddl())
|
|
|
+ finally:
|
|
|
+ loop.close()
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ **result,
|
|
|
+ "generation_info": {
|
|
|
+ "business_context": business_context,
|
|
|
+ "output_type": output_type,
|
|
|
+ "has_llm_comments": bool(business_context),
|
|
|
+ "database": db_connection.split('/')[-1].split('?')[0] if '/' in db_connection else "unknown"
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=f"获取表{output_type.upper()}成功",
|
|
|
+ data=response_data
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取表DDL失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text=f"获取表{output_type.upper() if 'output_type' in locals() else 'DDL'}失败: {str(e)}"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+# ==================== Data Pipeline 文件管理 API ====================
|
|
|
+
|
|
|
+from flask import send_file
|
|
|
+
|
|
|
+# 创建文件管理器
|
|
|
+data_pipeline_file_manager = None
|
|
|
+
|
|
|
+def get_data_pipeline_file_manager():
|
|
|
+ """获取Data Pipeline文件管理器单例"""
|
|
|
+ global data_pipeline_file_manager
|
|
|
+ if data_pipeline_file_manager is None:
|
|
|
+ data_pipeline_file_manager = SimpleFileManager()
|
|
|
+ return data_pipeline_file_manager
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['GET'])
|
|
|
+def get_data_pipeline_task_files(task_id):
|
|
|
+ """获取任务文件列表"""
|
|
|
+ try:
|
|
|
+ file_manager = get_data_pipeline_file_manager()
|
|
|
+
|
|
|
+ # 获取任务文件
|
|
|
+ files = file_manager.get_task_files(task_id)
|
|
|
+ directory_info = file_manager.get_directory_info(task_id)
|
|
|
+
|
|
|
+ # 格式化文件信息
|
|
|
+ formatted_files = []
|
|
|
+ for file_info in files:
|
|
|
+ formatted_files.append({
|
|
|
+ "file_name": file_info['file_name'],
|
|
|
+ "file_type": file_info['file_type'],
|
|
|
+ "file_size": file_info['file_size'],
|
|
|
+ "file_size_formatted": file_info['file_size_formatted'],
|
|
|
+ "created_at": file_info['created_at'].isoformat() if file_info.get('created_at') else None,
|
|
|
+ "modified_at": file_info['modified_at'].isoformat() if file_info.get('modified_at') else None,
|
|
|
+ "is_readable": file_info['is_readable']
|
|
|
+ })
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "files": formatted_files,
|
|
|
+ "directory_info": directory_info
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取任务文件列表成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取任务文件列表失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取任务文件列表失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files/<file_name>', methods=['GET'])
|
|
|
+def download_data_pipeline_task_file(task_id, file_name):
|
|
|
+ """下载任务文件"""
|
|
|
+ try:
|
|
|
+ logger.info(f"开始下载文件: task_id={task_id}, file_name={file_name}")
|
|
|
+
|
|
|
+ # 直接构建文件路径,避免依赖数据库
|
|
|
+ from pathlib import Path
|
|
|
+ import os
|
|
|
+
|
|
|
+ # 获取项目根目录的绝对路径
|
|
|
+ project_root = Path(__file__).parent.absolute()
|
|
|
+ task_dir = project_root / "data_pipeline" / "training_data" / task_id
|
|
|
+ file_path = task_dir / file_name
|
|
|
+
|
|
|
+ logger.info(f"文件路径: {file_path}")
|
|
|
+
|
|
|
+ # 检查文件是否存在
|
|
|
+ if not file_path.exists():
|
|
|
+ logger.warning(f"文件不存在: {file_path}")
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"文件不存在: {file_name}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 检查是否为文件(而不是目录)
|
|
|
+ if not file_path.is_file():
|
|
|
+ logger.warning(f"路径不是文件: {file_path}")
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"路径不是有效文件: {file_name}"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 安全检查:确保文件在允许的目录内
|
|
|
+ try:
|
|
|
+ file_path.resolve().relative_to(task_dir.resolve())
|
|
|
+ except ValueError:
|
|
|
+ logger.warning(f"文件路径不安全: {file_path}")
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="非法的文件路径"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 检查文件是否可读
|
|
|
+ if not os.access(file_path, os.R_OK):
|
|
|
+ logger.warning(f"文件不可读: {file_path}")
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="文件不可读"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ logger.info(f"开始发送文件: {file_path}")
|
|
|
+ return send_file(
|
|
|
+ file_path,
|
|
|
+ as_attachment=True,
|
|
|
+ download_name=file_name
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"下载任务文件失败: task_id={task_id}, file_name={file_name}, 错误: {str(e)}", exc_info=True)
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="下载文件失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/upload-table-list', methods=['POST'])
|
|
|
+def upload_table_list_file(task_id):
|
|
|
+ """
|
|
|
+ 上传表清单文件
|
|
|
+
|
|
|
+ 表单参数:
|
|
|
+ - file: 要上传的表清单文件(multipart/form-data)
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "表清单文件上传成功",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250701_123456",
|
|
|
+ "filename": "table_list.txt",
|
|
|
+ "file_size": 1024,
|
|
|
+ "file_size_formatted": "1.0 KB"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证任务是否存在
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 检查是否有文件上传
|
|
|
+ if 'file' not in request.files:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="请选择要上传的表清单文件",
|
|
|
+ missing_params=['file']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ file = request.files['file']
|
|
|
+
|
|
|
+ # 验证文件名
|
|
|
+ if file.filename == '':
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="请选择有效的文件"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 使用文件管理器上传文件
|
|
|
+ file_manager = get_data_pipeline_file_manager()
|
|
|
+ result = file_manager.upload_table_list_file(task_id, file)
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "filename": result["filename"],
|
|
|
+ "file_size": result["file_size"],
|
|
|
+ "file_size_formatted": result["file_size_formatted"],
|
|
|
+ "upload_time": result["upload_time"].isoformat() if result.get("upload_time") else None
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="表清单文件上传成功",
|
|
|
+ data=response_data
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except ValueError as e:
|
|
|
+ # 文件验证错误(如文件太大、空文件等)
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=str(e)
|
|
|
+ )), 400
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"上传表清单文件失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="文件上传失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理表清单文件上传请求失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="处理上传请求失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list-info', methods=['GET'])
|
|
|
+def get_table_list_info(task_id):
|
|
|
+ """
|
|
|
+ 获取任务的表清单文件信息
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "获取表清单文件信息成功",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250701_123456",
|
|
|
+ "has_file": true,
|
|
|
+ "filename": "table_list.txt",
|
|
|
+ "file_path": "./data_pipeline/training_data/task_20250701_123456/table_list.txt",
|
|
|
+ "file_size": 1024,
|
|
|
+ "file_size_formatted": "1.0 KB",
|
|
|
+ "uploaded_at": "2025-07-01T12:34:56",
|
|
|
+ "table_count": 5,
|
|
|
+ "is_readable": true
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ file_manager = get_data_pipeline_file_manager()
|
|
|
+
|
|
|
+ # 获取表清单文件信息
|
|
|
+ table_list_info = file_manager.get_table_list_file_info(task_id)
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "has_file": table_list_info.get("exists", False),
|
|
|
+ **table_list_info
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="获取表清单文件信息成功",
|
|
|
+ data=response_data
|
|
|
+ ))
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取表清单文件信息失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="获取表清单文件信息失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/table-list', methods=['POST'])
|
|
|
+def create_table_list_from_names(task_id):
|
|
|
+ """
|
|
|
+ 通过POST方式提交表名列表并创建table_list.txt文件
|
|
|
+
|
|
|
+ 请求体:
|
|
|
+ {
|
|
|
+ "tables": ["table1", "schema.table2", "table3"]
|
|
|
+ }
|
|
|
+ 或者:
|
|
|
+ {
|
|
|
+ "tables": "table1,schema.table2,table3"
|
|
|
+ }
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "表清单已成功创建",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250701_123456",
|
|
|
+ "filename": "table_list.txt",
|
|
|
+ "table_count": 3,
|
|
|
+ "file_size": 45,
|
|
|
+ "file_size_formatted": "45 B",
|
|
|
+ "created_time": "2025-07-01T12:34:56"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证任务是否存在
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 获取请求数据
|
|
|
+ req = request.get_json(force=True)
|
|
|
+ tables_param = req.get('tables')
|
|
|
+
|
|
|
+ if not tables_param:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="缺少必需参数:tables",
|
|
|
+ missing_params=['tables']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 处理不同格式的表名参数
|
|
|
+ try:
|
|
|
+ if isinstance(tables_param, str):
|
|
|
+ # 逗号分隔的字符串格式
|
|
|
+ table_names = [name.strip() for name in tables_param.split(',') if name.strip()]
|
|
|
+ elif isinstance(tables_param, list):
|
|
|
+ # 数组格式
|
|
|
+ table_names = [str(name).strip() for name in tables_param if str(name).strip()]
|
|
|
+ else:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="tables参数格式错误,应为字符串(逗号分隔)或数组"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not table_names:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="表名列表不能为空"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"解析tables参数失败: {str(e)}"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 使用文件管理器创建表清单文件
|
|
|
+ file_manager = get_data_pipeline_file_manager()
|
|
|
+ result = file_manager.create_table_list_from_names(task_id, table_names)
|
|
|
+
|
|
|
+ response_data = {
|
|
|
+ "task_id": task_id,
|
|
|
+ "filename": result["filename"],
|
|
|
+ "table_count": result["table_count"],
|
|
|
+ "unique_table_count": result["unique_table_count"],
|
|
|
+ "file_size": result["file_size"],
|
|
|
+ "file_size_formatted": result["file_size_formatted"],
|
|
|
+ "created_time": result["created_time"].isoformat() if result.get("created_time") else None,
|
|
|
+ "original_count": len(table_names) if isinstance(table_names, list) else len(tables_param.split(','))
|
|
|
+ }
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=f"表清单已成功创建,包含 {result['table_count']} 个表",
|
|
|
+ data=response_data
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except ValueError as e:
|
|
|
+ # 表名验证错误(如格式错误、数量限制等)
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=str(e)
|
|
|
+ )), 400
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"创建表清单文件失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="创建表清单文件失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理表清单创建请求失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="处理请求失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/files', methods=['POST'])
|
|
|
+def upload_file_to_task(task_id):
|
|
|
+ """
|
|
|
+ 上传文件到指定任务目录
|
|
|
+
|
|
|
+ 表单参数:
|
|
|
+ - file: 要上传的文件(multipart/form-data)
|
|
|
+ - overwrite_mode: 重名处理模式 (backup, replace, skip),默认为backup
|
|
|
+
|
|
|
+ 支持的文件类型:
|
|
|
+ - .ddl: DDL文件
|
|
|
+ - .md: Markdown文档
|
|
|
+ - .txt: 文本文件
|
|
|
+ - .json: JSON文件
|
|
|
+ - .sql: SQL文件
|
|
|
+ - .csv: CSV文件
|
|
|
+
|
|
|
+ 重名处理模式:
|
|
|
+ - backup: 备份原文件(默认)
|
|
|
+ - replace: 直接覆盖
|
|
|
+ - skip: 跳过上传
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "文件上传成功",
|
|
|
+ "data": {
|
|
|
+ "task_id": "task_20250701_123456",
|
|
|
+ "uploaded_file": {
|
|
|
+ "filename": "test.ddl",
|
|
|
+ "size": 1024,
|
|
|
+ "size_formatted": "1.0 KB",
|
|
|
+ "uploaded_at": "2025-07-01T12:34:56",
|
|
|
+ "overwrite_mode": "backup"
|
|
|
+ },
|
|
|
+ "backup_info": { // 仅当overwrite_mode为backup且文件已存在时返回
|
|
|
+ "had_existing_file": true,
|
|
|
+ "backup_filename": "test.ddl_bak1",
|
|
|
+ "backup_version": 1,
|
|
|
+ "backup_created_at": "2025-07-01T12:34:56"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证任务是否存在
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 检查是否有文件上传
|
|
|
+ if 'file' not in request.files:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="请选择要上传的文件",
|
|
|
+ missing_params=['file']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ file = request.files['file']
|
|
|
+
|
|
|
+ # 验证文件名
|
|
|
+ if file.filename == '':
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="请选择有效的文件"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 获取重名处理模式
|
|
|
+ overwrite_mode = request.form.get('overwrite_mode', 'backup')
|
|
|
+
|
|
|
+ # 验证重名处理模式
|
|
|
+ valid_modes = ['backup', 'replace', 'skip']
|
|
|
+ if overwrite_mode not in valid_modes:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"无效的overwrite_mode参数: {overwrite_mode},支持的值: {valid_modes}",
|
|
|
+ invalid_params=['overwrite_mode']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 使用文件管理器上传文件
|
|
|
+ file_manager = get_data_pipeline_file_manager()
|
|
|
+ result = file_manager.upload_file_to_task(task_id, file, file.filename, overwrite_mode)
|
|
|
+
|
|
|
+ # 检查是否跳过上传
|
|
|
+ if result.get('skipped'):
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=result.get('message', '文件已存在,跳过上传'),
|
|
|
+ data=result
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="文件上传成功",
|
|
|
+ data=result
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except ValueError as e:
|
|
|
+ # 文件验证错误(如文件太大、空文件、不支持的类型等)
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=str(e)
|
|
|
+ )), 400
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"上传文件失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="文件上传失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"处理文件上传请求失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="处理上传请求失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+# ==================== 任务目录删除API ====================
|
|
|
+
|
|
|
+import shutil
|
|
|
+from pathlib import Path
|
|
|
+from datetime import datetime
|
|
|
+import psycopg2
|
|
|
+from app_config import PGVECTOR_CONFIG
|
|
|
+
|
|
|
+def delete_task_directory_simple(task_id, delete_database_records=False):
|
|
|
+ """
|
|
|
+ 简单的任务目录删除功能
|
|
|
+ - 删除 data_pipeline/training_data/{task_id} 目录
|
|
|
+ - 更新数据库中的 directory_exists 字段
|
|
|
+ - 可选:删除数据库记录
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 1. 删除目录
|
|
|
+ project_root = Path(__file__).parent.absolute()
|
|
|
+ task_dir = project_root / "data_pipeline" / "training_data" / task_id
|
|
|
+
|
|
|
+ deleted_files_count = 0
|
|
|
+ deleted_size = 0
|
|
|
+
|
|
|
+ if task_dir.exists():
|
|
|
+ # 计算删除前的统计信息
|
|
|
+ for file_path in task_dir.rglob('*'):
|
|
|
+ if file_path.is_file():
|
|
|
+ deleted_files_count += 1
|
|
|
+ deleted_size += file_path.stat().st_size
|
|
|
+
|
|
|
+ # 删除目录
|
|
|
+ shutil.rmtree(task_dir)
|
|
|
+ directory_deleted = True
|
|
|
+ else:
|
|
|
+ directory_deleted = False
|
|
|
+
|
|
|
+ # 2. 更新数据库
|
|
|
+ database_records_deleted = False
|
|
|
+
|
|
|
+ try:
|
|
|
+ conn = psycopg2.connect(**PGVECTOR_CONFIG)
|
|
|
+ cur = conn.cursor()
|
|
|
+
|
|
|
+ if delete_database_records:
|
|
|
+ # 删除任务步骤记录
|
|
|
+ cur.execute("DELETE FROM data_pipeline_task_steps WHERE task_id = %s", (task_id,))
|
|
|
+ # 删除任务主记录
|
|
|
+ cur.execute("DELETE FROM data_pipeline_tasks WHERE task_id = %s", (task_id,))
|
|
|
+ database_records_deleted = True
|
|
|
+ else:
|
|
|
+ # 只更新目录状态
|
|
|
+ cur.execute("""
|
|
|
+ UPDATE data_pipeline_tasks
|
|
|
+ SET directory_exists = FALSE, updated_at = CURRENT_TIMESTAMP
|
|
|
+ WHERE task_id = %s
|
|
|
+ """, (task_id,))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ cur.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ except Exception as db_error:
|
|
|
+ logger.error(f"数据库操作失败: {db_error}")
|
|
|
+ # 数据库失败不影响文件删除的结果
|
|
|
+
|
|
|
+ # 3. 格式化文件大小
|
|
|
+ def format_size(size_bytes):
|
|
|
+ if size_bytes < 1024:
|
|
|
+ return f"{size_bytes} B"
|
|
|
+ elif size_bytes < 1024**2:
|
|
|
+ return f"{size_bytes/1024:.1f} KB"
|
|
|
+ elif size_bytes < 1024**3:
|
|
|
+ return f"{size_bytes/(1024**2):.1f} MB"
|
|
|
+ else:
|
|
|
+ return f"{size_bytes/(1024**3):.1f} GB"
|
|
|
+
|
|
|
+ return {
|
|
|
+ "success": True,
|
|
|
+ "task_id": task_id,
|
|
|
+ "directory_deleted": directory_deleted,
|
|
|
+ "database_records_deleted": database_records_deleted,
|
|
|
+ "deleted_files_count": deleted_files_count,
|
|
|
+ "deleted_size": format_size(deleted_size),
|
|
|
+ "deleted_at": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"删除任务目录失败: {task_id}, 错误: {str(e)}")
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "task_id": task_id,
|
|
|
+ "error": str(e),
|
|
|
+ "error_code": "DELETE_FAILED"
|
|
|
+ }
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks', methods=['DELETE'])
|
|
|
+def delete_tasks():
|
|
|
+ """删除任务目录(支持单个和批量)"""
|
|
|
+ try:
|
|
|
+ # 获取请求参数
|
|
|
+ req = request.get_json(force=True)
|
|
|
+
|
|
|
+ # 验证必需参数
|
|
|
+ task_ids = req.get('task_ids')
|
|
|
+ confirm = req.get('confirm')
|
|
|
+
|
|
|
+ if not task_ids:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="缺少必需参数: task_ids",
|
|
|
+ missing_params=['task_ids']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not confirm:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="缺少必需参数: confirm",
|
|
|
+ missing_params=['confirm']
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if confirm != True:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="confirm参数必须为true以确认删除操作"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not isinstance(task_ids, list) or len(task_ids) == 0:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="task_ids必须是非空的任务ID列表"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 获取可选参数
|
|
|
+ delete_database_records = req.get('delete_database_records', False)
|
|
|
+ continue_on_error = req.get('continue_on_error', True)
|
|
|
+
|
|
|
+ # 执行批量删除操作
|
|
|
+ deleted_tasks = []
|
|
|
+ failed_tasks = []
|
|
|
+ total_size_freed = 0
|
|
|
+
|
|
|
+ for task_id in task_ids:
|
|
|
+ result = delete_task_directory_simple(task_id, delete_database_records)
|
|
|
+
|
|
|
+ if result["success"]:
|
|
|
+ deleted_tasks.append(result)
|
|
|
+ # 累计释放的空间大小(这里简化处理,实际应该解析size字符串)
|
|
|
+ else:
|
|
|
+ failed_tasks.append({
|
|
|
+ "task_id": task_id,
|
|
|
+ "error": result["error"],
|
|
|
+ "error_code": result.get("error_code", "UNKNOWN")
|
|
|
+ })
|
|
|
+
|
|
|
+ if not continue_on_error:
|
|
|
+ break
|
|
|
+
|
|
|
+ # 构建响应
|
|
|
+ summary = {
|
|
|
+ "total_requested": len(task_ids),
|
|
|
+ "successfully_deleted": len(deleted_tasks),
|
|
|
+ "failed": len(failed_tasks)
|
|
|
+ }
|
|
|
+
|
|
|
+ batch_result = {
|
|
|
+ "deleted_tasks": deleted_tasks,
|
|
|
+ "failed_tasks": failed_tasks,
|
|
|
+ "summary": summary,
|
|
|
+ "deleted_at": datetime.now().isoformat()
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(task_ids) == 1:
|
|
|
+ # 单个删除
|
|
|
+ if summary["failed"] == 0:
|
|
|
+ message = "任务目录删除成功"
|
|
|
+ else:
|
|
|
+ message = "任务目录删除失败"
|
|
|
+ else:
|
|
|
+ # 批量删除
|
|
|
+ if summary["failed"] == 0:
|
|
|
+ message = "批量删除完成"
|
|
|
+ elif summary["successfully_deleted"] == 0:
|
|
|
+ message = "批量删除失败"
|
|
|
+ else:
|
|
|
+ message = "批量删除部分完成"
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text=message,
|
|
|
+ data=batch_result
|
|
|
+ )), 200
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"删除任务失败: 错误: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="删除任务失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+
|
|
|
+@app.flask_app.route('/api/v0/data_pipeline/tasks/<task_id>/logs/query', methods=['POST'])
|
|
|
+def query_data_pipeline_task_logs(task_id):
|
|
|
+ """
|
|
|
+ 高级查询数据管道任务日志
|
|
|
+
|
|
|
+ 支持复杂筛选、排序、分页功能
|
|
|
+
|
|
|
+ 请求体:
|
|
|
+ {
|
|
|
+ "page": 1, // 页码,必须大于0,默认1
|
|
|
+ "page_size": 50, // 每页大小,1-500之间,默认50
|
|
|
+ "level": "ERROR", // 可选,日志级别筛选:"DEBUG"|"INFO"|"WARNING"|"ERROR"|"CRITICAL"
|
|
|
+ "start_time": "2025-01-01 00:00:00", // 可选,开始时间范围 (YYYY-MM-DD HH:MM:SS)
|
|
|
+ "end_time": "2025-01-02 23:59:59", // 可选,结束时间范围 (YYYY-MM-DD HH:MM:SS)
|
|
|
+ "keyword": "failed", // 可选,关键字搜索(消息内容模糊匹配)
|
|
|
+ "logger_name": "DDLGenerator", // 可选,日志记录器名称精确匹配
|
|
|
+ "step_name": "ddl_generation", // 可选,执行步骤名称精确匹配
|
|
|
+ "sort_by": "timestamp", // 可选,排序字段:"timestamp"|"level"|"logger"|"step"|"line_number",默认"timestamp"
|
|
|
+ "sort_order": "desc" // 可选,排序方向:"asc"|"desc",默认"desc"
|
|
|
+ }
|
|
|
+
|
|
|
+ 响应:
|
|
|
+ {
|
|
|
+ "success": true,
|
|
|
+ "code": 200,
|
|
|
+ "message": "查询任务日志成功",
|
|
|
+ "data": {
|
|
|
+ "logs": [
|
|
|
+ {
|
|
|
+ "timestamp": "2025-07-01 14:30:52",
|
|
|
+ "level": "INFO",
|
|
|
+ "logger": "SimpleWorkflowExecutor",
|
|
|
+ "step": "ddl_generation",
|
|
|
+ "message": "开始DDL生成",
|
|
|
+ "line_number": 15
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "pagination": {
|
|
|
+ "page": 1,
|
|
|
+ "page_size": 50,
|
|
|
+ "total": 1000,
|
|
|
+ "total_pages": 20,
|
|
|
+ "has_next": true,
|
|
|
+ "has_prev": false
|
|
|
+ },
|
|
|
+ "log_file_info": {
|
|
|
+ "exists": true,
|
|
|
+ "file_path": "/path/to/log/file",
|
|
|
+ "file_size": 1024000,
|
|
|
+ "file_size_formatted": "1.0 MB",
|
|
|
+ "last_modified": "2025-07-01T14:30:52",
|
|
|
+ "total_lines": 5000
|
|
|
+ },
|
|
|
+ "query_time": "0.123s"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证任务是否存在
|
|
|
+ manager = get_data_pipeline_manager()
|
|
|
+ task_info = manager.get_task_status(task_id)
|
|
|
+ if not task_info:
|
|
|
+ return jsonify(not_found_response(
|
|
|
+ response_text=f"任务不存在: {task_id}"
|
|
|
+ )), 404
|
|
|
+
|
|
|
+ # 解析请求数据
|
|
|
+ request_data = request.get_json() or {}
|
|
|
+
|
|
|
+ # 参数验证
|
|
|
+ def _is_valid_time_format(time_str):
|
|
|
+ """验证时间格式是否有效"""
|
|
|
+ if not time_str:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 支持的时间格式
|
|
|
+ time_formats = [
|
|
|
+ '%Y-%m-%d %H:%M:%S', # 2025-01-01 00:00:00
|
|
|
+ '%Y-%m-%d', # 2025-01-01
|
|
|
+ '%Y-%m-%dT%H:%M:%S', # 2025-01-01T00:00:00
|
|
|
+ '%Y-%m-%dT%H:%M:%S.%f', # 2025-01-01T00:00:00.123456
|
|
|
+ ]
|
|
|
+
|
|
|
+ for fmt in time_formats:
|
|
|
+ try:
|
|
|
+ from datetime import datetime
|
|
|
+ datetime.strptime(time_str, fmt)
|
|
|
+ return True
|
|
|
+ except ValueError:
|
|
|
+ continue
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 提取和验证参数
|
|
|
+ page = request_data.get('page', 1)
|
|
|
+ page_size = request_data.get('page_size', 50)
|
|
|
+ level = request_data.get('level')
|
|
|
+ start_time = request_data.get('start_time')
|
|
|
+ end_time = request_data.get('end_time')
|
|
|
+ keyword = request_data.get('keyword')
|
|
|
+ logger_name = request_data.get('logger_name')
|
|
|
+ step_name = request_data.get('step_name')
|
|
|
+ sort_by = request_data.get('sort_by', 'timestamp')
|
|
|
+ sort_order = request_data.get('sort_order', 'desc')
|
|
|
+
|
|
|
+ # 参数验证
|
|
|
+ if not isinstance(page, int) or page < 1:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="页码必须是大于0的整数"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not isinstance(page_size, int) or page_size < 1 or page_size > 500:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="每页大小必须是1-500之间的整数"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证日志级别
|
|
|
+ if level and level.upper() not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="日志级别必须是DEBUG、INFO、WARNING、ERROR、CRITICAL之一"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证时间格式
|
|
|
+ if not _is_valid_time_format(start_time):
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="开始时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ if not _is_valid_time_format(end_time):
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="结束时间格式无效,支持格式:YYYY-MM-DD HH:MM:SS 或 YYYY-MM-DD"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证关键字长度
|
|
|
+ if keyword and len(keyword) > 200:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="关键字长度不能超过200个字符"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证排序字段
|
|
|
+ allowed_sort_fields = ['timestamp', 'level', 'logger', 'step', 'line_number']
|
|
|
+ if sort_by not in allowed_sort_fields:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text=f"排序字段必须是以下之一: {', '.join(allowed_sort_fields)}"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 验证排序方向
|
|
|
+ if sort_order.lower() not in ['asc', 'desc']:
|
|
|
+ return jsonify(bad_request_response(
|
|
|
+ response_text="排序方向必须是asc或desc"
|
|
|
+ )), 400
|
|
|
+
|
|
|
+ # 创建工作流执行器并查询日志
|
|
|
+ from data_pipeline.api.simple_workflow import SimpleWorkflowExecutor
|
|
|
+ executor = SimpleWorkflowExecutor(task_id)
|
|
|
+
|
|
|
+ try:
|
|
|
+ result = executor.query_logs_advanced(
|
|
|
+ page=page,
|
|
|
+ page_size=page_size,
|
|
|
+ level=level,
|
|
|
+ start_time=start_time,
|
|
|
+ end_time=end_time,
|
|
|
+ keyword=keyword,
|
|
|
+ logger_name=logger_name,
|
|
|
+ step_name=step_name,
|
|
|
+ sort_by=sort_by,
|
|
|
+ sort_order=sort_order
|
|
|
+ )
|
|
|
+
|
|
|
+ return jsonify(success_response(
|
|
|
+ response_text="查询任务日志成功",
|
|
|
+ data=result
|
|
|
+ ))
|
|
|
+
|
|
|
+ finally:
|
|
|
+ executor.cleanup()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"查询数据管道任务日志失败: {str(e)}")
|
|
|
+ return jsonify(internal_error_response(
|
|
|
+ response_text="查询任务日志失败,请稍后重试"
|
|
|
+ )), 500
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ logger.info("启动Flask应用: http://localhost:8084")
|
|
|
+ app.run(host="0.0.0.0", port=8084, debug=True)
|