Procházet zdrojové kódy

增加了对load_file.py的支持

wangxq před 3 týdny
rodič
revize
25d64887fd

+ 82 - 28
dags/dag_dataops_pipeline_data_scheduler.py

@@ -155,6 +155,10 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
     logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
 
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
     # 检查script_name是否为空
     if not script_name:
         logger.error(f"表 {target_table} 的script_name为空,无法执行")
@@ -178,16 +182,27 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
         spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
         module = importlib.util.module_from_spec(spec)
         spec.loader.exec_module(module)
+
+
         
         # 检查并调用标准入口函数run
         if hasattr(module, "run"):
             logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            result = module.run(
-                table_name=target_table, 
-                execution_mode=script_exec_mode,
-                exec_date=exec_date,
-                script_name=script_name
-            )
+            # 构建完整的参数字典
+            run_params = {
+                "table_name": target_table,
+                "execution_mode": script_exec_mode,
+                "exec_date": exec_date
+            }
+
+            ## 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'frequency']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key] 
+
+            # 调用脚本的run函数
+            logger.info(f"调用run函数并传递参数: {run_params}")
+            result = module.run(**run_params)
             logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
             
             # 确保result是布尔值
@@ -491,22 +506,29 @@ def prepare_dag_schedule(**kwargs):
     
     for table in valid_tables:
         if table.get('target_table_label') == 'DataResource':
-            resource_tasks.append({
+            task_info = {
                 "source_table": table.get('source_table'),
                 "target_table": table['target_table'],
                 "target_table_label": "DataResource",
                 "script_name": table.get('script_name'),
-                "script_exec_mode": table.get('script_exec_mode', 'append')
-            })
+                "script_exec_mode": table.get('script_exec_mode', 'append'),
+                "frequency": table.get('frequency')
+            }
+            # 为structure类型添加特殊属性
+            if table.get('target_type') == "structure":
+                task_info["target_type"] = "structure"
+                task_info["storage_location"] = table.get('storage_location')
+
+            resource_tasks.append(task_info)
         elif table.get('target_table_label') == 'DataModel':
             model_tasks.append({
                 "source_table": table.get('source_table'),
                 "target_table": table['target_table'],
                 "target_table_label": "DataModel",
                 "script_name": table.get('script_name'),
-                "script_exec_mode": table.get('script_exec_mode', 'append')
-            })
-    
+                "script_exec_mode": table.get('script_exec_mode', 'append'),
+                "frequency": table.get('frequency')
+            })    
     # 获取依赖关系
     model_table_names = [t['target_table'] for t in model_tasks]
     dependencies = {}
@@ -725,26 +747,46 @@ def create_execution_plan(**kwargs):
         
         return empty_plan
 
-def process_resource(target_table, script_name, script_exec_mode, exec_date):
+def process_resource(target_table, script_name, script_exec_mode, exec_date,**kwargs):
     """处理单个资源表"""
     task_id = f"resource_{target_table}"
     logger.info(f"===== 开始执行 {task_id} =====")
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
-    
+
     # 确保exec_date是字符串
     if not isinstance(exec_date, str):
         exec_date = str(exec_date)
         logger.info(f"将exec_date转换为字符串: {exec_date}")
+
+        # 获取额外参数
+    target_type = kwargs.get('target_type')
+    storage_location = kwargs.get('storage_location')
+    frequency = kwargs.get('frequency')
     
     try:
-        # 使用新的函数执行脚本,不依赖数据库
+        # 使用新的函数执行脚本,传递相应参数
         logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
