Pārlūkot izejas kodu

单一文件的DAG我准备放弃了,目前unified三文件版本是可以正常工作的。

wangxq 1 mēnesi atpakaļ
vecāks
revīzija
5e51972be7

+ 1 - 0
dags/config.py

@@ -31,6 +31,7 @@ TASK_RETRY_CONFIG = {
 
 
 # 脚本文件基础路径配置
 # 脚本文件基础路径配置
 # 部署到 Airflow 环境时使用此路径
 # 部署到 Airflow 环境时使用此路径
+AIRFLOW_BASE_PATH='/opt/airflow'
 SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
 SCRIPTS_BASE_PATH = "/opt/airflow/dataops/scripts"
 
 
 # 本地开发环境脚本路径(如果需要区分环境)
 # 本地开发环境脚本路径(如果需要区分环境)

+ 249 - 183
dags/dag_dataops_unified_data_scheduler.py

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta, date
 import logging
 import logging
 import networkx as nx
 import networkx as nx
 import json
 import json
+import os
 from common import (
 from common import (
     get_pg_conn, 
     get_pg_conn, 
     get_neo4j_driver,
     get_neo4j_driver,
@@ -49,6 +50,15 @@ def get_all_tasks(exec_date):
     conn = get_pg_conn()
     conn = get_pg_conn()
     cursor = conn.cursor()
     cursor = conn.cursor()
     try:
     try:
+        # 查询数据表中记录总数
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s
+        """, (exec_date,))
+        total_count = cursor.fetchone()[0]
+        logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
+        
         # 查询所有资源表任务
         # 查询所有资源表任务
         cursor.execute("""
         cursor.execute("""
             SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
             SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
@@ -56,6 +66,7 @@ def get_all_tasks(exec_date):
             WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
             WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
         """, (exec_date,))
         """, (exec_date,))
         resource_results = cursor.fetchall()
         resource_results = cursor.fetchall()
+        logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
         
         
         # 查询所有模型表任务
         # 查询所有模型表任务
         cursor.execute("""
         cursor.execute("""
@@ -64,6 +75,7 @@ def get_all_tasks(exec_date):
             WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
             WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
         """, (exec_date,))
         """, (exec_date,))
         model_results = cursor.fetchall()
         model_results = cursor.fetchall()
+        logger.info(f"查询到 {len(model_results)} 条DataModel记录")
         
         
         # 整理资源表信息
         # 整理资源表信息
         resource_tasks = []
         resource_tasks = []
@@ -165,9 +177,27 @@ def prepare_unified_execution_plan(**kwargs):
         "dependencies": dependencies
         "dependencies": dependencies
     }
     }
     
     
+    # 记录资源任务和模型任务的名称,便于调试
+    resource_names = [task["target_table"] for task in resource_tasks]
+    model_names = [task["target_table"] for task in model_tasks]
+    logger.info(f"资源表任务: {resource_names}")
+    logger.info(f"模型表任务: {model_names}")
+    
     # 将执行计划保存到XCom,使用自定义序列化器处理日期对象
     # 将执行计划保存到XCom,使用自定义序列化器处理日期对象
-    kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
-    logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
+    try:
+        kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
+        logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
+    except Exception as e:
+        logger.error(f"将执行计划保存到XCom时出错: {str(e)}")
+        
+        # 保存执行计划到文件以备后用
+        try:
+            plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+            with open(plan_path, 'w') as f:
+                json.dump(execution_plan, f, default=json_serial)
+            logger.info(f"将执行计划保存到文件: {plan_path}")
+        except Exception as file_e:
+            logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
     
     
     return len(resource_tasks) + len(model_tasks)
     return len(resource_tasks) + len(model_tasks)
 
 
@@ -184,12 +214,20 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
     
-    return execute_with_monitoring(
-        target_table=target_table,
-        script_name=script_name,
-        script_exec_mode=script_exec_mode,
-        exec_date=exec_date
-    )
+    try:
+        # 正常调用执行监控函数
+        result = execute_with_monitoring(
+            target_table=target_table,
+            script_name=script_name,
+            script_exec_mode=script_exec_mode,
+            exec_date=exec_date
+        )
+        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
+        return result
+    except Exception as e:
+        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+        # 确保即使出错也返回结果,不会阻塞DAG
+        return False
 
 
 def process_model(target_table, script_name, script_exec_mode, exec_date):
 def process_model(target_table, script_name, script_exec_mode, exec_date):
     """处理单个模型表"""
     """处理单个模型表"""
@@ -204,12 +242,54 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
     
-    return execute_with_monitoring(
-        target_table=target_table,
-        script_name=script_name,
-        script_exec_mode=script_exec_mode,
-        exec_date=exec_date
-    )
+    try:
+        result = execute_with_monitoring(
+            target_table=target_table,
+            script_name=script_name,
+            script_exec_mode=script_exec_mode,
+            exec_date=exec_date
+        )
+        logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
+        return result
+    except Exception as e:
+        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
+        # 确保即使出错也返回结果,不会阻塞DAG
+        return False
+
+# 预先加载数据以创建任务
+try:
+    logger.info("预先加载执行计划数据用于构建DAG")
+    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+    execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
+    
+    if os.path.exists(plan_path):
+        try:
+            with open(plan_path, 'r') as f:
+                execution_plan_json = f.read()
+                execution_plan = json.loads(execution_plan_json)
+                logger.info(f"从文件加载执行计划: {plan_path}")
+        except Exception as e:
+            logger.warning(f"读取执行计划文件出错: {str(e)}")
+    else:
+        logger.warning(f"执行计划文件不存在: {plan_path},将创建基础DAG结构")
+        
+    # 提取信息
+    exec_date = execution_plan.get("exec_date", get_today_date())
+    resource_tasks = execution_plan.get("resource_tasks", [])
+    model_tasks = execution_plan.get("model_tasks", [])
+    dependencies = execution_plan.get("dependencies", {})
+    
+    logger.info(f"预加载执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
+except Exception as e:
+    logger.error(f"预加载执行计划数据时出错: {str(e)}")
+    exec_date = get_today_date()
+    resource_tasks = []
+    model_tasks = []
+    dependencies = {}
+
+# 定义处理DAG失败的回调函数
+def handle_dag_failure(context):
+    logger.error(f"DAG执行失败: {context.get('exception')}")
 
 
 # 创建DAG
 # 创建DAG
 with DAG(
 with DAG(
@@ -224,7 +304,8 @@ with DAG(
         'email_on_retry': False,
         'email_on_retry': False,
         'retries': 1,
         'retries': 1,
         'retry_delay': timedelta(minutes=5)
         'retry_delay': timedelta(minutes=5)
-    }
+    },
+    on_failure_callback=handle_dag_failure  # 在这里设置回调函数
 ) as dag:
 ) as dag:
     
     
     # 等待准备DAG完成
     # 等待准备DAG完成
@@ -232,9 +313,12 @@ with DAG(
         task_id="wait_for_prepare",
         task_id="wait_for_prepare",
         external_dag_id="dag_dataops_unified_prepare_scheduler",
         external_dag_id="dag_dataops_unified_prepare_scheduler",
         external_task_id="preparation_completed",
         external_task_id="preparation_completed",
-        mode="poke",
+        mode="reschedule",  # 改为reschedule模式,减少资源占用
         timeout=3600,
         timeout=3600,
         poke_interval=30,
         poke_interval=30,
+        execution_timeout=timedelta(hours=1),
+        soft_fail=True,
+        allowed_states=["success", "skipped"],
         dag=dag
         dag=dag
     )
     )
     
     
@@ -249,192 +333,174 @@ with DAG(
     # 处理完成标记
     # 处理完成标记
     processing_completed = EmptyOperator(
     processing_completed = EmptyOperator(
         task_id="processing_completed",
         task_id="processing_completed",
+        trigger_rule="none_failed_min_one_success",  # 只要有一个任务成功且没有失败的任务就标记为完成
         dag=dag
         dag=dag
     )
     )
     
     
     # 设置初始任务依赖
     # 设置初始任务依赖
     wait_for_prepare >> prepare_plan
     wait_for_prepare >> prepare_plan
     
     
-    # 从执行计划JSON中获取信息
-    execution_plan_json = '''{"exec_date": "2025-04-12", "resource_tasks": [], "model_tasks": [], "dependencies": {}}'''
+    # 任务字典,用于设置依赖关系
+    task_dict = {}
+    
+    # 1. 预先创建资源表任务
+    for task_info in resource_tasks:
+        table_name = task_info["target_table"]
+        script_name = task_info["script_name"]
+        exec_mode = task_info.get("script_exec_mode", "append")
+        
+        # 创建安全的任务ID
+        safe_table_name = table_name.replace(".", "_").replace("-", "_")
+        task_id = f"resource_{safe_table_name}"
+        
+        resource_task = PythonOperator(
+            task_id=task_id,
+            python_callable=process_resource,
+            op_kwargs={
+                "target_table": table_name,
+                "script_name": script_name,
+                "script_exec_mode": exec_mode,
+                "exec_date": exec_date  # 直接使用解析出的exec_date,不使用XCom
+            },
+            retries=TASK_RETRY_CONFIG["retries"],
+            retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
+            dag=dag
+        )
+        
+        # 将任务添加到字典
+        task_dict[table_name] = resource_task
+        
+        # 设置与prepare_plan的依赖 - 直接依赖,不需要其他条件
+        prepare_plan >> resource_task
+        logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
+    
+    # 创建有向图,用于检测模型表之间的依赖关系
+    G = nx.DiGraph()
+    
+    # 将所有模型表添加为节点
+    for task_info in model_tasks:
+        table_name = task_info["target_table"]
+        G.add_node(table_name)
+    
+    # 添加模型表之间的依赖边
+    for source, deps in dependencies.items():
+        for dep in deps:
+            if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
+                G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
     
     
+    # 检测循环依赖并处理
+    cycles = list(nx.simple_cycles(G))
+    if cycles:
+        logger.warning(f"检测到循环依赖: {cycles}")
+        for cycle in cycles:
+            G.remove_edge(cycle[-1], cycle[0])
+            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+    
+    # 生成拓扑排序,确定执行顺序
+    execution_order = []
     try:
     try:
-        # 从文件或数据库中获取执行计划作为默认值
-        import os
-        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-        if os.path.exists(plan_path):
-            with open(plan_path, 'r') as f:
-                execution_plan_json = f.read()
+        execution_order = list(nx.topological_sort(G))
+        logger.info(f"预加载计算的执行顺序: {execution_order}")
     except Exception as e:
     except Exception as e:
-        logger.warning(f"读取执行计划默认值时出错: {str(e)}")
+        logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
+        execution_order = [task_info["target_table"] for task_info in model_tasks]
     
     
-    # 解析执行计划获取任务信息
-    try:
-        execution_plan = json.loads(execution_plan_json)
-        exec_date = execution_plan.get("exec_date", get_today_date())
-        resource_tasks = execution_plan.get("resource_tasks", [])
-        model_tasks = execution_plan.get("model_tasks", [])
-        dependencies = execution_plan.get("dependencies", {})
-        
-        # 任务字典,用于设置依赖关系
-        task_dict = {}
-        
-        # 1. 创建资源表任务
-        for task_info in resource_tasks:
-            table_name = task_info["target_table"]
-            script_name = task_info["script_name"]
-            exec_mode = task_info.get("script_exec_mode", "append")
-            
-            # 创建安全的任务ID
-            safe_table_name = table_name.replace(".", "_").replace("-", "_")
-            task_id = f"resource_{safe_table_name}"
-            
-            resource_task = PythonOperator(
-                task_id=task_id,
-                python_callable=process_resource,
-                op_kwargs={
-                    "target_table": table_name,
-                    "script_name": script_name,
-                    "script_exec_mode": exec_mode,
-                    "exec_date": """{{ ti.xcom_pull(task_ids='prepare_execution_plan', key='execution_plan') }}"""
-                },
-                retries=TASK_RETRY_CONFIG["retries"],
-                retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
-                dag=dag
-            )
+    # 2. 预先创建模型表任务
+    for table_name in execution_order:
+        task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+        if not task_info:
+            continue
             
             
-            # 将任务添加到字典
-            task_dict[table_name] = resource_task
-            
-            # 设置与prepare_plan的依赖
-            prepare_plan >> resource_task
-        
-        # 创建有向图,用于检测模型表之间的依赖关系
-        G = nx.DiGraph()
+        script_name = task_info["script_name"]
+        exec_mode = task_info.get("script_exec_mode", "append")
         
         
-        # 将所有模型表添加为节点
-        for task_info in model_tasks:
-            table_name = task_info["target_table"]
-            G.add_node(table_name)
+        # 创建安全的任务ID
+        safe_table_name = table_name.replace(".", "_").replace("-", "_")
+        task_id = f"model_{safe_table_name}"
         
         
-        # 添加模型表之间的依赖边
-        for source, deps in dependencies.items():
-            for dep in deps:
-                if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                    G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
+        model_task = PythonOperator(
+            task_id=task_id,
+            python_callable=process_model,
+            op_kwargs={
+                "target_table": table_name,
+                "script_name": script_name,
+                "script_exec_mode": exec_mode,
+                "exec_date": exec_date  # 直接使用解析出的exec_date,不使用XCom
+            },
+            retries=TASK_RETRY_CONFIG["retries"],
+            retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
+            dag=dag
+        )
         
         
-        # 检测循环依赖并处理
-        cycles = list(nx.simple_cycles(G))
-        if cycles:
-            logger.warning(f"检测到循环依赖: {cycles}")
-            for cycle in cycles:
-                G.remove_edge(cycle[-1], cycle[0])
-                logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+        # 将任务添加到字典
+        task_dict[table_name] = model_task
         
         
-        # 生成拓扑排序,确定执行顺序
-        try:
-            execution_order = list(nx.topological_sort(G))
-            logger.info(f"计算出的执行顺序: {execution_order}")
-        except Exception as e:
-            logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
-            execution_order = [task_info["target_table"] for task_info in model_tasks]
+        # 设置依赖关系
+        deps = dependencies.get(table_name, [])
+        has_dependency = False
         
         
-        # 2. 按拓扑排序顺序创建模型表任务
-        for table_name in execution_order:
-            task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-            if not task_info:
-                continue
-                
-            script_name = task_info["script_name"]
-            exec_mode = task_info.get("script_exec_mode", "append")
-            
-            # 创建安全的任务ID
-            safe_table_name = table_name.replace(".", "_").replace("-", "_")
-            task_id = f"model_{safe_table_name}"
-            
-            model_task = PythonOperator(
-                task_id=task_id,
-                python_callable=process_model,
-                op_kwargs={
-                    "target_table": table_name,
-                    "script_name": script_name,
-                    "script_exec_mode": exec_mode,
-                    "exec_date": """{{ ti.xcom_pull(task_ids='prepare_execution_plan', key='execution_plan') }}"""
-                },
-                retries=TASK_RETRY_CONFIG["retries"],
-                retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
-                dag=dag
-            )
-            
-            # 将任务添加到字典
-            task_dict[table_name] = model_task
-            
-            # 设置依赖关系
-            deps = dependencies.get(table_name, [])
-            has_dependency = False
+        # 处理模型表之间的依赖
+        for dep in deps:
+            dep_table = dep.get("table_name")
+            dep_type = dep.get("table_type")
             
             
-            # 处理模型表之间的依赖
-            for dep in deps:
-                dep_table = dep.get("table_name")
-                dep_type = dep.get("table_type")
-                
-                if dep_table in task_dict:
-                    task_dict[dep_table] >> model_task
-                    has_dependency = True
-                    logger.info(f"设置依赖: {dep_table} >> {table_name}")
+            if dep_table in task_dict:
+                task_dict[dep_table] >> model_task
+                has_dependency = True
+                logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
+        
+        # 如果没有依赖,则依赖于准备任务和所有资源表任务
+        if not has_dependency:
+            # 从prepare_plan任务直接连接
+            prepare_plan >> model_task
+            logger.info(f"预先设置基本依赖: prepare_plan >> {task_id}")
             
             
-            # 如果没有依赖,则依赖于准备任务和所有资源表任务
-            if not has_dependency:
-                # 从prepare_plan任务直接连接
-                prepare_plan >> model_task
+            # 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
+            resource_count = 0
+            for resource_table in resource_tasks:
+                if resource_count >= 5:
+                    break
                 
                 
-                # 同时从所有资源表任务连接
-                for resource_table in resource_tasks:
-                    resource_name = resource_table["target_table"]
-                    if resource_name in task_dict:
-                        task_dict[resource_name] >> model_task
-                        logger.info(f"设置资源依赖: {resource_name} >> {table_name}")
-        
-        # 找出所有终端任务(没有下游依赖的任务)
-        terminal_tasks = []
-        
-        # 检查所有模型表任务
-        for table_name in execution_order:
-            # 检查是否有下游任务
-            has_downstream = False
-            for source, deps in dependencies.items():
-                if source == table_name:  # 跳过自身
-                    continue
-                for dep in deps:
-                    if dep.get("table_name") == table_name:
-                        has_downstream = True
-                        break
-                if has_downstream:
+                resource_name = resource_table["target_table"]
+                if resource_name in task_dict:
+                    task_dict[resource_name] >> model_task
+                    logger.info(f"预先设置资源依赖: {resource_name} >> {table_name}")
+                    resource_count += 1
+    
+    # 找出所有终端任务(没有下游依赖的任务)
+    terminal_tasks = []
+    
+    # 检查所有模型表任务
+    for table_name in execution_order:
+        # 检查是否有下游任务
+        has_downstream = False
+        for source, deps in dependencies.items():
+            if source == table_name:  # 跳过自身
+                continue
+            for dep in deps:
+                if dep.get("table_name") == table_name:
+                    has_downstream = True
                     break
                     break
-            
-            # 如果没有下游任务,添加到终端任务列表
-            if not has_downstream and table_name in task_dict:
-                terminal_tasks.append(table_name)
+            if has_downstream:
+                break
         
         
-        # 如果没有模型表任务,将所有资源表任务视为终端任务
-        if not model_tasks and resource_tasks:
-            terminal_tasks = [task["target_table"] for task in resource_tasks]
-        
-        # 如果既没有模型表任务也没有资源表任务,直接连接准备任务到完成标记
-        if not terminal_tasks:
-            prepare_plan >> processing_completed
-            logger.warning("未找到任何任务,直接连接准备任务到完成标记")
-        else:
-            # 将所有终端任务连接到完成标记
-            for table_name in terminal_tasks:
-                if table_name in task_dict:
-                    task_dict[table_name] >> processing_completed
-                    logger.info(f"设置终端任务: {table_name} >> processing_completed")
+        # 如果没有下游任务,添加到终端任务列表
+        if not has_downstream and table_name in task_dict:
+            terminal_tasks.append(table_name)
     
     
-    except Exception as e:
-        logger.error(f"构建任务DAG时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        
-        # 确保即使出错,DAG也能正常完成
-        prepare_plan >> processing_completed
+    # 如果没有模型表任务,将所有资源表任务视为终端任务
+    if not model_tasks and resource_tasks:
+        terminal_tasks = [task["target_table"] for task in resource_tasks]
+    
+    # 如果既没有模型表任务也没有资源表任务,直接连接准备任务到完成标记
+    if not terminal_tasks:
+        prepare_plan >> processing_completed
+        logger.warning("未找到任何任务,直接连接准备任务到完成标记")
+    else:
+        # 将所有终端任务连接到完成标记
+        for table_name in terminal_tasks:
+            if table_name in task_dict:
+                task_dict[table_name] >> processing_completed
+                logger.info(f"设置终端任务: {table_name} >> processing_completed")
+
+logger.info(f"DAG dag_dataops_unified_data_scheduler 定义完成,预创建了 {len(task_dict)} 个任务")

+ 85 - 4
dags/dag_dataops_unified_prepare_scheduler.py

@@ -1,11 +1,12 @@
 # dag_dataops_unified_prepare_scheduler.py
 # dag_dataops_unified_prepare_scheduler.py
 from airflow import DAG
 from airflow import DAG
-from airflow.operators.python import PythonOperator
+from airflow.operators.python import PythonOperator, ShortCircuitOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.empty import EmptyOperator
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 import logging
 import logging
 import networkx as nx
 import networkx as nx
 import json
 import json
+import os
 from common import (
 from common import (
     get_pg_conn, 
     get_pg_conn, 
     get_neo4j_driver,
     get_neo4j_driver,
@@ -381,17 +382,90 @@ def prepare_unified_dag_schedule(**kwargs):
         }
         }
         
         
         # 保存执行计划到文件
         # 保存执行计划到文件
-        import os
         plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
         plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
         with open(plan_path, 'w') as f:
         with open(plan_path, 'w') as f:
             json.dump(execution_plan, f, indent=2)
             json.dump(execution_plan, f, indent=2)
             
             
         logger.info(f"保存执行计划到文件: {plan_path}")
         logger.info(f"保存执行计划到文件: {plan_path}")
+        
+        # 验证文件是否成功生成并可读
+        if not os.path.exists(plan_path):
+            raise Exception(f"执行计划文件未成功生成: {plan_path}")
+        
+        # 尝试读取文件验证内容
+        try:
+            with open(plan_path, 'r') as f:
+                validation_data = json.load(f)
+                # 验证执行计划内容
+                if not isinstance(validation_data, dict):
+                    raise Exception("执行计划格式无效,应为JSON对象")
+                if "exec_date" not in validation_data:
+                    raise Exception("执行计划缺少exec_date字段")
+                if not isinstance(validation_data.get("resource_tasks", []), list):
+                    raise Exception("执行计划的resource_tasks字段无效")
+                if not isinstance(validation_data.get("model_tasks", []), list):
+                    raise Exception("执行计划的model_tasks字段无效")
+        except json.JSONDecodeError as je:
+            raise Exception(f"执行计划文件内容无效,非法的JSON格式: {str(je)}")
+        except Exception as ve:
+            raise Exception(f"执行计划文件验证失败: {str(ve)}")
+            
     except Exception as e:
     except Exception as e:
-        logger.error(f"保存执行计划时出错: {str(e)}")
+        error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
+        logger.error(error_msg)
+        # 强制抛出异常,确保任务失败,阻止下游DAG执行
+        raise Exception(error_msg)
     
     
     return inserted_count
     return inserted_count
 
 
+def check_execution_plan_file(**kwargs):
+    """
+    检查执行计划文件是否存在且有效
+    返回False将阻止所有下游任务执行
+    """
+    logger.info("检查执行计划文件是否存在且有效")
+    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+    
+    # 检查文件是否存在
+    if not os.path.exists(plan_path):
+        logger.error(f"执行计划文件不存在: {plan_path}")
+        return False
+    
+    # 检查文件是否可读且内容有效
+    try:
+        with open(plan_path, 'r') as f:
+            data = json.load(f)
+            
+            # 检查必要字段
+            if "exec_date" not in data:
+                logger.error("执行计划缺少exec_date字段")
+                return False
+                
+            if not isinstance(data.get("resource_tasks", []), list):
+                logger.error("执行计划的resource_tasks字段无效")
+                return False
+                
+            if not isinstance(data.get("model_tasks", []), list):
+                logger.error("执行计划的model_tasks字段无效")
+                return False
+            
+            # 检查是否有任务数据
+            resource_tasks = data.get("resource_tasks", [])
+            model_tasks = data.get("model_tasks", [])
+            if not resource_tasks and not model_tasks:
+                logger.warning("执行计划不包含任何任务,但文件格式有效")
+                # 注意:即使没有任务,我们仍然允许流程继续
+            
+            logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
+            return True
+            
+    except json.JSONDecodeError as je:
+        logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
+        return False
+    except Exception as e:
+        logger.error(f"检查执行计划文件时出错: {str(e)}")
+        return False
+
 # 创建DAG
 # 创建DAG
 with DAG(
 with DAG(
     "dag_dataops_unified_prepare_scheduler",
     "dag_dataops_unified_prepare_scheduler",
@@ -422,6 +496,13 @@ with DAG(
         dag=dag
         dag=dag
     )
     )
     
     
+    # 检查执行计划文件
+    check_plan_file = ShortCircuitOperator(
+        task_id="check_execution_plan_file",
+        python_callable=check_execution_plan_file,
+        dag=dag
+    )
+    
     # 准备完成标记
     # 准备完成标记
     preparation_completed = EmptyOperator(
     preparation_completed = EmptyOperator(
         task_id="preparation_completed",
         task_id="preparation_completed",
@@ -429,4 +510,4 @@ with DAG(
     )
     )
     
     
     # 设置任务依赖
     # 设置任务依赖
-    start_preparation >> prepare_task >> preparation_completed
+    start_preparation >> prepare_task >> check_plan_file >> preparation_completed

+ 809 - 304
dags/dag_dataops_unified_scheduler.py

@@ -8,6 +8,7 @@ from datetime import datetime, timedelta, date
 import logging
 import logging
 import networkx as nx
 import networkx as nx
 import json
 import json
+import os
 from decimal import Decimal
 from decimal import Decimal
 from common import (
 from common import (
     get_pg_conn, 
     get_pg_conn, 
@@ -15,7 +16,7 @@ from common import (
     execute_with_monitoring,
     execute_with_monitoring,
     get_today_date
     get_today_date
 )
 )
-from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG
+from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG, AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH
 
 
 # 创建日志记录器
 # 创建日志记录器
 logger = logging.getLogger(__name__)
 logger = logging.getLogger(__name__)
@@ -305,7 +306,7 @@ def write_to_airflow_dag_schedule(exec_date, tables_info):
         conn.close()
         conn.close()
 
 
 def prepare_dag_schedule(**kwargs):
 def prepare_dag_schedule(**kwargs):
-    """准备DAG调度任务的主函数"""
+    """准备调度任务的主函数"""
     exec_date = kwargs.get('ds') or get_today_date()
     exec_date = kwargs.get('ds') or get_today_date()
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     
     
@@ -409,8 +410,393 @@ def prepare_dag_schedule(**kwargs):
     
     
     return inserted_count
     return inserted_count
 
 
+def create_execution_plan(**kwargs):
+    """准备执行计划的函数,使用从prepare_phase传递的数据,并生成JSON文件"""
+    try:
+        # 从prepare_dag_schedule获取执行计划
+        execution_plan_json = kwargs['ti'].xcom_pull(task_ids='prepare_dag_schedule', key='execution_plan')
+        
+        if not execution_plan_json:
+            # 如果没有获取到,可能是因为推送到XCom失败,尝试从数据库获取
+            exec_date = kwargs.get('ds') or get_today_date()
+            logger.info(f"未从XCom获取到执行计划,尝试从数据库构建。使用执行日期: {exec_date}")
+            
+            # 获取所有任务
+            resource_tasks, model_tasks = get_all_tasks(exec_date)
+            
+            if not resource_tasks and not model_tasks:
+                logger.warning(f"执行日期 {exec_date} 没有找到任务")
+                # 创建空执行计划
+                execution_plan = {
+                    "exec_date": exec_date,
+                    "resource_tasks": [],
+                    "model_tasks": [],
+                    "dependencies": {}
+                }
+            else:
+                # 为所有模型表获取依赖关系
+                model_table_names = [task["target_table"] for task in model_tasks]
+                dependencies = get_table_dependencies_for_data_phase(model_table_names)
+                
+                # 创建执行计划
+                execution_plan = {
+                    "exec_date": exec_date,
+                    "resource_tasks": resource_tasks,
+                    "model_tasks": model_tasks,
+                    "dependencies": dependencies
+                }
+            
+            # 转换为JSON
+            execution_plan_json = json.dumps(execution_plan, default=json_serial)
+        else:
+            # 如果是字符串,解析一下确保格式正确
+            if isinstance(execution_plan_json, str):
+                execution_plan = json.loads(execution_plan_json)
+            else:
+                execution_plan = execution_plan_json
+                execution_plan_json = json.dumps(execution_plan, default=json_serial)
+        
+        # 将执行计划保存为JSON文件,使用临时文件确保写入完整
+        try:
+            import os
+            import time
+            import tempfile
+            from datetime import datetime
+            
+            # 设置文件路径
+            plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
+            plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
+            temp_plan_path = os.path.join(plan_dir, f'temp_last_execution_plan_{int(time.time())}.json')
+            ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
+            
+            logger.info(f"=== 开始创建执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
+            logger.info(f"计划目录: {plan_dir}")
+            logger.info(f"最终文件路径: {plan_path}")
+            logger.info(f"临时文件路径: {temp_plan_path}")
+            logger.info(f"就绪标志文件路径: {ready_flag_path}")
+            
+            # 获取目录中的现有文件
+            existing_files = os.listdir(plan_dir)
+            plan_related_files = [f for f in existing_files if 'execution_plan' in f or f.endswith('.ready')]
+            logger.info(f"创建前目录中相关文件数: {len(plan_related_files)}")
+            for f in plan_related_files:
+                file_path = os.path.join(plan_dir, f)
+                file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
+                file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
+                logger.info(f"已存在文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
+            
+            # 首先写入临时文件
+            with open(temp_plan_path, 'w') as f:
+                if isinstance(execution_plan_json, str):
+                    f.write(execution_plan_json)
+                else:
+                    json.dump(execution_plan_json, f, indent=2, default=json_serial)
+                f.flush()
+                os.fsync(f.fileno())  # 确保写入磁盘
+            
+            # 验证临时文件
+            temp_size = os.path.getsize(temp_plan_path)
+            temp_time = datetime.fromtimestamp(os.path.getmtime(temp_plan_path)).isoformat()
+            logger.info(f"已创建临时文件: {temp_plan_path} (大小: {temp_size}字节, 修改时间: {temp_time})")
+            
+            with open(temp_plan_path, 'r') as f:
+                test_content = json.load(f)  # 测试是否能正确读取
+                logger.info(f"临时文件验证成功,内容可正确解析为JSON")
+            
+            # 重命名为正式文件
+            if os.path.exists(plan_path):
+                old_size = os.path.getsize(plan_path)
+                old_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
+                logger.info(f"删除已有文件: {plan_path} (大小: {old_size}字节, 修改时间: {old_time})")
+                os.remove(plan_path)  # 先删除已有文件
+            
+            logger.info(f"重命名临时文件: {temp_plan_path} -> {plan_path}")
+            os.rename(temp_plan_path, plan_path)
+            
+            # 确认正式文件
+            if os.path.exists(plan_path):
+                final_size = os.path.getsize(plan_path)
+                final_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
+                logger.info(f"正式文件创建成功: {plan_path} (大小: {final_size}字节, 修改时间: {final_time})")
+            else:
+                logger.error(f"正式文件未成功创建: {plan_path}")
+            
+            # 写入就绪标志文件
+            with open(ready_flag_path, 'w') as f:
+                flag_content = f"Generated at {datetime.now().isoformat()}"
+                f.write(flag_content)
+                f.flush()
+                os.fsync(f.fileno())  # 确保写入磁盘
+            
+            # 确认就绪标志文件
+            if os.path.exists(ready_flag_path):
+                flag_size = os.path.getsize(ready_flag_path)
+                flag_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
+                logger.info(f"就绪标志文件创建成功: {ready_flag_path} (大小: {flag_size}字节, 修改时间: {flag_time}, 内容: {flag_content})")
+            else:
+                logger.error(f"就绪标志文件未成功创建: {ready_flag_path}")
+            
+            # 再次检查目录
+            final_files = os.listdir(plan_dir)
+            final_plan_files = [f for f in final_files if 'execution_plan' in f or f.endswith('.ready')]
+            logger.info(f"创建完成后目录中相关文件数: {len(final_plan_files)}")
+            for f in final_plan_files:
+                file_path = os.path.join(plan_dir, f)
+                file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
+                file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
+                logger.info(f"最终文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
+            
+            logger.info(f"=== 执行计划文件创建完成 - 时间戳: {datetime.now().isoformat()} ===")
+        except Exception as e:
+            logger.error(f"保存执行计划到文件时出错: {str(e)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            raise  # 抛出异常,确保任务失败
+        
+        return execution_plan_json
+    except Exception as e:
+        logger.error(f"创建执行计划时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        raise  # 抛出异常,确保任务失败
+
+def bridge_prepare_to_data_func(**kwargs):
+    """桥接prepare和data阶段,确保执行计划文件已就绪"""
+    import os
+    import time
+    from datetime import datetime
+    
+    logger.info(f"=== 开始验证执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
+    
+    plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
+    plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
+    ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
+    
+    logger.info(f"计划目录: {plan_dir}")
+    logger.info(f"计划文件路径: {plan_path}")
+    logger.info(f"就绪标志文件路径: {ready_flag_path}")
+    
+    # 获取目录中的文件列表
+    all_files = os.listdir(plan_dir)
+    related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
+    logger.info(f"目录中的相关文件总数: {len(related_files)}")
+    for idx, file in enumerate(related_files, 1):
+        file_path = os.path.join(plan_dir, file)
+        file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
+        file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
+        logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
+    
+    # 等待就绪标志文件出现
+    logger.info(f"开始等待就绪标志文件: {ready_flag_path}")
+    waiting_start = datetime.now()
+    max_attempts = 30  # 最多等待5分钟
+    for attempt in range(max_attempts):
+        if os.path.exists(ready_flag_path):
+            wait_duration = (datetime.now() - waiting_start).total_seconds()
+            file_size = os.path.getsize(ready_flag_path)
+            file_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
+            
+            # 读取就绪文件内容
+            try:
+                with open(ready_flag_path, 'r') as f:
+                    ready_content = f.read()
+            except Exception as e:
+                ready_content = f"[读取错误: {str(e)}]"
+            
+            logger.info(f"发现执行计划就绪标志: {ready_flag_path} (尝试次数: {attempt+1}, 等待时间: {wait_duration:.2f}秒, 大小: {file_size}字节, 修改时间: {file_time}, 内容: {ready_content})")
+            break
+        
+        logger.info(f"等待执行计划就绪 (尝试: {attempt+1}/{max_attempts}, 已等待: {(datetime.now() - waiting_start).total_seconds():.2f}秒)...")
+        time.sleep(10)  # 等待10秒
+    
+    if not os.path.exists(ready_flag_path):
+        error_msg = f"执行计划就绪标志文件不存在: {ready_flag_path},等待超时 (等待时间: {(datetime.now() - waiting_start).total_seconds():.2f}秒)"
+        logger.error(error_msg)
+        raise Exception(error_msg)
+    
+    # 验证执行计划文件
+    logger.info(f"开始验证执行计划文件: {plan_path}")
+    if not os.path.exists(plan_path):
+        error_msg = f"执行计划文件不存在: {plan_path}"
+        logger.error(error_msg)
+        raise Exception(error_msg)
+    
+    try:
+        file_size = os.path.getsize(plan_path)
+        file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
+        logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
+        
+        with open(plan_path, 'r') as f:
+            execution_plan = json.load(f)
+            logger.info(f"成功读取并解析执行计划文件 JSON 内容")
+        
+        # 验证基本结构
+        if not isinstance(execution_plan, dict):
+            logger.error(f"执行计划格式错误: 不是有效的字典,而是 {type(execution_plan)}")
+            raise ValueError("执行计划不是有效的字典")
+        else:
+            logger.info(f"执行计划基本结构验证: 是有效的字典对象")
+        
+        # 验证关键字段
+        required_fields = ["exec_date", "resource_tasks", "model_tasks"]
+        missing_fields = [field for field in required_fields if field not in execution_plan]
+        
+        if missing_fields:
+            error_msg = f"执行计划缺少必要字段: {missing_fields}"
+            logger.error(error_msg)
+            raise ValueError(error_msg)
+        else:
+            logger.info(f"执行计划必要字段验证通过: 包含所有必要字段 {required_fields}")
+        
+        # 记录执行计划基本信息
+        resource_tasks = execution_plan.get("resource_tasks", [])
+        model_tasks = execution_plan.get("model_tasks", [])
+        exec_date = execution_plan.get("exec_date", "未知")
+        
+        logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
+        
+        # 如果任务很少,记录具体内容
+        if len(resource_tasks) + len(model_tasks) < 10:
+            for idx, task in enumerate(resource_tasks, 1):
+                logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
+            
+            for idx, task in enumerate(model_tasks, 1):
+                logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
+        
+        # 如果没有任何任务,发出警告
+        if not resource_tasks and not model_tasks:
+            logger.warning(f"执行计划不包含任何任务,可能导致数据处理阶段没有实际工作")
+        
+        logger.info(f"=== 执行计划文件验证成功 - 时间戳: {datetime.now().isoformat()} ===")
+        return True
+    except Exception as e:
+        error_msg = f"验证执行计划文件时出错: {str(e)}"
+        logger.error(error_msg)
+        import traceback
+        logger.error(traceback.format_exc())
+        logger.info(f"=== 执行计划文件验证失败 - 时间戳: {datetime.now().isoformat()} ===")
+        raise Exception(error_msg)
+
+def init_data_processing_phase(**kwargs):
+    """数据处理阶段的初始化函数,重新加载执行计划文件"""
+    import os
+    from datetime import datetime
+    
+    logger.info(f"=== 开始数据处理阶段初始化 - 时间戳: {datetime.now().isoformat()} ===")
+    
+    plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
+    plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
+    ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
+    
+    logger.info(f"计划目录: {plan_dir}")
+    logger.info(f"计划文件路径: {plan_path}")
+    logger.info(f"就绪标志文件路径: {ready_flag_path}")
+    
+    # 检查目录中的文件
+    all_files = os.listdir(plan_dir)
+    related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
+    logger.info(f"目录中的相关文件总数: {len(related_files)}")
+    for idx, file in enumerate(related_files, 1):
+        file_path = os.path.join(plan_dir, file)
+        file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
+        file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
+        logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
+    
+    # 验证文件是否存在
+    if not os.path.exists(plan_path):
+        error_msg = f"执行计划文件不存在: {plan_path}"
+        logger.error(error_msg)
+        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
+        raise Exception(error_msg)
+    
+    file_size = os.path.getsize(plan_path)
+    file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
+    logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
+    
+    try:
+        # 记录读取开始时间
+        read_start = datetime.now()
+        
+        with open(plan_path, 'r') as f:
+            file_content = f.read()
+            logger.info(f"成功读取文件内容,大小为 {len(file_content)} 字节")
+            
+            # 解析JSON
+            parse_start = datetime.now()
+            execution_plan = json.loads(file_content)
+            parse_duration = (datetime.now() - parse_start).total_seconds()
+            logger.info(f"成功解析JSON内容,耗时 {parse_duration:.4f} 秒")
+        
+        read_duration = (datetime.now() - read_start).total_seconds()
+        logger.info(f"文件读取和解析总耗时: {read_duration:.4f} 秒")
+        
+        # 验证执行计划基本结构
+        if not isinstance(execution_plan, dict):
+            error_msg = f"执行计划不是有效的字典,实际类型: {type(execution_plan)}"
+            logger.error(error_msg)
+            raise ValueError(error_msg)
+        
+        # 存储到XCom中,以便后续任务使用
+        push_start = datetime.now()
+        
+        # 先序列化为JSON字符串
+        execution_plan_json = json.dumps(execution_plan, default=json_serial)
+        logger.info(f"序列化执行计划为JSON字符串,大小为 {len(execution_plan_json)} 字节")
+        
+        # 推送到XCom
+        kwargs['ti'].xcom_push(key='data_phase_execution_plan', value=execution_plan_json)
+        push_duration = (datetime.now() - push_start).total_seconds()
+        logger.info(f"成功推送执行计划到XCom,耗时 {push_duration:.4f} 秒")
+        
+        # 记录执行计划基本信息
+        resource_tasks = execution_plan.get("resource_tasks", [])
+        model_tasks = execution_plan.get("model_tasks", [])
+        exec_date = execution_plan.get("exec_date", "未知")
+        
+        logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
+        
+        # 如果任务较少,记录详细信息
+        if len(resource_tasks) + len(model_tasks) < 10:
+            for idx, task in enumerate(resource_tasks, 1):
+                logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
+            
+            for idx, task in enumerate(model_tasks, 1):
+                logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
+        
+        result = {
+            "exec_date": exec_date,
+            "resource_count": len(resource_tasks),
+            "model_count": len(model_tasks)
+        }
+        
+        logger.info(f"=== 数据处理阶段初始化完成 - 时间戳: {datetime.now().isoformat()} ===")
+        return result
+    except json.JSONDecodeError as e:
+        error_msg = f"执行计划文件JSON解析失败: {str(e)}"
+        logger.error(error_msg)
+        
+        # 记录文件内容摘要以帮助调试
+        try:
+            with open(plan_path, 'r') as f:
+                content = f.read(1000)  # 只读取前1000个字符
+                logger.error(f"文件内容前1000个字符: {content}...")
+        except Exception as read_error:
+            logger.error(f"尝试读取文件内容时出错: {str(read_error)}")
+        
+        import traceback
+        logger.error(traceback.format_exc())
+        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
+        raise Exception(error_msg)
+    except Exception as e:
+        error_msg = f"数据处理阶段初始化失败: {str(e)}"
+        logger.error(error_msg)
+        import traceback
+        logger.error(traceback.format_exc())
+        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
+        raise Exception(error_msg)
+
 #############################################
 #############################################
-# 第二阶段: 数据处理阶段(Data Processing Phase)的函数
+# 第二阶段: 数据处理阶段(Data Processing Phase)
 #############################################
 #############################################
 
 
 def get_latest_date():
 def get_latest_date():
@@ -529,56 +915,6 @@ def get_table_dependencies_for_data_phase(table_names):
     
     
     return dependency_dict
     return dependency_dict
 
 
-def create_execution_plan(**kwargs):
-    """准备执行计划的函数,使用从准备阶段传递的数据"""
-    try:
-        # 从XCom获取执行计划
-        execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
-        
-        # 如果找不到执行计划,则从数据库获取
-        if not execution_plan:
-            # 获取执行日期
-            exec_date = get_latest_date()
-            logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
-            
-            # 获取所有任务
-            resource_tasks, model_tasks = get_all_tasks(exec_date)
-            
-            if not resource_tasks and not model_tasks:
-                logger.warning(f"执行日期 {exec_date} 没有找到任务")
-                return 0
-            
-            # 为所有模型表获取依赖关系
-            model_table_names = [task["target_table"] for task in model_tasks]
-            dependencies = get_table_dependencies_for_data_phase(model_table_names)
-            
-            # 创建执行计划
-            new_execution_plan = {
-                "exec_date": exec_date,
-                "resource_tasks": resource_tasks,
-                "model_tasks": model_tasks,
-                "dependencies": dependencies
-            }
-            
-            # 保存执行计划
-            kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
-            logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
-            
-            return json.dumps(new_execution_plan, default=json_serial)
-        
-        logger.info(f"成功获取执行计划")
-        return execution_plan
-    except Exception as e:
-        logger.error(f"创建执行计划时出错: {str(e)}")
-        # 返回空执行计划
-        empty_plan = {
-            "exec_date": get_today_date(),
-            "resource_tasks": [],
-            "model_tasks": [],
-            "dependencies": {}
-        }
-        return json.dumps(empty_plan, default=json_serial)
-
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
     """处理单个资源表"""
     """处理单个资源表"""
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
@@ -592,12 +928,144 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
     
-    return execute_with_monitoring(
-        target_table=target_table,
-        script_name=script_name,
-        script_exec_mode=script_exec_mode,
-        exec_date=exec_date
-    )
+    try:
+        # 直接调用执行监控函数,确保脚本得到执行
+        result = execute_with_monitoring(
+            target_table=target_table,
+            script_name=script_name,
+            script_exec_mode=script_exec_mode,
+            exec_date=exec_date
+        )
+        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
+        return f"处理资源表 {target_table} 完成,结果: {result}"
+    except Exception as e:
+        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回错误信息,但不抛出异常,确保DAG可以继续执行
+        return f"处理资源表 {target_table} 失败: {str(e)}"
+
+def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
+    """执行脚本并监控执行情况"""
+    from pathlib import Path
+    import importlib.util
+    import sys
+    
+    logger.info(f"=== 开始执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
+
+    # 检查script_name是否为空
+    if not script_name:
+        logger.error(f"表 {target_table} 的script_name为空,无法执行")
+        # 记录执行失败到数据库
+        now = datetime.now()
+        update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
+        return False
+    
+    # 记录执行开始时间
+    start_time = datetime.now()
+    update_task_start_time(exec_date, target_table, script_name, start_time)
+    logger.info(f"任务开始时间: {start_time.isoformat()}")
+    
+    try:
+        # 执行实际脚本
+        script_path = Path(SCRIPTS_BASE_PATH) / script_name
+        logger.info(f"脚本完整路径: {script_path}")
+        
+        if not script_path.exists():
+            logger.error(f"脚本文件不存在: {script_path}")
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
+            return False
+        
+        try:
+            # 动态导入模块
+            module_name = f"dynamic_module_{abs(hash(script_name))}"
+            spec = importlib.util.spec_from_file_location(module_name, script_path)
+            module = importlib.util.module_from_spec(spec)
+            sys.modules[module_name] = module
+            spec.loader.exec_module(module)
+            
+            # 使用标准入口函数run
+            if hasattr(module, "run"):
+                logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
+                result = module.run(table_name=target_table, execution_mode=script_exec_mode)
+                logger.info(f"脚本 {script_name} 执行结果: {result}")
+                success = True if result else False
+            else:
+                logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),尝试使用main函数")
+                if hasattr(module, "main"):
+                    logger.info(f"执行脚本 {script_name} 的main函数")
+                    result = module.main(table_name=target_table, execution_mode=script_exec_mode)
+                    logger.info(f"脚本 {script_name} 执行结果: {result}")
+                    success = True if result else False
+                else:
+                    logger.error(f"脚本 {script_name} 没有定义标准入口函数 run() 或 main()")
+                    success = False
+        except Exception as script_e:
+            logger.error(f"执行脚本 {script_name} 时出错: {str(script_e)}")
+            import traceback
+            logger.error(traceback.format_exc())
+            success = False
+        
+        # 记录结束时间和结果
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
+        
+        logger.info(f"任务结束时间: {end_time.isoformat()}, 执行时长: {duration:.2f}秒, 结果: {success}")
+        logger.info(f"=== 完成执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
+        
+        return success
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行任务出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
+        
+        logger.info(f"=== 执行任务 {target_table} 的脚本 {script_name} 失败 - 时间戳: {datetime.now().isoformat()} ===")
+        return False
+
+def update_task_start_time(exec_date, target_table, script_name, start_time):
+    """更新任务开始时间"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            UPDATE airflow_dag_schedule 
+            SET exec_start_time = %s
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (start_time, exec_date, target_table, script_name))
+        conn.commit()
+        logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的开始时间: {start_time}")
+    except Exception as e:
+        logger.error(f"更新任务开始时间失败: {str(e)}")
+        conn.rollback()
+    finally:
+        cursor.close()
+        conn.close()
+
+def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
+    """更新任务完成信息"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            UPDATE airflow_dag_schedule 
+            SET exec_result = %s, exec_end_time = %s, exec_duration = %s
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (success, end_time, duration, exec_date, target_table, script_name))
+        conn.commit()
+        logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的完成状态: 结果={success}, 结束时间={end_time}, 耗时={duration}秒")
+    except Exception as e:
+        logger.error(f"更新任务完成信息失败: {str(e)}")
+        conn.rollback()
+    finally:
+        cursor.close()
+        conn.close()
 
 
 def process_model(target_table, script_name, script_exec_mode, exec_date):
 def process_model(target_table, script_name, script_exec_mode, exec_date):
     """处理单个模型表"""
     """处理单个模型表"""
@@ -612,12 +1080,22 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
     
-    return execute_with_monitoring(
-        target_table=target_table,
-        script_name=script_name,
-        script_exec_mode=script_exec_mode,
-        exec_date=exec_date
-    ) 
+    try:
+        # 直接调用执行监控函数,确保脚本得到执行
+        result = execute_with_monitoring(
+            target_table=target_table,
+            script_name=script_name,
+            script_exec_mode=script_exec_mode,
+            exec_date=exec_date
+        )
+        logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
+        return f"处理模型表 {target_table} 完成,结果: {result}"
+    except Exception as e:
+        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回错误信息,但不抛出异常,确保DAG可以继续执行
+        return f"处理模型表 {target_table} 失败: {str(e)}"
 
 
 #############################################
 #############################################
 # 第三阶段: 汇总阶段(Summary Phase)的函数
 # 第三阶段: 汇总阶段(Summary Phase)的函数
@@ -842,73 +1320,127 @@ def generate_unified_execution_report(exec_date, stats):
     
     
     return report_str
     return report_str
 
 
-def summarize_execution(**kwargs):
-    """汇总执行情况的主函数"""
+def summarize_execution(**context):
+    """
+    汇总执行计划的执行情况,生成报告
+    """
+    logger.info(f"=== 开始汇总执行情况 - 时间戳: {datetime.now().isoformat()} ===")
     try:
     try:
-        exec_date = kwargs.get('ds') or get_today_date()
-        logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
+        # 获取执行日期
+        execution_date = context.get('execution_date', datetime.now())
+        exec_date = execution_date.strftime('%Y-%m-%d')
         
         
-        # 1. 更新缺失的执行结果
-        try:
-            update_count = update_missing_results(exec_date)
-            logger.info(f"更新了 {update_count} 个缺失的执行结果")
-        except Exception as e:
-            logger.error(f"更新缺失执行结果时出错: {str(e)}")
-            update_count = 0
+        # 从本地文件加载执行计划
+        plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
+        plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
         
         
-        # 2. 获取执行统计信息
-        try:
-            stats = get_execution_stats(exec_date)
-            if not stats:
-                logger.warning("未能获取执行统计信息,将使用默认值")
-                stats = {
-                    "exec_date": exec_date,
-                    "total_tasks": 0,
-                    "type_counts": {},
-                    "success_count": 0,
-                    "fail_count": 0,
-                    "pending_count": 0,
-                    "success_rate": 0,
-                    "avg_duration": None,
-                    "min_duration": None,
-                    "max_duration": None,
-                    "failed_tasks": []
-                }
-        except Exception as e:
-            logger.error(f"获取执行统计信息时出错: {str(e)}")
-            stats = {
-                "exec_date": exec_date,
-                "total_tasks": 0,
-                "type_counts": {},
-                "success_count": 0,
-                "fail_count": 0,
-                "pending_count": 0,
-                "success_rate": 0,
-                "avg_duration": None,
-                "min_duration": None,
-                "max_duration": None,
-                "failed_tasks": []
+        if not os.path.exists(plan_path):
+            logger.warning(f"执行计划文件不存在: {plan_path}")
+            return "执行计划文件不存在,无法生成汇总报告"
+        
+        with open(plan_path, 'r') as f:
+            execution_plan = json.loads(f.read())
+        
+        # 获取任务列表
+        resource_tasks = execution_plan.get("resource_tasks", [])
+        model_tasks = execution_plan.get("model_tasks", [])
+        all_tasks = resource_tasks + model_tasks
+        
+        # 连接数据库,获取任务执行状态
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        # 分析任务执行状态
+        successful_tasks = []
+        failed_tasks = []
+        skipped_tasks = []
+        
+        for task in all_tasks:
+            table_name = task["target_table"]
+            table_type = "资源表" if task in resource_tasks else "模型表"
+            
+            # 查询任务执行状态
+            cursor.execute("""
+                SELECT status FROM airflow_task_execution 
+                WHERE table_name = %s AND exec_date = %s
+                ORDER BY execution_time DESC LIMIT 1
+            """, (table_name, exec_date))
+            
+            result = cursor.fetchone()
+            status = result[0] if result else "未执行"
+            
+            task_info = {
+                "table_name": table_name,
+                "table_type": table_type,
+                "script_name": task["script_name"],
+                "status": status
             }
             }
+            
+            if status == "成功":
+                successful_tasks.append(task_info)
+            elif status == "失败":
+                failed_tasks.append(task_info)
+            else:
+                skipped_tasks.append(task_info)
         
         
-        # 3. 生成执行报告
-        try:
-            report = generate_unified_execution_report(exec_date, stats)
-        except Exception as e:
-            logger.error(f"生成执行报告时出错: {str(e)}")
-            report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
+        # 生成汇总报告
+        total_tasks = len(all_tasks)
+        success_count = len(successful_tasks)
+        fail_count = len(failed_tasks)
+        skip_count = len(skipped_tasks)
         
         
-        # 将报告和统计信息传递给下一个任务
-        try:
-            kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
-            kwargs['ti'].xcom_push(key='execution_report', value=report)
-        except Exception as e:
-            logger.error(f"保存报告到XCom时出错: {str(e)}")
+        summary = f"""
+执行日期: {exec_date}
+总任务数: {total_tasks}
+成功任务数: {success_count}
+失败任务数: {fail_count}
+跳过任务数: {skip_count}
+
+=== 成功任务 ===
+"""
+        
+        for task in successful_tasks:
+            summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
         
         
-        return report
+        if failed_tasks:
+            summary += "\n=== 失败任务 ===\n"
+            for task in failed_tasks:
+                summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
+        
+        if skipped_tasks:
+            summary += "\n=== 跳过任务 ===\n"
+            for task in skipped_tasks:
+                summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
+        
+        # 更新汇总表
+        cursor.execute("""
+            INSERT INTO airflow_execution_summary
+            (exec_date, total_tasks, success_count, fail_count, skip_count, summary_text, created_at)
+            VALUES (%s, %s, %s, %s, %s, %s, %s)
+            ON CONFLICT (exec_date) 
+            DO UPDATE SET
+                total_tasks = EXCLUDED.total_tasks,
+                success_count = EXCLUDED.success_count,
+                fail_count = EXCLUDED.fail_count,
+                skip_count = EXCLUDED.skip_count,
+                summary_text = EXCLUDED.summary_text,
+                updated_at = CURRENT_TIMESTAMP
+        """, (
+            exec_date, total_tasks, success_count, fail_count, skip_count, 
+            summary, datetime.now()
+        ))
+        
+        conn.commit()
+        cursor.close()
+        conn.close()
+        
+        logger.info(f"=== 执行情况汇总完成 - 时间戳: {datetime.now().isoformat()} ===")
+        return summary
     except Exception as e:
     except Exception as e:
         logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
         logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
         # 返回一个简单的错误报告,确保任务不会失败
         # 返回一个简单的错误报告,确保任务不会失败
-        return f"执行汇总时出现错误: {str(e)}" 
+        return f"执行汇总时出现错误: {str(e)}"
+
 
 
 # 创建DAG
 # 创建DAG
 with DAG(
 with DAG(
@@ -926,202 +1458,175 @@ with DAG(
     }
     }
 ) as dag:
 ) as dag:
     
     
-    #############################################
-    # 阶段1: 准备阶段(Prepare Phase)
-    #############################################
-    with TaskGroup("prepare_phase") as prepare_group:
-        # 任务开始标记
-        start_preparation = EmptyOperator(
-            task_id="start_preparation"
-        )
+    # 初始化全局变量,避免在DAG解析时出现未定义错误
+    globals()['_resource_tasks'] = []
+    globals()['_task_dict'] = {}
+    
+    # DAG开始任务
+    dag_start = EmptyOperator(task_id="dag_start")
+    
+    # DAG结束任务
+    dag_end = EmptyOperator(
+        task_id="dag_end",
+        trigger_rule="all_done"  # 确保DAG无论上游任务成功与否都能完成
+    )
+    
+    # 准备阶段任务
+    prepare_task = PythonOperator(
+        task_id="prepare_dag_schedule",
+        python_callable=prepare_dag_schedule,
+        provide_context=True
+    )
+    
+    # 汇总执行情况任务
+    summarize_task = PythonOperator(
+        task_id='summarize_execution',
+        python_callable=summarize_execution,
+        provide_context=True,
+        trigger_rule='all_done',  # 无论之前的任务成功还是失败都执行
+        retries=2,  # 增加重试次数
+        retry_delay=timedelta(minutes=1)  # 重试延迟
+    )
+    
+    # 数据处理阶段
+    # 获取所有需要执行的任务(实际任务,不是TaskGroup包装的任务)
+    exec_date = get_latest_date()
+    resource_tasks, model_tasks = get_all_tasks(exec_date)
+    
+    # 创建任务字典,用于设置依赖关系
+    task_dict = {}
+    
+    # 创建资源表任务
+    for task_info in resource_tasks:
+        table_name = task_info["target_table"]
+        script_name = task_info["script_name"]
+        exec_mode = task_info.get("script_exec_mode", "append")
         
         
-        # 准备调度任务
-        prepare_task = PythonOperator(
-            task_id="prepare_dag_schedule",
-            python_callable=prepare_dag_schedule,
-            provide_context=True
-        )
+        # 创建安全的任务ID
+        safe_table_name = table_name.replace(".", "_").replace("-", "_")
+        task_id = f"resource_{safe_table_name}"
         
         
-        # 创建执行计划 - 从data_processing_phase移至这里
-        create_plan = PythonOperator(
-            task_id="create_execution_plan",
-            python_callable=create_execution_plan,
-            provide_context=True
+        # 直接使用 execute_with_monitoring 函数,确保执行脚本
+        resource_task = PythonOperator(
+            task_id=task_id,
+            python_callable=execute_with_monitoring,
+            op_kwargs={
+                "target_table": table_name,
+                "script_name": script_name,
+                "script_exec_mode": exec_mode,
+                "exec_date": exec_date
+            },
+            retries=2,
+            retry_delay=timedelta(minutes=1),
+            trigger_rule="all_done"  # 确保无论上游任务成功或失败都会执行
         )
         )
         
         
-        # 准备完成标记
-        preparation_completed = EmptyOperator(
-            task_id="preparation_completed"
-        )
+        # 将任务添加到字典
+        task_dict[table_name] = resource_task
         
         
-        # 设置任务依赖 - 调整为包含create_plan
-        start_preparation >> prepare_task >> create_plan >> preparation_completed
+        # 设置依赖关系:prepare_task -> resource_task
+        prepare_task >> resource_task
+        
+    # 为所有模型表获取依赖关系
+    model_table_names = [task["target_table"] for task in model_tasks]
+    dependencies = get_table_dependencies_for_data_phase(model_table_names)
     
     
-    #############################################
-    # 阶段2: 数据处理阶段(Data Processing Phase)
-    #############################################
-    with TaskGroup("data_processing_phase") as data_group:
-        # 过程完成标记
-        processing_completed = EmptyOperator(
-            task_id="processing_completed"
-        )
+    # 创建有向图,用于确定执行顺序
+    G = nx.DiGraph()
     
     
-    #############################################
-    # 阶段3: 汇总阶段(Summary Phase)
-    #############################################
-    with TaskGroup("summary_phase") as summary_group:
-        # 汇总执行情况
-        summarize_task = PythonOperator(
-            task_id="summarize_execution",
-            python_callable=summarize_execution,
-            provide_context=True
-        )
-        
-        # 总结完成标记
-        summary_completed = EmptyOperator(
-            task_id="summary_completed"
-        )
-        
-        # 设置任务依赖
-        summarize_task >> summary_completed
+    # 将所有模型表添加为节点
+    for task_info in model_tasks:
+        G.add_node(task_info["target_table"])
     
     
-    # 设置三个阶段之间的依赖关系 - 使用简单的TaskGroup依赖
-    prepare_group >> data_group >> summary_group
-
-    # 实际数据处理任务的动态创建逻辑
-    # 这部分代码在DAG对象定义时执行,而不是在DAG运行时执行
+    # 添加依赖边
+    for source, deps in dependencies.items():
+        for dep in deps:
+            if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
+                G.add_edge(dep.get("table_name"), source)
+    
+    # 处理循环依赖
+    cycles = list(nx.simple_cycles(G))
+    if cycles:
+        for cycle in cycles:
+            G.remove_edge(cycle[-1], cycle[0])
+    
+    # 获取执行顺序
+    try:
+        execution_order = list(nx.topological_sort(G))
+    except Exception as e:
+        execution_order = [task["target_table"] for task in model_tasks]
     
     
-    # 创建一个DynamicTaskMapper用于动态创建任务
-    class DynamicTaskMapper(PythonOperator):
-        """用于动态映射任务的特殊PythonOperator。
-        该类作为一个普通任务执行,但会在运行时动态创建下游任务。"""
+    # 创建模型表任务
+    for table_name in execution_order:
+        task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+        if not task_info:
+            continue
         
         
-        def execute(self, context):
-            """在DAG运行时动态创建和执行任务"""
-            try:
-                logger.info("开始动态任务映射...")
-                
-                # 从XCom获取执行计划
-                ti = context['ti']
-                execution_plan_json = ti.xcom_pull(
-                    task_ids='prepare_phase.create_execution_plan'
-                )
-                
-                if not execution_plan_json:
-                    logger.info("尝试从prepare_phase.prepare_dag_schedule获取执行计划")
-                    execution_plan_tmp = ti.xcom_pull(
-                        task_ids='prepare_phase.prepare_dag_schedule', 
-                        key='execution_plan'
-                    )
-                    
-                    if execution_plan_tmp:
-                        execution_plan_json = execution_plan_tmp
-                    else:
-                        logger.error("无法从XCom获取执行计划")
-                        raise ValueError("执行计划未找到")
-                
-                # 解析执行计划
-                if isinstance(execution_plan_json, str):
-                    execution_plan = json.loads(execution_plan_json)
-                else:
-                    execution_plan = execution_plan_json
-                
-                # 获取资源任务和模型任务
-                resource_tasks = execution_plan.get("resource_tasks", [])
-                model_tasks = execution_plan.get("model_tasks", [])
-                dependencies = execution_plan.get("dependencies", {})
-                
-                logger.info(f"获取到执行计划: {len(resource_tasks)}个资源任务, {len(model_tasks)}个模型任务")
-                
-                # 处理资源任务
-                logger.info("处理资源任务...")
-                for task_info in resource_tasks:
-                    target_table = task_info["target_table"]
-                    script_name = task_info["script_name"]
-                    script_exec_mode = task_info.get("script_exec_mode", "append")
-                    
-                    logger.info(f"执行资源表任务: {target_table}, 脚本: {script_name}")
-                    try:
-                        process_resource(
-                            target_table=target_table,
-                            script_name=script_name,
-                            script_exec_mode=script_exec_mode,
-                            exec_date=context.get('ds')
-                        )
-                    except Exception as e:
-                        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
-                
-                # 构建模型表依赖图,确定执行顺序
-                G = nx.DiGraph()
-                
-                # 添加所有模型表节点
-                for task_info in model_tasks:
-                    G.add_node(task_info["target_table"])
-                
-                # 添加依赖边
-                for source, deps in dependencies.items():
-                    for dep in deps:
-                        if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                            G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
-                
-                # 检测并处理循环依赖
-                cycles = list(nx.simple_cycles(G))
-                if cycles:
-                    logger.warning(f"检测到循环依赖: {cycles}")
-                    for cycle in cycles:
-                        G.remove_edge(cycle[-1], cycle[0])
-                        logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-                
-                # 生成拓扑排序,确定执行顺序
-                try:
-                    execution_order = list(nx.topological_sort(G))
-                    logger.info(f"计算出的执行顺序: {execution_order}")
-                except Exception as e:
-                    logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
-                    execution_order = [task_info["target_table"] for task_info in model_tasks]
-                
-                # 按顺序执行模型表任务
-                for table_name in execution_order:
-                    task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-                    if not task_info:
-                        continue
-                        
-                    target_table = task_info["target_table"]
-                    script_name = task_info["script_name"]
-                    script_exec_mode = task_info.get("script_exec_mode", "append")
-                    
-                    logger.info(f"执行模型表任务: {target_table}, 脚本: {script_name}")
-                    try:
-                        process_model(
-                            target_table=target_table,
-                            script_name=script_name,
-                            script_exec_mode=script_exec_mode,
-                            exec_date=context.get('ds')
-                        )
-                    except Exception as e:
-                        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
-                
-                return f"成功执行 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务"
-                
-            except Exception as e:
-                logger.error(f"动态任务映射失败: {str(e)}")
-                import traceback
-                logger.error(traceback.format_exc())
-                raise
-    
-    # 在data_processing_phase中使用修改后的DynamicTaskMapper
-    with data_group:
-        dynamic_task_mapper = DynamicTaskMapper(
-            task_id="dynamic_task_mapper",
-            python_callable=lambda **kwargs: "Dynamic task mapping placeholder",
-            provide_context=True
+        script_name = task_info["script_name"]
+        exec_mode = task_info.get("script_exec_mode", "append")
+        
+        # 创建安全的任务ID
+        safe_table_name = table_name.replace(".", "_").replace("-", "_")
+        task_id = f"model_{safe_table_name}"
+        
+        # 直接使用 execute_with_monitoring 函数执行脚本
+        model_task = PythonOperator(
+            task_id=task_id,
+            python_callable=execute_with_monitoring,
+            op_kwargs={
+                "target_table": table_name,
+                "script_name": script_name,
+                "script_exec_mode": exec_mode,
+                "exec_date": exec_date
+            },
+            retries=2,
+            retry_delay=timedelta(minutes=1),
+            trigger_rule="all_done"  # 确保无论上游任务成功或失败都会执行
         )
         )
         
         
+        # 将任务添加到字典
+        task_dict[table_name] = model_task
+        
         # 设置依赖关系
         # 设置依赖关系
-        preparation_completed >> dynamic_task_mapper >> processing_completed
+        deps = dependencies.get(table_name, [])
+        has_dependency = False
+        
+        # 处理模型表之间的依赖
+        for dep in deps:
+            dep_table = dep.get("table_name")
+            if dep_table in task_dict:
+                task_dict[dep_table] >> model_task
+                has_dependency = True
+        
+        # 如果没有依赖,则依赖于所有资源表任务
+        if not has_dependency and resource_tasks:
+            for resource_task_info in resource_tasks:
+                resource_name = resource_task_info["target_table"]
+                if resource_name in task_dict:
+                    task_dict[resource_name] >> model_task
+        
+        # 如果没有依赖,也没有资源表,则直接依赖于prepare_task
+        if not has_dependency and not resource_tasks:
+            prepare_task >> model_task
+    
+    # 所有处理任务都是summarize_task的上游
+    for task in task_dict.values():
+        task >> summarize_task
+    
+    # 设置主要流程
+    dag_start >> prepare_task
+    
+    # 创建执行计划文件任务
+    create_plan_task = PythonOperator(
+        task_id="create_execution_plan",
+        python_callable=create_execution_plan,
+        provide_context=True
+    )
+    
+    # 设置依赖关系
+    prepare_task >> create_plan_task >> summarize_task >> dag_end
     
     
-    # 注意:以下原始代码不再需要,因为我们现在使用DynamicTaskMapper来动态创建和执行任务
-    # 保留的原始try-except块的结尾,确保代码结构完整
+    logger.info(f"DAG dag_dataops_unified_scheduler 定义完成,创建了 {len(task_dict)} 个脚本执行任务")
     
     
     # 尝试从数据库获取最新的执行计划,用于WebUI展示
     # 尝试从数据库获取最新的执行计划,用于WebUI展示
     try:
     try:

+ 1 - 1
dags/dag_manual_dependency_trigger.py

@@ -636,7 +636,7 @@ with DAG(
     description='手动触发指定表的依赖链执行,支持三种依赖级别:self(仅本表)、resource(到Resource层但不执行Resource)、source(完整依赖到Source层)',
     description='手动触发指定表的依赖链执行,支持三种依赖级别:self(仅本表)、resource(到Resource层但不执行Resource)、source(完整依赖到Source层)',
     schedule_interval=None,  # 设置为None表示只能手动触发
     schedule_interval=None,  # 设置为None表示只能手动触发
     catchup=False,
     catchup=False,
-    is_paused_upon_creation=False,  # 添加这一行,使DAG创建时不处于暂停状态
+    is_paused_upon_creation=True,  # 添加这一行,使DAG创建时不处于暂停状态
     params={
     params={
         'TABLE_NAME': '',
         'TABLE_NAME': '',
         'DEPENDENCY_LEVEL': {
         'DEPENDENCY_LEVEL': {

+ 0 - 145
dags/dag_manual_unified_dependency_trigger.py

@@ -1,145 +0,0 @@
-from airflow import DAG
-from airflow.operators.empty import EmptyOperator
-from airflow.operators.python import PythonOperator
-from airflow.decorators import task
-from datetime import datetime, timedelta
-from config import NEO4J_CONFIG
-from utils import execute_script
-import logging
-from neo4j import GraphDatabase
-import networkx as nx
-
-logger = logging.getLogger(__name__)
-
-default_args = {
-    'owner': 'airflow',
-    'depends_on_past': False,
-    'email_on_failure': False,
-    'email_on_retry': False,
-    'retries': 1,
-    'retry_delay': timedelta(minutes=5),
-}
-
-def build_dependency_chain_nx(start_table, dependency_level="resource"):
-    uri = NEO4J_CONFIG['uri']
-    auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-    driver = GraphDatabase.driver(uri, auth=auth)
-    logger.info(f"构建表 {start_table} 的依赖链(层级: {dependency_level})")
-
-    G = nx.DiGraph()
-    node_info = {}
-
-    with driver.session() as session:
-        query = f"""
-        MATCH path=(target:Table {{en_name: $start_table}})<-[:DERIVED_FROM*0..]-(source)
-        WHERE (all(n in nodes(path) WHERE n:DataModel OR n:DataResource))
-        WITH collect(DISTINCT source.en_name) + '{start_table}' AS tables
-        UNWIND tables AS tname
-        MATCH (t:Table {{en_name: tname}})
-        OPTIONAL MATCH (t)-[r:DERIVED_FROM]->(up)
-        RETURN t.en_name AS table_name,
-               labels(t) AS labels,
-               r.script_name AS script_name,
-               r.script_exec_mode AS script_exec_mode,
-               up.en_name AS upstream
-        """
-        records = session.run(query, start_table=start_table)
-        for record in records:
-            name = record['table_name']
-            script_name = record.get('script_name')
-            script_exec_mode = record.get('script_exec_mode') or 'append'
-            upstream = record.get('upstream')
-            node_type = None
-            for label in record.get('labels', []):
-                if label in ['DataModel', 'DataResource']:
-                    node_type = label
-
-            if name not in node_info:
-                node_info[name] = {
-                    'table_name': name,
-                    'script_name': script_name or f"{name}.py",
-                    'execution_mode': script_exec_mode,
-                    'table_type': node_type,
-                    'upstream_tables': []
-                }
-            if upstream:
-                G.add_edge(upstream, name)
-                node_info[name]['upstream_tables'].append(upstream)
-
-    driver.close()
-    execution_order = list(nx.topological_sort(G))
-    logger.info(f"拓扑排序执行顺序: {execution_order}")
-
-    dependency_chain = []
-    for table in execution_order:
-        if table in node_info:
-            dependency_chain.append(node_info[table])
-    return dependency_chain
-
-with DAG(
-    dag_id='dag_manual_dependency_unified_trigger',
-    start_date=datetime(2024, 1, 1),
-    schedule_interval=None,
-    catchup=False,
-    default_args=default_args,
-    description='运行时构建任务,支持conf参数,展示拓扑依赖图'
-) as dag:
-
-    start = EmptyOperator(task_id='start')
-    end = EmptyOperator(task_id='end')
-
-    @task()
-    def get_dependency_chain(**context):
-        conf = context['dag_run'].conf if context.get('dag_run') else {}
-        table_name = conf.get("TABLE_NAME", "book_sale_amt_2yearly")
-        dependency_level = conf.get("DEPENDENCY_LEVEL", "resource")
-        logger.info(f"手动传入参数: TABLE_NAME={table_name}, DEPENDENCY_LEVEL={dependency_level}")
-        return build_dependency_chain_nx(table_name, dependency_level)
-
-    def create_task_callable(table_name, script_name, execution_mode):
-        def _inner_callable():
-            logger.info(f"执行任务:{table_name} using {script_name} mode={execution_mode}")
-            if not execute_script(script_name, table_name, execution_mode):
-                raise Exception(f"脚本 {script_name} 执行失败")
-        return _inner_callable
-
-    def create_runtime_tasks(chain, dag):
-        task_dict = {}
-        for item in chain:
-            table = item['table_name']
-            script = item['script_name']
-            mode = item['execution_mode']
-            task = PythonOperator(
-                task_id=f"run_{table}",
-                python_callable=create_task_callable(table, script, mode),
-                dag=dag
-            )
-            task_dict[table] = task
-
-        for item in chain:
-            downstream = item['table_name']
-            upstreams = item.get('upstream_tables', [])
-            if not upstreams:
-                start >> task_dict[downstream]
-            else:
-                for up in upstreams:
-                    if up in task_dict:
-                        task_dict[up] >> task_dict[downstream]
-
-        for task in task_dict.values():
-            task >> end
-
-    from airflow.operators.python import PythonOperator
-
-    def wrapper(**context):
-        chain = context['ti'].xcom_pull(task_ids='get_dependency_chain')
-        create_runtime_tasks(chain, dag)
-
-    chain_task = get_dependency_chain()
-    build_tasks = PythonOperator(
-        task_id='build_runtime_tasks',
-        python_callable=wrapper,
-        provide_context=True
-    )
-
-    start >> chain_task >> build_tasks >> end