Pārlūkot izejas kodu

除了load_data.py之外,基本完成其它脚本与手工调度的测试,准备修改自动调度。

wangxq 1 nedēļu atpakaļ
vecāks
revīzija
3a6c29327a

+ 33 - 20
dags/dataops_productline_manual_trigger_dag.py

@@ -496,9 +496,10 @@ def execute_python_script(script_info):
     update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
     source_tables = script_info.get('source_tables', [])
-    frequency = script_info.get('frequency', 'daily')
+    schedule_frequency = script_info.get('schedule_frequency', 'daily')
     # 使用传入的执行日期,如果不存在则使用当前日期
     exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
+    is_manual_dag_trigger = script_info.get('is_manual_dag_trigger', False)  # 获取手动DAG触发标识
     
     # 记录开始执行
     logger.info(f"===== 开始执行物理Python脚本文件: {script_name} =====")
@@ -506,8 +507,9 @@ def execute_python_script(script_info):
     logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
     logger.info(f"源表: {source_tables}")
-    logger.info(f"频率: {frequency}")
+    logger.info(f"频率: {schedule_frequency}")
     logger.info(f"执行日期: {exec_date}")
+    logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
     
     # 检查脚本文件是否存在
     exists, script_path = check_script_exists(script_name)
@@ -531,8 +533,9 @@ def execute_python_script(script_info):
             run_kwargs = {
                 "table_name": target_table,
                 "update_mode": update_mode,
-                "frequency": frequency,
-                "exec_date": exec_date  # 使用传入的执行日期而不是当前日期
+                "schedule_frequency": schedule_frequency,
+                "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
+                "is_manual_dag_trigger": is_manual_dag_trigger  # 添加手动DAG触发标识
             }
             
             # 如果是structure类型,添加特殊参数
@@ -580,7 +583,8 @@ def execute_sql(script_info):
     target_table = script_info.get('target_table')
     update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
-    frequency = script_info.get('frequency', 'daily')
+    schedule_frequency = script_info.get('schedule_frequency', 'daily')
+    is_manual_dag_trigger = script_info.get('is_manual_dag_trigger', False)
     # 使用传入的执行日期,如果不存在则使用当前日期
     exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
@@ -589,8 +593,9 @@ def execute_sql(script_info):
     logger.info(f"目标表: {target_table}")
     logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
-    logger.info(f"频率: {frequency}")
+    logger.info(f"频率: {schedule_frequency}")
     logger.info(f"执行日期: {exec_date}")
+    logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
     
     try:
         # 记录执行开始时间
@@ -617,9 +622,10 @@ def execute_sql(script_info):
                 "target_table": target_table,
                 "script_name": script_name,
                 "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
-                "frequency": frequency,
+                "schedule_frequency": schedule_frequency,
                 "target_table_label": target_table_label,
-                "update_mode": update_mode
+                "update_mode": update_mode,
+                "is_manual_dag_trigger": is_manual_dag_trigger
             }
             
             # 如果是structure类型,添加特殊参数
@@ -667,7 +673,8 @@ def execute_python(script_info):
     target_table = script_info.get('target_table')
     update_mode = script_info.get('update_mode', 'append')
     target_table_label = script_info.get('target_table_label')
-    frequency = script_info.get('frequency', 'daily')
+    schedule_frequency = script_info.get('schedule_frequency', 'daily')
+    is_manual_dag_trigger = script_info.get('is_manual_dag_trigger', False)
     # 使用传入的执行日期,如果不存在则使用当前日期
     exec_date = script_info.get('exec_date', datetime.now().strftime('%Y-%m-%d'))
     
@@ -676,8 +683,9 @@ def execute_python(script_info):
     logger.info(f"目标表: {target_table}")
     logger.info(f"更新模式: {update_mode}")
     logger.info(f"表标签: {target_table_label}")
-    logger.info(f"频率: {frequency}")
+    logger.info(f"频率: {schedule_frequency}")
     logger.info(f"执行日期: {exec_date}")
+    logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
     
     try:
         # 记录执行开始时间
@@ -704,9 +712,10 @@ def execute_python(script_info):
                 "target_table": target_table,
                 "script_name": script_name,
                 "exec_date": exec_date,  # 使用传入的执行日期而不是当前日期
-                "frequency": frequency,
+                "schedule_frequency": schedule_frequency,
                 "target_table_label": target_table_label,
-                "update_mode": update_mode
+                "update_mode": update_mode,
+                "is_manual_dag_trigger": is_manual_dag_trigger
             }
             
             # 如果是structure类型,添加特殊参数
