Sfoglia il codice sorgente

添加 get_cn_exec_date()函数

wangxq 3 settimane fa
parent
commit
80395761d1
1 ha cambiato i file con 6 aggiunte e 8 eliminazioni
  1. 6 8
      dags/dataops_productline_execute_dag.py

+ 6 - 8
dags/dataops_productline_execute_dag.py

@@ -67,7 +67,7 @@ class DecimalEncoder(json.JSONEncoder):
         return super(DecimalEncoder, self).default(obj)
     
 
-def get_logical_exec_date(logical_date):
+def get_cn_exec_date(logical_date):
     """
     获取逻辑执行日期
     
@@ -78,7 +78,9 @@ def get_logical_exec_date(logical_date):
         logical_exec_date: 逻辑执行日期,北京时间
     """
     # 获取逻辑执行日期
-    return logical_date.in_timezone('Asia/Shanghai').to_date_string()
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    return exec_date
 
 
 #############################################
@@ -112,10 +114,6 @@ def execute_script(script_id, script_name, target_table, exec_date, script_exec_
     for key, value in kwargs.items():
         logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
 
-    # 获取逻辑执行日期
-    logical_exec_date = get_logical_exec_date(exec_date)
-    logger.info(f"逻辑执行日期: {logical_exec_date}")
-
     # 检查script_name是否为空
     if not script_name:
         logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
@@ -743,6 +741,7 @@ def check_execution_plan(**kwargs):
     """
     dag_run = kwargs.get('dag_run')
     logical_date = dag_run.logical_date
+
     local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
@@ -856,8 +855,7 @@ def create_execution_plan(**kwargs):
     try:
         dag_run = kwargs.get('dag_run')
         logical_date = dag_run.logical_date
-        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-        exec_date = local_logical_date.strftime('%Y-%m-%d')
+        exec_date = get_cn_exec_date(logical_date)
         
         # 检查是否是手动触发
         is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False