Forráskód Böngészése

pipeline 版本添加了手工传参的功能,修改了部分scripts.

wangxq 2 hónapja
szülő
commit
e3481c69da

+ 28 - 5
dags/dag_dataops_pipeline_data_scheduler.py

@@ -182,7 +182,12 @@ def execute_python_script(target_table, script_name, script_exec_mode, exec_date
         # 检查并调用标准入口函数run
         if hasattr(module, "run"):
             logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            result = module.run(table_name=target_table, execution_mode=script_exec_mode)
+            result = module.run(
+                table_name=target_table, 
+                execution_mode=script_exec_mode,
+                exec_date=exec_date,
+                script_name=script_name
+            )
             logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
             
             # 确保result是布尔值
@@ -442,6 +447,11 @@ def prepare_dag_schedule(**kwargs):
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
+    # 检查是否是手动触发
+    is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
+    if is_manual_trigger:
+        logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
+    
     # 记录重要的时间参数
     logger.info(f"【时间参数】prepare_dag_schedule: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
@@ -553,6 +563,11 @@ def check_execution_plan(**kwargs):
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
+    # 检查是否是手动触发
+    is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
+    if is_manual_trigger:
+        logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
+    
     # 记录重要的时间参数
     logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info("检查数据库中的执行计划是否存在且有效")
@@ -655,6 +670,11 @@ def create_execution_plan(**kwargs):
         local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
         exec_date = local_logical_date.strftime('%Y-%m-%d')
         
+        # 检查是否是手动触发
+        is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
+        if is_manual_trigger:
+            logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
+        
         # 记录重要的时间参数
         logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
         
@@ -868,11 +888,14 @@ with DAG(
         'retries': 1,
         'retry_delay': timedelta(minutes=5)
     },
-    # 添加DAG级别参数,确保任务运行时有正确的环境
     params={
-        "scripts_path": SCRIPTS_BASE_PATH,
-        "airflow_base_path": os.path.dirname(os.path.dirname(__file__))
-    }
+        'MANUAL_TRIGGER': True, 
+    },
+    # 添加DAG级别参数,确保任务运行时有正确的环境
+    # params={
+    #     "scripts_path": SCRIPTS_BASE_PATH,
+    #     "airflow_base_path": os.path.dirname(os.path.dirname(__file__))
+    # }
 ) as dag:
     
     # 记录DAG实例化时的重要信息

+ 8 - 8
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -395,11 +395,11 @@ def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
 def prepare_pipeline_dag_schedule(**kwargs):
     """准备Pipeline DAG调度任务的主函数"""
     # 检查是否是手动触发模式
-    is_force_refresh = False
+    is_manual_trigger = False
     params = kwargs.get('params', {})
-    if params and 'FORCE_REFRESH' in params:
-        is_force_refresh = params.get('FORCE_REFRESH', False)
-        logger.info(f"接收到强制刷新参数: FORCE_REFRESH={is_force_refresh}")
+    if params and 'MANUAL_TRIGGER' in params:
+        is_manual_trigger = params.get('MANUAL_TRIGGER', False)
+        logger.info(f"接收到手动触发参数: MANUAL_TRIGGER={is_manual_trigger}")
     
     # 获取执行日期
     dag_run = kwargs.get('dag_run')
@@ -436,9 +436,9 @@ def prepare_pipeline_dag_schedule(**kwargs):
             logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
             need_create_plan = True
     
-    # 强制刷新模式覆盖以上判断
-    if is_force_refresh:
-        logger.info("强制刷新模式,将创建新的执行计划")
+    # 手动触发模式覆盖以上判断
+    if is_manual_trigger:
+        logger.info("手动触发模式,将创建新的执行计划")
         need_create_plan = True
     
     # 如果不需要创建新的执行计划,直接返回
@@ -603,7 +603,7 @@ with DAG(
         'retry_delay': timedelta(minutes=5)
     },
     params={
-        'FORCE_REFRESH': False,  # 添加强制刷新参数,默认为False
+        'MANUAL_TRIGGER': True, 
     },
 ) as dag:
     