@@ -750,7 +759,7 @@ def choose_executor(script_info):
     返回:
         function: 执行函数
     """
-    script_type = script_info.get('script_type', 'python_script').lower()
+    script_type = script_info.get('script_type').lower()
     target_table_label = script_info.get('target_table_label')
     
     # 根据脚本类型和目标表标签选择执行函数
@@ -792,14 +801,14 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
     
     # 情况1: 同时提供脚本名称和目标表名
     if script_name and target_table:
-        logger.info(f"方案1: 同时提供了脚本名称和目标表名")
+        logger.info(f"情况1: 同时提供了脚本名称和目标表名")
         script_info = get_complete_script_info(script_name, target_table)
         if script_info:
             all_script_infos.append(script_info)
     
     # 情况2: 只提供脚本名称,自动查找目标表
     elif script_name and not target_table:
-        logger.info(f"方案2: 只提供了脚本名称,自动查找目标表")
+        logger.info(f"情况2: 只提供了脚本名称,自动查找目标表")
         target_table = find_target_table_for_script(script_name)
         if target_table:
             logger.info(f"找到脚本 {script_name} 对应的目标表: {target_table}")
@@ -811,7 +820,7 @@ def prepare_script_info(script_name=None, target_table=None, dependency_level=No
     
     # 情况3: 只提供目标表名,查找并处理相关的脚本
     elif not script_name and target_table:
-        logger.info(f"方案3: 只提供了目标表名,查找相关的脚本")
+        logger.info(f"情况3: 只提供了目标表名,查找相关的脚本")
         
         # 首先检查是否为structure类型的DataResource表
         table_label = get_table_label(target_table)
@@ -942,6 +951,7 @@ def execute_script_chain(**context):
     """
     执行依赖链中的所有脚本
     """
+      
     # 获取依赖链和执行日期
     ti = context['ti']
     dependency_chain = ti.xcom_pull(task_ids='prepare_dependency_chain', key='dependency_chain')
@@ -975,7 +985,7 @@ def execute_script_chain(**context):
     for idx, script_info in enumerate(dependency_chain, 1):
         script_name = script_info['script_name']
         target_table = script_info['target_table']
-        script_type = script_info.get('script_type', 'python_script')
+        script_type = script_info.get('script_type')
         
         logger.info(f"===== 执行脚本 {idx}/{len(dependency_chain)}: {script_name} -> {target_table} (类型: {script_type}) =====")
         
@@ -985,7 +995,9 @@ def execute_script_chain(**context):
         # 将执行日期添加到脚本信息中
         script_info['exec_date'] = exec_date
         script_info['logical_date'] = logical_date
-        
+
+        # 这个参数用来标识脚本是不是被手工DAG触发的,它会影响脚本执行之前的幂等性操作。
+        script_info['is_manual_dag_trigger'] = True
         # 执行脚本
         success = executor(script_info)
         
@@ -1035,7 +1047,7 @@ def generate_execution_report(**context):
     # 统计不同类型脚本数量
     script_types = {}
     for result in results:
-        script_type = result.get('script_type', 'python_script')
+        script_type = result.get('script_type')
         if script_type not in script_types:
             script_types[script_type] = 0
         script_types[script_type] += 1
@@ -1059,7 +1071,7 @@ def generate_execution_report(**context):
     for idx, result in enumerate(results, 1):
         script_name = result['script_name']
         target_table = result['target_table']
-        script_type = result.get('script_type', 'python_script')
+        script_type = result.get('script_type')
         success = result['success']
         status = "✓ 成功" if success else "✗ 失败"
         report.append(f"{idx}. {script_name} -> {target_table} ({script_type}): {status}")
@@ -1104,6 +1116,7 @@ with DAG(
         task_id='execute_script_chain',
         python_callable=execute_script_chain,
         provide_context=True,
+        trigger_rule='all_success'  # 修改为仅在上游任务成功时执行
     )
     
     # 任务3: 生成执行报告

+ 75 - 42
dataops_scripts/execution_python.py

@@ -171,7 +171,7 @@ def execute_sql(sql, params=None):
         if conn:
             conn.close()
 
-def run(script_type=None, target_table=None, script_name=None, exec_date=None, frequency=None, **kwargs):
+def run(script_type=None, target_table=None, script_name=None, exec_date=None, schedule_frequency=None, **kwargs):
     """
     执行Python脚本主入口函数
     
@@ -180,7 +180,7 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
         target_table (str): 目标表名
         script_name (str): 脚本名称
         exec_date (str): 执行日期,格式为YYYY-MM-DD
-        frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
+        schedule_frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
         **kwargs: 其他参数
     
     返回:
