Browse Source

准备修改load_file.py

wangxq 3 weeks ago
parent
commit
d9ddddb866
1 changed files with 37 additions and 255 deletions
  1. 37 255
      dags/dataops_productline_execute_dag.py

+ 37 - 255
dags/dataops_productline_execute_dag.py

@@ -76,245 +76,19 @@ def get_cn_exec_date(logical_date):
 
 
     返回:
     返回:
         logical_exec_date: 逻辑执行日期,北京时间
         logical_exec_date: 逻辑执行日期,北京时间
+        local_logical_date: 北京时区的logical_date
     """
     """
     # 获取逻辑执行日期
     # 获取逻辑执行日期
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
-    return exec_date
+    return exec_date, local_logical_date
 
 
 
 
 #############################################
 #############################################
 # 脚本执行函数
 # 脚本执行函数
 #############################################
 #############################################
 
 
-def execute_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
-    """
-    执行单个脚本并返回执行结果
-    
-    参数:
-        script_id: 脚本ID
-        script_name: 脚本文件名
-        target_table: 目标表名
-        exec_date: 执行日期
-        script_exec_mode: 执行模式
-        **kwargs: 其他参数,如source_tables、target_type等
-    
-    返回:
-        bool: 脚本执行结果
-    """
-    # 添加详细日志
-    logger.info(f"===== 开始执行脚本 {script_id} =====")
-    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
-    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
-    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
-
-    # 记录额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
-
-    # 检查script_name是否为空
-    if not script_name:
-        logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
-        return False
-        
-    # 记录执行开始时间
-    start_time = datetime.now()
-    
-    try:
-        # 导入和执行脚本模块
-        import importlib.util
-        import sys
-        script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
-        
-        # 获取脚本类型
-        script_type = kwargs.get('script_type', 'python_script')
-        
-        # 只有当脚本类型为 sql_script 或 python_script 时才检查文件是否存在
-        if script_type in ['sql_script', 'python_script']:
-            if not os.path.exists(script_path):
-                logger.error(f"脚本文件不存在: {script_path}")
-                return False
-            
-            # 动态导入模块
-            spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
-            module = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(module)
-        else:
-            # 对于其他类型,例如默认python类型,我们将使用其他执行方式
-            logger.info(f"脚本类型为 {script_type},不检查脚本文件是否存在")
-            # 这里我们将直接退出,因为对于python类型应使用execute_python_script函数
-            logger.error(f"脚本类型为 {script_type},应使用专用函数执行,而不是execute_script")
-            return False
-        
-        # 检查并调用标准入口函数run
-        if hasattr(module, "run"):
-            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            # 构建完整的参数字典
-            run_params = {
-                "table_name": target_table,
-                "execution_mode": script_exec_mode,
-                "exec_date": exec_date
-            }
-
-            ## 添加可能的额外参数
-            for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
-                if key in kwargs and kwargs[key] is not None:
-                    run_params[key] = kwargs[key] 
-
-            # 调用脚本的run函数
-            logger.info(f"调用run函数并传递参数: {run_params}")
-            result = module.run(**run_params)
-            logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
-            
-            # 确保result是布尔值
-            if result is None:
-                logger.warning(f"脚本返回值为None,转换为False")
-                result = False
-            elif not isinstance(result, bool):
-                original_result = result
-                result = bool(result)
-                logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
-            
-            # 记录结束时间和结果
-            end_time = datetime.now()
-            duration = (end_time - start_time).total_seconds()
-            logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
-            
-            return result
-        else:
-            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
-            return False
-    except Exception as e:
-        # 处理异常
-        logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
-        logger.info(f"===== 脚本执行异常结束 =====")
-        import traceback
-        logger.error(traceback.format_exc())
-        
-        # 确保不会阻塞DAG
-        return False
-
-def execute_sql_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
-    """
-    执行SQL脚本并返回执行结果
-    
-    参数:
-        script_id: 脚本ID
-        script_name: 脚本名称
-        target_table: 目标表名
-        exec_date: 执行日期
-        script_exec_mode: 执行模式
-        **kwargs: 其他参数
-    
-    返回:
-        bool: 脚本执行结果
-    """
-    # 添加详细日志
-    logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
-    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
-    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
-    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
-    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
-
-    # 记录额外参数
-    for key, value in kwargs.items():
-        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
-
-    # 记录执行开始时间
-    start_time = datetime.now()
-
-    try:
-        # 导入和执行execution_sql模块
-        import importlib.util
-        import sys
-        exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
-
-        # 对于SQL类型的脚本,我们不检查它是否作为文件存在
-        # 但是我们需要检查execution_sql.py是否存在
-        if not os.path.exists(exec_sql_path):
-            logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
-            return False
-
-        # 动态导入execution_sql模块
-        try:
-            spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
-            exec_sql_module = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(exec_sql_module)
-            logger.info(f"成功导入 execution_sql 模块")
-        except Exception as import_err:
-            logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
-            import traceback
-            logger.error(traceback.format_exc())
-            return False
-
-        # 检查并调用标准入口函数run
-        if hasattr(exec_sql_module, "run"):
-            logger.info(f"调用执行SQL脚本的标准入口函数 run()")
-
-            # 获取频率参数
-            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
-            
-            # 构建完整的参数字典
-            run_params = {
-                "script_type": "sql",
-                "target_table": target_table,
-                "script_name": script_name,
-                "exec_date": exec_date,
-                "frequency": frequency,
-                "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
-                "execution_mode": script_exec_mode  # 传递执行模式参数
-            }
-
-            # 添加可能的额外参数
-            for key in ['target_type', 'storage_location', 'source_tables']:
-                if key in kwargs and kwargs[key] is not None:
-                    run_params[key] = kwargs[key]
-
-            # 调用execution_sql.py的run函数
-            logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
-            result = exec_sql_module.run(**run_params)
-            logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
-
-            # 确保result是布尔值
-            if result is None:
-                logger.warning(f"SQL脚本返回值为None,转换为False")
-                result = False
-            elif not isinstance(result, bool):
-                original_result = result
-                result = bool(result)
-                logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
-
-            # 记录结束时间和结果
-            end_time = datetime.now()
-            duration = (end_time - start_time).total_seconds()
-            logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
-
-            return result
-        else:
-            logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
-            return False
-
-    except Exception as e:
-        # 处理异常
-        logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
-        logger.info(f"===== SQL脚本执行异常结束 =====")
-        import traceback
-        logger.error(traceback.format_exc())
-        
-        # 确保不会阻塞DAG
-        return False
-
-# 重命名此函数为execute_python_script
-def execute_python_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+def execute_python_script(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
     """
     """
     执行Python脚本文件并返回执行结果
     执行Python脚本文件并返回执行结果
     
     
@@ -322,20 +96,25 @@ def execute_python_script(script_id, script_name, target_table, exec_date, scrip
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本文件名(.py文件)
         script_name: 脚本文件名(.py文件)
         target_table: 目标表名
         target_table: 目标表名
-        exec_date: 执行日期
         script_exec_mode: 执行模式
         script_exec_mode: 执行模式
+        frequency: 执行频率
         **kwargs: 其他参数,如source_tables、target_type等
         **kwargs: 其他参数,如source_tables、target_type等
     
     
     返回:
     返回:
         bool: 脚本执行结果
         bool: 脚本执行结果
     """
     """
+    # 获取执行日期
+    logical_date = kwargs.get('logical_date', datetime.now())
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
+    
     # 添加详细日志
     # 添加详细日志
     logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
     logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"【时间参数】execute_python_script: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
     for key, value in kwargs.items():
     for key, value in kwargs.items():
@@ -371,11 +150,12 @@ def execute_python_script(script_id, script_name, target_table, exec_date, scrip
             run_params = {
             run_params = {
                 "table_name": target_table,
                 "table_name": target_table,
                 "execution_mode": script_exec_mode,
                 "execution_mode": script_exec_mode,
-                "exec_date": exec_date
+                "exec_date": exec_date,
+                "frequency": frequency
             }
             }
 
 
             ## 添加可能的额外参数
             ## 添加可能的额外参数
-            for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
+            for key in ['target_type', 'storage_location', 'source_tables']:
                 if key in kwargs and kwargs[key] is not None:
                 if key in kwargs and kwargs[key] is not None:
                     run_params[key] = kwargs[key] 
                     run_params[key] = kwargs[key] 
 
 
@@ -416,7 +196,7 @@ def execute_python_script(script_id, script_name, target_table, exec_date, scrip
         return False
         return False
 
 
 # 使用execute_sql函数代替之前的execute_sql_script
 # 使用execute_sql函数代替之前的execute_sql_script
-def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+def execute_sql(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
     """
     """
     执行SQL脚本并返回执行结果
     执行SQL脚本并返回执行结果
     
     
@@ -424,20 +204,25 @@ def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mod
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本名称(数据库中的名称)
         script_name: 脚本名称(数据库中的名称)
         target_table: 目标表名
         target_table: 目标表名
-        exec_date: 执行日期
         script_exec_mode: 执行模式
         script_exec_mode: 执行模式
+        frequency: 执行频率
         **kwargs: 其他参数
         **kwargs: 其他参数
     
     
     返回:
     返回:
         bool: 脚本执行结果
         bool: 脚本执行结果
     """
     """
+    # 获取执行日期
+    logical_date = kwargs.get('logical_date', datetime.now())
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
+    
     # 添加详细日志
     # 添加详细日志
     logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
     logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"【时间参数】execute_sql: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
     for key, value in kwargs.items():
     for key, value in kwargs.items():
@@ -473,9 +258,6 @@ def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mod
         # 检查并调用标准入口函数run
         # 检查并调用标准入口函数run
         if hasattr(exec_sql_module, "run"):
         if hasattr(exec_sql_module, "run"):
             logger.info(f"调用执行SQL脚本的标准入口函数 run()")
             logger.info(f"调用执行SQL脚本的标准入口函数 run()")
-
-            # 获取频率参数
-            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
             
             
             # 构建完整的参数字典
             # 构建完整的参数字典
             run_params = {
             run_params = {
@@ -531,7 +313,7 @@ def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mod
         return False
         return False
 
 
 # 使用execute_python函数代替之前的execute_python_script
 # 使用execute_python函数代替之前的execute_python_script
-def execute_python(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+def execute_python(script_id, script_name, target_table, script_exec_mode, frequency, **kwargs):
     """
     """
     执行Python脚本并返回执行结果
     执行Python脚本并返回执行结果
     
     
@@ -539,20 +321,25 @@ def execute_python(script_id, script_name, target_table, exec_date, script_exec_
         script_id: 脚本ID
         script_id: 脚本ID
         script_name: 脚本名称(数据库中的名称)
         script_name: 脚本名称(数据库中的名称)
         target_table: 目标表名
         target_table: 目标表名
-        exec_date: 执行日期
         script_exec_mode: 执行模式
         script_exec_mode: 执行模式
+        frequency: 执行频率
         **kwargs: 其他参数
         **kwargs: 其他参数
     
     
     返回:
     返回:
         bool: 脚本执行结果
         bool: 脚本执行结果
     """
     """
+    # 获取执行日期
+    logical_date = kwargs.get('logical_date', datetime.now())
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
+    
     # 添加详细日志
     # 添加详细日志
     logger.info(f"===== 开始执行Python脚本 {script_id} =====")
     logger.info(f"===== 开始执行Python脚本 {script_id} =====")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
-    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+    logger.info(f"frequency: {frequency}, 类型: {type(frequency)}")
+    logger.info(f"【时间参数】execute_python: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
 
 
     # 记录额外参数
     # 记录额外参数
     for key, value in kwargs.items():
     for key, value in kwargs.items():
@@ -588,9 +375,6 @@ def execute_python(script_id, script_name, target_table, exec_date, script_exec_
         # 检查并调用标准入口函数run
         # 检查并调用标准入口函数run
         if hasattr(exec_python_module, "run"):
         if hasattr(exec_python_module, "run"):
             logger.info(f"调用执行Python脚本的标准入口函数 run()")
             logger.info(f"调用执行Python脚本的标准入口函数 run()")
-
-            # 获取频率参数
-            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
             
             
             # 构建完整的参数字典
             # 构建完整的参数字典
             run_params = {
             run_params = {
@@ -742,8 +526,7 @@ def check_execution_plan(**kwargs):
     dag_run = kwargs.get('dag_run')
     dag_run = kwargs.get('dag_run')
     logical_date = dag_run.logical_date
     logical_date = dag_run.logical_date
 
 
-    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    exec_date, local_logical_date = get_cn_exec_date(logical_date)
     
     
     # 检查是否是手动触发
     # 检查是否是手动触发
     is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
     is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
@@ -855,7 +638,7 @@ def create_execution_plan(**kwargs):
     try:
     try:
         dag_run = kwargs.get('dag_run')
         dag_run = kwargs.get('dag_run')
         logical_date = dag_run.logical_date
         logical_date = dag_run.logical_date
-        exec_date = get_cn_exec_date(logical_date)
+        exec_date, local_logical_date = get_cn_exec_date(logical_date)
         
         
         # 检查是否是手动触发
         # 检查是否是手动触发
         is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
         is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
@@ -863,7 +646,7 @@ def create_execution_plan(**kwargs):
             logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
             logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
         
         
         # 记录重要的时间参数
         # 记录重要的时间参数
-        logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+        logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}")
         
         
         # 从XCom获取执行计划
         # 从XCom获取执行计划
         execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
         execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
@@ -1025,13 +808,15 @@ with DAG(
                     "script_id": script_id,
                     "script_id": script_id,
                     "script_name": script_name,
                     "script_name": script_name,
                     "target_table": target_table,
                     "target_table": target_table,
-                    "exec_date": str(exec_date),
                     "script_exec_mode": script_exec_mode,
                     "script_exec_mode": script_exec_mode,
-                    "source_tables": source_tables
+                    "source_tables": source_tables,
+                    "frequency": script.get("frequency", "daily"),  # 显式添加frequency参数
+                    "target_table_label": target_table_label,
+                    # logical_date会在任务执行时由Airflow自动添加
                 }
                 }
                 
                 
                 # 添加特殊参数(如果有)
                 # 添加特殊参数(如果有)
-                for key in ['target_type', 'storage_location', 'frequency']:
+                for key in ['target_type', 'storage_location']:
                     if key in script and script[key] is not None:
                     if key in script and script[key] is not None:
                         op_kwargs[key] = script[key]
                         op_kwargs[key] = script[key]
                 
                 
@@ -1053,9 +838,6 @@ with DAG(
                     logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
                     logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
                     python_callable = execute_python_script
                     python_callable = execute_python_script
                     
                     
-                # 确保target_table_label被传递
-                op_kwargs["target_table_label"] = target_table_label
-                
                 # 创建任务
                 # 创建任务
                 script_task = PythonOperator(
                 script_task = PythonOperator(
                     task_id=task_id,
                     task_id=task_id,