Переглянути джерело

准备修改airflow_exec_plan表,并测试时间戳的生成

wangxq 1 місяць тому
батько
коміт
0c1735befc

+ 25 - 16
dags/dag_dataops_pipeline_data_scheduler.py

@@ -16,6 +16,7 @@ import logging
 import networkx as nx
 import json
 import os
+import pendulum
 from decimal import Decimal
 from common import (
     get_pg_conn, 
@@ -436,11 +437,13 @@ def filter_invalid_tables(tables_info):
 
 def prepare_dag_schedule(**kwargs):
     """准备DAG调度任务的主函数"""
-    exec_date = kwargs.get('ds') or get_today_date()
-    execution_date = kwargs.get('execution_date')
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
     
     # 记录重要的时间参数
-    logger.info(f"【时间参数】prepare_dag_schedule: ds={exec_date}, execution_date={execution_date}")
+    logger.info(f"【时间参数】prepare_dag_schedule: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     
     # 1. 获取启用的表
@@ -543,11 +546,13 @@ def check_execution_plan(**kwargs):
     检查执行计划是否存在且有效
     返回False将阻止所有下游任务执行
     """
-    execution_date = kwargs.get('execution_date')
-    exec_date = kwargs.get('ds') or get_today_date()
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
     
     # 记录重要的时间参数
-    logger.info(f"【时间参数】check_execution_plan: ds={exec_date}, execution_date={execution_date}")
+    logger.info(f"【时间参数】check_execution_plan: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info("检查数据库中的执行计划是否存在且有效")
     
     # 从数据库获取执行计划
@@ -643,11 +648,13 @@ def get_table_dependencies(table_names):
 def create_execution_plan(**kwargs):
     """准备执行计划的函数,使用从准备阶段传递的数据"""
     try:
-        execution_date = kwargs.get('execution_date')
-        exec_date = kwargs.get('ds') or get_today_date()
+        dag_run = kwargs.get('dag_run')
+        logical_date = dag_run.logical_date
+        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+        exec_date = local_logical_date.strftime('%Y-%m-%d')
         
         # 记录重要的时间参数
-        logger.info(f"【时间参数】create_execution_plan: ds={exec_date}, execution_date={execution_date}")
+        logger.info(f"【时间参数】create_execution_plan: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
         
         # 从XCom获取执行计划
         execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
@@ -807,7 +814,7 @@ def get_execution_stats(exec_date):
             }
         
         dag_run = dag_runs[0]
-        logger.debug(f"找到最近的DAG运行: execution_date={dag_run.execution_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
+        logger.debug(f"找到最近的DAG运行: logical_date={dag_run.logical_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
         
         # 直接查询最近更新的任务实例,不再通过execution_date过滤
         # 只通过dag_id过滤,按更新时间降序排序
@@ -864,7 +871,7 @@ def get_execution_stats(exec_date):
         # 汇总统计信息
         stats = {
             "exec_date": exec_date,
-            "dag_run_execution_date": dag_run.execution_date,
+            "dag_run_logical_date": dag_run.logical_date,
             "dag_run_updated_at": dag_run.updated_at,
             "total_tasks": total_tasks,
             "type_counts": type_counts,
@@ -904,11 +911,13 @@ def generate_execution_report(exec_date, stats):
 
 def summarize_execution(**kwargs):
     """简化的汇总执行情况函数,只判断整个作业是否成功"""
-    exec_date = kwargs.get('ds') or get_today_date()
-    execution_date = kwargs.get('execution_date')
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
     
-    # 记录重要的时间参数,仅供参考
-    logger.debug(f"【时间参数】summarize_execution: ds={exec_date}, execution_date={execution_date}")
+    # 记录重要的时间参数
+    logger.debug(f"【时间参数】summarize_execution: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.debug(f"开始汇总执行日期 {exec_date} 的执行情况")
     
     # 获取任务实例对象
@@ -939,7 +948,7 @@ def summarize_execution(**kwargs):
         # 获取状态
         dag_run = dag_runs[0]  # 取最近更新的DAG运行
         state = dag_run.state
-        logger.debug(f"找到最近的DAG运行: execution_date={dag_run.execution_date}, updated_at={dag_run.updated_at}, state={state}")
+        logger.debug(f"找到最近的DAG运行: logical_date={dag_run.logical_date}, updated_at={dag_run.updated_at}, state={state}")
         logger.debug(f"DAG {dag_id} 的状态为: {state}")
         
         # 判断是否成功

+ 17 - 7
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -11,6 +11,7 @@ import re
 import glob
 from pathlib import Path
 import hashlib
+import pendulum
 from common import (
     get_pg_conn, 
     get_neo4j_driver,
@@ -358,7 +359,10 @@ def prepare_pipeline_dag_schedule(**kwargs):
         logger.info(f"接收到强制刷新参数: FORCE_REFRESH={is_force_refresh}")
     
     # 获取执行日期
-    exec_date = kwargs.get('ds') or get_today_date()
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
     logger.info(f"开始准备执行日期 {exec_date} 的Pipeline调度任务")
     
     # 检查是否需要创建新的执行计划
@@ -367,7 +371,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
     # 条件1: 数据库中不存在当天的执行计划
     has_plan_in_db = check_execution_plan_in_db(exec_date)
     if not has_plan_in_db:
-        logger.info(f"数据库中不存在执行日期 {exec_date} 的执行计划,需要创建新的执行计划")
+        logger.info(f"数据库中不存在执行日期exec_date {exec_date} 的执行计划,需要创建新的执行计划")
         need_create_plan = True
     
     # 条件2: schedule_status表中的数据发生了变更
@@ -505,6 +509,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
                 dag_id = dag_run.dag_id
                 run_id = dag_run.run_id
                 logical_date = dag_run.logical_date
+                local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
             else:
                 # 如果无法获取dag_run,使用默认值
                 dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dag_dataops_pipeline_prepare_scheduler"
@@ -516,7 +521,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
                 execution_plan=execution_plan,
                 dag_id=dag_id,
                 run_id=run_id,
-                logical_date=logical_date,
+                logical_date=local_logical_date,
                 ds=exec_date
             )
             
@@ -545,8 +550,14 @@ def check_execution_plan_db(**kwargs):
     返回False将阻止所有下游任务执行
     """
     # 获取执行日期
-    exec_date = kwargs.get('ds') or get_today_date()
-    logger.info(f"检查执行日期 {exec_date} 的执行计划是否存在于数据库中")
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    logger.info(f"logical_date: {logical_date} ")
+    logger.info(f"local_logical_date {local_logical_date} ")
+    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
+   
     
     # 检查数据库中是否存在执行计划
     conn = get_pg_conn()
@@ -568,8 +579,7 @@ def check_execution_plan_db(**kwargs):
         # 检查执行计划内容是否有效
         try:
             # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
-            plan_data = result[0]
-            
+            plan_data = result[0]            
             # 检查必要字段
             if "exec_date" not in plan_data:
                 logger.error("执行计划缺少exec_date字段")