@@ -193,7 +193,8 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
     logger.info(f"目标表: {target_table}")
     logger.info(f"脚本名称: {script_name}")
     logger.info(f"执行日期: {exec_date}")
-    logger.info(f"频率: {frequency}")
+    logger.info(f"频率: {schedule_frequency}")
+    
     
     # 记录其他参数
     for key, value in kwargs.items():
@@ -216,7 +217,7 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
         logger.error("未提供执行日期")
         return False
     
-    if not frequency:
+    if not schedule_frequency:
         logger.error("未提供频率")
         return False
 
@@ -230,29 +231,31 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
         logger.info(f"成功获取脚本内容,长度: {len(script_code)} 字符")
         
         # 日期计算
-        try:
-            # 直接使用script_utils.get_date_range计算日期范围
-            logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={frequency}")
-            start_date, end_date = script_utils.get_date_range(exec_date, frequency)
-            logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
-        except Exception as date_err:
-            logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
-            # 使用简单的默认日期范围计算
-            date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
-            if frequency.lower() == 'daily':
-                start_date = date_obj.strftime('%Y-%m-%d')
-                end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
-            else:
-                # 对其他频率使用默认范围
-                start_date = exec_date
-                end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
-            logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
+        # try:
+        #     # 直接使用script_utils.get_date_range计算日期范围
+        #     logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
+        #     start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+        #     logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+        # except Exception as date_err:
+        #     logger.error(f"日期处理失败: {str(date_err)}", exc_info=True)
+        #     # 使用简单的默认日期范围计算
+        #     date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+        #     if schedule_frequency.lower() == 'daily':
+        #         start_date = date_obj.strftime('%Y-%m-%d')
+        #         end_date = (date_obj + timedelta(days=1)).strftime('%Y-%m-%d')
+        #     else:
+        #         # 对其他频率使用默认范围
+        #         start_date = exec_date
+        #         end_date = (date_obj + timedelta(days=30)).strftime('%Y-%m-%d')
+        #     logger.warning(f"使用默认日期范围计算: start_date={start_date}, end_date={end_date}")
         
         # 检查是否开启ETL幂等性
         target_table_label = kwargs.get('target_table_label', '')
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
+        is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False) 
         
         logger.info(f"脚本更新模式: {script_exec_mode}")
+        logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
         
         # 导入config模块获取幂等性开关
         try:
@@ -265,39 +268,69 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None, f
         logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
         logger.info(f"目标表标签: {target_table_label}")
         
-        # 如果开启了ETL幂等性处理
+        # 如果开启了ETL幂等性处理,并且是由手工dag触发的。
         if enable_idempotency:
             # 处理append模式
             if script_exec_mode.lower() == 'append':
-                logger.info("当前为append模式,开始考虑ETL幂等性处理")
-                
-                # 检查是否有目标日期列
-                if target_dt_column:
-                    logger.info(f"找到目标日期列 {target_dt_column},将进行数据清理")
+                logger.info("当前为append模式,且ETL幂等性开关被打开") 
+                if is_manual_dag_trigger:
+                    logger.info("manual dag被手工触发")         
+                           # 检查是否有目标日期列
+                    logger.info(f"使用script_utils.get_date_range计算日期范围,参数: exec_date={exec_date}, frequency={schedule_frequency}")
+                    start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                    logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+
+                    if target_dt_column:
+                        logger.info(f"找到目标日期列 {target_dt_column},将进行数据清理")
+                        
+                        # 生成DELETE语句
+                        delete_sql = f"""DELETE FROM {target_table}
+    WHERE {target_dt_column} >= '{start_date}'
+    AND {target_dt_column} < '{end_date}';"""
+                        
+                        logger.info(f"生成的DELETE语句: {delete_sql}")
+                        
+                        # 执行DELETE SQL
+                        logger.info("执行清理SQL以实现幂等性")
+                        delete_success, delete_result = execute_sql(delete_sql)
+                        
+                        if delete_success:
+                            if isinstance(delete_result, dict) and "affected_rows" in delete_result:
+                                logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                            else:
+                                logger.info("清理SQL执行成功")
+                        else:
+                            logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                            # 继续执行原始Python脚本
+                            logger.warning("继续执行原始Python脚本")
+                    else:
+                        logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
+                        logger.warning("将直接执行原始Python脚本,可能导致数据重复")
+                else:
+                    logger.info(f"不是manual dag手工触发,对目标表 {target_table} create_time 进行数据清理")
                     