-        result = execute_python_script(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
+        
+        # 构建参数字典
+        script_params = {
+            "target_table": target_table,
+            "script_name": script_name,
+            "script_exec_mode": script_exec_mode,
+            "exec_date": exec_date,
+            "frequency": frequency
+        }
+        
+        # 添加特殊参数(如果有)
+        if target_type == "structure":
+            logger.info(f"处理structure类型的资源表,文件路径: {storage_location}")
+            script_params["target_type"] = target_type
+            script_params["storage_location"] = storage_location
+        
+        # logger.debug 打印所有的script_params
+        logger.debug(f"script_params: {script_params}")
+        
+        # 执行脚本
+        result = execute_python_script(**script_params)
         logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
         return result
     except Exception as e:
@@ -756,6 +798,7 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
     finally:
         logger.info(f"===== 结束执行 {task_id} =====")
 
+
 def process_model(target_table, script_name, script_exec_mode, exec_date):
     """处理单个模型表"""
     task_id = f"model_{target_table}"
@@ -1014,19 +1057,30 @@ with DAG(
             
             # 创建安全的任务ID
             safe_table_name = table_name.replace(".", "_").replace("-", "_")
+
+            # 构建op_kwargs参数
+            op_kwargs = {
+                "target_table": table_name,
+                "script_name": script_name,
+                "script_exec_mode": exec_mode,
+                "exec_date": str(exec_date)
+            }
+
+            # 添加特殊参数(如果有)
+            if "target_type" in task_info and task_info["target_type"] == "structure":
+                op_kwargs["target_type"] = task_info["target_type"]
+                op_kwargs["storage_location"] = task_info.get("storage_location")
+            
+            # 添加frequency参数(如果有)
+            if "frequency" in task_info:
+                op_kwargs["frequency"] = task_info["frequency"]
             
             # 确保所有任务都是data_processing_phase的一部分
             with data_group:
                 resource_task = PythonOperator(
                     task_id=f"resource_{safe_table_name}",
                     python_callable=process_resource,
-                    op_kwargs={
-                        "target_table": table_name,
-                        "script_name": script_name,
-                        "script_exec_mode": exec_mode,
-                        # 确保使用字符串而不是可能是默认(非字符串)格式的执行日期
-                        "exec_date": str(exec_date)
-                    },
+                    op_kwargs=op_kwargs,
                     retries=TASK_RETRY_CONFIG["retries"],
                     retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
                 )

+ 90 - 6
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -134,7 +134,8 @@ def get_table_info_from_neo4j(table_name):
             # 查询表标签和状态
             query_table = """
                 MATCH (t {en_name: $table_name})
-                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
+                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
+                       t.type AS type, t.storage_location AS storage_location
             """
             result = session.run(query_table, table_name=table_name)
             record = result.single()
@@ -145,12 +146,37 @@ def get_table_info_from_neo4j(table_name):
                 table_info['target_table_status'] = record.get("status", True)  # 默认为True
                 # table_info['default_update_frequency'] = record.get("frequency")
                 table_info['frequency'] = record.get("frequency")
+                table_info['target_type'] = record.get("type")  # 获取type属性
+                table_info['storage_location'] = record.get("storage_location")  # 获取storage_location属性
                 
                 # 根据标签类型查询关系和脚本信息
                 if "DataResource" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                    # 检查是否为structure类型
+                    if table_info.get('target_type') == "structure":
+                        # 对于structure类型,设置默认值,不查询关系
+                        table_info['source_table'] = None
+                        table_info['script_name'] = "load_file.py"
+                        table_info['script_type'] = "python"
+                        
+                        # 获取执行模式,注意csv类型的DataResource,它没有上游,所以exec_mode属性只能被写到节点上。
+                        query_exec_mode = """
+                            MATCH (t {en_name: $table_name})
+                            RETURN t.script_exec_mode AS script_exec_mode
+                        """
+                        result = session.run(query_exec_mode, table_name=table_name)
+                        record = result.single()
+                        if record and record.get("script_exec_mode"):
+                            table_info['script_exec_mode'] = record.get("script_exec_mode")
+                        else:
+                            # 如果没有找到执行模式,使用默认值
+                            table_info['script_exec_mode'] = "append"
+                            logger.info(f"表 {table_name} 未指定执行模式,使用默认值: append")
+
+                        return table_info
+                    else:
+                        query_rel = """
+                            MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                            RETURN source.en_name AS source_table, rel.script_name AS script_name,
                                rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
                     """
                 elif "DataModel" in labels:
@@ -233,6 +259,57 @@ def process_dependencies(tables_info):
     
     return list(all_tables.values())
 
+def process_resource(target_table, script_name, script_exec_mode, exec_date, **kwargs):
+    """处理单个资源表"""
+    task_id = f"resource_{target_table}"
+    logger.info(f"===== 开始执行 {task_id} =====")
+    logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
+    
+    # 确保exec_date是字符串
+    if not isinstance(exec_date, str):
+        exec_date = str(exec_date)
+        logger.info(f"将exec_date转换为字符串: {exec_date}")
+    
+    # 获取额外参数
+    target_type = kwargs.get('target_type')
+    storage_location = kwargs.get('storage_location')
+    frequency = kwargs.get('frequency')
+    
+    try:
+        # 使用新的函数执行脚本,传递相应参数
+        logger.info(f"调用execute_python_script: target_table={target_table}, script_name={script_name}")
+        
+        # 构建参数字典
+        script_params = {
+            "target_table": target_table,
+            "script_name": script_name,
+            "script_exec_mode": script_exec_mode,
+            "exec_date": exec_date
+        }
+        
+        # 添加特殊参数(如果有)
+        if target_type == "structure":
+            logger.info(f"处理structure类型的资源表,文件路径: {storage_location}")
+            script_params["target_type"] = target_type
+            script_params["storage_location"] = storage_location
+        
+        if frequency:
+            script_params["frequency"] = frequency
+        
+        # 执行脚本
+        result = execute_python_script(**script_params)
+        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
+        return result
+    except Exception as e:
+        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        logger.info(f"===== 结束执行 {task_id} (失败) =====")
+        return False
+    finally:
+        logger.info(f"===== 结束执行 {task_id} =====")
+        
+
 def filter_invalid_tables(tables_info):
     """过滤无效表及其依赖,使用NetworkX构建依赖图"""
     # 构建表名到索引的映射
@@ -555,14 +632,21 @@ def prepare_pipeline_dag_schedule(**kwargs):
         
         for table in valid_tables:
             if table.get('target_table_label') == 'DataResource':
-                resource_tasks.append({
+                task_info = {
                     "source_table": table.get('source_table'),
                     "target_table": table['target_table'],
                     "target_table_label": "DataResource",
                     "script_name": table.get('script_name'),
                     "script_exec_mode": table.get('script_exec_mode', 'append'),
                     "frequency": table.get('frequency')
-                })
+                }
+                # 为structure类型添加特殊属性
+                if table.get('target_type') == "structure":
+                    task_info["target_type"] = "structure"
+                    task_info["storage_location"] = table.get('storage_location')  
+                                  
+                resource_tasks.append(task_info)
+
             elif table.get('target_table_label') == 'DataModel':
                 model_tasks.append({
                     "source_table": table.get('source_table'),