Ver Fonte

pipeline 版本基本调试通过

wangxq há 1 mês atrás
pai
commit
7ba906cca4

+ 1 - 263
dags/dag_dataops_pipeline_data_scheduler.py

@@ -767,250 +767,6 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
     finally:
         logger.info(f"===== 结束执行 {task_id} =====")
 
-#############################################
-# 第三阶段: 汇总阶段(Summary Phase)的函数
-#############################################
-
-def get_execution_stats(exec_date):
-    """
-    获取执行统计信息,使用Airflow的API获取执行状态
-    不再依赖airflow_dag_schedule表
-    """
-    from airflow.models import DagRun, TaskInstance
-    from airflow.utils.state import State
-    from sqlalchemy import desc
-    from airflow import settings
-    
-    # 记录原始输入参数,仅供参考
-    logger.debug(f"【执行日期】get_execution_stats接收到 exec_date: {exec_date}, 类型: {type(exec_date)}")
-    logger.debug(f"获取执行日期 {exec_date} 的执行统计信息")
-    
-    # 当前DAG ID
-    dag_id = "dag_dataops_pipeline_data_scheduler"
-    
-    try:
-        # 直接查询最近的DAG运行,不依赖于精确的执行日期
-        logger.debug("忽略精确的执行日期,直接查询最近的DAG运行")
-        session = settings.Session()
-        
-        # 查询最近的DAG运行,按updated_at降序排序
-        dag_runs = session.query(DagRun).filter(
-            DagRun.dag_id == dag_id
-        ).order_by(desc(DagRun.updated_at)).limit(1).all()
-        
-        if not dag_runs:
-            logger.warning(f"未找到DAG {dag_id} 的任何运行记录")
-            session.close()
-            return {
-                "exec_date": exec_date,
-                "total_tasks": 0,
-                "type_counts": {},
-                "success_count": 0,
-                "fail_count": 0,
-                "pending_count": 0,
-                "success_rate": 0,
-                "avg_duration": None,
-                "min_duration": None,
-                "max_duration": None,
-                "failed_tasks": []
-            }
-        
-        dag_run = dag_runs[0]
-        logical_date = dag_run.logical_date
-        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-        logger.debug(f"找到最近的DAG运行: logical_date={logical_date}, local_logical_date={local_logical_date}, updated_at={dag_run.updated_at}, state={dag_run.state}")
-        
-        # 直接查询最近更新的任务实例,不再通过execution_date过滤
-        # 只通过dag_id过滤,按更新时间降序排序
-        task_instances = session.query(TaskInstance).filter(
-            TaskInstance.dag_id == dag_id
-        ).order_by(desc(TaskInstance.updated_at)).limit(100).all()  # 获取最近100条记录
-        
-        # 日志记录找到的任务实例数量
-        logger.debug(f"找到 {len(task_instances)} 个最近的任务实例")
-        
-        # 关闭会话
-        session.close()
-        
-        # 统计任务状态
-        total_tasks = len(task_instances)
-        success_count = len([ti for ti in task_instances if ti.state == State.SUCCESS])
-        fail_count = len([ti for ti in task_instances if ti.state in (State.FAILED, State.UPSTREAM_FAILED)])
-        pending_count = total_tasks - success_count - fail_count
-        
-        # 计算成功率
-        success_rate = (success_count / total_tasks * 100) if total_tasks > 0 else 0
-        
-        # 计算执行时间
-        durations = []
-        for ti in task_instances:
-            if ti.start_date and ti.end_date:
-                duration = (ti.end_date - ti.start_date).total_seconds()
-                durations.append(duration)
-        
-        avg_duration = sum(durations) / len(durations) if durations else None
-        min_duration = min(durations) if durations else None
-        max_duration = max(durations) if durations else None
-        
-        # 分类统计信息
-        type_counts = {
-            "resource": len([ti for ti in task_instances if ti.task_id.startswith("resource_")]),
-            "model": len([ti for ti in task_instances if ti.task_id.startswith("model_")])
-        }
-        
-        # 获取失败任务详情
-        failed_tasks = []
-        for ti in task_instances:
-            if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
-                task_dict = {
-                    "task_id": ti.task_id,
-                    "state": ti.state,
-                }
-                
-                if ti.start_date and ti.end_date:
-                    task_dict["exec_duration"] = (ti.end_date - ti.start_date).total_seconds()
-                
-                failed_tasks.append(task_dict)
-        
-        # 汇总统计信息
-        stats = {
-            "exec_date": exec_date,
-            "logical_date": logical_date,
-            "local_logical_date": local_logical_date,
-            "dag_run_logical_date": dag_run.logical_date,
-            "dag_run_updated_at": dag_run.updated_at,
-            "total_tasks": total_tasks,
-            "type_counts": type_counts,
-            "success_count": success_count,
-            "fail_count": fail_count,
-            "pending_count": pending_count,
-            "success_rate": success_rate,
-            "avg_duration": avg_duration,
-            "min_duration": min_duration,
-            "max_duration": max_duration,
-            "failed_tasks": failed_tasks
-        }
-        
-        return stats
-    except Exception as e:
-        logger.error(f"获取执行统计信息时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 在出错时显式抛出异常,确保任务失败
-        raise
-
-def generate_execution_report(exec_date, stats):
-    """生成简化的执行报告,只包含基本信息"""
-    # 构建简化报告
-    report = []
-    report.append(f"========== 数据运维系统执行报告 ==========")
-    report.append(f"执行日期: {exec_date}")
-    report.append(f"总任务数: {stats['total_tasks']}")
-    
-    # 将报告转换为字符串
-    report_str = "\n".join(report)
-    
-    # 记录到日志
-    logger.info("\n" + report_str)
-    
-    return report_str
-
-def summarize_execution(**kwargs):
-    """简化的汇总执行情况函数,只判断整个作业是否成功"""
-    dag_run = kwargs.get('dag_run')
-    logical_date = dag_run.logical_date
-    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
-    exec_date = local_logical_date.strftime('%Y-%m-%d')
-    
-    # 记录重要的时间参数
-    logger.debug(f"【时间参数】summarize_execution: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
-    logger.debug(f"开始汇总执行日期 {exec_date} 的执行情况")
-    
-    # 获取任务实例对象
-    task_instance = kwargs.get('ti')
-    dag_id = task_instance.dag_id
-    
-    # 获取DAG运行状态信息 - 直接查询最近的运行
-    from airflow.models import DagRun
-    from airflow.utils.state import State
-    from sqlalchemy import desc
-    from airflow import settings
-    
-    logger.debug("直接查询最近更新的DAG运行记录,不依赖执行日期")
-    session = settings.Session()
-    
-    # 查询最近的DAG运行,按updated_at降序排序
-    dag_runs = session.query(DagRun).filter(
-        DagRun.dag_id == dag_id
-    ).order_by(desc(DagRun.updated_at)).limit(1).all()
-    
-    session.close()
-    
-    if not dag_runs or len(dag_runs) == 0:
-        logger.warning(f"未找到DAG {dag_id} 的任何运行记录")
-        state = "UNKNOWN"
-        success = False
-    else:
-        # 获取状态
-        dag_run = dag_runs[0]  # 取最近更新的DAG运行
-        state = dag_run.state
-        logger.debug(f"找到最近的DAG运行: logical_date={dag_run.logical_date}, updated_at={dag_run.updated_at}, state={state}")
-        logger.debug(f"DAG {dag_id} 的状态为: {state}")
-        
-        # 判断是否成功
-        success = (state == State.SUCCESS)
-    
-    # 获取更详细的执行统计信息 - 直接调用get_execution_stats而不关心具体日期
-    stats = get_execution_stats(exec_date)
-    
-    # 创建简单的报告
-    if success:
-        report = f"DAG {dag_id} 在 {exec_date} 的执行成功完成。"
-        if stats:
-            report += f" 总共有 {stats.get('total_tasks', 0)} 个任务," \
-                      f"其中成功 {stats.get('success_count', 0)} 个," \
-                      f"失败 {stats.get('fail_count', 0)} 个。"
-    else:
-        report = f"DAG {dag_id} 在 {exec_date} 的执行未成功完成,状态为: {state}。"
-        if stats and stats.get('failed_tasks'):
-            report += f" 有 {len(stats.get('failed_tasks', []))} 个任务失败。"
-    
-    # 记录执行结果
-    logger.info(report)
-    
-    # 如果 stats 为空或缺少total_tasks字段,创建一个完整的状态信息
-    if not stats or 'total_tasks' not in stats:
-        stats = {
-            "exec_date": exec_date,
-            "logical_date": logical_date,
-            "local_logical_date": local_logical_date,
-            "total_tasks": 0,
-            "type_counts": {},
-            "success_count": 0,
-            "fail_count": 0,
-            "pending_count": 0,
-            "success_rate": 0,
-            "avg_duration": None,
-            "min_duration": None,
-            "max_duration": None,
-            "failed_tasks": [],
-            "success": success,
-            "dag_id": dag_id,
-            "dag_run_state": state
-        }
-    else:
-        # 添加success状态到stats
-        stats["success"] = success
-    
-    # 将结果推送到XCom
-    task_instance.xcom_push(key='execution_stats', value=stats)
-    task_instance.xcom_push(key='execution_report', value=report)
-    task_instance.xcom_push(key='execution_success', value=success)
-    
-    # 生成简化的执行报告
-    simple_report = generate_execution_report(exec_date, stats)
-    
-    return simple_report
 
 # 添加新函数,用于从数据库获取执行计划
 def get_execution_plan_from_db(ds):
@@ -1181,27 +937,9 @@ with DAG(
         # 设置依赖
         start_processing >> processing_completed
     
-    #############################################
-    # 阶段3: 汇总阶段(Summary Phase)
-    #############################################
-    with TaskGroup("summary_phase") as summary_group:
-        # 汇总执行情况
-        summarize_task = PythonOperator(
-            task_id="summarize_execution",
-            python_callable=summarize_execution,
-            provide_context=True
-        )
-        
-        # 总结完成标记
-        summary_completed = EmptyOperator(
-            task_id="summary_completed"
-        )
-        
-        # 设置任务依赖
-        summarize_task >> summary_completed
     
     # 设置三个阶段之间的依赖关系
-    prepare_group >> data_group >> summary_group
+    prepare_group >> data_group
 
     # 尝试从数据库获取执行计划
     try:

+ 394 - 0
dags/dag_dataops_pipeline_summary_scheduler.py

@@ -0,0 +1,394 @@
+# dag_dataops_pipeline_summary_scheduler.py
+"""
+数据管道执行统计汇总 DAG
+
+功能:
+1. 依赖主数据处理 DAG (dag_dataops_pipeline_data_scheduler) 的完成
+2. 收集主 DAG 的执行统计信息
+3. 生成执行报告
+4. 无论主 DAG 执行成功与否都会运行
+"""
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from datetime import datetime, timedelta
+import logging
+import json
+import pendulum
+import pytz
+from airflow.models import DagRun, TaskInstance
+from airflow.utils.state import State
+from sqlalchemy import desc
+from airflow import settings
+from common import get_today_date
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+# 开启详细日志记录
+ENABLE_DEBUG_LOGGING = True
+
+def log_debug(message):
+    """记录调试日志,但只在启用调试模式时"""
+    if ENABLE_DEBUG_LOGGING:
+        logger.info(f"[DEBUG] {message}")
+
+def print_target_date(dt):
+    """
+    打印并返回执行日期信息,用于 ExternalTaskSensor
+    """
+    # 转换为中国时区
+    local_dt = pendulum.instance(dt).in_timezone('Asia/Shanghai')
+
+    logger.info(f"===== ExternalTaskSensor等待的目标日期信息 =====")
+    logger.info(f"源DAG: dag_dataops_pipeline_summary_scheduler")
+    logger.info(f"目标DAG: dag_dataops_pipeline_data_scheduler")
+    logger.info(f"目标任务: data_processing_phase.processing_completed")
+    logger.info(f"查找的执行日期(UTC): {dt}")
+    logger.info(f"查找的执行日期(北京时间): {local_dt}")
+    logger.info(f"日期字符串格式(UTC): {dt.strftime('%Y-%m-%dT%H:%M:%S')}")
+    logger.info(f"日期字符串格式(北京时间): {local_dt.strftime('%Y-%m-%dT%H:%M:%S')}")
+    logger.info(f"日期UTC时区: {dt.tzinfo}")
+    logger.info(f"日期类型: {type(dt)}")
+    logger.info(f"=======================================")
+    # 必须返回原始日期,不能修改
+    return dt
+
+
+def collect_pipeline_stats(**kwargs):
+    """
+    从 Airflow 元数据收集主 DAG 的执行统计信息
+    """
+    # 获取当前执行的日期和时间信息
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    
+    # 记录重要的时间参数
+    logger.info(f"【时间参数】collect_pipeline_stats: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+    logger.info(f"开始收集执行日期 {exec_date} 的管道执行统计信息")
+    
+    # 主 DAG 的 ID
+    target_dag_id = "dag_dataops_pipeline_data_scheduler"
+    
+    try:
+        # 创建数据库会话
+        session = settings.Session()
+        
+        # 查询最近的 DAG 运行记录,按照创建时间降序排序
+        dag_runs = session.query(DagRun).filter(
+            DagRun.dag_id == target_dag_id,
+            DagRun.execution_date == local_logical_date  # 只考虑当前执行日期及之前的运行
+        ).order_by(desc(DagRun.execution_date)).limit(1).all()
+        
+        if not dag_runs:
+            logger.warning(f"未找到 DAG {target_dag_id} 的运行记录")
+            session.close()
+            return {
+                "exec_date": exec_date,
+                "dag_id": target_dag_id,
+                "status": "NOT_FOUND",
+                "total_tasks": 0,
+                "success_count": 0,
+                "fail_count": 0,
+                "skipped_count": 0,
+                "upstream_failed_count": 0,
+                "duration": None,
+                "start_time": None,
+                "end_time": None
+            }
+        
+        # 获取最近的 DAG 运行
+        dag_run = dag_runs[0]
+        dag_run_id = dag_run.run_id
+        dag_start_time = dag_run.start_date
+        dag_end_time = dag_run.end_date
+        dag_state = dag_run.state
+        dag_execution_date = dag_run.execution_date
+        
+        # 计算 DAG 运行时间
+        dag_duration = None
+        if dag_start_time and dag_end_time:
+            dag_duration = (dag_end_time - dag_start_time).total_seconds()
+
+        
+        # 时区转换
+        if dag_start_time:
+            dag_start_time_local = pendulum.instance(dag_start_time).in_timezone('Asia/Shanghai')
+            dag_start_time_str = dag_start_time_local.strftime('%Y-%m-%d %H:%M:%S')
+        else:
+            dag_start_time_str = 'N/A'
+            
+        if dag_end_time:
+            dag_end_time_local = pendulum.instance(dag_end_time).in_timezone('Asia/Shanghai')
+            dag_end_time_str = dag_end_time_local.strftime('%Y-%m-%d %H:%M:%S')
+        else:
+            dag_end_time_str = 'N/A'
+
+            
+        # 获取所有相关的任务实例
+        task_instances = session.query(TaskInstance).filter(
+            TaskInstance.dag_id == target_dag_id,
+            TaskInstance.run_id == dag_run_id
+        ).all()
+        
+        # 关闭会话
+        session.close()
+        
+        # 统计任务状态信息
+        total_tasks = len(task_instances)
+        success_count = sum(1 for ti in task_instances if ti.state == State.SUCCESS)
+        fail_count = sum(1 for ti in task_instances if ti.state == State.FAILED)
+        skipped_count = sum(1 for ti in task_instances if ti.state == State.SKIPPED)
+        upstream_failed_count = sum(1 for ti in task_instances if ti.state == State.UPSTREAM_FAILED)
+        
+        # 统计各任务类型的数量
+        resource_task_count = sum(1 for ti in task_instances if "resource_" in ti.task_id)
+        model_task_count = sum(1 for ti in task_instances if "model_" in ti.task_id)
+        
+        # 获取执行时间最长的几个任务
+        task_durations = []
+        for ti in task_instances:
+            if ti.start_date and ti.end_date:
+                duration = (ti.end_date - ti.start_date).total_seconds()
+                task_durations.append({
+                    "task_id": ti.task_id,
+                    "duration": duration,
+                    "state": ti.state
+                })
+        
+        # 按持续时间降序排序
+        task_durations.sort(key=lambda x: x["duration"] if x["duration"] is not None else 0, reverse=True)
+        top_tasks_by_duration = task_durations[:5]  # 取前5个
+        
+        # 获取失败的任务
+        failed_tasks = []
+        for ti in task_instances:
+            if ti.state in [State.FAILED, State.UPSTREAM_FAILED]:
+                failed_task = {
+                    "task_id": ti.task_id,
+                    "state": ti.state,
+                    "try_number": ti.try_number,
+                }
+                if ti.start_date and ti.end_date:
+                    failed_task["duration"] = (ti.end_date - ti.start_date).total_seconds()
+                failed_tasks.append(failed_task)
+        
+        # 构建统计结果
+        stats = {
+            "exec_date": exec_date,
+            "dag_id": target_dag_id,
+            "dag_execution_date": dag_execution_date.isoformat() if dag_execution_date else None,
+            "dag_run_id": dag_run_id,
+            "status": dag_state,
+            "total_tasks": total_tasks,
+            "success_count": success_count,
+            "fail_count": fail_count,
+            "skipped_count": skipped_count,
+            "upstream_failed_count": upstream_failed_count,
+            "resource_task_count": resource_task_count,
+            "model_task_count": model_task_count,
+            "duration": dag_duration,
+            "start_time": dag_start_time_str,
+            "end_time": dag_end_time_str,
+            "top_tasks_by_duration": top_tasks_by_duration,
+            "failed_tasks": failed_tasks
+        }
+        
+        # 将统计结果保存到 XCom
+        kwargs['ti'].xcom_push(key='pipeline_stats', value=stats)
+        
+        logger.info(f"成功收集管道执行统计信息: 总任务数={total_tasks}, 成功={success_count}, 失败={fail_count}")
+        return stats
+    except Exception as e:
+        logger.error(f"收集管道执行统计信息时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回一个基本的错误信息
+        error_stats = {
+            "exec_date": exec_date,
+            "dag_id": target_dag_id,
+            "status": "ERROR",
+            "error": str(e),
+            "total_tasks": 0,
+            "success_count": 0,
+            "fail_count": 0
+        }
+        kwargs['ti'].xcom_push(key='pipeline_stats', value=error_stats)
+        return error_stats
+
+def generate_execution_report(**kwargs):
+    """
+    基于收集的统计信息生成执行报告
+    """
+    try:
+        # 从 XCom 获取统计信息
+        ti = kwargs['ti']
+        stats = ti.xcom_pull(task_ids='collect_pipeline_stats')
+        
+        if not stats:
+            logger.warning("未找到管道执行统计信息,无法生成报告")
+            report = "未找到管道执行统计信息,无法生成报告。"
+            ti.xcom_push(key='execution_report', value=report)
+            return report
+        
+        # 构建报告
+        report = []
+        report.append(f"\n========== Data pipeline 执行报告 ==========")
+        report.append(f"执行日期: {stats['exec_date']}")
+        report.append(f"DAG ID: {stats['dag_id']}")
+        report.append(f"Runn ID: {stats.get('dag_run_id', 'N/A')}")
+        report.append(f"状态: {stats['status']}")
+        report.append(f"总任务数: {stats['total_tasks']}")
+        
+        # 任务状态统计
+        report.append("\n--- 任务状态统计 ---")
+        report.append(f"成功任务: {stats['success_count']} 个")
+        report.append(f"失败任务: {stats['fail_count']} 个")
+        report.append(f"跳过任务: {stats.get('skipped_count', 0)} 个")
+        report.append(f"上游失败任务: {stats.get('upstream_failed_count', 0)} 个")
+        
+        # 任务类型统计
+        report.append("\n--- 任务类型统计 ---")
+        report.append(f"资源任务: {stats.get('resource_task_count', 0)} 个")
+        report.append(f"模型任务: {stats.get('model_task_count', 0)} 个")
+        
+        # 执行时间统计
+        report.append("\n--- 执行时间统计 ---")
+        if stats.get('duration') is not None:
+            hours, remainder = divmod(stats['duration'], 3600)
+            minutes, seconds = divmod(remainder, 60)
+            report.append(f"总执行时间: {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
+        else:
+            report.append("总执行时间: N/A")
+            
+        report.append(f"开始时间(北京时间): {stats.get('start_time', 'N/A')}")
+        report.append(f"结束时间(北京时间): {stats.get('end_time', 'N/A')}")
+        
+        # 执行时间最长的任务
+        top_tasks = stats.get('top_tasks_by_duration', [])
+        if top_tasks:
+            report.append("\n--- 执行时间最长的任务 ---")
+            for i, task in enumerate(top_tasks, 1):
+                duration_secs = task.get('duration', 0)
+                minutes, seconds = divmod(duration_secs, 60)
+                report.append(f"{i}. {task['task_id']}: {int(minutes)}分钟 {int(seconds)}秒 ({task['state']})")
+        
+        # 失败任务详情
+        failed_tasks = stats.get('failed_tasks', [])
+        if failed_tasks:
+            report.append("\n--- 失败任务详情 ---")
+            for i, task in enumerate(failed_tasks, 1):
+                report.append(f"{i}. 任务ID: {task['task_id']}")
+                report.append(f"   状态: {task['state']}")
+                report.append(f"   尝试次数: {task.get('try_number', 'N/A')}")
+                
+                if 'duration' in task:
+                    minutes, seconds = divmod(task['duration'], 60)
+                    report.append(f"   执行时间: {int(minutes)}分钟 {int(seconds)}秒")
+                else:
+                    report.append("   执行时间: N/A")
+        
+        # 总结
+        success_rate = 0
+        if stats['total_tasks'] > 0:
+            success_rate = (stats['success_count'] / stats['total_tasks']) * 100
+            
+        report.append("\n--- 总结 ---")
+        report.append(f"任务成功率: {success_rate:.2f}%")
+        
+        if stats['status'] == 'success':
+            report.append("管道执行成功完成!")
+        elif stats['status'] == 'failed':
+            report.append(f"管道执行失败。有 {stats['fail_count']} 个任务失败。")
+        else:
+            report.append(f"管道当前状态: {stats['status']}")
+        
+        report.append("\n========== 报告结束 ==========")
+        
+        # 将报告转换为字符串
+        report_str = "\n".join(report)
+        
+        # 记录到日志
+        logger.info("\n" + report_str)
+        
+        # 保存到 XCom
+        ti.xcom_push(key='execution_report', value=report_str)
+        
+        return report_str
+    except Exception as e:
+        logger.error(f"生成执行报告时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回一个简单的错误报告
+        error_report = f"生成执行报告时出错: {str(e)}"
+        kwargs['ti'].xcom_push(key='execution_report', value=error_report)
+        return error_report
+
+# 创建 DAG
+with DAG(
+    "dag_dataops_pipeline_summary_scheduler", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily", 
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 记录 DAG 实例化时的信息
+    now = datetime.now()
+    now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
+    default_exec_date = get_today_date()
+    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
+    
+    #############################################
+    # 等待阶段: 等待主 DAG 完成
+    #############################################
+    wait_for_pipeline_completion = ExternalTaskSensor(
+        task_id="wait_for_pipeline_completion",
+        external_dag_id="dag_dataops_pipeline_data_scheduler",
+        external_task_id="data_processing_phase.processing_completed",
+        mode="reschedule",  # 使用 reschedule 模式,不会占用 worker
+        timeout=7200,  # 等待超时时间为 2 小时
+        poke_interval=30,  # 每15秒检查一次
+        allowed_states=["success", "failed", "skipped"],  # 允许的状态包括成功、失败和跳过
+        failed_states=None,  # 不设置失败状态,确保无论主 DAG 状态如何都会继续执行
+        execution_date_fn=print_target_date,  # 用于调试的日期打印函数
+        dag=dag
+    )
+    
+    #############################################
+    # 统计阶段: 收集和生成统计信息
+    #############################################
+    collect_stats = PythonOperator(
+        task_id="collect_pipeline_stats",
+        python_callable=collect_pipeline_stats,
+        provide_context=True,
+        dag=dag
+    )
+    
+    generate_report = PythonOperator(
+        task_id="generate_execution_report",
+        python_callable=generate_execution_report,
+        provide_context=True,
+        dag=dag
+    )
+    
+    #############################################
+    # 完成阶段: 标记汇总完成
+    #############################################
+    summary_completed = EmptyOperator(
+        task_id="summary_completed",
+        dag=dag
+    )
+    
+    # 设置任务依赖
+    wait_for_pipeline_completion >> collect_stats >> generate_report >> summary_completed