-                    # 生成DELETE语句
+                    start_date, end_date = script_utils.get_one_day_range(exec_date)
+
+                        # 生成DELETE语句
                     delete_sql = f"""DELETE FROM {target_table}
-WHERE {target_dt_column} >= '{start_date}'
-  AND {target_dt_column} < '{end_date}';"""
-                    
+    WHERE create_time >= '{start_date}'
+    AND create_time < '{end_date}';"""
+                        
                     logger.info(f"生成的DELETE语句: {delete_sql}")
-                    
+
                     # 执行DELETE SQL
-                    logger.info("执行清理SQL以实现幂等性")
+                    logger.info("基于create_time执行清理SQL以实现幂等性")
                     delete_success, delete_result = execute_sql(delete_sql)
-                    
+
                     if delete_success:
                         if isinstance(delete_result, dict) and "affected_rows" in delete_result:
-                            logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                            logger.info(f"基于create_time,清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
                         else:
-                            logger.info("清理SQL执行成功")
+                            logger.info("基于create_time,清理SQL执行成功")
                     else:
-                        logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                        logger.error(f"基于create_time,清理SQL执行失败: {delete_result.get('error', '未知错误')}")
                         # 继续执行原始Python脚本
                         logger.warning("继续执行原始Python脚本")
-                else:
-                    logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
-                    logger.warning("将直接执行原始Python脚本,可能导致数据重复")
             
             # 处理full_refresh模式
             elif script_exec_mode.lower() == 'full_refresh':
@@ -395,7 +428,7 @@ if __name__ == "__main__":
     parser.add_argument('--target-table', type=str, required=True, help='目标表名')
     parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
     parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
-    parser.add_argument('--frequency', type=str, required=True, 
+    parser.add_argument('--schedule_frequency', type=str, required=True, 
                         choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], 
                         help='频率: daily, weekly, monthly, quarterly, yearly')
     parser.add_argument('--update-mode', type=str, default='append',
@@ -409,7 +442,7 @@ if __name__ == "__main__":
         "target_table": args.target_table,
         "script_name": args.script_name,
         "exec_date": args.exec_date,
-        "frequency": args.frequency,
+        "schedule_frequency": args.schedule_frequency,
         "update_mode": args.update_mode
     }
     

+ 118 - 70
dataops_scripts/execution_sql.py

@@ -35,27 +35,27 @@ except ImportError as e:
     logger.error(f"无法直接导入script_utils方法: {str(e)}")
     
     # 尝试备用方法1:完整路径导入
-    try:
-        sys.path.append(os.path.dirname(current_dir))  # 添加父目录
-        from dataops_scripts.script_utils import get_pg_config
-        logger.info("使用完整路径成功导入script_utils模块的方法")
-    except ImportError as e2:
-        logger.error(f"使用完整路径导入失败: {str(e2)}")
+    # try:
+    #     sys.path.append(os.path.dirname(current_dir))  # 添加父目录
+    #     from dataops_scripts.script_utils import get_pg_config
+    #     logger.info("使用完整路径成功导入script_utils模块的方法")
+    # except ImportError as e2:
+    #     logger.error(f"使用完整路径导入失败: {str(e2)}")
         
-        # 尝试备用方法2:动态导入
-        try:
-            import importlib.util
-            script_utils_path = os.path.join(current_dir, "script_utils.py")
-            logger.info(f"尝试从路径动态导入: {script_utils_path}")
+    #     # 尝试备用方法2:动态导入
+    #     try:
+    #         import importlib.util
+    #         script_utils_path = os.path.join(current_dir, "script_utils.py")
+    #         logger.info(f"尝试从路径动态导入: {script_utils_path}")
             
-            spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
-            script_utils = importlib.util.module_from_spec(spec)
-            spec.loader.exec_module(script_utils)
-            get_pg_config = script_utils.get_pg_config
-            logger.info("通过动态导入成功加载script_utils模块的方法")
-        except Exception as e3:
-            logger.error(f"动态导入也失败: {str(e3)}")
-            raise ImportError(f"无法导入script_utils模块的方法,所有方法都失败")
+    #         spec = importlib.util.spec_from_file_location("script_utils", script_utils_path)
+    #         script_utils = importlib.util.module_from_spec(spec)
+    #         spec.loader.exec_module(script_utils)
+    #         get_pg_config = script_utils.get_pg_config
+    #         logger.info("通过动态导入成功加载script_utils模块的方法")
+    #     except Exception as e3:
+    #         logger.error(f"动态导入也失败: {str(e3)}")
+    #         raise ImportError(f"无法导入script_utils模块的方法,所有方法都失败")
 
 # 使用script_utils中的方法获取配置
 try:
@@ -235,7 +235,7 @@ def render_sql_template(sql_content, template_params):
         raise
 
 def run(script_type=None, target_table=None, script_name=None, exec_date=None, 
-        frequency=None, **kwargs):
+        schedule_frequency=None, **kwargs):
     """
     执行SQL脚本主入口函数
     
