Selaa lähdekoodia

调通了python代码块的调用功能

wangxq 3 viikkoa sitten
vanhempi
commit
111c81a3fa

+ 3 - 0
dags/config.py

@@ -43,3 +43,6 @@ STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH ="/data/archive"
 
 # 执行计划保留的数量
 EXECUTION_PLAN_KEEP_COUNT = 5
+
+# ETL作业幂等性开关
+ENABLE_ETL_IDEMPOTENCY = True

+ 504 - 4
dags/dataops_productline_execute_dag.py

@@ -25,6 +25,8 @@ from common import (
 )
 from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
 import pytz
+import pandas as pd
+import sys
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -63,6 +65,21 @@ class DecimalEncoder(json.JSONEncoder):
             return obj.isoformat()
         # 让父类处理其他类型
         return super(DecimalEncoder, self).default(obj)
+    
+
+def get_logical_exec_date(logical_date):
+    """
+    获取逻辑执行日期
+    
+    参数:
+        logical_date: 逻辑执行日期,UTC时间
+
+    返回:
+        logical_exec_date: 逻辑执行日期,北京时间
+    """
+    # 获取逻辑执行日期
+    return logical_date.in_timezone('Asia/Shanghai').to_date_string()
+
 
 #############################################
 # 脚本执行函数
