|
|
@@ -14,7 +14,6 @@ from app.core.graph.graph_operations import (
|
|
|
get_node,
|
|
|
relationship_exists,
|
|
|
)
|
|
|
-from app.core.llm.llm_service import llm_sql
|
|
|
from app.core.meta_data import get_formatted_time, translate_and_parse
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
@@ -256,6 +255,7 @@ class DataFlowService:
|
|
|
"update_mode": data.get("update_mode", "append"),
|
|
|
"script_type": data.get("script_type", "python"),
|
|
|
"script_requirement": script_requirement_str,
|
|
|
+ "script_path": "", # 脚本路径,任务完成后更新
|
|
|
"created_at": get_formatted_time(),
|
|
|
"updated_at": get_formatted_time(),
|
|
|
}
|
|
|
@@ -342,119 +342,41 @@ class DataFlowService:
|
|
|
name_en: str,
|
|
|
):
|
|
|
"""
|
|
|
- 将脚本信息保存到PG数据库
|
|
|
+ 将任务信息保存到PG数据库的task_list表
|
|
|
|
|
|
Args:
|
|
|
data: 包含脚本信息的数据
|
|
|
script_name: 脚本名称
|
|
|
name_en: 英文名称
|
|
|
"""
|
|
|
+ from app.config.config import config, current_env
|
|
|
+
|
|
|
try:
|
|
|
+ # 获取当前环境的配置
|
|
|
+ current_config = config.get(current_env, config["default"])
|
|
|
+ dataflow_schema = getattr(current_config, "DATAFLOW_SCHEMA", "dags")
|
|
|
+
|
|
|
# 提取脚本相关信息
|
|
|
# 处理 script_requirement,确保保存为 JSON 字符串
|
|
|
script_requirement_raw = data.get("script_requirement")
|
|
|
- # 用于保存从 script_requirement 中提取的 rule
|
|
|
- rule_from_requirement = ""
|
|
|
|
|
|
if script_requirement_raw is not None:
|
|
|
- # 如果是字典,提取 rule 字段
|
|
|
- if isinstance(script_requirement_raw, dict):
|
|
|
- rule_from_requirement = script_requirement_raw.get("rule", "")
|
|
|
- script_requirement = json.dumps(
|
|
|
- script_requirement_raw, ensure_ascii=False
|
|
|
- )
|
|
|
- elif isinstance(script_requirement_raw, list):
|
|
|
+ if isinstance(script_requirement_raw, (dict, list)):
|
|
|
script_requirement = json.dumps(
|
|
|
script_requirement_raw, ensure_ascii=False
|
|
|
)
|
|
|
else:
|
|
|
- # 如果已经是字符串,尝试解析以提取 rule
|
|
|
script_requirement = str(script_requirement_raw)
|
|
|
- try:
|
|
|
- parsed_req = json.loads(script_requirement)
|
|
|
- if isinstance(parsed_req, dict):
|
|
|
- rule_from_requirement = parsed_req.get("rule", "")
|
|
|
- except (json.JSONDecodeError, TypeError):
|
|
|
- pass
|
|
|
else:
|
|
|
script_requirement = ""
|
|
|
|
|
|
- # 处理 script_content:优先使用前端传入的值,如果为空则使用从 script_requirement 提取的 rule
|
|
|
- script_content = data.get("script_content", "")
|
|
|
- if not script_content and rule_from_requirement:
|
|
|
- script_content = rule_from_requirement
|
|
|
- logger.info(
|
|
|
- "script_content为空,使用从script_requirement提取的rule: %s",
|
|
|
- rule_from_requirement,
|
|
|
- )
|
|
|
-
|
|
|
- # 安全处理 source_table 和 target_table(避免 None 值导致的 'in' 操作错误)
|
|
|
- source_table_raw = data.get("source_table") or ""
|
|
|
- source_table = (
|
|
|
- source_table_raw.split(":")[-1]
|
|
|
- if ":" in source_table_raw
|
|
|
- else source_table_raw
|
|
|
- )
|
|
|
-
|
|
|
- target_table_raw = data.get("target_table") or ""
|
|
|
- target_table = (
|
|
|
- target_table_raw.split(":")[-1]
|
|
|
- if ":" in target_table_raw
|
|
|
- else (target_table_raw or name_en)
|
|
|
- )
|
|
|
-
|
|
|
- script_type = data.get("script_type", "python")
|
|
|
- user_name = data.get("created_by", "system")
|
|
|
- target_dt_column = data.get("target_dt_column", "")
|
|
|
-
|
|
|
# 验证必需字段
|
|
|
- if not target_table:
|
|
|
- target_table = name_en
|
|
|
if not script_name:
|
|
|
raise ValueError("script_name不能为空")
|
|
|
|
|
|
- # 构建插入SQL
|
|
|
- insert_sql = text(
|
|
|
- """
|
|
|
- INSERT INTO dags.data_transform_scripts
|
|
|
- (source_table, target_table, script_name, script_type,
|
|
|
- script_requirement, script_content, user_name, create_time,
|
|
|
- update_time, target_dt_column)
|
|
|
- VALUES
|
|
|
- (:source_table, :target_table, :script_name, :script_type,
|
|
|
- :script_requirement, :script_content, :user_name,
|
|
|
- :create_time, :update_time, :target_dt_column)
|
|
|
- ON CONFLICT (target_table, script_name)
|
|
|
- DO UPDATE SET
|
|
|
- source_table = EXCLUDED.source_table,
|
|
|
- script_type = EXCLUDED.script_type,
|
|
|
- script_requirement = EXCLUDED.script_requirement,
|
|
|
- script_content = EXCLUDED.script_content,
|
|
|
- user_name = EXCLUDED.user_name,
|
|
|
- update_time = EXCLUDED.update_time,
|
|
|
- target_dt_column = EXCLUDED.target_dt_column
|
|
|
- """
|
|
|
- )
|
|
|
-
|
|
|
- # 准备参数
|
|
|
current_time = datetime.now()
|
|
|
- params = {
|
|
|
- "source_table": source_table,
|
|
|
- "target_table": target_table,
|
|
|
- "script_name": script_name,
|
|
|
- "script_type": script_type,
|
|
|
- "script_requirement": script_requirement,
|
|
|
- "script_content": script_content,
|
|
|
- "user_name": user_name,
|
|
|
- "create_time": current_time,
|
|
|
- "update_time": current_time,
|
|
|
- "target_dt_column": target_dt_column,
|
|
|
- }
|
|
|
|
|
|
- # 执行插入操作
|
|
|
- db.session.execute(insert_sql, params)
|
|
|
-
|
|
|
- # 新增:保存到task_list表
|
|
|
+ # 保存到task_list表
|
|
|
try:
|
|
|
# 1. 解析script_requirement并构建详细的任务描述
|
|
|
task_description_md = script_requirement
|
|
|
@@ -545,6 +467,10 @@ class DataFlowService:
|
|
|
# 构建Markdown格式的任务描述
|
|
|
task_desc_parts = [f"# Task: {script_name}\n"]
|
|
|
|
|
|
+ # 添加DataFlow Schema配置信息
|
|
|
+ task_desc_parts.append("## DataFlow Configuration")
|
|
|
+ task_desc_parts.append(f"- **Schema**: {dataflow_schema}\n")
|
|
|
+
|
|
|
# 添加数据源信息
|
|
|
if data_source_info:
|
|
|
task_desc_parts.append("## Data Source")
|
|
|
@@ -670,8 +596,20 @@ class DataFlowService:
|
|
|
)
|
|
|
task_description_md = script_requirement
|
|
|
|
|
|
- # 假设运行根目录为项目根目录,dataflows.py在app/core/data_flow/
|
|
|
- code_path = "app/core/data_flow"
|
|
|
+ # 判断任务类型并设置code_path和code_name
|
|
|
+ # 如果是远程数据源导入任务,使用通用的import_resource_data.py脚本
|
|
|
+ if data_source_info:
|
|
|
+ # 远程数据源导入任务
|
|
|
+ code_path = "datafactory/scripts"
|
|
|
+ code_name = "import_resource_data.py"
|
|
|
+ logger.info(
|
|
|
+ f"检测到远程数据源导入任务,使用通用脚本: "
|
|
|
+ f"{code_path}/{code_name}"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 数据转换任务,需要生成专用脚本
|
|
|
+ code_path = "datafactory/scripts"
|
|
|
+ code_name = script_name
|
|
|
|
|
|
task_insert_sql = text(
|
|
|
"INSERT INTO public.task_list\n"
|
|
|
@@ -686,36 +624,26 @@ class DataFlowService:
|
|
|
"task_name": script_name,
|
|
|
"task_description": task_description_md,
|
|
|
"status": "pending",
|
|
|
- "code_name": script_name,
|
|
|
+ "code_name": code_name,
|
|
|
"code_path": code_path,
|
|
|
"create_by": "cursor",
|
|
|
"create_time": current_time,
|
|
|
"update_time": current_time,
|
|
|
}
|
|
|
|
|
|
- # 使用嵌套事务,确保task_list插入失败不影响主流程
|
|
|
- with db.session.begin_nested():
|
|
|
- db.session.execute(task_insert_sql, task_params)
|
|
|
+ db.session.execute(task_insert_sql, task_params)
|
|
|
+ db.session.commit()
|
|
|
|
|
|
logger.info(f"成功将任务信息写入task_list表: task_name={script_name}")
|
|
|
|
|
|
except Exception as task_error:
|
|
|
- # 记录错误但不中断主流程
|
|
|
+ db.session.rollback()
|
|
|
logger.error(f"写入task_list表失败: {str(task_error)}")
|
|
|
- # 如果要求必须成功写入任务列表,则这里应该raise task_error
|
|
|
- # raise task_error
|
|
|
-
|
|
|
- db.session.commit()
|
|
|
-
|
|
|
- logger.info(
|
|
|
- "成功将脚本信息写入PG数据库: target_table=%s, script_name=%s",
|
|
|
- target_table,
|
|
|
- script_name,
|
|
|
- )
|
|
|
+ raise task_error
|
|
|
|
|
|
except Exception as e:
|
|
|
db.session.rollback()
|
|
|
- logger.error(f"写入PG数据库失败: {str(e)}")
|
|
|
+ logger.error(f"保存到PG数据库失败: {str(e)}")
|
|
|
raise e
|
|
|
|
|
|
@staticmethod
|
|
|
@@ -823,6 +751,144 @@ class DataFlowService:
|
|
|
except Exception as e:
|
|
|
logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def update_dataflow_script_path(
|
|
|
+ dataflow_name: str,
|
|
|
+ script_path: str,
|
|
|
+ ) -> bool:
|
|
|
+ """
|
|
|
+ 更新 DataFlow 节点的脚本路径
|
|
|
+
|
|
|
+ 当任务完成后,将创建的 Python 脚本路径更新到 DataFlow 节点
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dataflow_name: 数据流名称(中文名)
|
|
|
+ script_path: Python 脚本的完整路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 是否更新成功
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ query = """
|
|
|
+ MATCH (n:DataFlow {name_zh: $name_zh})
|
|
|
+ SET n.script_path = $script_path, n.updated_at = $updated_at
|
|
|
+ RETURN n
|
|
|
+ """
|
|
|
+ with connect_graph().session() as session:
|
|
|
+ result = session.run(
|
|
|
+ query,
|
|
|
+ name_zh=dataflow_name,
|
|
|
+ script_path=script_path,
|
|
|
+ updated_at=get_formatted_time(),
|
|
|
+ ).single()
|
|
|
+
|
|
|
+ if result:
|
|
|
+ logger.info(
|
|
|
+ f"已更新 DataFlow 脚本路径: {dataflow_name} -> {script_path}"
|
|
|
+ )
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.warning(f"未找到 DataFlow 节点: {dataflow_name}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"更新 DataFlow 脚本路径失败: {str(e)}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def get_script_content(dataflow_id: int) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 根据 DataFlow ID 获取关联的脚本内容
|
|
|
+
|
|
|
+ Args:
|
|
|
+ dataflow_id: 数据流ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 包含脚本内容和元信息的字典:
|
|
|
+ - script_path: 脚本路径
|
|
|
+ - script_content: 脚本内容
|
|
|
+ - script_type: 脚本类型(如 python)
|
|
|
+ - dataflow_name: 数据流名称
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ ValueError: 当 DataFlow 不存在或脚本路径为空时
|
|
|
+ FileNotFoundError: 当脚本文件不存在时
|
|
|
+ """
|
|
|
+ from pathlib import Path
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 从 Neo4j 获取 DataFlow 节点
|
|
|
+ query = """
|
|
|
+ MATCH (n:DataFlow)
|
|
|
+ WHERE id(n) = $dataflow_id
|
|
|
+ RETURN n, id(n) as node_id
|
|
|
+ """
|
|
|
+
|
|
|
+ with connect_graph().session() as session:
|
|
|
+ result = session.run(query, dataflow_id=dataflow_id).single()
|
|
|
+
|
|
|
+ if not result:
|
|
|
+ raise ValueError(f"未找到 ID 为 {dataflow_id} 的 DataFlow 节点")
|
|
|
+
|
|
|
+ node = result["n"]
|
|
|
+ node_props = dict(node)
|
|
|
+
|
|
|
+ # 获取脚本路径
|
|
|
+ script_path = node_props.get("script_path", "")
|
|
|
+ if not script_path:
|
|
|
+ raise ValueError(
|
|
|
+ f"DataFlow (ID: {dataflow_id}) 的 script_path 属性为空"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 确定脚本文件的完整路径
|
|
|
+ # script_path 可能是相对路径或绝对路径
|
|
|
+ script_file = Path(script_path)
|
|
|
+
|
|
|
+ # 如果是相对路径,相对于项目根目录
|
|
|
+ if not script_file.is_absolute():
|
|
|
+ # 获取项目根目录(假设 app 目录的父目录是项目根)
|
|
|
+ project_root = Path(__file__).parent.parent.parent.parent
|
|
|
+ script_file = project_root / script_path
|
|
|
+
|
|
|
+ # 检查文件是否存在
|
|
|
+ if not script_file.exists():
|
|
|
+ raise FileNotFoundError(f"脚本文件不存在: {script_file}")
|
|
|
+
|
|
|
+ # 读取脚本内容
|
|
|
+ with script_file.open("r", encoding="utf-8") as f:
|
|
|
+ script_content = f.read()
|
|
|
+
|
|
|
+ # 确定脚本类型
|
|
|
+ suffix = script_file.suffix.lower()
|
|
|
+ script_type_map = {
|
|
|
+ ".py": "python",
|
|
|
+ ".js": "javascript",
|
|
|
+ ".ts": "typescript",
|
|
|
+ ".sql": "sql",
|
|
|
+ ".sh": "shell",
|
|
|
+ }
|
|
|
+ script_type = script_type_map.get(suffix, "text")
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"成功读取脚本内容: DataFlow ID={dataflow_id}, "
|
|
|
+ f"路径={script_path}, 类型={script_type}"
|
|
|
+ )
|
|
|
+
|
|
|
+ return {
|
|
|
+ "script_path": script_path,
|
|
|
+ "script_content": script_content,
|
|
|
+ "script_type": script_type,
|
|
|
+ "dataflow_id": dataflow_id,
|
|
|
+ "dataflow_name": node_props.get("name_zh", ""),
|
|
|
+ "dataflow_name_en": node_props.get("name_en", ""),
|
|
|
+ }
|
|
|
+
|
|
|
+ except (ValueError, FileNotFoundError):
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取脚本内容失败: {str(e)}")
|
|
|
+ raise
|
|
|
+
|
|
|
@staticmethod
|
|
|
def update_dataflow(
|
|
|
dataflow_id: int,
|
|
|
@@ -1118,281 +1184,6 @@ class DataFlowService:
|
|
|
logger.error(f"获取数据流日志失败: {str(e)}")
|
|
|
raise e
|
|
|
|
|
|
- @staticmethod
|
|
|
- def create_script(request_data: Union[Dict[str, Any], str]) -> str:
|
|
|
- """
|
|
|
- 使用Deepseek模型生成SQL脚本
|
|
|
-
|
|
|
- Args:
|
|
|
- request_data: 包含input, output, request_content的请求数据字典,或JSON字符串
|
|
|
-
|
|
|
- Returns:
|
|
|
- 生成的SQL脚本内容
|
|
|
- """
|
|
|
- try:
|
|
|
- logger.info(f"开始处理脚本生成请求: {request_data}")
|
|
|
- logger.info(f"request_data类型: {type(request_data)}")
|
|
|
-
|
|
|
- # 类型检查和处理
|
|
|
- if isinstance(request_data, str):
|
|
|
- logger.warning(f"request_data是字符串,尝试解析为JSON: {request_data}")
|
|
|
- try:
|
|
|
- import json
|
|
|
-
|
|
|
- request_data = json.loads(request_data)
|
|
|
- except json.JSONDecodeError as e:
|
|
|
- raise ValueError(f"无法解析request_data为JSON: {str(e)}") from e
|
|
|
-
|
|
|
- if not isinstance(request_data, dict):
|
|
|
- raise ValueError(
|
|
|
- f"request_data必须是字典类型,实际类型: {type(request_data)}"
|
|
|
- )
|
|
|
-
|
|
|
- # 1. 从传入的request_data中解析input, output, request_content内容
|
|
|
- input_data = request_data.get("input", "")
|
|
|
- output_data = request_data.get("output", "")
|
|
|
-
|
|
|
- request_content = request_data.get("request_data", "")
|
|
|
-
|
|
|
- # 如果request_content是HTML格式,提取纯文本
|
|
|
- if request_content and (
|
|
|
- request_content.startswith("<p>") or "<" in request_content
|
|
|
- ):
|
|
|
- # 简单的HTML标签清理
|
|
|
- import re
|
|
|
-
|
|
|
- request_content = re.sub(r"<[^>]+>", "", request_content).strip()
|
|
|
-
|
|
|
- if not input_data or not output_data or not request_content:
|
|
|
- raise ValueError(
|
|
|
- "缺少必要参数:input='{}', output='{}', "
|
|
|
- "request_content='{}' 不能为空".format(
|
|
|
- input_data,
|
|
|
- output_data,
|
|
|
- request_content[:100] if request_content else "",
|
|
|
- )
|
|
|
- )
|
|
|
-
|
|
|
- logger.info(
|
|
|
- "解析得到 - input: %s, output: %s, request_content: %s",
|
|
|
- input_data,
|
|
|
- output_data,
|
|
|
- request_content,
|
|
|
- )
|
|
|
-
|
|
|
- # 2. 解析input中的多个数据表并生成源表DDL
|
|
|
- source_tables_ddl = []
|
|
|
- input_tables = []
|
|
|
- if input_data:
|
|
|
- tables = [
|
|
|
- table.strip() for table in input_data.split(",") if table.strip()
|
|
|
- ]
|
|
|
- for table in tables:
|
|
|
- ddl = DataFlowService._parse_table_and_get_ddl(table, "input")
|
|
|
- if ddl:
|
|
|
- input_tables.append(table)
|
|
|
- source_tables_ddl.append(ddl)
|
|
|
- else:
|
|
|
- logger.warning(f"无法获取输入表 {table} 的DDL结构")
|
|
|
-
|
|
|
- # 3. 解析output中的数据表并生成目标表DDL
|
|
|
- target_table_ddl = ""
|
|
|
- if output_data:
|
|
|
- target_table_ddl = DataFlowService._parse_table_and_get_ddl(
|
|
|
- output_data.strip(), "output"
|
|
|
- )
|
|
|
- if not target_table_ddl:
|
|
|
- logger.warning(f"无法获取输出表 {output_data} 的DDL结构")
|
|
|
-
|
|
|
- # 4. 按照Deepseek-prompt.txt的框架构建提示语
|
|
|
- prompt_parts = []
|
|
|
-
|
|
|
- # 开场白 - 角色定义
|
|
|
- prompt_parts.append(
|
|
|
- "你是一名数据库工程师,正在构建一个PostgreSQL数据中的汇总逻辑。"
|
|
|
- "请为以下需求生成一段标准的 PostgreSQL SQL 脚本:"
|
|
|
- )
|
|
|
-
|
|
|
- # 动态生成源表部分(第1点)
|
|
|
- for i, (table, ddl) in enumerate(zip(input_tables, source_tables_ddl), 1):
|
|
|
- table_name = table.split(":")[-1] if ":" in table else table
|
|
|
- prompt_parts.append(f"{i}.有一个源表: {table_name},它的定义语句如下:")
|
|
|
- prompt_parts.append(ddl)
|
|
|
- prompt_parts.append("") # 添加空行分隔
|
|
|
-
|
|
|
- # 动态生成目标表部分(第2点)
|
|
|
- if target_table_ddl:
|
|
|
- target_table_name = (
|
|
|
- output_data.split(":")[-1] if ":" in output_data else output_data
|
|
|
- )
|
|
|
- next_index = len(input_tables) + 1
|
|
|
- prompt_parts.append(
|
|
|
- f"{next_index}.有一个目标表:{target_table_name},它的定义语句如下:"
|
|
|
- )
|
|
|
- prompt_parts.append(target_table_ddl)
|
|
|
- prompt_parts.append("") # 添加空行分隔
|
|
|
-
|
|
|
- # 动态生成处理逻辑部分(第3点)
|
|
|
- next_index = (
|
|
|
- len(input_tables) + 2 if target_table_ddl else len(input_tables) + 1
|
|
|
- )
|
|
|
- prompt_parts.append(f"{next_index}.处理逻辑为:{request_content}")
|
|
|
- prompt_parts.append("") # 添加空行分隔
|
|
|
-
|
|
|
- # 固定的技术要求部分(第4-8点)
|
|
|
- tech_requirements = [
|
|
|
- (
|
|
|
- f"{next_index + 1}.脚本应使用标准的 PostgreSQL 语法,"
|
|
|
- "适合在 Airflow、Python 脚本、或调度系统中调用;"
|
|
|
- ),
|
|
|
- f"{next_index + 2}.无需使用 UPSERT 或 ON CONFLICT",
|
|
|
- f"{next_index + 3}.请直接输出SQL,无需进行解释。",
|
|
|
- (
|
|
|
- f'{next_index + 4}.请给这段sql起个英文名,不少于三个英文单词,使用"_"分隔,'
|
|
|
- "采用蛇形命名法。把sql的名字作为注释写在返回的sql中。"
|
|
|
- ),
|
|
|
- (
|
|
|
- f"{next_index + 5}.生成的sql在向目标表插入数据的时候,向create_time字段写入当前日期"
|
|
|
- "时间now(),不用处理update_time字段"
|
|
|
- ),
|
|
|
- ]
|
|
|
-
|
|
|
- prompt_parts.extend(tech_requirements)
|
|
|
-
|
|
|
- # 组合完整的提示语
|
|
|
- full_prompt = "\n".join(prompt_parts)
|
|
|
-
|
|
|
- logger.info(f"构建的完整提示语长度: {len(full_prompt)}")
|
|
|
- logger.info(f"完整提示语内容: {full_prompt}")
|
|
|
-
|
|
|
- # 5. 调用LLM生成SQL脚本
|
|
|
- logger.info("开始调用Deepseek模型生成SQL脚本")
|
|
|
- script_content = llm_sql(full_prompt)
|
|
|
-
|
|
|
- if not script_content:
|
|
|
- raise ValueError("Deepseek模型返回空内容")
|
|
|
-
|
|
|
- # 确保返回的是文本格式
|
|
|
- if not isinstance(script_content, str):
|
|
|
- script_content = str(script_content)
|
|
|
-
|
|
|
- logger.info(f"SQL脚本生成成功,内容长度: {len(script_content)}")
|
|
|
-
|
|
|
- return script_content
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"生成SQL脚本失败: {str(e)}")
|
|
|
- raise e
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def _parse_table_and_get_ddl(table_str: str, table_type: str) -> str:
|
|
|
- """
|
|
|
- 解析表格式(A:B)并从Neo4j查询元数据生成DDL
|
|
|
-
|
|
|
- Args:
|
|
|
- table_str: 表格式字符串,格式为"label:name_en"
|
|
|
- table_type: 表类型,用于日志记录(input/output)
|
|
|
-
|
|
|
- Returns:
|
|
|
- DDL格式的表结构字符串
|
|
|
- """
|
|
|
- try:
|
|
|
- # 解析A:B格式
|
|
|
- if ":" not in table_str:
|
|
|
- logger.error(f"表格式错误,应为'label:name_en'格式: {table_str}")
|
|
|
- return ""
|
|
|
-
|
|
|
- parts = table_str.split(":", 1)
|
|
|
- if len(parts) != 2:
|
|
|
- logger.error(f"表格式解析失败: {table_str}")
|
|
|
- return ""
|
|
|
-
|
|
|
- label = parts[0].strip()
|
|
|
- name_en = parts[1].strip()
|
|
|
-
|
|
|
- if not label or not name_en:
|
|
|
- logger.error(f"标签或英文名为空: label={label}, name_en={name_en}")
|
|
|
- return ""
|
|
|
-
|
|
|
- logger.info(f"开始查询{table_type}表: label={label}, name_en={name_en}")
|
|
|
-
|
|
|
- # 从Neo4j查询节点及其关联的元数据
|
|
|
- with connect_graph().session() as session:
|
|
|
- # 查询节点及其关联的元数据
|
|
|
- cypher = f"""
|
|
|
- MATCH (n:{label} {{name_en: $name_en}})
|
|
|
- OPTIONAL MATCH (n)-[:INCLUDES]->(m:DataMeta)
|
|
|
- RETURN n, collect(m) as metadata
|
|
|
- """
|
|
|
-
|
|
|
- result = session.run(
|
|
|
- cypher, # type: ignore[arg-type]
|
|
|
- {"name_en": name_en},
|
|
|
- )
|
|
|
- record = result.single()
|
|
|
-
|
|
|
- if not record:
|
|
|
- logger.error(f"未找到节点: label={label}, name_en={name_en}")
|
|
|
- return ""
|
|
|
-
|
|
|
- node = record["n"]
|
|
|
- metadata = record["metadata"]
|
|
|
-
|
|
|
- logger.info(f"找到节点,关联元数据数量: {len(metadata)}")
|
|
|
-
|
|
|
- # 生成DDL格式的表结构
|
|
|
- ddl_lines = []
|
|
|
- ddl_lines.append(f"CREATE TABLE {name_en} (")
|
|
|
-
|
|
|
- if metadata:
|
|
|
- column_definitions = []
|
|
|
- for meta in metadata:
|
|
|
- if meta: # 确保meta不为空
|
|
|
- meta_props = dict(meta)
|
|
|
- column_name = meta_props.get(
|
|
|
- "name_en",
|
|
|
- meta_props.get("name_zh", "unknown_column"),
|
|
|
- )
|
|
|
- data_type = meta_props.get("data_type", "VARCHAR(255)")
|
|
|
- comment = meta_props.get("name_zh", "")
|
|
|
-
|
|
|
- # 构建列定义
|
|
|
- column_def = f" {column_name} {data_type}"
|
|
|
- if comment:
|
|
|
- column_def += f" COMMENT '{comment}'"
|
|
|
-
|
|
|
- column_definitions.append(column_def)
|
|
|
-
|
|
|
- if column_definitions:
|
|
|
- ddl_lines.append(",\n".join(column_definitions))
|
|
|
- else:
|
|
|
- ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
|
|
|
- else:
|
|
|
- # 如果没有元数据,添加默认列
|
|
|
- ddl_lines.append(" id BIGINT PRIMARY KEY COMMENT '主键ID'")
|
|
|
-
|
|
|
- ddl_lines.append(");")
|
|
|
-
|
|
|
- # 添加表注释
|
|
|
- node_props = dict(node)
|
|
|
- table_comment = node_props.get(
|
|
|
- "name_zh", node_props.get("describe", name_en)
|
|
|
- )
|
|
|
- if table_comment and table_comment != name_en:
|
|
|
- ddl_lines.append(
|
|
|
- f"COMMENT ON TABLE {name_en} IS '{table_comment}';"
|
|
|
- )
|
|
|
-
|
|
|
- ddl_content = "\n".join(ddl_lines)
|
|
|
- logger.info(f"{table_type}表DDL生成成功: {name_en}")
|
|
|
- logger.debug(f"生成的DDL: {ddl_content}")
|
|
|
-
|
|
|
- return ddl_content
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"解析表格式和生成DDL失败: {str(e)}")
|
|
|
- return ""
|
|
|
-
|
|
|
@staticmethod
|
|
|
def _generate_businessdomain_ddl(
|
|
|
session,
|