@@ -244,7 +244,7 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         target_table (str): 目标表名
         script_name (str): 脚本名称
         exec_date (str): 执行日期,格式为YYYY-MM-DD
-        frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
+        schedule_frequency (str): 频率,可选值为 daily, weekly, monthly, quarterly, yearly
         **kwargs: 其他参数
     
     返回:
@@ -257,7 +257,7 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
     logger.info(f"目标表: {target_table}")
     logger.info(f"脚本名称: {script_name}")
     logger.info(f"执行日期: {exec_date}")
-    logger.info(f"频率: {frequency}")
+    logger.info(f"频率: {schedule_frequency}")
     
     # 记录其他参数
     for key, value in kwargs.items():
@@ -280,7 +280,7 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         logger.error("未提供执行日期")
         return False
     
-    if not frequency:
+    if not schedule_frequency:
         logger.error("未提供频率")
         return False
     
@@ -293,26 +293,13 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
         
         logger.info(f"成功获取脚本内容,长度: {len(sql_content)} 字符")
         
-        # 计算日期范围
-        try:
-            start_date, end_date = script_utils.get_date_range(exec_date, frequency)
-            logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
-        except Exception as date_err:
-            logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
-            return False
-        
-        # 准备模板参数
-        template_params = {
-            'start_date': start_date,
-            'end_date': end_date,
-            # 可以添加更多默认参数
-        }
-        
         # 检查是否开启ETL幂等性
         target_table_label = kwargs.get('target_table_label', '')
         script_exec_mode = kwargs.get('update_mode', 'append')  # 只使用update_mode
+        is_manual_dag_trigger = kwargs.get('is_manual_dag_trigger', False)  # 新增:获取是否为手动触发的DAG
         
         logger.info(f"脚本更新模式: {script_exec_mode}")
+        logger.info(f"是否手工DAG触发: {is_manual_dag_trigger}")
         
         # 导入config模块获取幂等性开关
         try:
@@ -332,42 +319,86 @@ def run(script_type=None, target_table=None, script_name=None, exec_date=None,
             if script_exec_mode.lower() == 'append':
                 logger.info("当前为append模式,开始考虑ETL幂等性处理")
                 
-                # 检查是否有目标日期列
-                if target_dt_column:
-                    logger.info(f"找到目标日期列 {target_dt_column},将生成DELETE语句")
+                if is_manual_dag_trigger:
+                    logger.info("manual dag被手工触发")
+                    # 计算日期范围
+                    try:
+                        start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                        logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+                    except Exception as date_err:
+                        logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
+                        return False
                     
-                    # 生成DELETE语句
-                    delete_sql = f"""DELETE FROM {target_table}
+                    # 检查是否有目标日期列
+                    if target_dt_column:
+                        logger.info(f"找到目标日期列 {target_dt_column},将生成DELETE语句")
+                        
+                        # 生成DELETE语句
+                        delete_sql = f"""DELETE FROM {target_table}
 WHERE {target_dt_column} >= '{{{{ start_date }}}}'
   AND {target_dt_column} < '{{{{ end_date }}}}';"""
-                    
-                    logger.info(f"生成的DELETE语句: {delete_sql}")
-                    
-                    # 渲染DELETE SQL
-                    try:
-                        rendered_delete_sql = render_sql_template(delete_sql, template_params)
-                        logger.info("成功渲染清理SQL")
-                    except Exception as render_err:
-                        logger.error(f"渲染清理SQL时出错: {str(render_err)}", exc_info=True)
-                        # 即使清理SQL失败,仍然继续执行后续SQL
-                        logger.warning("继续执行原始SQL")
-                    else:
-                        # 执行DELETE SQL
-                        logger.info("执行清理SQL以实现幂等性")
-                        delete_success, delete_result = execute_sql(rendered_delete_sql)
                         
-                        if delete_success:
-                            if isinstance(delete_result, dict) and "affected_rows" in delete_result:
-                                logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
-                            else:
-                                logger.info("清理SQL执行成功")
-                        else:
-                            logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
-                            # 继续执行原始SQL
+                        logger.info(f"生成的DELETE语句: {delete_sql}")
+                        
+                        # 准备模板参数
+                        template_params = {
+                            'start_date': start_date,
+                            'end_date': end_date,
+                            # 可以添加更多默认参数
+                        }
+                        
+                        # 渲染DELETE SQL
+                        try:
+                            rendered_delete_sql = render_sql_template(delete_sql, template_params)
+                            logger.info("成功渲染清理SQL")
+                        except Exception as render_err:
+                            logger.error(f"渲染清理SQL时出错: {str(render_err)}", exc_info=True)
+                            # 即使清理SQL失败,仍然继续执行后续SQL
                             logger.warning("继续执行原始SQL")
+                        else:
+                            # 执行DELETE SQL
+                            logger.info("执行清理SQL以实现幂等性")
+                            delete_success, delete_result = execute_sql(rendered_delete_sql)
+                            
+                            if delete_success:
+                                if isinstance(delete_result, dict) and "affected_rows" in delete_result:
+                                    logger.info(f"清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                                else:
+                                    logger.info("清理SQL执行成功")
+                            else:
+                                logger.error(f"清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                                # 继续执行原始SQL
+                                logger.warning("继续执行原始SQL")
+                    else:
+                        logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
+                        logger.warning("将直接执行原始SQL,可能导致数据重复")
                 else:
-                    logger.warning(f"目标表 {target_table} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
-                    logger.warning("将直接执行原始SQL,可能导致数据重复")
+                    logger.info(f"不是manual dag手工触发,对目标表 {target_table} create_time 进行数据清理")
+                    
+                    # 获取一天范围的日期
+                    start_date, end_date = script_utils.get_one_day_range(exec_date)
+                    logger.info(f"使用一天范围的日期: start_date={start_date}, end_date={end_date}")
+                    
+                    # 生成基于create_time的DELETE语句
+                    delete_sql = f"""DELETE FROM {target_table}
+WHERE create_time >= '{start_date}'
+  AND create_time < '{end_date}';"""
+                    
+                    logger.info(f"生成的基于create_time的DELETE语句: {delete_sql}")
+                    
+                    # 执行DELETE SQL
+                    logger.info("基于create_time执行清理SQL以实现幂等性")
+                    delete_success, delete_result = execute_sql(delete_sql)
+                    
+                    if delete_success:
+                        if isinstance(delete_result, dict) and "affected_rows" in delete_result:
+                            logger.info(f"基于create_time,清理SQL执行成功,删除了 {delete_result['affected_rows']} 行数据")
+                        else:
+                            logger.info("基于create_time,清理SQL执行成功")
+                    else:
+                        logger.error(f"基于create_time,清理SQL执行失败: {delete_result.get('error', '未知错误')}")
+                        # 继续执行原始SQL
+                        logger.warning("继续执行原始SQL")
             
             # 处理full_refresh模式
             elif script_exec_mode.lower() == 'full_refresh':
@@ -393,6 +424,23 @@ WHERE {target_dt_column} >= '{{{{ start_date }}}}'
         else:
             logger.info("未满足ETL幂等性处理条件,直接执行原始SQL")
         
+        # 计算日期范围(如果之前未计算)
+        if 'start_date' not in locals() or 'end_date' not in locals():
+            try:
+                start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+            except Exception as date_err:
+                logger.error(f"计算日期范围时出错: {str(date_err)}", exc_info=True)
+                return False
+        
+        # 准备模板参数(如果之前未准备)
+        if 'template_params' not in locals():
+            template_params = {
+                'start_date': start_date,
+                'end_date': end_date,
+                # 可以添加更多默认参数
+            }
+        
         # 渲染原始SQL模板
         try:
             rendered_sql = render_sql_template(sql_content, template_params)
@@ -446,7 +494,7 @@ if __name__ == "__main__":
     parser.add_argument('--target-table', type=str, required=True, help='目标表名')
     parser.add_argument('--script-name', type=str, required=True, help='脚本名称')
     parser.add_argument('--exec-date', type=str, required=True, help='执行日期 (YYYY-MM-DD)')
-    parser.add_argument('--frequency', type=str, required=True, 
+    parser.add_argument('--schedule_frequency', type=str, required=True, 
                         choices=['daily', 'weekly', 'monthly', 'quarterly', 'yearly'], 
                         help='频率: daily, weekly, monthly, quarterly, yearly')
     
@@ -458,7 +506,7 @@ if __name__ == "__main__":
         "target_table": args.target_table,
         "script_name": args.script_name,
         "exec_date": args.exec_date,
-        "frequency": args.frequency,
+        "schedule_frequency": args.schedule_frequency,
     }
 
     logger.info("命令行测试执行参数: " + str(run_kwargs))

+ 152 - 24
dataops_scripts/load_file.py

@@ -334,7 +334,7 @@ def load_file_to_table(file_path, table_name, update_mode='append'):
         return False
 
 def run(table_name, update_mode='append', exec_date=None, target_type=None, 
-        storage_location=None, frequency=None, script_name=None, **kwargs):
+        storage_location=None, schedule_frequency=None, script_name=None, is_manual_dag_trigger=False, **kwargs):
     """
     统一入口函数,支持通配符路径,处理并归档文件
     """
@@ -351,7 +351,8 @@ def run(table_name, update_mode='append', exec_date=None, target_type=None,
     logger.info(f"资源类型: {target_type}, 文件相对路径模式: {storage_location}")
     logger.info(f"基准上传路径: {STRUCTURE_UPLOAD_BASE_PATH}")
     logger.info(f"基准归档路径: {STRUCTURE_UPLOAD_ARCHIVE_BASE_PATH}")
-    logger.info(f"更新频率: {frequency}")
+    logger.info(f"更新频率: {schedule_frequency}")
+    logger.info(f"是否手工触发 manual DAG: {is_manual_dag_trigger}")
     
     # 记录其他参数
     for key, value in kwargs.items():
@@ -429,28 +430,155 @@ def run(table_name, update_mode='append', exec_date=None, target_type=None,
     found_files = list(set(found_files))  # 去重
     logger.info(f"总共找到 {len(found_files)} 个匹配文件: {found_files}")
 
-    # 如果是全量刷新,在处理任何文件前清空表
-    if update_mode == 'full_refresh':
-        conn = None
-        cursor = None
-        try:
-            conn = get_pg_conn()
-            cursor = conn.cursor()
-            # 假设表在 public schema,并为表名加引号
-            logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
-            cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
-            conn.commit()
-            logger.info("表 public.\"" + table_name + "\" 已清空。")
-        except Exception as e:
-            logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
-            if conn:
-                conn.rollback()
-            return False # 清空失败则直接失败退出
-        finally:
-            if cursor:
-                 cursor.close()
-            if conn:
-                 conn.close()
+    # 检查是否开启ETL幂等性
+    try:
+        config = __import__('config')
+        enable_idempotency = getattr(config, 'ENABLE_ETL_IDEMPOTENCY', False)
+    except ImportError:
+        logger.warning("无法导入config模块获取幂等性开关,默认为False")
+        enable_idempotency = False
+    
+    logger.info(f"ETL幂等性开关状态: {enable_idempotency}")
+    
+    # 如果开启了ETL幂等性处理
+    if enable_idempotency:
+        # 处理append模式
+        if update_mode.lower() == 'append':
+            logger.info("当前为append模式,且ETL幂等性开关被打开")
+            conn = None
+            cursor = None
+            try:
+                conn = get_pg_conn()
+                cursor = conn.cursor()
+                
+                if is_manual_dag_trigger:
+                    logger.info("manual dag被手工触发")
+                    # 计算日期范围
+                    try:
+                        start_date, end_date = script_utils.get_date_range(exec_date, schedule_frequency)
+                        logger.info(f"计算得到的日期范围: start_date={start_date}, end_date={end_date}")
+                        
+                        # 尝试获取目标表的日期列
+                        logger.info(f"查询表 {table_name} 的target_dt_column...")
+                        target_dt_query = """
+                            SELECT target_dt_column 
+                            FROM data_transform_scripts 
+                            WHERE target_table = %s 
+                            LIMIT 1
+                        """
+                        cursor.execute(target_dt_query, (table_name,))
+                        result = cursor.fetchone()
+                        
+                        target_dt_column = result[0] if result and result[0] else None
+                        
+                        if target_dt_column:
+                            logger.info(f"找到目标日期列 {target_dt_column},将生成DELETE语句")
+                            # 生成DELETE语句
+                            delete_sql = f"""DELETE FROM {table_name}
+    WHERE {target_dt_column} >= '{start_date}'
+    AND {target_dt_column} < '{end_date}';"""
+                            
+                            logger.info(f"生成的DELETE语句: {delete_sql}")
+                            
+                            # 执行DELETE操作
+                            cursor.execute(delete_sql)
+                            affected_rows = cursor.rowcount
+                            conn.commit()
+                            logger.info(f"清理SQL执行成功,删除了 {affected_rows} 行数据")
+                        else:
+                            logger.warning(f"目标表 {table_name} 没有设置目标日期列(target_dt_column),无法生成DELETE语句实现幂等性")
+                            logger.warning("将直接执行原始导入,可能导致数据重复")
+                    except Exception as date_err:
+                        logger.error(f"计算日期范围或执行DELETE时出错: {str(date_err)}", exc_info=True)
+                        # 继续执行后续导入
+                        logger.warning("继续执行原始导入")
+                else:
+                    logger.info(f"不是manual dag手工触发,对目标表 {table_name} create_time 进行数据清理")
+                    
+                    # 获取一天范围的日期
+                    start_date, end_date = script_utils.get_one_day_range(exec_date)
+                    logger.info(f"使用一天范围的日期: start_date={start_date}, end_date={end_date}")
+                    
+                    # 生成基于create_time的DELETE语句
+                    delete_sql = f"""DELETE FROM {table_name}
+    WHERE create_time >= '{start_date}'
+    AND create_time < '{end_date}';"""
+                    
+                    logger.info(f"生成的基于create_time的DELETE语句: {delete_sql}")
+                    
+                    try:
+                        # 执行DELETE操作
+                        cursor.execute(delete_sql)
+                        affected_rows = cursor.rowcount
+                        conn.commit()
+                        logger.info(f"基于create_time,清理SQL执行成功,删除了 {affected_rows} 行数据")
+                    except Exception as del_err:
+                        logger.error(f"基于create_time,清理SQL执行失败: {str(del_err)}", exc_info=True)
+                        conn.rollback()
+                        # 继续执行后续导入
+                        logger.warning("继续执行原始导入")
+            except Exception as e:
+                logger.error(f"ETL幂等性处理出错: {str(e)}", exc_info=True)
+                if conn and conn.status == psycopg2.extensions.STATUS_IN_TRANSACTION:
+                    conn.rollback()
+            finally:
+                if cursor:
+                    cursor.close()
+                if conn:
+                    conn.close()
+        
+        # 处理full_refresh模式
+        elif update_mode.lower() == 'full_refresh':
+            # 保持原有的full_refresh逻辑,不做修改
+            logger.info("当前为full_refresh模式,将执行TRUNCATE操作")
+            conn = None
+            cursor = None
+            try:
+                conn = get_pg_conn()
+                cursor = conn.cursor()
+                # 假设表在 public schema,并为表名加引号
+                logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
+                cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
+                conn.commit()
+                logger.info("表 public.\"" + table_name + "\" 已清空。")
+            except Exception as e:
+                logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
+                if conn:
+                    conn.rollback()
+                return False # 清空失败则直接失败退出
+            finally:
+                if cursor:
+                     cursor.close()
+                if conn:
+                     conn.close()
+        else:
+            logger.info(f"当前更新模式 {update_mode} 不是append或full_refresh,不执行幂等性处理")
+    else:
+        # 如果未开启ETL幂等性处理,保持原有的full_refresh逻辑
+        if update_mode == 'full_refresh':
+            logger.info("ETL幂等性未开启,但当前为full_refresh模式,仍将执行TRUNCATE操作")
+            conn = None
+            cursor = None
+            try:
+                conn = get_pg_conn()
+                cursor = conn.cursor()
+                # 假设表在 public schema,并为表名加引号
+                logger.info(f"执行全量刷新,清空表 public.\"{table_name}\"")
+                cursor.execute(f'TRUNCATE TABLE public.\"{table_name}\"')
+                conn.commit()
+                logger.info("表 public.\"" + table_name + "\" 已清空。")
+            except Exception as e:
+                logger.error("清空表 public.\"" + table_name + "\" 时出错: " + str(e))
+                if conn:
+                    conn.rollback()
+                return False # 清空失败则直接失败退出
+            finally:
+                if cursor:
+                     cursor.close()
+                if conn:
+                     conn.close()
+        else:
+            logger.info("ETL幂等性未开启,直接执行文件加载")
 
     # 处理并归档每个找到的文件
     processed_files_count = 0

+ 22 - 1
dataops_scripts/script_utils.py

@@ -362,4 +362,25 @@ WHERE summary_date >= '{{{{ start_date }}}}'
             
     except Exception as e:
         logger.error(f"解析SQL生成清理语句时出错: {str(e)}", exc_info=True)
-        return None
+        return None
+
+def get_one_day_range(exec_date):
+    """
+    根据exec_date返回当天的00:00:00和次日00:00:00,均为datetime对象
+    参数:
+        exec_date (str 或 datetime): 执行日期,格式为YYYY-MM-DD或datetime对象
+    返回:
+        tuple(datetime, datetime): (start_datetime, end_datetime)
+    """
+    shanghai_tz = pytz.timezone('Asia/Shanghai')
+    if isinstance(exec_date, str):
+        date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+    elif isinstance(exec_date, datetime):
+        date_obj = exec_date
+    else:
+        raise ValueError(f"不支持的exec_date类型: {type(exec_date)}")
+    # 当天00:00:00
+    start_datetime = shanghai_tz.localize(datetime(date_obj.year, date_obj.month, date_obj.day, 0, 0, 0))
+    # 次日00:00:00
+    end_datetime = start_datetime + timedelta(days=1)
+    return start_datetime, end_datetime