@@ -91,6 +108,237 @@ def execute_script(script_id, script_name, target_table, exec_date, script_exec_
     logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
     logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
 
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
+    # 获取逻辑执行日期
+    logical_exec_date = get_logical_exec_date(exec_date)
+    logger.info(f"逻辑执行日期: {logical_exec_date}")
+
+    # 检查script_name是否为空
+    if not script_name:
+        logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
+        return False
+        
+    # 记录执行开始时间
+    start_time = datetime.now()
+    
+    try:
+        # 导入和执行脚本模块
+        import importlib.util
+        import sys
+        script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
+        
+        # 获取脚本类型
+        script_type = kwargs.get('script_type', 'python_script')
+        
+        # 只有当脚本类型为 sql_script 或 python_script 时才检查文件是否存在
+        if script_type in ['sql_script', 'python_script']:
+            if not os.path.exists(script_path):
+                logger.error(f"脚本文件不存在: {script_path}")
+                return False
+            
+            # 动态导入模块
+            spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
+            module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(module)
+        else:
+            # 对于其他类型,例如默认python类型,我们将使用其他执行方式
+            logger.info(f"脚本类型为 {script_type},不检查脚本文件是否存在")
+            # 这里我们将直接退出,因为对于python类型应使用execute_python_script函数
+            logger.error(f"脚本类型为 {script_type},应使用专用函数执行,而不是execute_script")
+            return False
+        
+        # 检查并调用标准入口函数run
+        if hasattr(module, "run"):
+            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
+            # 构建完整的参数字典
+            run_params = {
+                "table_name": target_table,
+                "execution_mode": script_exec_mode,
+                "exec_date": exec_date
+            }
+
+            ## 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key] 
+
+            # 调用脚本的run函数
+            logger.info(f"调用run函数并传递参数: {run_params}")
+            result = module.run(**run_params)
+            logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+            
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+            
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+            
+            return result
+        else:
+            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
+            return False
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== 脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 确保不会阻塞DAG
+        return False
+
+def execute_sql_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+    """
+    执行SQL脚本并返回执行结果
+    
+    参数:
+        script_id: 脚本ID
+        script_name: 脚本名称
+        target_table: 目标表名
+        exec_date: 执行日期
+        script_exec_mode: 执行模式
+        **kwargs: 其他参数
+    
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
+    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
+    # 记录执行开始时间
+    start_time = datetime.now()
+
+    try:
+        # 导入和执行execution_sql模块
+        import importlib.util
+        import sys
+        exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
+
+        # 对于SQL类型的脚本,我们不检查它是否作为文件存在
+        # 但是我们需要检查execution_sql.py是否存在
+        if not os.path.exists(exec_sql_path):
+            logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
+            return False
+
+        # 动态导入execution_sql模块
+        try:
+            spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
+            exec_sql_module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(exec_sql_module)
+            logger.info(f"成功导入 execution_sql 模块")
+        except Exception as import_err:
+            logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            return False
+
+        # 检查并调用标准入口函数run
+        if hasattr(exec_sql_module, "run"):
+            logger.info(f"调用执行SQL脚本的标准入口函数 run()")
+
+            # 获取频率参数
+            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
+            
+            # 构建完整的参数字典
+            run_params = {
+                "script_type": "sql",
+                "target_table": target_table,
+                "script_name": script_name,
+                "exec_date": exec_date,
+                "frequency": frequency,
+                "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
+                "execution_mode": script_exec_mode  # 传递执行模式参数
+            }
+
+            # 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'source_tables']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key]
+
+            # 调用execution_sql.py的run函数
+            logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
+            result = exec_sql_module.run(**run_params)
+            logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"SQL脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+
+            return result
+        else:
+            logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
+            return False
+
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== SQL脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 确保不会阻塞DAG
+        return False
+
+# 重命名此函数为execute_python_script
+def execute_python_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+    """
+    执行Python脚本文件并返回执行结果
+    
+    参数:
+        script_id: 脚本ID
+        script_name: 脚本文件名(.py文件)
+        target_table: 目标表名
+        exec_date: 执行日期
+        script_exec_mode: 执行模式
+        **kwargs: 其他参数,如source_tables、target_type等
+    
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行Python脚本文件 {script_id} =====")
+    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
     # 记录额外参数
     for key, value in kwargs.items():
         logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
@@ -120,7 +368,7 @@ def execute_script(script_id, script_name, target_table, exec_date, script_exec_
         
         # 检查并调用标准入口函数run
         if hasattr(module, "run"):
-            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
+            logger.info(f"调用脚本文件 {script_name} 的标准入口函数 run()")
             # 构建完整的参数字典
             run_params = {
                 "table_name": target_table,
@@ -157,7 +405,7 @@ def execute_script(script_id, script_name, target_table, exec_date, script_exec_
             logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
             return False
     except Exception as e:
-        # a处理异常
+        # 处理异常
         logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
         end_time = datetime.now()
         duration = (end_time - start_time).total_seconds()
@@ -169,6 +417,236 @@ def execute_script(script_id, script_name, target_table, exec_date, script_exec_
         # 确保不会阻塞DAG
         return False
 
+# 使用execute_sql函数代替之前的execute_sql_script
+def execute_sql(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+    """
+    执行SQL脚本并返回执行结果
+    
+    参数:
+        script_id: 脚本ID
+        script_name: 脚本名称(数据库中的名称)
+        target_table: 目标表名
+        exec_date: 执行日期
+        script_exec_mode: 执行模式
+        **kwargs: 其他参数
+    
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行SQL脚本 {script_id} =====")
+    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
+    # 记录执行开始时间
+    start_time = datetime.now()
+
+    try:
+        # 导入和执行execution_sql模块
+        import importlib.util
+        import sys
+        exec_sql_path = os.path.join(SCRIPTS_BASE_PATH, "execution_sql.py")
+
+        # 对于SQL类型的脚本,我们不检查它是否作为文件存在
+        # 但是我们需要检查execution_sql.py是否存在
+        if not os.path.exists(exec_sql_path):
+            logger.error(f"SQL执行脚本文件不存在: {exec_sql_path}")
+            return False
+
+        # 动态导入execution_sql模块
+        try:
+            spec = importlib.util.spec_from_file_location("execution_sql", exec_sql_path)
+            exec_sql_module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(exec_sql_module)
+            logger.info(f"成功导入 execution_sql 模块")
+        except Exception as import_err:
+            logger.error(f"导入 execution_sql 模块时出错: {str(import_err)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            return False
+
+        # 检查并调用标准入口函数run
+        if hasattr(exec_sql_module, "run"):
+            logger.info(f"调用执行SQL脚本的标准入口函数 run()")
+
+            # 获取频率参数
+            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
+            
+            # 构建完整的参数字典
+            run_params = {
+                "script_type": "sql",
+                "target_table": target_table,
+                "script_name": script_name,
+                "exec_date": exec_date,
+                "frequency": frequency,
+                "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签,用于ETL幂等性判断
+                "execution_mode": script_exec_mode  # 传递执行模式参数
+            }
+
+            # 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'source_tables']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key]
+
+            # 调用execution_sql.py的run函数
+            logger.info(f"调用SQL执行脚本的run函数并传递参数: {run_params}")
+            result = exec_sql_module.run(**run_params)
+            logger.info(f"SQL脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"SQL脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"SQL脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"SQL脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+
+            return result
+        else:
+            logger.error(f"执行SQL脚本 execution_sql.py 中未定义标准入口函数 run(),无法执行")
+            return False
+
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行SQL脚本 {script_id} 出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"SQL脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== SQL脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 确保不会阻塞DAG
+        return False
+
+# 使用execute_python函数代替之前的execute_python_script
+def execute_python(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+    """
+    执行Python脚本并返回执行结果
+    
+    参数:
+        script_id: 脚本ID
+        script_name: 脚本名称(数据库中的名称)
+        target_table: 目标表名
+        exec_date: 执行日期
+        script_exec_mode: 执行模式
+        **kwargs: 其他参数
+    
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行Python脚本 {script_id} =====")
+    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
+    # 记录执行开始时间
+    start_time = datetime.now()
+
+    try:
+        # 导入和执行execution_python模块
+        import importlib.util
+        import sys
+        exec_python_path = os.path.join(SCRIPTS_BASE_PATH, "execution_python.py")
+
+        # 对于Python类型的脚本,我们不检查它是否作为文件存在
+        # 但是我们需要检查execution_python.py是否存在
+        if not os.path.exists(exec_python_path):
+            logger.error(f"Python执行脚本文件不存在: {exec_python_path}")
+            return False
+
+        # 动态导入execution_python模块
+        try:
+            spec = importlib.util.spec_from_file_location("execution_python", exec_python_path)
+            exec_python_module = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(exec_python_module)
+            logger.info(f"成功导入 execution_python 模块")
+        except Exception as import_err:
+            logger.error(f"导入 execution_python 模块时出错: {str(import_err)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            return False
+
+        # 检查并调用标准入口函数run
+        if hasattr(exec_python_module, "run"):
+            logger.info(f"调用执行Python脚本的标准入口函数 run()")
+
+            # 获取频率参数
+            frequency = kwargs.get('frequency', 'daily')  # 默认为daily
+            
+            # 构建完整的参数字典
+            run_params = {
+                "script_type": "python",
+                "target_table": target_table,
+                "script_name": script_name,
+                "exec_date": exec_date,
+                "frequency": frequency,
+                "target_table_label": kwargs.get('target_table_label', ''), # 传递目标表标签
+                "execution_mode": script_exec_mode  # 传递执行模式参数
+            }
+
+            # 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'source_tables']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key]
+
+            # 调用execution_python.py的run函数
+            logger.info(f"调用Python执行脚本的run函数并传递参数: {run_params}")
+            result = exec_python_module.run(**run_params)
+            logger.info(f"Python脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"Python脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"Python脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"Python脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+
+            return result
+        else:
+            logger.error(f"执行Python脚本 execution_python.py 中未定义标准入口函数 run(),无法执行")
+            return False
+
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行Python脚本 {script_id} 出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"Python脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== Python脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 确保不会阻塞DAG
+        return False
+
 #############################################
 # 执行计划获取和处理函数
 #############################################
@@ -446,7 +924,7 @@ with DAG(
     now = datetime.now()
     now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
     default_exec_date = get_today_date()
-    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
+    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期(用于初始化,非实际执行日期): {default_exec_date}")
     
     #############################################
     # 准备阶段: 检查并创建执行计划
@@ -528,6 +1006,7 @@ with DAG(
                 script_type = script.get("script_type", "python")
                 script_exec_mode = script.get("script_exec_mode", "append")
                 source_tables = script.get("source_tables", [])
+                target_table_label = script.get("target_table_label", "")
                 
                 # 使用描述性的任务ID,包含脚本名称和目标表
                 # 提取文件名
@@ -558,10 +1037,31 @@ with DAG(
                     if key in script and script[key] is not None:
                         op_kwargs[key] = script[key]
                 
+                # 根据脚本类型和目标表标签选择执行函数
+                if script_type.lower() == 'sql' and target_table_label == 'DataModel':
+                    # 使用SQL脚本执行函数
+                    logger.info(f"脚本 {script_id} 是SQL类型且目标表标签为DataModel,使用execute_sql函数执行")
+                    python_callable = execute_sql
+                elif script_type.lower() == 'python' and target_table_label == 'DataModel':
+                    # 使用Python脚本执行函数
+                    logger.info(f"脚本 {script_id} 是Python类型且目标表标签为DataModel,使用execute_python函数执行")
+                    python_callable = execute_python
+                elif script_type.lower() == 'python_script':
+                    # 使用Python脚本文件执行函数
+                    logger.info(f"脚本 {script_id} 是python_script类型,使用execute_python_script函数执行")
+                    python_callable = execute_python_script
+                else:
+                    # 默认使用Python脚本文件执行函数
+                    logger.warning(f"未识别的脚本类型 {script_type},使用默认execute_python_script函数执行")
+                    python_callable = execute_python_script
+                    
+                # 确保target_table_label被传递
+                op_kwargs["target_table_label"] = target_table_label
+                
                 # 创建任务
                 script_task = PythonOperator(
                     task_id=task_id,
-                    python_callable=execute_script,
+                    python_callable=python_callable,
                     op_kwargs=op_kwargs,
                     retries=TASK_RETRY_CONFIG["retries"],
                     retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])

+ 447 - 0
dataops/scripts/execution_python.py

@@ -0,0 +1,447 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import sys
+import os
+import logging
+from datetime import datetime
+import psycopg2
+import textwrap
+from airflow.exceptions import AirflowException
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[logging.StreamHandler(sys.stdout)]
+)
+logger = logging.getLogger("execution_python")
+
+# 将同级目录加入到Python搜索路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+if current_dir not in sys.path:
+    sys.path.append(current_dir)
+
+# 尝试导入script_utils,使用多级导入策略
+try:
+    import script_utils
+    logger.info("成功导入script_utils模块")
+except ImportError as e:
+    logger.error(f"无法直接导入script_utils: {str(e)}")
+    
+    # 尝试备用方法1:完整路径导入
+    try:
+        sys.path.append(os.path.dirname(current_dir))  # 添加父目录
+        import dataops.scripts.script_utils as script_utils
+        logger.info("使用完整路径成功导入script_utils模块")
+    except ImportError as e2:
+        logger.error(f"使用完整路径导入失败: {str(e2)}")
+        
+        # 尝试备用方法2:动态导入
+        try:
+            import importlib.util
+            script_utils_path = os.path.join(current_dir, "script_utils.py")
+            logger.info(f"尝试从路径动态导入: {script_utils_path}")
+            
+            spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
+            script_utils = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(script_utils)
+            logger.info("通过动态导入成功加载script_utils模块")
+        except Exception as e3:
+            logger.error(f"动态导入也失败: {str(e3)}")
+            raise ImportError(f"无法导入script_utils模块,所有方法都失败")
+
+# 动态导入 config
+def get_config():
+    """
+    从config模块导入配置
+    
+    返回:
+        dict: PG_CONFIG 数据库连接配置
+    """
+    # 默认配置
+    default_pg_config = {
+        "host": "localhost",
+        "port": 5432,
+        "user": "postgres",
+        "password": "postgres",
+        "database": "dataops"
+    }
+    try:
+        config = __import__('config')
+        logger.info("从config模块直接导入配置")
+        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
+        return pg_config
+    except ImportError:
+        logger.warning("未找到 config.py,使用默认数据库配置")
+        return default_pg_config
+
+# 导入配置
+PG_CONFIG = get_config()
+logger.info(f"配置加载完成: 数据库连接={PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}")
+
+def get_pg_conn():
+    """获取PostgreSQL连接"""
+    return psycopg2.connect(**PG_CONFIG)
+
+def get_python_script(target_table, script_name):
+    """
+    从data_transform_scripts表中获取Python脚本内容和目标日期列
+    
+    参数:
+        target_table (str): 目标表名
+        script_name (str): 脚本名称
+    
+    返回:
+        tuple: (script_content, target_dt_column) 脚本内容和目标日期列
+    """
+    logger.info(f"加载Python脚本: target_table={target_table}, script_name={script_name}")
+    conn = None
+    cursor = None
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        query = """
+            SELECT script_content, target_dt_column
+            FROM data_transform_scripts
+            WHERE target_table = %s AND script_name = %s LIMIT 1
+        """
+        
+        logger.info(f"执行SQL查询: {query}")
+        logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")
+        
+        cursor.execute(query, (target_table, script_name))
+        result = cursor.fetchone()
+        
+        if result is None:
+            logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
+            return None, None
+        
+        # 获取脚本内容和目标日期列
+        script_content = result[0]
+        target_dt_column = result[1] if len(result) > 1 else None
+        
+        # 记录结果
+        logger.info(f"目标日期列: {target_dt_column if target_dt_column else '未设置'}")
+        
+        # 记录脚本内容,但可能很长,只记录前500个字符和后100个字符
+        if len(script_content) > 600:
+            logger.info(f"成功获取脚本内容,总长度: {len(script_content)}字符")
+            logger.info(f"脚本内容前500字符: \n{script_content[:500]}")
+        else:
+            logger.info(f"成功获取脚本内容,内容如下: \n{script_content}")
+        
+        return script_content, target_dt_column
+    except Exception as e:
+        logger.error(f"查询脚本出错: {str(e)}", exc_info=True)
+        return None, None
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def execute_sql(sql, params=None):
+    """
+    执行SQL语句
+    
+    参数:
+        sql (str): SQL语句
+        params (dict, optional): SQL参数
+    
+    返回:
+        tuple: (成功标志, 影响的行数或结果)
+    """
+    conn = None
+    cursor = None
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        # 记录SQL(不包含敏感参数)
+        # 由于SQL可能很长,只记录前200个字符
+        if len(sql) > 200:
+            logger.info(f"执行SQL (前200字符): {sql[:200]}...")
+        else:
+            logger.info(f"执行SQL: {sql}")
+            
+        if params:
+            logger.info(f"SQL参数: {params}")
+        
+        # 执行SQL
+        cursor.execute(sql, params)
+        
+        # 获取影响的行数
+        if cursor.rowcount >= 0:
+            affected_rows = cursor.rowcount
+            logger.info(f"SQL执行成功,影响了 {affected_rows} 行")
+        else:
+            affected_rows = 0
+            logger.info("SQL执行成功,但无法确定影响的行数")
+        
+        # 如果是SELECT语句,获取结果
+        if sql.strip().upper().startswith("SELECT"):
+            result = cursor.fetchall()
+            logger.info(f"查询返回 {len(result)} 行结果")
+            conn.commit()
+            return True, {"affected_rows": affected_rows, "result": result}
+        else:
+            # 对于非SELECT语句,提交事务
+            conn.commit()
+            return True, {"affected_rows": affected_rows}
+            
+    except Exception as e:
+        logger.error(f"执行SQL时出错: {str(e)}", exc_info=True)
+        if conn:
+            conn.rollback()
+        return False, {"error": str(e)}
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def run(script_type=None, target_table=None, script_name=None, exec_date=None, frequency=None, **kwargs):
+    """
+    执行Python脚本主入口函数
+    
+    参数:
+        script_type (str): 脚本类型,必须为'python'
+        target_table (str): 目标表名
+        script_name (str): 脚本名称
+        exec_date (str): 执行日期,格式为YYYY-MM-DD
+        frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
+        **kwargs: 其他参数
+    
+    返回:
+        bool: 是否执行成功
+    """
+    # 记录开始执行的时间
+    start_time = datetime.now()
+    logger.info("===== 开始执行 Python 脚本 =====")
+    logger.info(f"脚本类型: {script_type}")
+    logger.info(f"目标表: {target_table}")
+    logger.info(f"脚本名称: {script_name}")
+    logger.info(f"执行日期: {exec_date}")
+    logger.info(f"频率: {frequency}")
+    
+    # 记录其他参数
+    for key, value in kwargs.items():
+        logger.info(f"其他参数 - {key}: {value}")
+
+    # 验证必要参数
+    if not script_type or script_type.lower() != 'python':
+        logger.error(f"脚本类型必须为'python',当前为: {script_type}")
+        return False
+    
+    if not target_table:
+        logger.error("未提供目标表名")
+        return False
+    
+    if not script_name:
+        logger.error("未提供脚本名称")
+        return False
+    
+    if not exec_date:
+        logger.error("未提供执行日期")
+        return False
+    
+    if not frequency:
+        logger.error("未提供频率")
+        return False
+
+    try:
+        # 获取Python脚本和目标日期列
+        script_code, target_dt_column = get_python_script(target_table, script_name)
+        if not script_code:
+            logger.error("未获取到 Python 脚本内容")
+            return False
+        
+        logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
+        
+        # 日期计算
+        try:
+            start_date, end_date = script_utils.get_date_range(exec_date, frequency)
+            logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+        except Exception as date_err:
+            logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
+            return False
+        
+        # 检查是否开启ETL幂等性
+        target_table_label = kwargs.get('target_table_label', '')
+        script_exec_mode = kwargs.get('execution_mode', 'append')  # 默认为append
+        
+        logger.info(f"脚本执行模式: {script_exec_mode}")
+        
+        # 导入config模块获取幂等性开关
+        try:
+            config = __import__('config')
+            enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
+        except ImportError:
+            logger.warning("无法导入config模块获取幂等性开关,默认为False")
+            enable_idempotency = False
+        
+        logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
+        logger.info(f"目标表标签: {target_table_label}")
+        
+        # 如果开启了ETL幂等性处理
+        if enable_idempotency:
+            # 处理append模式
+            if script_exec_mode.lower() == 'append':
+                logger.info("当前为append模式,开始考虑ETL幂等性处理")
+                
+                # 检查是否有目标日期列
+                if target_dt_column:
+                    logger.info(f"找到目标日期列 {target_dt_column},将进行数据清理")
+                    
+                    # 生成DELETE语句
+                    delete_sql = f"""DELETE FROM {target_table}
+WHERE {target_dt_column} >= '{start_date}'
+  AND {target_dt_column} < '{end_date}';"""
+                    
+                    logger.info(f"生成的DELETE语句: {delete_sql}")
+                    
+                    # 执行DELETE SQL
+                    logger.info("执行清理SQL以实现幂等性")
+                    delete_success, delete_result = execute_sql(delete_sql)
+                    
+                    if delete_success:
+                        if isinstance(delete_result, dict) and "affected_rows" in delete_result:
+                            logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                        else:
+                            logger.info("清理SQL执行成功")
+                    else:
+                        logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                        # 继续执行原始Python脚本
+                        logger.warning("继续执行原始Python脚本")
+                else:
+                    logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
+                    logger.warning("将直接执行原始Python脚本,可能导致数据重复")
+            
+            # 处理full_refresh模式
+            elif script_exec_mode.lower() == 'full_refresh':
+                logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
+                
+                # 构建TRUNCATE语句
+                truncate_sql = f"TRUNCATE TABLE {target_table};"
+                logger.info(f"生成的TRUNCATE SQL: {truncate_sql}")
+                
+                # 执行TRUNCATE操作
+                truncate_success, truncate_result = execute_sql(truncate_sql)
+                
+                if truncate_success:
+                    logger.info(f"TRUNCATE TABLE {target_table} 执行成功,表已清空")
+                else:
+                    error_msg = truncate_result.get("error", "未知错误")
+                    logger.error(f"TRUNCATE TABLE执行失败: {error_msg}")
+                    # 继续执行原始Python脚本
+                    logger.warning("TRUNCATE失败,继续执行原始Python脚本")
+            
+            else:
+                logger.info(f"当前执行模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
+        else:
+            logger.info("未开启ETL幂等性,直接执行Python脚本")
+
+        # 准备执行上下文
+        conn = get_pg_conn()  # 在外层先拿到连接
+        exec_globals = {
+            "conn": conn,  # 提供数据库连接对象
+            "execute_sql": execute_sql,   # 提供SQL执行方法
+            "datetime": datetime,         # 提供datetime模块
+            "psycopg2": psycopg2,         # 提供psycopg2模块
+            "os": os,                     # 提供os模块
+            "sys": sys                    # 提供sys模块
+        }
+        
+        exec_locals = {
+            "start_date": start_date,
+            "end_date": end_date,
+            "logger": logger,
+            "target_table": target_table,
+            **kwargs
+        }
+
+        # 安全执行Python片段
+        try:
+            # 开始执行 Python 片段...
+            logger.info("开始执行 Python 片段...")
+            
+            # 执行脚本 - psycopg2自动开始事务
+            exec(textwrap.dedent(script_code), exec_globals, exec_locals)
+            
+            # 检查事务是否仍然处于活动状态
+            # psycopg2没有_in_transaction属性,使用status代替
+            if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
+                conn.commit()
+                logger.info("在外层提交未完成的事务")
+            
+            # 记录执行时间
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info("Python 脚本执行成功")
+            logger.info(f"===== Python脚本执行完成 (成功) =====")
+            logger.info(f"总耗时: {duration:.2f}秒")
+            
+            return True
+        except Exception as e:
+            # 检查事务状态进行回滚
+            if conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
+                conn.rollback()
+                logger.info("事务已回滚")
+            
+            # 记录错误信息
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.error(f"执行 Python 脚本失败: {str(e)}", exc_info=True)
+            logger.info(f"===== Python脚本执行完成 (失败) =====")
+            logger.info(f"总耗时: {duration:.2f}秒")
+            
+            # 将异常重新抛出,让Airflow知道任务失败
+            raise AirflowException(f"Python脚本执行失败: {str(e)}")
+        finally:
+            # 安全关闭连接
+            conn.close()
+            logger.info("数据库连接已关闭")
+    except Exception as e:
+        # 捕获所有未处理的异常
+        logger.error(f"执行Python脚本时发生未预期的错误: {str(e)}", exc_info=True)
+        # 抛出异常,让Airflow知道任务失败
+        raise AirflowException(f"执行Python脚本时发生未预期的错误: {str(e)}")
+
+if __name__ == "__main__":
+    import argparse
+    parser = argparse.ArgumentParser(description='执行Python脚本片段')
+    parser.add_argument('--target-table', type=str, required=True, help='目标表名')
+    parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
+    parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
+    parser.add_argument('--frequency', type=str, required=True, 
+                        choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], 
+                        help='频率: daily, weekly, monthly, quarterly, yearly')
+    parser.add_argument('--execution-mode', type=str, default='append', 
+                        choices=['append', 'full_refresh'], 
+                        help='执行模式: append(追加), full_refresh(全量刷新)')
+    
+    args = parser.parse_args()
+
+    run_kwargs = {
+        "script_type": "python",
+        "target_table": args.target_table,
+        "script_name": args.script_name,
+        "exec_date": args.exec_date,
+        "frequency": args.frequency,
+        "execution_mode": args.execution_mode
+    }
+    
+    logger.info("命令行测试执行参数: " + str(run_kwargs))
+
+    try:
+        success = run(**run_kwargs)
+        if success:
+            logger.info("Python脚本执行成功")
+            sys.exit(0)
+        else:
+            logger.error("Python脚本执行失败")
+            sys.exit(1)
+    except Exception as e:
+        logger.error(f"执行失败: {str(e)}")
+        sys.exit(1)

+ 487 - 0
dataops/scripts/execution_sql.py

@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import sys
+import os
+from datetime import datetime
+import psycopg2
+import jinja2
+# 修改导入方式,避免使用相对导入
+import sys
+import os
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("execution_sql")
+
+# 将同级目录加入到Python搜索路径
+current_dir = os.path.dirname(os.path.abspath(__file__))
+if current_dir not in sys.path:
+    sys.path.append(current_dir)
+
+# 尝试导入script_utils
+try:
+    import script_utils
+    logger.info("成功导入script_utils模块")
+except ImportError as e:
+    logger.error(f"无法直接导入script_utils: {str(e)}")
+    
+    # 尝试备用方法1:完整路径导入
+    try:
+        sys.path.append(os.path.dirname(current_dir))  # 添加父目录
+        import dataops.scripts.script_utils as script_utils
+        logger.info("使用完整路径成功导入script_utils模块")
+    except ImportError as e2:
+        logger.error(f"使用完整路径导入失败: {str(e2)}")
+        
+        # 尝试备用方法2:动态导入
+        try:
+            import importlib.util
+            script_utils_path = os.path.join(current_dir, "script_utils.py")
+            logger.info(f"尝试从路径动态导入: {script_utils_path}")
+            
+            spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
+            script_utils = importlib.util.module_from_spec(spec)
+            spec.loader.exec_module(script_utils)
+            logger.info("通过动态导入成功加载script_utils模块")
+        except Exception as e3:
+            logger.error(f"动态导入也失败: {str(e3)}")
+            raise ImportError(f"无法导入script_utils模块,所有方法都失败")
+
+# 添加健壮的导入机制
+def get_config():
+    """
+    从config模块导入配置
+    
+    返回:
+        dict: PG_CONFIG 数据库连接配置
+    """
+    # 默认配置
+    default_pg_config = {
+        "host": "localhost",
+        "port": 5432,
+        "user": "postgres",
+        "password": "postgres",
+        "database": "dataops",
+    }
+    
+    try:
+        # 动态导入,避免IDE警告
+        config = __import__('config')
+        logger.info("从config模块直接导入配置")
+        pg_config = getattr(config, 'PG_CONFIG', default_pg_config)
+        return pg_config
+    except ImportError:
+        # 使用默认配置
+        logger.warning("无法导入config模块,使用默认值")
+        return default_pg_config
+
+# 导入配置
+PG_CONFIG = get_config()
+logger.info(f"配置加载完成: 数据库连接={PG_CONFIG['host']}:{PG_CONFIG['port']}/{PG_CONFIG['database']}")
+
+def get_pg_conn():
+    """获取PostgreSQL连接"""
+    return psycopg2.connect(**PG_CONFIG)
+
+def get_script_content(target_table, script_name):
+    """
+    从data_transform_scripts表中获取脚本内容和目标日期列
+    
+    参数:
+        target_table (str): 目标表名
+        script_name (str): 脚本名称
+    
+    返回:
+        tuple: (script_content, target_dt_column) 脚本内容和目标日期列
+    """
+    logger.info(f"正在从data_transform_scripts表获取脚本内容,目标表: {target_table},脚本名称: {script_name}")
+    conn = None
+    cursor = None
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        query = """
+            SELECT script_content, target_dt_column 
+            FROM data_transform_scripts 
+            WHERE target_table = %s AND script_name = %s 
+            LIMIT 1
+        """
+        
+        logger.info(f"执行SQL查询: {query}")
+        logger.info(f"查询参数: target_table={target_table}, script_name={script_name}")
+        
+        cursor.execute(query, (target_table, script_name))
+        result = cursor.fetchone()
+        
+        if result is None:
+            logger.error(f"未找到目标表 '{target_table}' 和脚本名 '{script_name}' 对应的脚本")
+            return None, None
+        
+        # 获取脚本内容和目标日期列    
+        script_content = result[0]
+        target_dt_column = result[1] if len(result) > 1 else None
+        
+        # 记录结果
+        logger.info(f"目标日期列: {target_dt_column if target_dt_column else '未设置'}")
+        
+        # 记录脚本内容,但可能很长,只记录前500个字符和后100个字符
+        if len(script_content) > 600:
+            logger.info(f"成功获取脚本内容,总长度: {len(script_content)}字符")
+            logger.info(f"脚本内容前500字符: \n{script_content[:500]}")
+            logger.info(f"脚本内容后100字符: \n{script_content[-100:]}")
+        else:
+            logger.info(f"成功获取脚本内容,内容如下: \n{script_content}")
+            
+        return script_content, target_dt_column
+        
+    except Exception as e:
+        logger.error(f"获取脚本内容时出错: {str(e)}", exc_info=True)
+        return None, None
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def execute_sql(sql, params=None):
+    """
+    执行SQL语句
+    
+    参数:
+        sql (str): SQL语句
+        params (dict, optional): SQL参数
+    
+    返回:
+        tuple: (成功标志, 影响的行数或结果)
+    """
+    conn = None
+    cursor = None
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        # 记录SQL(不包含敏感参数)
+        # 由于SQL可能很长,只记录前200个字符
+        if len(sql) > 200:
+            logger.info(f"执行SQL (前200字符): {sql[:200]}...")
+        else:
+            logger.info(f"执行SQL: {sql}")
+            
+        if params:
+            logger.info(f"SQL参数: {params}")
+        
+        # 执行SQL
+        cursor.execute(sql, params)
+        
+        # 获取影响的行数
+        if cursor.rowcount >= 0:
+            affected_rows = cursor.rowcount
+            logger.info(f"SQL执行成功,影响了 {affected_rows} 行")
+        else:
+            affected_rows = 0
+            logger.info("SQL执行成功,但无法确定影响的行数")
+        
+        # 如果是SELECT语句,获取结果
+        if sql.strip().upper().startswith("SELECT"):
+            result = cursor.fetchall()
+            logger.info(f"查询返回 {len(result)} 行结果")
+            conn.commit()
+            return True, {"affected_rows": affected_rows, "result": result}
+        else:
+            # 对于非SELECT语句,提交事务
+            conn.commit()
+            return True, {"affected_rows": affected_rows}
+            
+    except Exception as e:
+        logger.error(f"执行SQL时出错: {str(e)}", exc_info=True)
+        if conn:
+            conn.rollback()
+        return False, {"error": str(e)}
+    finally:
+        if cursor:
+            cursor.close()
+        if conn:
+            conn.close()
+
+def render_sql_template(sql_content, template_params):
+    """
+    使用Jinja2渲染SQL模板
+    
+    参数:
+        sql_content (str): SQL模板内容
+        template_params (dict): 模板参数
+    
+    返回:
+        str: 渲染后的SQL
+    """
+    try:
+        # 创建Jinja2环境
+        env = jinja2.Environment(
+            autoescape=False,
+            undefined=jinja2.StrictUndefined  # 严格模式,未定义变量会抛出异常
+        )
+        
+        # 创建模板并渲染
+        template = env.from_string(sql_content)
+        rendered_sql = template.render(**template_params)
+        
+        logger.info("SQL模板渲染成功")
+        
+        # 记录渲染后的SQL,只记录前200个字符和后100个字符
+        if len(rendered_sql) > 300:
+            logger.info(f"渲染后SQL前200字符: \n{rendered_sql[:200]}...")
+            logger.info(f"渲染后SQL后100字符: \n...{rendered_sql[-100:]}")
+        else:
+            logger.info(f"渲染后SQL: \n{rendered_sql}")
+            
+        return rendered_sql
+    except Exception as e:
+        logger.error(f"渲染SQL模板时出错: {str(e)}", exc_info=True)
+        raise
+
+def run(script_type=None, target_table=None, script_name=None, exec_date=None, 
+        frequency=None, **kwargs):
+    """
+    执行SQL脚本主入口函数
+    
+    参数:
+        script_type (str): 脚本类型,必须为'sql'
+        target_table (str): 目标表名
+        script_name (str): 脚本名称
+        exec_date (str): 执行日期,格式为YYYY-MM-DD
+        frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
+        **kwargs: 其他参数
+    
+    返回:
+        bool: 是否执行成功
+    """
+    # 记录开始执行的时间
+    start_time = datetime.now()
+    logger.info(f"===== 开始执行 SQL 脚本 =====")
+    logger.info(f"脚本类型: {script_type}")
+    logger.info(f"目标表: {target_table}")
+    logger.info(f"脚本名称: {script_name}")
+    logger.info(f"执行日期: {exec_date}")
+    logger.info(f"频率: {frequency}")
+    
+    # 记录其他参数
+    for key, value in kwargs.items():
+        logger.info(f"其他参数 - {key}: {value}")
+    
+    # 验证必要参数
+    if not script_type or script_type.lower() != 'sql':
+        logger.error(f"脚本类型必须为'sql',当前为: {script_type}")
+        return False
+    
+    if not target_table:
+        logger.error("未提供目标表名")
+        return False
+    
+    if not script_name:
+        logger.error("未提供脚本名称")
+        return False
+    
+    if not exec_date:
+        logger.error("未提供执行日期")
+        return False
+    
+    if not frequency:
+        logger.error("未提供频率")
+        return False
+    
+    try:
+        # 获取脚本内容,直接从数据库获取,不检查文件是否存在
+        sql_content, target_dt_column = get_script_content(target_table, script_name)
+        if not sql_content:
+            logger.error("无法获取脚本内容,执行失败")
+            return False
+        
+        logger.info(f"成功获取脚本内容,长度: {len(sql_content)} 字符")
+        
+        # 计算日期范围
+        try:
+            start_date, end_date = script_utils.get_date_range(exec_date, frequency)
+            logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+        except Exception as date_err:
+            logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
+            return False
+        
+        # 准备模板参数
+        template_params = {
+            'start_date': start_date,
+            'end_date': end_date,
+            # 可以添加更多默认参数
+        }
+        
+        # 检查是否开启ETL幂等性
+        target_table_label = kwargs.get('target_table_label', '')
+        script_exec_mode = kwargs.get('execution_mode', 'append')  # 默认为append
+        
+        logger.info(f"脚本执行模式: {script_exec_mode}")
+        
+        # 导入config模块获取幂等性开关
+        try:
+            config = __import__('config')
+            enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
+        except ImportError:
+            logger.warning("无法导入config模块获取幂等性开关,默认为False")
+            enable_idempotency = False
+        
+        logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
+        logger.info(f"目标表标签: {target_table_label}")
+        
+        # 如果开启了ETL幂等性且是SQL类型
+        if script_type.lower() == 'sql' and enable_idempotency:
+            
+            # 处理append模式
+            if script_exec_mode.lower() == 'append':
+                logger.info("当前为append模式,开始考虑ETL幂等性处理")
+                
+                # 检查是否有目标日期列
+                if target_dt_column:
+                    logger.info(f"找到目标日期列 {target_dt_column},将生成DELETE语句")
+                    
+                    # 生成DELETE语句
+                    delete_sql = f"""DELETE FROM {target_table}
+WHERE {target_dt_column} >= '{{{{ start_date }}}}'
+  AND {target_dt_column} < '{{{{ end_date }}}}';"""
+                    
+                    logger.info(f"生成的DELETE语句: {delete_sql}")
+                    
+                    # 渲染DELETE SQL
+                    try:
+                        rendered_delete_sql = render_sql_template(delete_sql, template_params)
+                        logger.info("成功渲染清理SQL")
+                    except Exception as render_err:
+                        logger.error(f"渲染清理SQL时出错: {str(render_err)}", exc_info=True)
+                        # 即使清理SQL失败,仍然继续执行后续SQL
+                        logger.warning("继续执行原始SQL")
+                    else:
+                        # 执行DELETE SQL
+                        logger.info("执行清理SQL以实现幂等性")
+                        delete_success, delete_result = execute_sql(rendered_delete_sql)
+                        
+                        if delete_success:
+                            if isinstance(delete_result, dict) and "affected_rows" in delete_result:
+                                logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                            else:
+                                logger.info("清理SQL执行成功")
+                        else:
+                            logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                            # 继续执行原始SQL
+                            logger.warning("继续执行原始SQL")
+                else:
+                    logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
+                    logger.warning("将直接执行原始SQL,可能导致数据重复")
+            
+            # 处理full_refresh模式
+            elif script_exec_mode.lower() == 'full_refresh':
+                logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
+                
+                # 构建TRUNCATE语句
+                truncate_sql = f"TRUNCATE TABLE {target_table};"
+                logger.info(f"生成的TRUNCATE SQL: {truncate_sql}")
+                
+                # 执行TRUNCATE操作
+                truncate_success, truncate_result = execute_sql(truncate_sql)
+                
+                if truncate_success:
+                    logger.info(f"TRUNCATE TABLE {target_table} 执行成功,表已清空")
+                else:
+                    error_msg = truncate_result.get("error", "未知错误")
+                    logger.error(f"TRUNCATE TABLE执行失败: {error_msg}")
+                    # 继续执行原始SQL
+                    logger.warning("TRUNCATE失败,继续执行原始SQL")
+            
+            else:
+                logger.info(f"当前执行模式 {script_exec_mode} 不是append或full_refresh,不执行幂等性处理")
+        else:
+            logger.info("未满足ETL幂等性处理条件,直接执行原始SQL")
+        
+        # 渲染原始SQL模板
+        try:
+            rendered_sql = render_sql_template(sql_content, template_params)
+        except Exception as render_err:
+            logger.error(f"渲染SQL模板时出错: {str(render_err)}", exc_info=True)
+            return False
+        
+        # 执行原始SQL
+        success, result = execute_sql(rendered_sql)
+        
+        if success:
+            # 记录执行结果
+            if isinstance(result, dict):
+                if "affected_rows" in result:
+                    logger.info(f"SQL执行成功,影响了 {result['affected_rows']} 行数据")
+                if "result" in result:
+                    logger.info(f"SQL查询返回 {len(result['result'])} 行结果")
+            else:
+                logger.info("SQL执行成功")
+            
+            # 记录执行时间
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"===== SQL脚本执行完成 (成功) =====")
+            logger.info(f"总耗时: {duration:.2f}秒")
+            
+            return True
+        else:
+            # 记录错误信息
+            error_msg = result.get("error", "未知错误")
+            logger.error(f"SQL执行失败: {error_msg}")
+            
+            # 记录执行时间
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"===== SQL脚本执行完成 (失败) =====")
+            logger.info(f"总耗时: {duration:.2f}秒")
+            
+            return False
+            
+    except Exception as e:
+        # 捕获所有未处理的异常
+        logger.error(f"执行SQL脚本时发生未预期的错误: {str(e)}", exc_info=True)
+        return False
+
+if __name__ == "__main__":
+    # 直接执行时的测试代码
+    import argparse
+    
+    parser = argparse.ArgumentParser(description='执行SQL脚本')
+    parser.add_argument('--target-table', type=str, required=True, help='目标表名')
+    parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
+    parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
+    parser.add_argument('--frequency', type=str, required=True, 
+                        choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], 
+                        help='频率: daily, weekly, monthly, quarterly, yearly')
+    
+    args = parser.parse_args()
+    
+    # 构造必要的 kwargs
+    run_kwargs = {
+        "script_type": "sql",
+        "target_table": args.target_table,
+        "script_name": args.script_name,
+        "exec_date": args.exec_date,
+        "frequency": args.frequency,
+    }
+
+    logger.info("命令行测试执行参数: " + str(run_kwargs))
+
+    success = run(**run_kwargs)
+    
+    if success:
+        logger.info("SQL脚本执行成功")
+        sys.exit(0)
+    else:
+        logger.error("SQL脚本执行失败")
+        sys.exit(1) 

+ 242 - 0
dataops/scripts/script_utils.py

@@ -0,0 +1,242 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import logging
+import sys
+from datetime import datetime, timedelta
+import pytz
+import re  # 添加re模块以支持正则表达式
+
+# 配置日志记录器
+logging.basicConfig(
+    level=logging.INFO,
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+    handlers=[
+        logging.StreamHandler(sys.stdout)
+    ]
+)
+
+logger = logging.getLogger("script_utils")
+
+def get_date_range(exec_date, frequency):
+    """
+    根据执行日期和频率,计算开始日期和结束日期
+    
+    参数:
+        exec_date (str): 执行日期,格式为 YYYY-MM-DD
+        frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
+    
+    返回:
+        tuple: (start_date, end_date) 格式为 YYYY-MM-DD 的字符串
+    """
+    logger.info(f"计算日期范围 - 执行日期: {exec_date}, 频率: {frequency}")
+    
+    # 将输入的日期转换为上海时区的datetime对象
+    shanghai_tz = pytz.timezone('Asia/Shanghai')
+    
+    try:
+        # 解析输入的exec_date
+        if isinstance(exec_date, str):
+            date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+        elif isinstance(exec_date, datetime):
+            date_obj = exec_date
+        else:
+            raise ValueError(f"不支持的exec_date类型: {type(exec_date)}")
+        
+        # 转换为上海时区
+        date_obj = shanghai_tz.localize(date_obj)
+        logger.info(f"上海时区的执行日期: {date_obj}")
+        
+        # 根据不同频率计算日期范围
+        if frequency.lower() == 'daily':
+            # 每日: start_date = exec_date, end_date = exec_date + 1 day
+            start_date = date_obj.strftime('%Y-%m-%d')
+            end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+            
+        elif frequency.lower() == 'weekly':
+            # 每周: start_date = 本周一, end_date = 下周一
+            days_since_monday = date_obj.weekday()  # 0=周一, 6=周日
+            monday = date_obj - timedelta(days=days_since_monday)
+            next_monday = monday + timedelta(days=7)
+            
+            start_date = monday.strftime('%Y-%m-%d')
+            end_date = next_monday.strftime('%Y-%m-%d')
+            
+        elif frequency.lower() == 'monthly':
+            # 每月: start_date = 本月第一天, end_date = 下月第一天
+            first_day = date_obj.replace(day=1)
+            
+            # 计算下个月的第一天
+            if first_day.month == 12:
+                next_month_first_day = first_day.replace(year=first_day.year + 1, month=1)
+            else:
+                next_month_first_day = first_day.replace(month=first_day.month + 1)
+                
+            start_date = first_day.strftime('%Y-%m-%d')
+            end_date = next_month_first_day.strftime('%Y-%m-%d')
+            
+        elif frequency.lower() == 'quarterly':
+            # 每季度: start_date = 本季度第一天, end_date = 下季度第一天
+            quarter = (date_obj.month - 1) // 3 + 1  # 1-4季度
+            first_month_of_quarter = (quarter - 1) * 3 + 1  # 季度的第一个月
+            
+            quarter_first_day = date_obj.replace(month=first_month_of_quarter, day=1)
+            
+            # 计算下个季度的第一天
+            if quarter == 4:
+                next_quarter_first_day = quarter_first_day.replace(year=quarter_first_day.year + 1, month=1)
+            else:
+                next_quarter_first_day = quarter_first_day.replace(month=first_month_of_quarter + 3)
+                
+            start_date = quarter_first_day.strftime('%Y-%m-%d')
+            end_date = next_quarter_first_day.strftime('%Y-%m-%d')
+            
+        elif frequency.lower() == 'yearly':
+            # 每年: start_date = 本年第一天, end_date = 下年第一天
+            year_first_day = date_obj.replace(month=1, day=1)
+            next_year_first_day = date_obj.replace(year=date_obj.year + 1, month=1, day=1)
+            
+            start_date = year_first_day.strftime('%Y-%m-%d')
+            end_date = next_year_first_day.strftime('%Y-%m-%d')
+            
+        else:
+            logger.error(f"不支持的频率: {frequency}")
+            raise ValueError(f"不支持的频率: {frequency}")
+        
+        logger.info(f"计算结果 - 开始日期: {start_date}, 结束日期: {end_date}")
+        return start_date, end_date
+        
+    except Exception as e:
+        logger.error(f"计算日期范围时出错: {str(e)}", exc_info=True)
+        raise 
+
+
+import re
+from typing import Dict, List, Optional, Set
+
+def extract_source_fields_linked_to_template(sql: str, jinja_vars: List[str]) -> Set[str]:
+    """
+    从 SQL 中提取和 jinja 模板变量绑定的源字段(支持各种形式)
+    """
+    fields = set()
+    sql = re.sub(r"\s+", " ", sql)
+
+    for var in jinja_vars:
+        # 普通比较、函数包裹
+        pattern = re.compile(
+            r"""
+            (?P<field>
+                (?:\w+\s*\(\s*)?           # 可选函数开始(如 DATE(
+                [\w\.]+                    # 字段名
+                (?:\s+AS\s+\w+)?           # 可选 CAST 形式
+                \)?                        # 可选右括号
+            )
+            \s*(=|<|>|<=|>=)\s*['"]?\{\{\s*""" + var + r"""\s*\}\}['"]?
+            """, re.IGNORECASE | re.VERBOSE
+        )
+        fields.update(match.group("field").strip() for match in pattern.finditer(sql))
+
+        # BETWEEN '{{ start_date }}' AND '{{ end_date }}'
+        if var == "start_date":
+            pattern_between = re.compile(
+                r"""(?P<field>
+                        (?:\w+\s*\(\s*)?[\w\.]+(?:\s+AS\s+\w+)?\)?  # 字段(函数包裹可选)
+                    )
+                    \s+BETWEEN\s+['"]?\{\{\s*start_date\s*\}\}['"]?\s+AND\s+['"]?\{\{\s*end_date\s*\}\}
+                """, re.IGNORECASE | re.VERBOSE
+            )
+            fields.update(match.group("field").strip() for match in pattern_between.finditer(sql))
+
+    return {extract_core_field(f) for f in fields}
+
+def extract_core_field(expr: str) -> str:
+    """
+    清洗函数包裹的字段表达式:DATE(sd.sale_date) -> sd.sale_date, CAST(...) -> ...
+    """
+    expr = re.sub(r"CAST\s*\(\s*([\w\.]+)\s+AS\s+\w+\s*\)", r"\1", expr, flags=re.IGNORECASE)
+    expr = re.sub(r"\b\w+\s*\(\s*([\w\.]+)\s*\)", r"\1", expr)
+    return expr.strip()
+
+def parse_select_aliases(sql: str) -> Dict[str, str]:
+    """
+    提取 SELECT 中的字段别名映射:原字段 -> 目标别名
+    """
+    sql = re.sub(r"\s+", " ", sql)
+    select_clause_match = re.search(r"SELECT\s+(.*?)\s+FROM", sql, re.IGNORECASE)
+    if not select_clause_match:
+        return {}
+
+    select_clause = select_clause_match.group(1)
+    mappings = {}
+    for expr in select_clause.split(","):
+        expr = expr.strip()
+        alias_match = re.match(r"([\w\.]+)\s+AS\s+([\w]+)", expr, re.IGNORECASE)
+        if alias_match:
+            source, alias = alias_match.groups()
+            mappings[source.strip()] = alias.strip()
+
+    return mappings
+
+def find_target_date_field(sql: str, jinja_vars: List[str] = ["start_date", "end_date"]) -> Optional[str]:
+    """
+    从 SQL 中找出与模板时间变量绑定的目标表字段(只返回一个)
+    """
+    source_fields = extract_source_fields_linked_to_template(sql, jinja_vars)
+    alias_map = parse_select_aliases(sql)
+
+    # 匹配 SELECT 中的映射字段
+    for src_field in source_fields:
+        if src_field in alias_map:
+            return alias_map[src_field]  # 源字段映射的目标字段
+
+    # 若未通过 AS 映射,可能直接 SELECT sd.sale_date(裸字段)
+    for src_field in source_fields:
+        if '.' not in src_field:
+            return src_field  # 裸字段直接作为目标字段名
+
+    return None
+
+
+def generate_delete_sql(sql_content, target_table=None):
+    """
+    根据SQL脚本内容生成用于清理数据的DELETE语句
+    
+    参数:
+        sql_content (str): 原始SQL脚本内容
+        target_table (str, optional): 目标表名,如果SQL脚本中无法解析出表名时使用
+    
+    返回:
+        str: DELETE语句,用于清理数据
+    """
+    logger.info("生成清理SQL语句,实现ETL作业幂等性")
+    
+    # 如果提供了目标表名,直接使用
+    if target_table:
+        logger.info(f"使用提供的目标表名: {target_table}")
+        delete_stmt = f"""DELETE FROM {target_table}
+WHERE summary_date >= '{{{{ start_date }}}}'
+  AND summary_date < '{{{{ end_date }}}}';"""
+        logger.info(f"生成的清理SQL: {delete_stmt}")
+        return delete_stmt
+    
+    # 尝试从SQL内容中解析出目标表名
+    try:
+        # 简单解析,尝试找出INSERT语句的目标表
+        # 匹配 INSERT INTO xxx 或 INSERT INTO "xxx" 或 INSERT INTO `xxx` 或 INSERT INTO [xxx]
+        insert_match = re.search(r'INSERT\s+INTO\s+(?:["\[`])?([a-zA-Z0-9_\.]+)(?:["\]`])?', sql_content, re.IGNORECASE)
+        
+        if insert_match:
+            table_name = insert_match.group(1)
+            logger.info(f"从SQL中解析出目标表名: {table_name}")
+            
+            delete_stmt = f"""DELETE FROM {table_name}
+WHERE summary_date >= '{{{{ start_date }}}}'
+  AND summary_date < '{{{{ end_date }}}}';"""
+            logger.info(f"生成的清理SQL: {delete_stmt}")
+            return delete_stmt
+        else:
+            logger.warning("无法从SQL中解析出目标表名,无法生成清理SQL")
+            return None
+            
+    except Exception as e:
+        logger.error(f"解析SQL生成清理语句时出错: {str(e)}", exc_info=True)
+        return None