Sfoglia il codice sorgente

完成对execute_dag和prepare_dag.py的修改,不再从参数中判断是否手工触发了,web ui的参数仅做展示用途。

wangxq 1 settimana fa
parent
commit
b6b86838a9

+ 10 - 10
dags/dataops_productline_execute_dag.py

@@ -545,10 +545,12 @@ def check_execution_plan(**kwargs):
     exec_date, local_logical_date = get_cn_exec_date(logical_date)
     
     # 检查是否是手动触发
-    is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
-    if is_manual_trigger:
+    dag_run = kwargs['dag_run']
+    logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
+
+    if dag_run.external_trigger:
         logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
-    
+       
     # 记录重要的时间参数
     logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info("检查数据库中的执行计划是否存在且有效")
@@ -947,10 +949,10 @@ def create_execution_plan(**kwargs):
         dag_run = kwargs.get('dag_run')
         logical_date = dag_run.logical_date
         exec_date, local_logical_date = get_cn_exec_date(logical_date)
-        
+
+        logger.info(f"This DAG run was triggered via: {dag_run.run_type}")        
         # 检查是否是手动触发
-        is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
-        if is_manual_trigger:
+        if dag_run.external_trigger:
             logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
         
         # 记录重要的时间参数
@@ -961,7 +963,7 @@ def create_execution_plan(**kwargs):
         
         # 如果找不到执行计划,则从数据库获取
         if not execution_plan:
-            logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
+            logger.info(f"未从XCom中找到执行计划,从数据库获取。使用执行日期: {exec_date}")
             execution_plan = get_execution_plan_from_db(exec_date)
             
             if not execution_plan:
@@ -1004,9 +1006,7 @@ with DAG(
         'retries': 1,
         'retry_delay': timedelta(minutes=5)
     },
-    params={
-        'MANUAL_TRIGGER': False, 
-    }
+    params={"TRIGGERED_VIA_UI": True},# 触发 UI 弹出配置页面
 ) as dag:
     
     # 记录DAG实例化时的重要信息

+ 13 - 14
dags/dataops_productline_prepare_dag.py

@@ -725,11 +725,12 @@ def check_execution_plan_in_db(**kwargs):
     """
     # 获取执行日期
     dag_run = kwargs.get('dag_run')
-    logical_date = dag_run.logical_date
+    logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
+    logical_date = dag_run.logical_date    
     exec_date, local_logical_date = get_cn_exec_date(logical_date)
     logger.info(f"logical_date: {logical_date} ")
-    logger.info(f"local_logical_date {local_logical_date} ")
-    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
+    logger.info(f"local_logical_date {local_logical_date} ")
+    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
    
     # 检查数据库中是否存在执行计划
     try:
@@ -1169,18 +1170,18 @@ def prepare_productline_dag_schedule(**kwargs):
     try:
         # 检查是否是手动触发模式
         is_manual_trigger = False
-        params = kwargs.get('params', {})
-        if params and 'MANUAL_TRIGGER' in params:
-            is_manual_trigger = params.get('MANUAL_TRIGGER', False)
-            if is_manual_trigger:
-                logger.info(f"接收到手动触发参数: MANUAL_TRIGGER={is_manual_trigger}")
+        dag_run = kwargs['dag_run']
+        logger.info(f"This DAG run was triggered via: {dag_run.run_type}")
+        
+        if dag_run.external_trigger:
+            is_manual_trigger = True
+            logger.info("This DAG run was manually triggered.")
         
         # 获取执行日期
         dag_run = kwargs.get('dag_run')
         logical_date = dag_run.logical_date
-        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-        exec_date = local_logical_date.strftime('%Y-%m-%d')
-        logger.info(f"开始准备执行日期 {exec_date} 的产品线调度任务")
+        exec_date, local_logical_date = get_cn_exec_date(logical_date)
+        logger.info(f"开始准备执行日期 {exec_date} 的创建执行计划的调度任务")
         
         # 检查是否需要创建新的执行计划
         need_create_plan = False
@@ -1465,9 +1466,7 @@ with DAG(
         'retries': 1,
         'retry_delay': timedelta(minutes=5)
     },
-    params={
-        'MANUAL_TRIGGER': False, 
-    },
+    params={"TRIGGERED_VIA_UI": True},  # 触发 UI 弹出配置页面
 ) as dag:
     
     # 任务开始标记