+ 1 - 1
dags/dag_dataops_pipeline_summary_scheduler.py

@@ -358,7 +358,7 @@ with DAG(
         external_task_id="data_processing_phase.processing_completed",
         mode="reschedule",  # 使用 reschedule 模式,不会占用 worker
         timeout=7200,  # 等待超时时间为 2 小时
-        poke_interval=30,  # 每15秒检查一次
+        poke_interval=30,  # 每30秒检查一次
         allowed_states=["success", "failed", "skipped"],  # 允许的状态包括成功、失败和跳过
         failed_states=None,  # 不设置失败状态,确保无论主 DAG 状态如何都会继续执行
         execution_date_fn=print_target_date,  # 用于调试的日期打印函数

+ 68 - 17
dataops/scripts/book_sale_amt_daily_clean.py

@@ -19,10 +19,19 @@ logging.basicConfig(
 
 logger = logging.getLogger("book_sale_amt_daily_clean")
 
-def clean_daily_book_sales():
+def clean_daily_book_sales(table_name=None, exec_date=None, execution_mode=None, script_name=None):
     """清洗日度图书销售额数据的函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
+    # 获取当前脚本的文件名(如果没有传入)
+    if script_name is None:
+        script_name = os.path.basename(__file__)
+    
+    # 打印所有传入的参数
+    logger.info(f"===== 传入参数信息 (处理函数内) =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"exec_date: {exec_date}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"script_name: {script_name}")
+    logger.info(f"======================================")
     
     logger.info(f"开始执行日度图书销售额数据清洗 - 脚本: {script_name}")
     
@@ -33,10 +42,30 @@ def clean_daily_book_sales():
         
         logger.info("执行数据清洗流程...")
         
-        # 模拟处理步骤
-        today = datetime.now()
-        yesterday = today - timedelta(days=1)
-        date_str = yesterday.strftime('%Y-%m-%d')
+        # 尝试使用传入的日期,如果没有则使用昨天
+        if exec_date:
+            if isinstance(exec_date, str):
+                try:
+                    date_obj = datetime.strptime(exec_date, '%Y-%m-%d')
+                    date_str = exec_date
+                except ValueError:
+                    today = datetime.now()
+                    yesterday = today - timedelta(days=1)
+                    date_str = yesterday.strftime('%Y-%m-%d')
+                    logger.warning(f"无法解析传入的exec_date: {exec_date},使用昨天日期: {date_str}")
+            else:
+                try:
+                    date_str = exec_date.strftime('%Y-%m-%d')
+                except:
+                    today = datetime.now()
+                    yesterday = today - timedelta(days=1)
+                    date_str = yesterday.strftime('%Y-%m-%d')
+                    logger.warning(f"无法格式化传入的exec_date,使用昨天日期: {date_str}")
+        else:
+            today = datetime.now()
+            yesterday = today - timedelta(days=1)
+            date_str = yesterday.strftime('%Y-%m-%d')
+            logger.info(f"未传入exec_date,使用昨天日期: {date_str}")
         
         logger.info(f"正在清洗 {date_str} 的数据")
         
@@ -65,34 +94,46 @@ def clean_daily_book_sales():
         # 模拟写入数据库
         success_rate = random.random()
         if success_rate > 0.1:  # 90%的成功率
-            logger.info(f"表 {date_str} 数据清洗成功,已处理并写入")
+            logger.info(f"表 {table_name} 数据清洗成功,处理日期: {date_str}")
             return True
         else:
-            logger.error(f"表 {date_str} 数据清洗或写入过程中出现随机错误")
+            logger.error(f"表 {table_name} 数据清洗或写入过程中出现随机错误")
             return False
     except Exception as e:
         logger.error(f"清洗日度图书销售额数据时出错: {str(e)}")
         return False
 
-def run(table_name, execution_mode, **kwargs):
+def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范
     
     参数:
         table_name (str): 要处理的表名
         execution_mode (str): 执行模式 (append/full_refresh)
+        exec_date: 执行日期
+        script_name: 脚本名称
         **kwargs: 其他可能的参数
     
     返回:
         bool: 执行成功返回True,否则返回False
     """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    # 打印所有传入的参数
+    logger.info(f"===== 传入参数信息 (入口函数内) =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"exec_date: {exec_date}")
+    logger.info(f"script_name: {script_name}")
+    
+    # 打印所有可能的额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}")
+    logger.info(f"======================================")
     
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
+    # 如果没有提供脚本名,使用当前脚本的文件名
+    if script_name is None:
+        script_name = os.path.basename(__file__)
     
     # 记录详细的执行信息
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
     logger.info(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
     
     # 根据执行模式判断处理逻辑
@@ -103,7 +144,12 @@ def run(table_name, execution_mode, **kwargs):
         logger.info("执行增量模式 - 只清洗最新一天的数据")
     
     # 调用实际处理函数
-    result = clean_daily_book_sales()
+    result = clean_daily_book_sales(
+        table_name=table_name, 
+        exec_date=exec_date,
+        execution_mode=execution_mode, 
+        script_name=script_name
+    )
     
     logger.info(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
     logger.info(f"处理结果: {'成功' if result else '失败'}")
@@ -111,5 +157,10 @@ def run(table_name, execution_mode, **kwargs):
     return result
 
 if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="book_sale_amt_daily", execution_mode="append") 
+    # 直接执行时调用统一入口函数,带上所有参数作为测试
+    run(
+        table_name="book_sale_amt_daily", 
+        execution_mode="append",
+        exec_date=datetime.now().strftime('%Y-%m-%d'),
+        script_name=os.path.basename(__file__)
+    ) 

+ 52 - 15
dataops/scripts/books_total_process.py

@@ -4,6 +4,7 @@
 import logging
 import sys
 import os
+from datetime import datetime
 
 # 配置日志记录器
 logging.basicConfig(
@@ -16,39 +17,75 @@ logging.basicConfig(
 
 logger = logging.getLogger("book_total_process")
 
-def process_book_data():
+def process_book_data(table_name=None, execution_date=None, execution_mode=None, script_name=None):
     """处理图书数据的示例函数"""
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-       
-    # 使用logger.info输出脚本名称
-    logger.info(f"当前脚本名称是 {script_name} - 来自logger.info输出")
+    # 获取当前脚本的文件名(如果没有传入)
+    if script_name is None:
+        script_name = os.path.basename(__file__)
+    
+    # 使用print输出所有参数
+    print(f"===== 参数信息 (print输出) =====")
+    print(f"table_name: {table_name}")
+    print(f"exec_date: {execution_date}")
+    print(f"execution_mode: {execution_mode}")
+    print(f"script_name: {script_name}")
+    print(f"================================")
+    
+    # 使用logger.info输出所有参数
+    logger.info(f"===== 参数信息 (logger输出) =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"exec_date: {execution_date}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"script_name: {script_name}")
+    logger.info(f"================================")
     
     return True
 
-def run(table_name, execution_mode, **kwargs):
+def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范
     
     参数:
         table_name (str): 要处理的表名
         execution_mode (str): 执行模式 (append/full_refresh)
+        exec_date: 执行日期
+        script_name: 脚本名称
         **kwargs: 其他可能的参数
     
     返回:
         bool: 执行成功返回True,否则返回False
     """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始执行脚本...")
+    
+    # 打印所有传入的参数
+    logger.info(f"===== 传入参数信息 =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"exec_date: {exec_date}")
+    logger.info(f"script_name: {script_name}")
     
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
+    # 打印所有可能的额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}")
+    logger.info(f"========================")
     
-    # 同时使用print和logger输出以便比较
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    # 如果没有传入script_name,使用当前脚本名
+    if script_name is None:
+        script_name = os.path.basename(__file__)
     
     # 实际调用内部处理函数
-    return process_book_data()
+    return process_book_data(
+        table_name=table_name,
+        execution_date=exec_date,
+        execution_mode=execution_mode,
+        script_name=script_name
+    )
 
 if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="books", execution_mode="full_refresh")
+    # 直接执行时调用统一入口函数,传入测试参数
+    run(
+        table_name="books", 
+        execution_mode="full_refresh",
+        exec_date=datetime.now(),
+        script_name=os.path.basename(__file__)
+    )

+ 46 - 15
dataops/scripts/load_data.py

@@ -17,43 +17,74 @@ logging.basicConfig(
 
 logger = logging.getLogger("load_data")
 
-def load_data_from_source(source_name="default", execution_date=None):
+def load_data_from_source(source_name="default", execution_date=None, execution_mode=None, script_name=None):
     """从数据源加载数据的示例函数"""
     if execution_date is None:
         execution_date = datetime.now()
     
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
+    # 获取当前脚本的文件名(如果没有传入)
+    if script_name is None:
+        script_name = os.path.basename(__file__)
     
-    # 使用print输出脚本名称
-    print(f"当前脚本名称是 {script_name} - 来自print输出 - 正在处理{source_name}数据")
+    # 使用print输出所有参数
+    print(f"===== 参数信息 (print输出) =====")
+    print(f"table_name: {source_name}")
+    print(f"exec_date: {execution_date}")
+    print(f"execution_mode: {execution_mode}")
+    print(f"script_name: {script_name}")
+    print(f"================================")
     
-    # 使用logger.info输出脚本名称
-    logger.info(f"当前脚本名称是 {script_name} - 来自logger.info输出 - 执行日期: {execution_date}")
+    # 使用logger.info输出所有参数
+    logger.info(f"===== 参数信息 (logger输出) =====")
+    logger.info(f"table_name: {source_name}")
+    logger.info(f"exec_date: {execution_date}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"script_name: {script_name}")
+    logger.info(f"================================")
     
     return True
 
-def run(table_name, execution_mode, **kwargs):
+def run(table_name, execution_mode, exec_date=None, script_name=None, **kwargs):
     """
     统一入口函数,符合Airflow动态脚本调用规范
     
     参数:
         table_name (str): 要处理的表名
         execution_mode (str): 执行模式 (append/full_refresh)
+        exec_date: 执行日期
+        script_name: 脚本名称
         **kwargs: 其他可能的参数
     
     返回:
         bool: 执行成功返回True,否则返回False
     """
-    logger.info(f"通过统一入口函数run()调用 - 处理表: {table_name}, 模式: {execution_mode}")
+    logger.info(f"开始执行脚本...")
     
-    # 获取当前脚本的文件名
-    script_name = os.path.basename(__file__)
-    logger.info(f"[统一入口] 脚本 {script_name} 正在处理表 {table_name}, 模式: {execution_mode}")
+    # 打印所有传入的参数
+    logger.info(f"===== 传入参数信息 =====")
+    logger.info(f"table_name: {table_name}")
+    logger.info(f"execution_mode: {execution_mode}")
+    logger.info(f"exec_date: {exec_date}")
+    logger.info(f"script_name: {script_name}")
+    
+    # 打印所有可能的额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}")
+    logger.info(f"========================")
     
     # 实际调用内部处理函数
-    return load_data_from_source(source_name=table_name)
+    return load_data_from_source(
+        source_name=table_name, 
+        execution_date=exec_date, 
+        execution_mode=execution_mode, 
+        script_name=script_name
+    )
 
 if __name__ == "__main__":
-    # 直接执行时调用统一入口函数
-    run(table_name="test_table", execution_mode="append")
+    # 直接执行时调用统一入口函数,传入测试参数
+    run(
+        table_name="test_table", 
+        execution_mode="append", 
+        exec_date=datetime.now(),
+        script_name=os.path.basename(__file__)
+    )