|
|
@@ -845,14 +845,15 @@ class DataProductService:
|
|
|
max_depth: int = 10,
|
|
|
) -> dict[str, Any]:
|
|
|
"""
|
|
|
- 递归追溯数据生产链条
|
|
|
+ 追溯数据生产链条(使用广度优先遍历)
|
|
|
|
|
|
追溯逻辑(从目标节点向上游追溯):
|
|
|
- 1. 从当前 BusinessDomain 找到通过 OUTPUT 关系指向它的 DataFlow
|
|
|
+ 1. 从当前 BusinessDomain 找到通过 OUTPUT 关系指向它的 DataFlow(反向查找)
|
|
|
2. 获取 DataFlow 的 script_requirement 作为数据流程定义
|
|
|
3. 从 DataFlow 找到通过 INPUT 关系连接的上游 BusinessDomain
|
|
|
4. 根据 sample_data 的键值在各 BusinessDomain 中匹配数据
|
|
|
- 5. 递归重复直到 BusinessDomain 没有被 DataFlow OUTPUT 指向
|
|
|
+ 5. 将新的 BusinessDomain 加入队列继续遍历
|
|
|
+ 6. 循环执行直到 BusinessDomain 没有被 DataFlow OUTPUT 指向为止
|
|
|
|
|
|
Args:
|
|
|
session: Neo4j会话
|
|
|
@@ -863,19 +864,17 @@ class DataProductService:
|
|
|
Returns:
|
|
|
包含 nodes, lines, lineage_depth 的字典
|
|
|
"""
|
|
|
- nodes: list[dict[str, Any]] = []
|
|
|
- lines: list[dict[str, Any]] = []
|
|
|
- visited_bd: set[int] = set()
|
|
|
- visited_df: set[int] = set()
|
|
|
+ nodes_dict: dict[int, dict[str, Any]] = {} # 节点字典: {node_id: node_props}
|
|
|
+ lines_dict: dict[str, dict[str, Any]] = {} # 关系字典: {rel_key: rel_props}
|
|
|
+ processed_bd: set[int] = set() # 已处理的 BusinessDomain 节点 ID
|
|
|
+ processed_df: set[int] = set() # 已处理的 DataFlow 节点 ID
|
|
|
|
|
|
- def trace_upstream(bd_id: int, depth: int) -> int:
|
|
|
- """递归追溯上游生产链条"""
|
|
|
- if depth >= max_depth or bd_id in visited_bd:
|
|
|
- return depth
|
|
|
+ # 使用队列进行广度优先遍历,队列元素为 (bd_id, depth)
|
|
|
+ queue: list[tuple[int, int]] = [(target_bd_id, 0)]
|
|
|
+ max_depth_reached = 0
|
|
|
|
|
|
- visited_bd.add(bd_id)
|
|
|
-
|
|
|
- # 获取 BusinessDomain 节点信息和字段
|
|
|
+ def get_business_domain_node(bd_id: int, depth: int) -> dict[str, Any] | None:
|
|
|
+ """获取 BusinessDomain 节点的完整信息(包括字段)"""
|
|
|
# 使用 CALL 子查询避免嵌套聚合函数的问题
|
|
|
bd_query = """
|
|
|
MATCH (bd:BusinessDomain)
|
|
|
@@ -898,7 +897,7 @@ class DataProductService:
|
|
|
"""
|
|
|
bd_result = session.run(bd_query, {"bd_id": bd_id}).single()
|
|
|
if not bd_result:
|
|
|
- return depth
|
|
|
+ return None
|
|
|
|
|
|
bd_node = dict(bd_result["bd"])
|
|
|
bd_labels = bd_result["bd_labels"]
|
|
|
@@ -919,71 +918,75 @@ class DataProductService:
|
|
|
elif name_en and name_en in sample_data:
|
|
|
matched_data[name_en] = sample_data[name_en]
|
|
|
|
|
|
- # 添加 BusinessDomain 节点
|
|
|
- nodes.append(
|
|
|
- {
|
|
|
- "id": bd_id,
|
|
|
- "node_type": "BusinessDomain",
|
|
|
- "name_zh": bd_node.get("name_zh") or bd_node.get("name", ""),
|
|
|
- "name_en": bd_node.get("name_en", ""),
|
|
|
- "labels": bd_labels,
|
|
|
- "depth": depth,
|
|
|
- "is_target": depth == 0,
|
|
|
- "is_source": "DataResource" in bd_labels,
|
|
|
- "fields": fields,
|
|
|
- "matched_data": matched_data,
|
|
|
- }
|
|
|
- )
|
|
|
+ return {
|
|
|
+ "id": bd_id,
|
|
|
+ "node_type": "BusinessDomain",
|
|
|
+ "name_zh": bd_node.get("name_zh") or bd_node.get("name", ""),
|
|
|
+ "name_en": bd_node.get("name_en", ""),
|
|
|
+ "labels": bd_labels,
|
|
|
+ "depth": depth,
|
|
|
+ "is_target": depth == 0,
|
|
|
+ "is_source": "DataResource" in bd_labels,
|
|
|
+ "fields": fields,
|
|
|
+ "matched_data": matched_data,
|
|
|
+ }
|
|
|
+
|
|
|
+ while queue:
|
|
|
+ current_bd_id, current_depth = queue.pop(0)
|
|
|
+
|
|
|
+ # 检查深度限制和是否已处理
|
|
|
+ if current_depth >= max_depth or current_bd_id in processed_bd:
|
|
|
+ continue
|
|
|
+
|
|
|
+ processed_bd.add(current_bd_id)
|
|
|
+
|
|
|
+ # 获取并添加当前 BusinessDomain 节点
|
|
|
+ bd_node_info = get_business_domain_node(current_bd_id, current_depth)
|
|
|
+ if bd_node_info:
|
|
|
+ nodes_dict[current_bd_id] = bd_node_info
|
|
|
+ max_depth_reached = max(max_depth_reached, current_depth)
|
|
|
|
|
|
- # 查找通过 OUTPUT 关系指向当前 BD 的 DataFlow
|
|
|
+ # 查找通过 OUTPUT 关系(反向)指向当前 BD 的 DataFlow
|
|
|
+ # 即: (df:DataFlow)-[:OUTPUT]->(bd:BusinessDomain)
|
|
|
df_query = """
|
|
|
- MATCH (df:DataFlow)-[:OUTPUT]->(bd:BusinessDomain)
|
|
|
+ MATCH (df:DataFlow)-[r:OUTPUT]->(bd:BusinessDomain)
|
|
|
WHERE id(bd) = $bd_id
|
|
|
RETURN df, id(df) as df_id, labels(df) as df_labels
|
|
|
"""
|
|
|
- df_results = session.run(df_query, {"bd_id": bd_id}).data()
|
|
|
-
|
|
|
- if not df_results:
|
|
|
- return depth # 无上游,停止追溯
|
|
|
-
|
|
|
- max_depth_reached = depth
|
|
|
+ df_results = session.run(df_query, {"bd_id": current_bd_id}).data()
|
|
|
|
|
|
for df_record in df_results:
|
|
|
df_id = df_record["df_id"]
|
|
|
- if df_id in visited_df:
|
|
|
- continue
|
|
|
- visited_df.add(df_id)
|
|
|
-
|
|
|
df_node = dict(df_record["df"])
|
|
|
|
|
|
- # 添加 DataFlow 节点
|
|
|
- nodes.append(
|
|
|
- {
|
|
|
+ # 如果 DataFlow 还未处理,添加节点信息
|
|
|
+ if df_id not in processed_df:
|
|
|
+ processed_df.add(df_id)
|
|
|
+ nodes_dict[df_id] = {
|
|
|
"id": df_id,
|
|
|
"node_type": "DataFlow",
|
|
|
"name_zh": df_node.get("name_zh") or df_node.get("name", ""),
|
|
|
"name_en": df_node.get("name_en", ""),
|
|
|
"labels": df_record["df_labels"],
|
|
|
- "depth": depth,
|
|
|
+ "depth": current_depth,
|
|
|
"script_requirement": df_node.get("script_requirement", ""),
|
|
|
"script_name": df_node.get("script_name", ""),
|
|
|
"script_type": df_node.get("script_type", ""),
|
|
|
"update_mode": df_node.get("update_mode", ""),
|
|
|
}
|
|
|
- )
|
|
|
|
|
|
# 添加 OUTPUT 关系
|
|
|
- lines.append(
|
|
|
- {
|
|
|
+ rel_key = f"OUTPUT_{df_id}_{current_bd_id}"
|
|
|
+ if rel_key not in lines_dict:
|
|
|
+ lines_dict[rel_key] = {
|
|
|
"from": df_id,
|
|
|
- "to": bd_id,
|
|
|
+ "to": current_bd_id,
|
|
|
"text": "OUTPUT",
|
|
|
}
|
|
|
- )
|
|
|
|
|
|
- # 查找上游 BusinessDomain
|
|
|
+ # 查找通过 INPUT 关系连接到该 DataFlow 的源 BusinessDomain
|
|
|
input_query = """
|
|
|
- MATCH (source:BusinessDomain)-[:INPUT]->(df:DataFlow)
|
|
|
+ MATCH (source:BusinessDomain)-[r:INPUT]->(df:DataFlow)
|
|
|
WHERE id(df) = $df_id
|
|
|
RETURN id(source) as source_id
|
|
|
"""
|
|
|
@@ -993,26 +996,22 @@ class DataProductService:
|
|
|
source_id = input_record["source_id"]
|
|
|
|
|
|
# 添加 INPUT 关系
|
|
|
- lines.append(
|
|
|
- {
|
|
|
+ input_rel_key = f"INPUT_{source_id}_{df_id}"
|
|
|
+ if input_rel_key not in lines_dict:
|
|
|
+ lines_dict[input_rel_key] = {
|
|
|
"from": source_id,
|
|
|
"to": df_id,
|
|
|
"text": "INPUT",
|
|
|
}
|
|
|
- )
|
|
|
-
|
|
|
- # 递归追溯上游
|
|
|
- reached = trace_upstream(source_id, depth + 1)
|
|
|
- max_depth_reached = max(max_depth_reached, reached)
|
|
|
|
|
|
- return max_depth_reached
|
|
|
-
|
|
|
- actual_depth = trace_upstream(target_bd_id, 0)
|
|
|
+ # 如果源 BusinessDomain 还未处理,加入队列继续遍历
|
|
|
+ if source_id not in processed_bd:
|
|
|
+ queue.append((source_id, current_depth + 1))
|
|
|
|
|
|
return {
|
|
|
- "nodes": nodes,
|
|
|
- "lines": lines,
|
|
|
- "lineage_depth": actual_depth,
|
|
|
+ "nodes": list(nodes_dict.values()),
|
|
|
+ "lines": list(lines_dict.values()),
|
|
|
+ "lineage_depth": max_depth_reached,
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -1951,14 +1950,27 @@ class DataOrderService:
|
|
|
f"name_zh={target_bd_name_zh}, name_en={target_bd_name_en}"
|
|
|
)
|
|
|
|
|
|
+ # 2.1 如果订单指定了数据源,建立 COME_FROM 关系
|
|
|
+ if order.data_source:
|
|
|
+ create_datasource_rel_query = """
|
|
|
+ MATCH (bd:BusinessDomain), (ds:DataSource)
|
|
|
+ WHERE id(bd) = $bd_id AND id(ds) = $ds_id
|
|
|
+ CREATE (bd)-[:COME_FROM]->(ds)
|
|
|
+ """
|
|
|
+ session.run(
|
|
|
+ create_datasource_rel_query,
|
|
|
+ {"bd_id": target_bd_id, "ds_id": order.data_source},
|
|
|
+ )
|
|
|
+ logger.info(
|
|
|
+ f"建立 COME_FROM 关系: {target_bd_id} -> "
|
|
|
+ f"DataSource:{order.data_source}"
|
|
|
+ )
|
|
|
+
|
|
|
# 3. 创建 DataFlow 节点
|
|
|
dataflow_name_en = f"DF_{order.order_no}"
|
|
|
dataflow_name_zh = f"{target_bd_name_zh}_数据流程"
|
|
|
|
|
|
- # 构建 script_requirement(包含完整的数据加工定义)
|
|
|
- input_domain_names = [
|
|
|
- d.get("name_zh", d.get("name_en", "")) for d in matched_domains
|
|
|
- ]
|
|
|
+ # 获取输入域 ID 列表
|
|
|
input_domain_ids = [d["id"] for d in matched_domains]
|
|
|
|
|
|
# 构建结构化的 script_requirement(JSON 格式)
|
|
|
@@ -1974,12 +1986,18 @@ class DataOrderService:
|
|
|
script_requirement_dict, ensure_ascii=False
|
|
|
)
|
|
|
|
|
|
+ # 预设脚本路径(与 _create_task_record 中的 code_path/code_name 保持一致)
|
|
|
+ code_path = "datafactory/scripts"
|
|
|
+ code_name = dataflow_name_en
|
|
|
+ script_path = f"{code_path}/{code_name}.py"
|
|
|
+
|
|
|
create_dataflow_query = """
|
|
|
CREATE (df:DataFlow {
|
|
|
name_en: $name_en,
|
|
|
name_zh: $name_zh,
|
|
|
script_requirement: $script_requirement,
|
|
|
- script_type: 'pending',
|
|
|
+ script_type: 'python',
|
|
|
+ script_path: $script_path,
|
|
|
update_mode: 'full',
|
|
|
status: 'inactive',
|
|
|
created_at: datetime(),
|
|
|
@@ -1994,6 +2012,7 @@ class DataOrderService:
|
|
|
"name_en": dataflow_name_en,
|
|
|
"name_zh": dataflow_name_zh,
|
|
|
"script_requirement": script_requirement_str,
|
|
|
+ "script_path": script_path,
|
|
|
"created_by": "system",
|
|
|
"order_id": order.id,
|
|
|
},
|
|
|
@@ -2031,15 +2050,35 @@ class DataOrderService:
|
|
|
|
|
|
logger.info(f"建立 OUTPUT 关系: {dataflow_id} -> {target_bd_id}")
|
|
|
|
|
|
- # 6. 在 task_list 表中创建任务记录
|
|
|
+ # 6. 注册数据产品
|
|
|
+ product_id = DataOrderService._register_order_data_product(
|
|
|
+ order=order,
|
|
|
+ target_bd_id=target_bd_id,
|
|
|
+ target_bd_name_zh=target_bd_name_zh,
|
|
|
+ target_bd_name_en=target_bd_name_en,
|
|
|
+ dataflow_id=dataflow_id,
|
|
|
+ dataflow_name_en=dataflow_name_en,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新订单的 result_product_id
|
|
|
+ if product_id:
|
|
|
+ order.result_product_id = product_id
|
|
|
+ db.session.commit()
|
|
|
+ logger.info(
|
|
|
+ f"订单关联数据产品: order_id={order.id}, product_id={product_id}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # 7. 在 task_list 表中创建任务记录
|
|
|
task_id = DataOrderService._create_task_record(
|
|
|
order=order,
|
|
|
dataflow_name_en=dataflow_name_en,
|
|
|
dataflow_name_zh=dataflow_name_zh,
|
|
|
dataflow_id=dataflow_id,
|
|
|
- input_domain_names=input_domain_names,
|
|
|
- target_bd_name_zh=target_bd_name_zh,
|
|
|
+ source_table_ids=input_domain_ids,
|
|
|
+ target_bd_id=target_bd_id,
|
|
|
+ update_mode="full",
|
|
|
processing_logic=processing_logic,
|
|
|
+ product_id=product_id,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
@@ -2049,21 +2088,90 @@ class DataOrderService:
|
|
|
"dataflow_name": dataflow_name_en,
|
|
|
"input_domain_ids": input_domain_ids,
|
|
|
"task_id": task_id,
|
|
|
+ "product_id": product_id,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"生成订单资源失败: {str(e)}")
|
|
|
raise
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def _register_order_data_product(
|
|
|
+ order: DataOrder,
|
|
|
+ target_bd_id: int,
|
|
|
+ target_bd_name_zh: str,
|
|
|
+ target_bd_name_en: str,
|
|
|
+ dataflow_id: int,
|
|
|
+ dataflow_name_en: str,
|
|
|
+ ) -> int | None:
|
|
|
+ """
|
|
|
+ 为订单注册数据产品
|
|
|
+
|
|
|
+ Args:
|
|
|
+ order: 数据订单对象
|
|
|
+ target_bd_id: 目标 BusinessDomain 节点 ID
|
|
|
+ target_bd_name_zh: 目标 BusinessDomain 中文名称
|
|
|
+ target_bd_name_en: 目标 BusinessDomain 英文名称
|
|
|
+ dataflow_id: DataFlow 节点 ID
|
|
|
+ dataflow_name_en: DataFlow 英文名称
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 创建的数据产品 ID,失败返回 None
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 从订单的数据源获取 schema
|
|
|
+ target_schema = "public"
|
|
|
+ if order.data_source:
|
|
|
+ with neo4j_driver.get_session() as session:
|
|
|
+ query = """
|
|
|
+ MATCH (ds:DataSource)
|
|
|
+ WHERE id(ds) = $ds_id
|
|
|
+ RETURN ds.schema as schema
|
|
|
+ """
|
|
|
+ result = session.run(query, ds_id=order.data_source).single()
|
|
|
+ if result and result.get("schema"):
|
|
|
+ target_schema = result["schema"]
|
|
|
+
|
|
|
+ # 目标表名使用 BusinessDomain 的英文名
|
|
|
+ target_table = target_bd_name_en
|
|
|
+
|
|
|
+ # 描述使用订单的用途或描述
|
|
|
+ description = order.extraction_purpose or order.description
|
|
|
+
|
|
|
+ # 调用数据产品服务进行注册
|
|
|
+ product = DataProductService.register_data_product(
|
|
|
+ product_name=target_bd_name_zh,
|
|
|
+ product_name_en=target_bd_name_en,
|
|
|
+ target_table=target_table,
|
|
|
+ target_schema=target_schema,
|
|
|
+ description=description,
|
|
|
+ source_dataflow_id=dataflow_id,
|
|
|
+ source_dataflow_name=dataflow_name_en,
|
|
|
+ created_by=order.created_by or "system",
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ f"订单数据产品注册成功: order_id={order.id}, "
|
|
|
+ f"product_id={product.id}, name={target_bd_name_zh}"
|
|
|
+ )
|
|
|
+ return product.id
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"注册订单数据产品失败: {str(e)}")
|
|
|
+ # 数据产品注册失败不阻塞主流程
|
|
|
+ return None
|
|
|
+
|
|
|
@staticmethod
|
|
|
def _create_task_record(
|
|
|
order: DataOrder,
|
|
|
dataflow_name_en: str,
|
|
|
dataflow_name_zh: str,
|
|
|
dataflow_id: int,
|
|
|
- input_domain_names: list[str],
|
|
|
- target_bd_name_zh: str,
|
|
|
+ source_table_ids: list[int],
|
|
|
+ target_bd_id: int,
|
|
|
+ update_mode: str,
|
|
|
processing_logic: str,
|
|
|
+ product_id: int | None = None,
|
|
|
) -> int | None:
|
|
|
"""
|
|
|
在 task_list 表中创建任务记录
|
|
|
@@ -2073,8 +2181,9 @@ class DataOrderService:
|
|
|
dataflow_name_en: DataFlow 英文名称
|
|
|
dataflow_name_zh: DataFlow 中文名称
|
|
|
dataflow_id: DataFlow 节点 ID
|
|
|
- input_domain_names: 输入域名称列表
|
|
|
- target_bd_name_zh: 目标 BusinessDomain 中文名称
|
|
|
+ source_table_ids: 源表 BusinessDomain ID 列表
|
|
|
+ target_bd_id: 目标 BusinessDomain ID
|
|
|
+ update_mode: 更新模式(append 或 full)
|
|
|
processing_logic: 数据加工处理逻辑
|
|
|
|
|
|
Returns:
|
|
|
@@ -2084,48 +2193,125 @@ class DataOrderService:
|
|
|
|
|
|
from sqlalchemy import text
|
|
|
|
|
|
+ from app.core.data_flow.dataflows import DataFlowService
|
|
|
+ from app.services.neo4j_driver import neo4j_driver as neo4j_drv
|
|
|
+
|
|
|
try:
|
|
|
current_time = datetime.now()
|
|
|
|
|
|
+ # 获取源表和目标表的 DDL 及数据源信息
|
|
|
+ source_tables_info = []
|
|
|
+ target_tables_info = []
|
|
|
+
|
|
|
+ with neo4j_drv.get_session() as session:
|
|
|
+ # 处理源表
|
|
|
+ for bd_id in source_table_ids:
|
|
|
+ ddl_info = DataFlowService._generate_businessdomain_ddl(
|
|
|
+ session, bd_id, is_target=False
|
|
|
+ )
|
|
|
+ if ddl_info:
|
|
|
+ source_tables_info.append(ddl_info)
|
|
|
+
|
|
|
+ # 处理目标表
|
|
|
+ ddl_info = DataFlowService._generate_businessdomain_ddl(
|
|
|
+ session, target_bd_id, is_target=True, update_mode=update_mode
|
|
|
+ )
|
|
|
+ if ddl_info:
|
|
|
+ target_tables_info.append(ddl_info)
|
|
|
+
|
|
|
# 构建 Markdown 格式的任务描述
|
|
|
- task_description_parts = [
|
|
|
- f"# Task: {dataflow_name_en}\n",
|
|
|
- "## DataFlow Configuration",
|
|
|
- f"- **DataFlow ID**: {dataflow_id}",
|
|
|
- f"- **DataFlow Name**: {dataflow_name_zh}",
|
|
|
- f"- **Order ID**: {order.id}",
|
|
|
- f"- **Order No**: {order.order_no}\n",
|
|
|
- "## Source Tables",
|
|
|
- ]
|
|
|
+ task_desc_parts = [f"# Task: {dataflow_name_en}\n"]
|
|
|
+
|
|
|
+ # 添加关联信息(用于工作流回调)
|
|
|
+ task_desc_parts.append("## Related Information")
|
|
|
+ task_desc_parts.append(f"- **Order ID**: {order.id}")
|
|
|
+ task_desc_parts.append(f"- **Order No**: {order.order_no}")
|
|
|
+ task_desc_parts.append(f"- **DataFlow ID**: {dataflow_id}")
|
|
|
+ task_desc_parts.append(f"- **DataFlow Name**: {dataflow_name_zh}")
|
|
|
+ if product_id:
|
|
|
+ task_desc_parts.append(f"- **Product ID**: {product_id}")
|
|
|
+ task_desc_parts.append("")
|
|
|
+
|
|
|
+ # 添加源表信息(DDL和数据源)
|
|
|
+ if source_tables_info:
|
|
|
+ task_desc_parts.append("## Source Tables")
|
|
|
+ for info in source_tables_info:
|
|
|
+ task_desc_parts.append(f"### {info['table_name']}")
|
|
|
+ if info.get("data_source"):
|
|
|
+ ds = info["data_source"]
|
|
|
+ task_desc_parts.append("**Data Source**")
|
|
|
+ task_desc_parts.append(f"- **Type**: {ds.get('type', 'N/A')}")
|
|
|
+ task_desc_parts.append(f"- **Host**: {ds.get('host', 'N/A')}")
|
|
|
+ task_desc_parts.append(f"- **Port**: {ds.get('port', 'N/A')}")
|
|
|
+ task_desc_parts.append(
|
|
|
+ f"- **Database**: {ds.get('database', 'N/A')}"
|
|
|
+ )
|
|
|
+ task_desc_parts.append(
|
|
|
+ f"- **Schema**: {ds.get('schema', 'N/A')}\n"
|
|
|
+ )
|
|
|
+ task_desc_parts.append("**DDL**")
|
|
|
+ task_desc_parts.append(f"```sql\n{info['ddl']}\n```\n")
|
|
|
+
|
|
|
+ # 添加目标表信息(DDL和数据源)
|
|
|
+ if target_tables_info:
|
|
|
+ task_desc_parts.append("## Target Tables")
|
|
|
+ for info in target_tables_info:
|
|
|
+ task_desc_parts.append(f"### {info['table_name']}")
|
|
|
+ if info.get("data_source"):
|
|
|
+ ds = info["data_source"]
|
|
|
+ task_desc_parts.append("**Data Source**")
|
|
|
+ task_desc_parts.append(f"- **Type**: {ds.get('type', 'N/A')}")
|
|
|
+ task_desc_parts.append(f"- **Host**: {ds.get('host', 'N/A')}")
|
|
|
+ task_desc_parts.append(f"- **Port**: {ds.get('port', 'N/A')}")
|
|
|
+ task_desc_parts.append(
|
|
|
+ f"- **Database**: {ds.get('database', 'N/A')}"
|
|
|
+ )
|
|
|
+ task_desc_parts.append(
|
|
|
+ f"- **Schema**: {ds.get('schema', 'N/A')}\n"
|
|
|
+ )
|
|
|
+ task_desc_parts.append("**DDL**")
|
|
|
+ task_desc_parts.append(f"```sql\n{info['ddl']}\n```\n")
|
|
|
+
|
|
|
+ # 添加更新模式说明
|
|
|
+ task_desc_parts.append("## Update Mode")
|
|
|
+ if update_mode == "append":
|
|
|
+ task_desc_parts.append("- **Mode**: Append (追加模式)")
|
|
|
+ task_desc_parts.append(
|
|
|
+ "- **Description**: 新数据将追加到目标表,不删除现有数据\n"
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ task_desc_parts.append("- **Mode**: Full Refresh (全量更新)")
|
|
|
+ task_desc_parts.append(
|
|
|
+ "- **Description**: 目标表将被清空后重新写入数据\n"
|
|
|
+ )
|
|
|
|
|
|
- # 添加输入域信息
|
|
|
- for name in input_domain_names:
|
|
|
- task_description_parts.append(f"- {name}")
|
|
|
-
|
|
|
- task_description_parts.extend(
|
|
|
- [
|
|
|
- "",
|
|
|
- "## Target Table",
|
|
|
- f"- {target_bd_name_zh}\n",
|
|
|
- "## Update Mode",
|
|
|
- "- **Mode**: Full Refresh (全量更新)",
|
|
|
- "- **Description**: 目标表将被清空后重新写入数据\n",
|
|
|
- "## Request Content",
|
|
|
- processing_logic or order.description,
|
|
|
- "",
|
|
|
- "## Implementation Steps",
|
|
|
- "1. 连接数据源,读取源数据表",
|
|
|
- "2. 根据处理逻辑执行数据转换",
|
|
|
- "3. 写入目标数据表",
|
|
|
- "4. 完成后回调更新订单状态为 onboard",
|
|
|
- ]
|
|
|
+ # 添加请求内容
|
|
|
+ if processing_logic:
|
|
|
+ task_desc_parts.append("## Request Content")
|
|
|
+ task_desc_parts.append(f"{processing_logic}\n")
|
|
|
+
|
|
|
+ # 添加实施步骤
|
|
|
+ task_desc_parts.append("## Implementation Steps")
|
|
|
+ task_desc_parts.append(
|
|
|
+ "1. Extract data from source tables as specified in the DDL"
|
|
|
+ )
|
|
|
+ task_desc_parts.append(
|
|
|
+ "2. Apply transformation logic according to the rule:"
|
|
|
+ )
|
|
|
+ if processing_logic:
|
|
|
+ task_desc_parts.append(f" - Rule: {processing_logic}")
|
|
|
+ task_desc_parts.append(
|
|
|
+ "3. Generate Python program to implement the data transformation logic"
|
|
|
+ )
|
|
|
+ task_desc_parts.append(
|
|
|
+ f"4. Write transformed data to target table using {update_mode} mode"
|
|
|
)
|
|
|
|
|
|
- task_description_md = "\n".join(task_description_parts)
|
|
|
+ task_description_md = "\n".join(task_desc_parts)
|
|
|
|
|
|
# 脚本路径和名称
|
|
|
code_path = "datafactory/scripts"
|
|
|
- code_name = f"{dataflow_name_en}.py"
|
|
|
+ code_name = dataflow_name_en
|
|
|
|
|
|
# 插入 task_list 表
|
|
|
task_insert_sql = text(
|
|
|
@@ -2157,6 +2343,24 @@ class DataOrderService:
|
|
|
logger.info(
|
|
|
f"成功创建任务记录: task_id={task_id}, task_name={dataflow_name_en}"
|
|
|
)
|
|
|
+
|
|
|
+ # 自动生成 n8n 工作流 JSON 文件
|
|
|
+ try:
|
|
|
+ workflow_path = DataOrderService._generate_n8n_workflow(
|
|
|
+ script_name=dataflow_name_en,
|
|
|
+ code_name=code_name,
|
|
|
+ code_path=code_path,
|
|
|
+ update_mode=update_mode,
|
|
|
+ order_id=order.id,
|
|
|
+ dataflow_id=dataflow_id,
|
|
|
+ product_id=product_id,
|
|
|
+ )
|
|
|
+ if workflow_path:
|
|
|
+ logger.info(f"成功生成n8n工作流文件: {workflow_path}")
|
|
|
+ except Exception as wf_error:
|
|
|
+ logger.warning(f"生成n8n工作流文件失败: {str(wf_error)}")
|
|
|
+ # 工作流生成失败不影响主流程
|
|
|
+
|
|
|
return task_id
|
|
|
|
|
|
except Exception as e:
|
|
|
@@ -2165,6 +2369,325 @@ class DataOrderService:
|
|
|
# 任务记录创建失败不阻塞主流程,返回 None
|
|
|
return None
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def _generate_n8n_workflow(
|
|
|
+ script_name: str,
|
|
|
+ code_name: str,
|
|
|
+ code_path: str,
|
|
|
+ update_mode: str = "full",
|
|
|
+ order_id: int | None = None,
|
|
|
+ dataflow_id: int | None = None,
|
|
|
+ product_id: int | None = None,
|
|
|
+ ) -> str | None:
|
|
|
+ """
|
|
|
+ 自动生成 n8n 工作流 JSON 文件
|
|
|
+
|
|
|
+ 生成的工作流包含以下步骤:
|
|
|
+ 1. 定时触发器
|
|
|
+ 2. SSH 执行脚本
|
|
|
+ 3. 检查执行结果
|
|
|
+ 4. 成功时调用 onboard 接口更新订单状态
|
|
|
+ 5. 设置成功/失败响应
|
|
|
+
|
|
|
+ Args:
|
|
|
+ script_name: 脚本/任务名称
|
|
|
+ code_name: 代码文件名
|
|
|
+ code_path: 代码路径
|
|
|
+ update_mode: 更新模式
|
|
|
+ order_id: 关联的数据订单 ID(用于回调更新状态)
|
|
|
+ dataflow_id: 关联的 DataFlow ID
|
|
|
+ product_id: 关联的数据产品 ID
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 生成的工作流文件路径,失败返回 None
|
|
|
+ """
|
|
|
+ import uuid
|
|
|
+ from datetime import datetime
|
|
|
+ from pathlib import Path
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 获取项目根目录
|
|
|
+ project_root = Path(__file__).parent.parent.parent.parent
|
|
|
+
|
|
|
+ # 确保工作流目录存在
|
|
|
+ workflows_dir = project_root / "datafactory" / "workflows"
|
|
|
+ workflows_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+
|
|
|
+ # 生成工作流文件名
|
|
|
+ workflow_filename = f"{script_name}_workflow.json"
|
|
|
+ workflow_path = workflows_dir / workflow_filename
|
|
|
+
|
|
|
+ # 生成唯一ID
|
|
|
+ def gen_id():
|
|
|
+ return str(uuid.uuid4())
|
|
|
+
|
|
|
+ # 构建完整的 SSH 命令,包含激活 venv
|
|
|
+ # 注意:由于 n8n 服务器与应用服务器分离,必须使用 SSH 节点
|
|
|
+ ssh_command = (
|
|
|
+ f"cd /opt/dataops-platform && source venv/bin/activate && "
|
|
|
+ f"python {code_path}/{code_name}.py"
|
|
|
+ )
|
|
|
+
|
|
|
+ # API 基础 URL(从配置获取或使用默认值)
|
|
|
+ api_base_url = "http://192.168.3.143:5000"
|
|
|
+
|
|
|
+ # 构建节点列表
|
|
|
+ nodes = [
|
|
|
+ # 1. 定时触发器
|
|
|
+ {
|
|
|
+ "parameters": {
|
|
|
+ "rule": {
|
|
|
+ "interval": [
|
|
|
+ {
|
|
|
+ "field": "days",
|
|
|
+ "daysInterval": 1,
|
|
|
+ "triggerAtHour": 1,
|
|
|
+ "triggerAtMinute": 0,
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Schedule Trigger",
|
|
|
+ "type": "n8n-nodes-base.scheduleTrigger",
|
|
|
+ "typeVersion": 1.2,
|
|
|
+ "position": [250, 300],
|
|
|
+ },
|
|
|
+ # 2. SSH 执行脚本
|
|
|
+ {
|
|
|
+ "parameters": {
|
|
|
+ "resource": "command",
|
|
|
+ "operation": "execute",
|
|
|
+ "command": ssh_command,
|
|
|
+ "cwd": "/opt/dataops-platform",
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Execute Script",
|
|
|
+ "type": "n8n-nodes-base.ssh",
|
|
|
+ "typeVersion": 1,
|
|
|
+ "position": [450, 300],
|
|
|
+ "credentials": {
|
|
|
+ "sshPassword": {
|
|
|
+ "id": "pYTwwuyC15caQe6y",
|
|
|
+ "name": "SSH Password account",
|
|
|
+ }
|
|
|
+ },
|
|
|
+ },
|
|
|
+ # 3. 检查执行结果
|
|
|
+ {
|
|
|
+ "parameters": {
|
|
|
+ "conditions": {
|
|
|
+ "options": {
|
|
|
+ "caseSensitive": True,
|
|
|
+ "leftValue": "",
|
|
|
+ "typeValidation": "strict",
|
|
|
+ },
|
|
|
+ "conditions": [
|
|
|
+ {
|
|
|
+ "id": "condition-success",
|
|
|
+ "leftValue": "={{ $json.code }}",
|
|
|
+ "rightValue": 0,
|
|
|
+ "operator": {
|
|
|
+ "type": "number",
|
|
|
+ "operation": "equals",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "combinator": "and",
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Check Result",
|
|
|
+ "type": "n8n-nodes-base.if",
|
|
|
+ "typeVersion": 2,
|
|
|
+ "position": [650, 300],
|
|
|
+ },
|
|
|
+ # 4. 成功响应
|
|
|
+ {
|
|
|
+ "parameters": {
|
|
|
+ "assignments": {
|
|
|
+ "assignments": [
|
|
|
+ {
|
|
|
+ "id": "result-success",
|
|
|
+ "name": "status",
|
|
|
+ "value": "success",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "result-message",
|
|
|
+ "name": "message",
|
|
|
+ "value": f"{script_name} 执行成功",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "result-output",
|
|
|
+ "name": "output",
|
|
|
+ "value": "={{ $json.stdout }}",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "result-time",
|
|
|
+ "name": "executionTime",
|
|
|
+ "value": "={{ $now.toISO() }}",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Success Response",
|
|
|
+ "type": "n8n-nodes-base.set",
|
|
|
+ "typeVersion": 3.4,
|
|
|
+ "position": [1050, 100],
|
|
|
+ },
|
|
|
+ # 5. 失败响应
|
|
|
+ {
|
|
|
+ "parameters": {
|
|
|
+ "assignments": {
|
|
|
+ "assignments": [
|
|
|
+ {
|
|
|
+ "id": "error-status",
|
|
|
+ "name": "status",
|
|
|
+ "value": "error",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "error-message",
|
|
|
+ "name": "message",
|
|
|
+ "value": f"{script_name} 执行失败",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "error-output",
|
|
|
+ "name": "error",
|
|
|
+ "value": "={{ $json.stderr }}",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "error-code",
|
|
|
+ "name": "exitCode",
|
|
|
+ "value": "={{ $json.code }}",
|
|
|
+ "type": "number",
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "id": "error-time",
|
|
|
+ "name": "executionTime",
|
|
|
+ "value": "={{ $now.toISO() }}",
|
|
|
+ "type": "string",
|
|
|
+ },
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Error Response",
|
|
|
+ "type": "n8n-nodes-base.set",
|
|
|
+ "typeVersion": 3.4,
|
|
|
+ "position": [850, 500],
|
|
|
+ },
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 构建连接关系
|
|
|
+ connections: dict[str, Any] = {
|
|
|
+ "Schedule Trigger": {
|
|
|
+ "main": [[{"node": "Execute Script", "type": "main", "index": 0}]]
|
|
|
+ },
|
|
|
+ "Execute Script": {
|
|
|
+ "main": [[{"node": "Check Result", "type": "main", "index": 0}]]
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ # 如果有订单ID,添加调用 onboard 接口的节点
|
|
|
+ if order_id:
|
|
|
+ # 添加调用 onboard 接口的 HTTP Request 节点
|
|
|
+ onboard_request_body = {
|
|
|
+ "dataflow_id": dataflow_id,
|
|
|
+ "processed_by": "n8n-workflow",
|
|
|
+ }
|
|
|
+ if product_id:
|
|
|
+ onboard_request_body["product_id"] = product_id
|
|
|
+
|
|
|
+ onboard_node = {
|
|
|
+ "parameters": {
|
|
|
+ "method": "POST",
|
|
|
+ "url": f"{api_base_url}/api/dataservice/orders/{order_id}/onboard",
|
|
|
+ "sendHeaders": True,
|
|
|
+ "headerParameters": {
|
|
|
+ "parameters": [
|
|
|
+ {
|
|
|
+ "name": "Content-Type",
|
|
|
+ "value": "application/json",
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ "sendBody": True,
|
|
|
+ "specifyBody": "json",
|
|
|
+ "jsonBody": json.dumps(
|
|
|
+ onboard_request_body, ensure_ascii=False
|
|
|
+ ),
|
|
|
+ "options": {
|
|
|
+ "timeout": 30000,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "id": gen_id(),
|
|
|
+ "name": "Update Order Status",
|
|
|
+ "type": "n8n-nodes-base.httpRequest",
|
|
|
+ "typeVersion": 4.2,
|
|
|
+ "position": [850, 200],
|
|
|
+ "continueOnFail": True,
|
|
|
+ }
|
|
|
+ nodes.append(onboard_node)
|
|
|
+
|
|
|
+ # 更新连接关系:成功后先调用 onboard 接口,再设置成功响应
|
|
|
+ connections["Check Result"] = {
|
|
|
+ "main": [
|
|
|
+ [{"node": "Update Order Status", "type": "main", "index": 0}],
|
|
|
+ [{"node": "Error Response", "type": "main", "index": 0}],
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ connections["Update Order Status"] = {
|
|
|
+ "main": [[{"node": "Success Response", "type": "main", "index": 0}]]
|
|
|
+ }
|
|
|
+ else:
|
|
|
+ # 没有订单ID时,使用原来的连接关系
|
|
|
+ connections["Check Result"] = {
|
|
|
+ "main": [
|
|
|
+ [{"node": "Success Response", "type": "main", "index": 0}],
|
|
|
+ [{"node": "Error Response", "type": "main", "index": 0}],
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+ workflow_json = {
|
|
|
+ "name": f"{script_name}_工作流",
|
|
|
+ "nodes": nodes,
|
|
|
+ "connections": connections,
|
|
|
+ "active": False,
|
|
|
+ "settings": {"executionOrder": "v1"},
|
|
|
+ "versionId": "1",
|
|
|
+ "meta": {
|
|
|
+ "templateCredsSetupCompleted": False,
|
|
|
+ "instanceId": "dataops-platform",
|
|
|
+ },
|
|
|
+ "tags": [
|
|
|
+ {
|
|
|
+ "createdAt": datetime.now().isoformat() + "Z",
|
|
|
+ "updatedAt": datetime.now().isoformat() + "Z",
|
|
|
+ "id": "1",
|
|
|
+ "name": "数据流程",
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ }
|
|
|
+
|
|
|
+ # 写入文件
|
|
|
+ with open(workflow_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(workflow_json, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ logger.info(f"成功生成n8n工作流文件: {workflow_path}")
|
|
|
+ return str(workflow_path)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"生成n8n工作流失败: {str(e)}")
|
|
|
+ return None
|
|
|
+
|
|
|
@staticmethod
|
|
|
def set_order_onboard(
|
|
|
order_id: int,
|