فهرست منبع

prepare和data已经可以正常工作,后面计划重新添加一个summary的DAG.

wangxq 1 ماه پیش
والد
کامیت
c7c9aa395e
2فایلهای تغییر یافته به همراه90 افزوده شده و 106 حذف شده
  1. 29 21
      dags/dag_dataops_pipeline_data_scheduler.py
  2. 61 85
      dags/dag_dataops_pipeline_prepare_scheduler.py

+ 29 - 21
dags/dag_dataops_pipeline_data_scheduler.py

@@ -80,10 +80,10 @@ def validate_database_connection():
             
             # 检查最近的执行记录
             cursor.execute("""
-                SELECT ds, COUNT(*) as record_count
+                SELECT exec_date, COUNT(*) as record_count
                 FROM airflow_exec_plans
-                GROUP BY ds
-                ORDER BY ds DESC
+                GROUP BY exec_date
+                ORDER BY exec_date DESC
                 LIMIT 3
             """)
             recent_dates = cursor.fetchall()
@@ -443,7 +443,7 @@ def prepare_dag_schedule(**kwargs):
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
     # 记录重要的时间参数
-    logger.info(f"【时间参数】prepare_dag_schedule: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+    logger.info(f"【时间参数】prepare_dag_schedule: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     
     # 1. 获取启用的表
@@ -530,6 +530,8 @@ def prepare_dag_schedule(**kwargs):
     # 创建执行计划
     execution_plan = {
         "exec_date": exec_date,
+        "logical_date": logical_date,
+        "local_logical_date": local_logical_date,
         "resource_tasks": resource_tasks,
         "model_tasks": model_tasks,
         "dependencies": dependencies
@@ -552,7 +554,7 @@ def check_execution_plan(**kwargs):
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
     # 记录重要的时间参数
-    logger.info(f"【时间参数】check_execution_plan: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+    logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.info("检查数据库中的执行计划是否存在且有效")
     
     # 从数据库获取执行计划
@@ -654,7 +656,7 @@ def create_execution_plan(**kwargs):
         exec_date = local_logical_date.strftime('%Y-%m-%d')
         
         # 记录重要的时间参数
-        logger.info(f"【时间参数】create_execution_plan: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+        logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
         
         # 从XCom获取执行计划
         execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
@@ -814,7 +816,9 @@ def get_execution_stats(exec_date):
             }
         
         dag_run = dag_runs[0]
-        logger.debug(f"找到最近的DAG运行: logical_date={dag_run.logical_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
+        logical_date = dag_run.logical_date
+        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+        logger.debug(f"找到最近的DAG运行: logical_date={logical_date}, local_logical_date={local_logical_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
         
         # 直接查询最近更新的任务实例,不再通过execution_date过滤
         # 只通过dag_id过滤,按更新时间降序排序
@@ -871,6 +875,8 @@ def get_execution_stats(exec_date):
         # 汇总统计信息
         stats = {
             "exec_date": exec_date,
+            "logical_date": logical_date,
+            "local_logical_date": local_logical_date,
             "dag_run_logical_date": dag_run.logical_date,
             "dag_run_updated_at": dag_run.updated_at,
             "total_tasks": total_tasks,
@@ -917,7 +923,7 @@ def summarize_execution(**kwargs):
     exec_date = local_logical_date.strftime('%Y-%m-%d')
     
     # 记录重要的时间参数
-    logger.debug(f"【时间参数】summarize_execution: ds={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+    logger.debug(f"【时间参数】summarize_execution: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
     logger.debug(f"开始汇总执行日期 {exec_date} 的执行情况")
     
     # 获取任务实例对象
@@ -976,6 +982,8 @@ def summarize_execution(**kwargs):
     if not stats or 'total_tasks' not in stats:
         stats = {
             "exec_date": exec_date,
+            "logical_date": logical_date,
+            "local_logical_date": local_logical_date,
             "total_tasks": 0,
             "type_counts": {},
             "success_count": 0,
@@ -1030,11 +1038,11 @@ def get_execution_plan_from_db(ds):
     execution_plan = None
     
     try:
-        # 查询条件a: 当前日期=表的ds,如果有多条记录,取insert_time最大的一条
+        # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取insert_time最大的一条
         cursor.execute("""
             SELECT plan, run_id, insert_time
             FROM airflow_exec_plans
-            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND ds = %s
+            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND exec_date = %s
             ORDER BY insert_time DESC
             LIMIT 1
         """, (ds,))
@@ -1043,7 +1051,7 @@ def get_execution_plan_from_db(ds):
         if result:
             # 获取计划、run_id和insert_time
             plan_json, run_id, insert_time = result
-            logger.info(f"找到当前日期 ds={ds} 的执行计划记录,run_id: {run_id}, insert_time: {insert_time}")
+            logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录,run_id: {run_id}, insert_time: {insert_time}")
             
             # 处理plan_json可能已经是dict的情况
             if isinstance(plan_json, dict):
@@ -1053,21 +1061,21 @@ def get_execution_plan_from_db(ds):
                 
             return execution_plan
         
-        # 查询条件b: 找不到当前日期的记录,查找ds<当前ds的最新记录
-        logger.info(f"未找到当前日期 ds={ds} 的执行计划记录,尝试查找历史记录")
+        # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
+        logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
         cursor.execute("""
-            SELECT plan, run_id, insert_time, ds
+            SELECT plan, run_id, insert_time, exec_date
             FROM airflow_exec_plans
-            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND ds < %s
-            ORDER BY ds DESC, insert_time DESC
+            WHERE dag_id = 'dag_dataops_pipeline_prepare_scheduler' AND exec_date < %s
+            ORDER BY exec_date DESC, insert_time DESC
             LIMIT 1
         """, (ds,))
         result = cursor.fetchone()
         
         if result:
-            # 获取计划、run_id、insert_time和ds
+            # 获取计划、run_id、insert_time和exec_date
             plan_json, run_id, insert_time, plan_ds = result
-            logger.info(f"找到历史执行计划记录,ds: {plan_ds}, run_id: {run_id}, insert_time: {insert_time}")
+            logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}, run_id: {run_id}, insert_time: {insert_time}")
             
             # 处理plan_json可能已经是dict的情况
             if isinstance(plan_json, dict):
@@ -1078,7 +1086,7 @@ def get_execution_plan_from_db(ds):
             return execution_plan
         
         # 找不到任何执行计划记录
-        logger.error(f"在数据库中未找到任何执行计划记录,当前DAG ds={ds}")
+        logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
         return None
         
     except Exception as e:
@@ -1212,7 +1220,7 @@ with DAG(
         
         # 检查是否成功获取到执行计划
         if execution_plan is None:
-            error_msg = f"无法从数据库获取有效的执行计划,当前DAG ds={exec_date}"
+            error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
             logger.error(error_msg)
             # 使用全局变量而不是异常来强制DAG失败
             raise ValueError(error_msg)
@@ -1230,7 +1238,7 @@ with DAG(
         
         # 如果执行计划为空(没有任务),也应该失败
         if not resource_tasks and not model_tasks:
-            error_msg = f"执行计划中没有任何任务,当前DAG ds={exec_date}"
+            error_msg = f"执行计划中没有任何任务,当前DAG exec_date={exec_date}"
             logger.error(error_msg)
             raise ValueError(error_msg)
         

+ 61 - 85
dags/dag_dataops_pipeline_prepare_scheduler.py

@@ -283,26 +283,66 @@ def get_subscription_state_hash():
         cursor.close()
         conn.close()
 
-def check_execution_plan_in_db(exec_date):
+def check_execution_plan_in_db(**kwargs):
     """
-    检查数据库中是否已存在指定日期的执行计划
-    
-    参数:
-        exec_date (str): 执行日期,格式为YYYY-MM-DD
-        
-    返回:
-        bool: 如果存在返回True,否则返回False
+    检查当天的执行计划是否存在于数据库中
+    返回False将阻止所有下游任务执行
     """
+    # 获取执行日期
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    logger.info(f"logical_date: {logical_date} ")
+    logger.info(f"local_logical_date {local_logical_date} ")
+    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
+   
+    
+    # 检查数据库中是否存在执行计划
     conn = get_pg_conn()
     cursor = conn.cursor()
     try:
         cursor.execute("""
-            SELECT COUNT(*)
+            SELECT plan
             FROM airflow_exec_plans
-            WHERE ds = %s
+            WHERE exec_date = %s
+            ORDER BY logical_date DESC
+            LIMIT 1
         """, (exec_date,))
-        count = cursor.fetchone()[0]
-        return count > 0
+        
+        result = cursor.fetchone()
+        if not result:
+            logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
+            return False
+        
+        # 检查执行计划内容是否有效
+        try:
+            # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
+            plan_data = result[0]            
+            # 检查必要字段
+            if "exec_date" not in plan_data:
+                logger.error("执行计划缺少exec_date字段")
+                return False
+                
+            if not isinstance(plan_data.get("resource_tasks", []), list):
+                logger.error("执行计划的resource_tasks字段无效")
+                return False
+                
+            if not isinstance(plan_data.get("model_tasks", []), list):
+                logger.error("执行计划的model_tasks字段无效")
+                return False
+            
+            # 检查是否有任务数据
+            resource_tasks = plan_data.get("resource_tasks", [])
+            model_tasks = plan_data.get("model_tasks", [])
+            
+            logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
+            return True
+            
+        except Exception as je:
+            logger.error(f"处理执行计划数据时出错: {str(je)}")
+            return False
+        
     except Exception as e:
         logger.error(f"检查数据库中执行计划时出错: {str(e)}")
         return False
@@ -331,15 +371,18 @@ def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
         # 将执行计划转换为JSON字符串
         plan_json = json.dumps(execution_plan)
         
+        # 获取本地时间
+        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+        
         # 插入记录
         cursor.execute("""
             INSERT INTO airflow_exec_plans
-            (dag_id, run_id, logical_date, ds, plan)
-            VALUES (%s, %s, %s, %s, %s)
-        """, (dag_id, run_id, logical_date, ds, plan_json))
+            (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
+            VALUES (%s, %s, %s, %s, %s, %s)
+        """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
         
         conn.commit()
-        logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, ds={ds}")
+        logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
         return True
     except Exception as e:
         logger.error(f"保存执行计划到数据库时出错: {str(e)}")
@@ -369,7 +412,7 @@ def prepare_pipeline_dag_schedule(**kwargs):
     need_create_plan = False
     
     # 条件1: 数据库中不存在当天的执行计划
-    has_plan_in_db = check_execution_plan_in_db(exec_date)
+    has_plan_in_db = check_execution_plan_in_db(**kwargs)
     if not has_plan_in_db:
         logger.info(f"数据库中不存在执行日期exec_date {exec_date} 的执行计划,需要创建新的执行计划")
         need_create_plan = True
@@ -544,73 +587,6 @@ def prepare_pipeline_dag_schedule(**kwargs):
     
     return len(valid_tables)  # 返回有效表数量
 
-def check_execution_plan_db(**kwargs):
-    """
-    检查当天的执行计划是否存在于数据库中
-    返回False将阻止所有下游任务执行
-    """
-    # 获取执行日期
-    dag_run = kwargs.get('dag_run')
-    logical_date = dag_run.logical_date
-    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-    exec_date = local_logical_date.strftime('%Y-%m-%d')
-    logger.info(f"logical_date: {logical_date} ")
-    logger.info(f"local_logical_date {local_logical_date} ")
-    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
-   
-    
-    # 检查数据库中是否存在执行计划
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT plan
-            FROM airflow_exec_plans
-            WHERE ds = %s
-            ORDER BY logical_date DESC
-            LIMIT 1
-        """, (exec_date,))
-        
-        result = cursor.fetchone()
-        if not result:
-            logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
-            return False
-        
-        # 检查执行计划内容是否有效
-        try:
-            # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
-            plan_data = result[0]            
-            # 检查必要字段
-            if "exec_date" not in plan_data:
-                logger.error("执行计划缺少exec_date字段")
-                return False
-                
-            if not isinstance(plan_data.get("resource_tasks", []), list):
-                logger.error("执行计划的resource_tasks字段无效")
-                return False
-                
-            if not isinstance(plan_data.get("model_tasks", []), list):
-                logger.error("执行计划的model_tasks字段无效")
-                return False
-            
-            # 检查是否有任务数据
-            resource_tasks = plan_data.get("resource_tasks", [])
-            model_tasks = plan_data.get("model_tasks", [])
-            
-            logger.info(f"执行计划验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
-            return True
-            
-        except Exception as je:
-            logger.error(f"处理执行计划数据时出错: {str(je)}")
-            return False
-        
-    except Exception as e:
-        logger.error(f"检查数据库中执行计划时出错: {str(e)}")
-        return False
-    finally:
-        cursor.close()
-        conn.close()
-
 # 创建DAG
 with DAG(
     "dag_dataops_pipeline_prepare_scheduler",
@@ -648,7 +624,7 @@ with DAG(
     # 检查执行计划是否存在于数据库中
     check_plan_in_db = ShortCircuitOperator(
         task_id="check_execution_plan_in_db",
-        python_callable=check_execution_plan_db,
+        python_callable=check_execution_plan_in_db,
         provide_context=True,
         dag=dag
     )