Sfoglia il codice sorgente

准备修改三个unified文件的触发机制

wangxq 1 mese fa
parent
commit
2965e0e4ee
1 ha cambiato i file con 388 aggiunte e 828 eliminazioni
  1. 388 828
      dags/dag_dataops_unified_scheduler.py

+ 388 - 828
dags/dag_dataops_unified_scheduler.py

@@ -8,7 +8,6 @@ from datetime import datetime, timedelta, date
 import logging
 import networkx as nx
 import json
-import os
 from decimal import Decimal
 from common import (
     get_pg_conn, 
@@ -16,7 +15,8 @@ from common import (
     execute_with_monitoring,
     get_today_date
 )
-from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG, AIRFLOW_BASE_PATH, SCRIPTS_BASE_PATH
+from config import TASK_RETRY_CONFIG, PG_CONFIG, NEO4J_CONFIG
+import os
 
 # 创建日志记录器
 logger = logging.getLogger(__name__)
@@ -306,7 +306,7 @@ def write_to_airflow_dag_schedule(exec_date, tables_info):
         conn.close()
 
 def prepare_dag_schedule(**kwargs):
-    """准备调度任务的主函数"""
+    """准备DAG调度任务的主函数"""
     exec_date = kwargs.get('ds') or get_today_date()
     logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
     
@@ -408,395 +408,19 @@ def prepare_dag_schedule(**kwargs):
     kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(execution_plan, default=json_serial))
     logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
     
-    return inserted_count
-
-def create_execution_plan(**kwargs):
-    """准备执行计划的函数,使用从prepare_phase传递的数据,并生成JSON文件"""
-    try:
-        # 从prepare_dag_schedule获取执行计划
-        execution_plan_json = kwargs['ti'].xcom_pull(task_ids='prepare_dag_schedule', key='execution_plan')
-        
-        if not execution_plan_json:
-            # 如果没有获取到,可能是因为推送到XCom失败,尝试从数据库获取
-            exec_date = kwargs.get('ds') or get_today_date()
-            logger.info(f"未从XCom获取到执行计划,尝试从数据库构建。使用执行日期: {exec_date}")
-            
-            # 获取所有任务
-            resource_tasks, model_tasks = get_all_tasks(exec_date)
-            
-            if not resource_tasks and not model_tasks:
-                logger.warning(f"执行日期 {exec_date} 没有找到任务")
-                # 创建空执行计划
-                execution_plan = {
-                    "exec_date": exec_date,
-                    "resource_tasks": [],
-                    "model_tasks": [],
-                    "dependencies": {}
-                }
-            else:
-                # 为所有模型表获取依赖关系
-                model_table_names = [task["target_table"] for task in model_tasks]
-                dependencies = get_table_dependencies_for_data_phase(model_table_names)
-                
-                # 创建执行计划
-                execution_plan = {
-                    "exec_date": exec_date,
-                    "resource_tasks": resource_tasks,
-                    "model_tasks": model_tasks,
-                    "dependencies": dependencies
-                }
-            
-            # 转换为JSON
-            execution_plan_json = json.dumps(execution_plan, default=json_serial)
-        else:
-            # 如果是字符串,解析一下确保格式正确
-            if isinstance(execution_plan_json, str):
-                execution_plan = json.loads(execution_plan_json)
-            else:
-                execution_plan = execution_plan_json
-                execution_plan_json = json.dumps(execution_plan, default=json_serial)
-        
-        # 将执行计划保存为JSON文件,使用临时文件确保写入完整
-        try:
-            import os
-            import time
-            import tempfile
-            from datetime import datetime
-            
-            # 设置文件路径
-            plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
-            plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
-            temp_plan_path = os.path.join(plan_dir, f'temp_last_execution_plan_{int(time.time())}.json')
-            ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-            
-            logger.info(f"=== 开始创建执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
-            logger.info(f"计划目录: {plan_dir}")
-            logger.info(f"最终文件路径: {plan_path}")
-            logger.info(f"临时文件路径: {temp_plan_path}")
-            logger.info(f"就绪标志文件路径: {ready_flag_path}")
-            
-            # 获取目录中的现有文件
-            existing_files = os.listdir(plan_dir)
-            plan_related_files = [f for f in existing_files if 'execution_plan' in f or f.endswith('.ready')]
-            logger.info(f"创建前目录中相关文件数: {len(plan_related_files)}")
-            for f in plan_related_files:
-                file_path = os.path.join(plan_dir, f)
-                file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-                file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
-                logger.info(f"已存在文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
-            
-            # 首先写入临时文件
-            with open(temp_plan_path, 'w') as f:
-                if isinstance(execution_plan_json, str):
-                    f.write(execution_plan_json)
-                else:
-                    json.dump(execution_plan_json, f, indent=2, default=json_serial)
-                f.flush()
-                os.fsync(f.fileno())  # 确保写入磁盘
-            
-            # 验证临时文件
-            temp_size = os.path.getsize(temp_plan_path)
-            temp_time = datetime.fromtimestamp(os.path.getmtime(temp_plan_path)).isoformat()
-            logger.info(f"已创建临时文件: {temp_plan_path} (大小: {temp_size}字节, 修改时间: {temp_time})")
-            
-            with open(temp_plan_path, 'r') as f:
-                test_content = json.load(f)  # 测试是否能正确读取
-                logger.info(f"临时文件验证成功,内容可正确解析为JSON")
-            
-            # 重命名为正式文件
-            if os.path.exists(plan_path):
-                old_size = os.path.getsize(plan_path)
-                old_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
-                logger.info(f"删除已有文件: {plan_path} (大小: {old_size}字节, 修改时间: {old_time})")
-                os.remove(plan_path)  # 先删除已有文件
-            
-            logger.info(f"重命名临时文件: {temp_plan_path} -> {plan_path}")
-            os.rename(temp_plan_path, plan_path)
-            
-            # 确认正式文件
-            if os.path.exists(plan_path):
-                final_size = os.path.getsize(plan_path)
-                final_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
-                logger.info(f"正式文件创建成功: {plan_path} (大小: {final_size}字节, 修改时间: {final_time})")
-            else:
-                logger.error(f"正式文件未成功创建: {plan_path}")
-            
-            # 写入就绪标志文件
-            with open(ready_flag_path, 'w') as f:
-                flag_content = f"Generated at {datetime.now().isoformat()}"
-                f.write(flag_content)
-                f.flush()
-                os.fsync(f.fileno())  # 确保写入磁盘
-            
-            # 确认就绪标志文件
-            if os.path.exists(ready_flag_path):
-                flag_size = os.path.getsize(ready_flag_path)
-                flag_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
-                logger.info(f"就绪标志文件创建成功: {ready_flag_path} (大小: {flag_size}字节, 修改时间: {flag_time}, 内容: {flag_content})")
-            else:
-                logger.error(f"就绪标志文件未成功创建: {ready_flag_path}")
-            
-            # 再次检查目录
-            final_files = os.listdir(plan_dir)
-            final_plan_files = [f for f in final_files if 'execution_plan' in f or f.endswith('.ready')]
-            logger.info(f"创建完成后目录中相关文件数: {len(final_plan_files)}")
-            for f in final_plan_files:
-                file_path = os.path.join(plan_dir, f)
-                file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-                file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
-                logger.info(f"最终文件: {f} (大小: {file_size}字节, 修改时间: {file_time})")
-            
-            logger.info(f"=== 执行计划文件创建完成 - 时间戳: {datetime.now().isoformat()} ===")
-        except Exception as e:
-            logger.error(f"保存执行计划到文件时出错: {str(e)}")
-            import traceback
-            logger.error(traceback.format_exc())
-            raise  # 抛出异常,确保任务失败
-        
-        return execution_plan_json
-    except Exception as e:
-        logger.error(f"创建执行计划时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        raise  # 抛出异常,确保任务失败
-
-def bridge_prepare_to_data_func(**kwargs):
-    """桥接prepare和data阶段,确保执行计划文件已就绪"""
-    import os
-    import time
-    from datetime import datetime
-    
-    logger.info(f"=== 开始验证执行计划文件 - 时间戳: {datetime.now().isoformat()} ===")
-    
-    plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
-    plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
-    ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-    
-    logger.info(f"计划目录: {plan_dir}")
-    logger.info(f"计划文件路径: {plan_path}")
-    logger.info(f"就绪标志文件路径: {ready_flag_path}")
-    
-    # 获取目录中的文件列表
-    all_files = os.listdir(plan_dir)
-    related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
-    logger.info(f"目录中的相关文件总数: {len(related_files)}")
-    for idx, file in enumerate(related_files, 1):
-        file_path = os.path.join(plan_dir, file)
-        file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-        file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
-        logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
-    
-    # 等待就绪标志文件出现
-    logger.info(f"开始等待就绪标志文件: {ready_flag_path}")
-    waiting_start = datetime.now()
-    max_attempts = 30  # 最多等待5分钟
-    for attempt in range(max_attempts):
-        if os.path.exists(ready_flag_path):
-            wait_duration = (datetime.now() - waiting_start).total_seconds()
-            file_size = os.path.getsize(ready_flag_path)
-            file_time = datetime.fromtimestamp(os.path.getmtime(ready_flag_path)).isoformat()
-            
-            # 读取就绪文件内容
-            try:
-                with open(ready_flag_path, 'r') as f:
-                    ready_content = f.read()
-            except Exception as e:
-                ready_content = f"[读取错误: {str(e)}]"
-            
-            logger.info(f"发现执行计划就绪标志: {ready_flag_path} (尝试次数: {attempt+1}, 等待时间: {wait_duration:.2f}秒, 大小: {file_size}字节, 修改时间: {file_time}, 内容: {ready_content})")
-            break
-        
-        logger.info(f"等待执行计划就绪 (尝试: {attempt+1}/{max_attempts}, 已等待: {(datetime.now() - waiting_start).total_seconds():.2f}秒)...")
-        time.sleep(10)  # 等待10秒
-    
-    if not os.path.exists(ready_flag_path):
-        error_msg = f"执行计划就绪标志文件不存在: {ready_flag_path},等待超时 (等待时间: {(datetime.now() - waiting_start).total_seconds():.2f}秒)"
-        logger.error(error_msg)
-        raise Exception(error_msg)
-    
-    # 验证执行计划文件
-    logger.info(f"开始验证执行计划文件: {plan_path}")
-    if not os.path.exists(plan_path):
-        error_msg = f"执行计划文件不存在: {plan_path}"
-        logger.error(error_msg)
-        raise Exception(error_msg)
-    
-    try:
-        file_size = os.path.getsize(plan_path)
-        file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
-        logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
-        
-        with open(plan_path, 'r') as f:
-            execution_plan = json.load(f)
-            logger.info(f"成功读取并解析执行计划文件 JSON 内容")
-        
-        # 验证基本结构
-        if not isinstance(execution_plan, dict):
-            logger.error(f"执行计划格式错误: 不是有效的字典,而是 {type(execution_plan)}")
-            raise ValueError("执行计划不是有效的字典")
-        else:
-            logger.info(f"执行计划基本结构验证: 是有效的字典对象")
-        
-        # 验证关键字段
-        required_fields = ["exec_date", "resource_tasks", "model_tasks"]
-        missing_fields = [field for field in required_fields if field not in execution_plan]
-        
-        if missing_fields:
-            error_msg = f"执行计划缺少必要字段: {missing_fields}"
-            logger.error(error_msg)
-            raise ValueError(error_msg)
-        else:
-            logger.info(f"执行计划必要字段验证通过: 包含所有必要字段 {required_fields}")
-        
-        # 记录执行计划基本信息
-        resource_tasks = execution_plan.get("resource_tasks", [])
-        model_tasks = execution_plan.get("model_tasks", [])
-        exec_date = execution_plan.get("exec_date", "未知")
-        
-        logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
-        
-        # 如果任务很少,记录具体内容
-        if len(resource_tasks) + len(model_tasks) < 10:
-            for idx, task in enumerate(resource_tasks, 1):
-                logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-            
-            for idx, task in enumerate(model_tasks, 1):
-                logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-        
-        # 如果没有任何任务,发出警告
-        if not resource_tasks and not model_tasks:
-            logger.warning(f"执行计划不包含任何任务,可能导致数据处理阶段没有实际工作")
-        
-        logger.info(f"=== 执行计划文件验证成功 - 时间戳: {datetime.now().isoformat()} ===")
-        return True
-    except Exception as e:
-        error_msg = f"验证执行计划文件时出错: {str(e)}"
-        logger.error(error_msg)
-        import traceback
-        logger.error(traceback.format_exc())
-        logger.info(f"=== 执行计划文件验证失败 - 时间戳: {datetime.now().isoformat()} ===")
-        raise Exception(error_msg)
-
-def init_data_processing_phase(**kwargs):
-    """数据处理阶段的初始化函数,重新加载执行计划文件"""
-    import os
-    from datetime import datetime
-    
-    logger.info(f"=== 开始数据处理阶段初始化 - 时间戳: {datetime.now().isoformat()} ===")
-    
-    plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
-    plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
-    ready_flag_path = os.path.join(plan_dir, 'last_execution_plan.json.ready')
-    
-    logger.info(f"计划目录: {plan_dir}")
-    logger.info(f"计划文件路径: {plan_path}")
-    logger.info(f"就绪标志文件路径: {ready_flag_path}")
-    
-    # 检查目录中的文件
-    all_files = os.listdir(plan_dir)
-    related_files = [f for f in all_files if 'execution_plan' in f or f.endswith('.ready')]
-    logger.info(f"目录中的相关文件总数: {len(related_files)}")
-    for idx, file in enumerate(related_files, 1):
-        file_path = os.path.join(plan_dir, file)
-        file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
-        file_time = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat() if os.path.exists(file_path) else 'unknown'
-        logger.info(f"相关文件{idx}: {file} (大小: {file_size}字节, 修改时间: {file_time})")
-    
-    # 验证文件是否存在
-    if not os.path.exists(plan_path):
-        error_msg = f"执行计划文件不存在: {plan_path}"
-        logger.error(error_msg)
-        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
-        raise Exception(error_msg)
-    
-    file_size = os.path.getsize(plan_path)
-    file_time = datetime.fromtimestamp(os.path.getmtime(plan_path)).isoformat()
-    logger.info(f"准备读取执行计划文件: {plan_path} (大小: {file_size}字节, 修改时间: {file_time})")
-    
+    # 保存执行计划到文件
     try:
-        # 记录读取开始时间
-        read_start = datetime.now()
-        
-        with open(plan_path, 'r') as f:
-            file_content = f.read()
-            logger.info(f"成功读取文件内容,大小为 {len(file_content)} 字节")
-            
-            # 解析JSON
-            parse_start = datetime.now()
-            execution_plan = json.loads(file_content)
-            parse_duration = (datetime.now() - parse_start).total_seconds()
-            logger.info(f"成功解析JSON内容,耗时 {parse_duration:.4f} 秒")
-        
-        read_duration = (datetime.now() - read_start).total_seconds()
-        logger.info(f"文件读取和解析总耗时: {read_duration:.4f} 秒")
-        
-        # 验证执行计划基本结构
-        if not isinstance(execution_plan, dict):
-            error_msg = f"执行计划不是有效的字典,实际类型: {type(execution_plan)}"
-            logger.error(error_msg)
-            raise ValueError(error_msg)
-        
-        # 存储到XCom中,以便后续任务使用
-        push_start = datetime.now()
-        
-        # 先序列化为JSON字符串
-        execution_plan_json = json.dumps(execution_plan, default=json_serial)
-        logger.info(f"序列化执行计划为JSON字符串,大小为 {len(execution_plan_json)} 字节")
-        
-        # 推送到XCom
-        kwargs['ti'].xcom_push(key='data_phase_execution_plan', value=execution_plan_json)
-        push_duration = (datetime.now() - push_start).total_seconds()
-        logger.info(f"成功推送执行计划到XCom,耗时 {push_duration:.4f} 秒")
-        
-        # 记录执行计划基本信息
-        resource_tasks = execution_plan.get("resource_tasks", [])
-        model_tasks = execution_plan.get("model_tasks", [])
-        exec_date = execution_plan.get("exec_date", "未知")
-        
-        logger.info(f"执行计划内容摘要: 日期={exec_date}, 资源任务数={len(resource_tasks)}, 模型任务数={len(model_tasks)}")
-        
-        # 如果任务较少,记录详细信息
-        if len(resource_tasks) + len(model_tasks) < 10:
-            for idx, task in enumerate(resource_tasks, 1):
-                logger.info(f"资源任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-            
-            for idx, task in enumerate(model_tasks, 1):
-                logger.info(f"模型任务{idx}: 表={task.get('target_table')}, 脚本={task.get('script_name')}")
-        
-        result = {
-            "exec_date": exec_date,
-            "resource_count": len(resource_tasks),
-            "model_count": len(model_tasks)
-        }
-        
-        logger.info(f"=== 数据处理阶段初始化完成 - 时间戳: {datetime.now().isoformat()} ===")
-        return result
-    except json.JSONDecodeError as e:
-        error_msg = f"执行计划文件JSON解析失败: {str(e)}"
-        logger.error(error_msg)
-        
-        # 记录文件内容摘要以帮助调试
-        try:
-            with open(plan_path, 'r') as f:
-                content = f.read(1000)  # 只读取前1000个字符
-                logger.error(f"文件内容前1000个字符: {content}...")
-        except Exception as read_error:
-            logger.error(f"尝试读取文件内容时出错: {str(read_error)}")
-        
-        import traceback
-        logger.error(traceback.format_exc())
-        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
-        raise Exception(error_msg)
-    except Exception as e:
-        error_msg = f"数据处理阶段初始化失败: {str(e)}"
-        logger.error(error_msg)
-        import traceback
-        logger.error(traceback.format_exc())
-        logger.info(f"=== 数据处理阶段初始化失败 - 时间戳: {datetime.now().isoformat()} ===")
-        raise Exception(error_msg)
+        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+        with open(plan_path, 'w') as f:
+            json.dump(execution_plan, f, default=json_serial, indent=2)
+        logger.info(f"将执行计划保存到文件: {plan_path}")
+    except Exception as file_e:
+        logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
+    
+    return inserted_count 
 
 #############################################
-# 第二阶段: 数据处理阶段(Data Processing Phase)
+# 第二阶段: 数据处理阶段(Data Processing Phase)的函数
 #############################################
 
 def get_latest_date():
@@ -915,6 +539,85 @@ def get_table_dependencies_for_data_phase(table_names):
     
     return dependency_dict
 
+def create_execution_plan(**kwargs):
+    """准备执行计划的函数,使用从准备阶段传递的数据"""
+    try:
+        # 从XCom获取执行计划
+        execution_plan = kwargs['ti'].xcom_pull(task_ids='prepare_phase.prepare_dag_schedule', key='execution_plan')
+        
+        # 如果找不到执行计划,则从数据库获取
+        if not execution_plan:
+            # 获取执行日期
+            exec_date = get_latest_date()
+            logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
+            
+            # 获取所有任务
+            resource_tasks, model_tasks = get_all_tasks(exec_date)
+            
+            if not resource_tasks and not model_tasks:
+                logger.warning(f"执行日期 {exec_date} 没有找到任务")
+                return 0
+            
+            # 为所有模型表获取依赖关系
+            model_table_names = [task["target_table"] for task in model_tasks]
+            dependencies = get_table_dependencies_for_data_phase(model_table_names)
+            
+            # 创建执行计划
+            new_execution_plan = {
+                "exec_date": exec_date,
+                "resource_tasks": resource_tasks,
+                "model_tasks": model_tasks,
+                "dependencies": dependencies
+            }
+            
+            # 保存执行计划
+            kwargs['ti'].xcom_push(key='execution_plan', value=json.dumps(new_execution_plan, default=json_serial))
+            logger.info(f"创建新的执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
+            
+            # 保存执行计划到文件
+            try:
+                plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+                with open(plan_path, 'w') as f:
+                    json.dump(new_execution_plan, f, default=json_serial, indent=2)
+                logger.info(f"将执行计划保存到文件: {plan_path}")
+            except Exception as file_e:
+                logger.error(f"保存执行计划到文件时出错: {str(file_e)}")
+            
+            return json.dumps(new_execution_plan, default=json_serial)
+        
+        # 如果从XCom获取到了执行计划,也保存到文件
+        try:
+            plan_json = json.loads(execution_plan) if isinstance(execution_plan, str) else execution_plan
+            plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+            with open(plan_path, 'w') as f:
+                json.dump(plan_json, f, default=json_serial, indent=2)
+            logger.info(f"将从XCom获取的执行计划保存到文件: {plan_path}")
+        except Exception as file_e:
+            logger.error(f"保存从XCom获取的执行计划到文件时出错: {str(file_e)}")
+        
+        logger.info(f"成功获取执行计划")
+        return execution_plan
+    except Exception as e:
+        logger.error(f"创建执行计划时出错: {str(e)}")
+        # 返回空执行计划
+        empty_plan = {
+            "exec_date": get_today_date(),
+            "resource_tasks": [],
+            "model_tasks": [],
+            "dependencies": {}
+        }
+        
+        # 尝试保存空执行计划到文件
+        try:
+            plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+            with open(plan_path, 'w') as f:
+                json.dump(empty_plan, f, default=json_serial, indent=2)
+            logger.info(f"将空执行计划保存到文件: {plan_path}")
+        except Exception as file_e:
+            logger.error(f"保存空执行计划到文件时出错: {str(file_e)}")
+            
+        return json.dumps(empty_plan, default=json_serial)
+
 def process_resource(target_table, script_name, script_exec_mode, exec_date):
     """处理单个资源表"""
     logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
@@ -928,144 +631,12 @@ def process_resource(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
-    try:
-        # 直接调用执行监控函数,确保脚本得到执行
-        result = execute_with_monitoring(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
-        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
-        return f"处理资源表 {target_table} 完成,结果: {result}"
-    except Exception as e:
-        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 返回错误信息,但不抛出异常,确保DAG可以继续执行
-        return f"处理资源表 {target_table} 失败: {str(e)}"
-
-def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
-    """执行脚本并监控执行情况"""
-    from pathlib import Path
-    import importlib.util
-    import sys
-    
-    logger.info(f"=== 开始执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
-
-    # 检查script_name是否为空
-    if not script_name:
-        logger.error(f"表 {target_table} 的script_name为空,无法执行")
-        # 记录执行失败到数据库
-        now = datetime.now()
-        update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
-        return False
-    
-    # 记录执行开始时间
-    start_time = datetime.now()
-    update_task_start_time(exec_date, target_table, script_name, start_time)
-    logger.info(f"任务开始时间: {start_time.isoformat()}")
-    
-    try:
-        # 执行实际脚本
-        script_path = Path(SCRIPTS_BASE_PATH) / script_name
-        logger.info(f"脚本完整路径: {script_path}")
-        
-        if not script_path.exists():
-            logger.error(f"脚本文件不存在: {script_path}")
-            end_time = datetime.now()
-            duration = (end_time - start_time).total_seconds()
-            update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
-            return False
-        
-        try:
-            # 动态导入模块
-            module_name = f"dynamic_module_{abs(hash(script_name))}"
-            spec = importlib.util.spec_from_file_location(module_name, script_path)
-            module = importlib.util.module_from_spec(spec)
-            sys.modules[module_name] = module
-            spec.loader.exec_module(module)
-            
-            # 使用标准入口函数run
-            if hasattr(module, "run"):
-                logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
-                result = module.run(table_name=target_table, execution_mode=script_exec_mode)
-                logger.info(f"脚本 {script_name} 执行结果: {result}")
-                success = True if result else False
-            else:
-                logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),尝试使用main函数")
-                if hasattr(module, "main"):
-                    logger.info(f"执行脚本 {script_name} 的main函数")
-                    result = module.main(table_name=target_table, execution_mode=script_exec_mode)
-                    logger.info(f"脚本 {script_name} 执行结果: {result}")
-                    success = True if result else False
-                else:
-                    logger.error(f"脚本 {script_name} 没有定义标准入口函数 run() 或 main()")
-                    success = False
-        except Exception as script_e:
-            logger.error(f"执行脚本 {script_name} 时出错: {str(script_e)}")
-            import traceback
-            logger.error(traceback.format_exc())
-            success = False
-        
-        # 记录结束时间和结果
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
-        
-        logger.info(f"任务结束时间: {end_time.isoformat()}, 执行时长: {duration:.2f}秒, 结果: {success}")
-        logger.info(f"=== 完成执行任务 {target_table} 的脚本 {script_name} - 时间戳: {datetime.now().isoformat()} ===")
-        
-        return success
-    except Exception as e:
-        # 处理异常
-        logger.error(f"执行任务出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        end_time = datetime.now()
-        duration = (end_time - start_time).total_seconds()
-        update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
-        
-        logger.info(f"=== 执行任务 {target_table} 的脚本 {script_name} 失败 - 时间戳: {datetime.now().isoformat()} ===")
-        return False
-
-def update_task_start_time(exec_date, target_table, script_name, start_time):
-    """更新任务开始时间"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            UPDATE airflow_dag_schedule 
-            SET exec_start_time = %s
-            WHERE exec_date = %s AND target_table = %s AND script_name = %s
-        """, (start_time, exec_date, target_table, script_name))
-        conn.commit()
-        logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的开始时间: {start_time}")
-    except Exception as e:
-        logger.error(f"更新任务开始时间失败: {str(e)}")
-        conn.rollback()
-    finally:
-        cursor.close()
-        conn.close()
-
-def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
-    """更新任务完成信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            UPDATE airflow_dag_schedule 
-            SET exec_result = %s, exec_end_time = %s, exec_duration = %s
-            WHERE exec_date = %s AND target_table = %s AND script_name = %s
-        """, (success, end_time, duration, exec_date, target_table, script_name))
-        conn.commit()
-        logger.info(f"已更新表 {target_table} 的脚本 {script_name} 的完成状态: 结果={success}, 结束时间={end_time}, 耗时={duration}秒")
-    except Exception as e:
-        logger.error(f"更新任务完成信息失败: {str(e)}")
-        conn.rollback()
-    finally:
-        cursor.close()
-        conn.close()
+    return execute_with_monitoring(
+        target_table=target_table,
+        script_name=script_name,
+        script_exec_mode=script_exec_mode,
+        exec_date=exec_date
+    )
 
 def process_model(target_table, script_name, script_exec_mode, exec_date):
     """处理单个模型表"""
@@ -1080,22 +651,12 @@ def process_model(target_table, script_name, script_exec_mode, exec_date):
         except Exception as e:
             logger.error(f"解析exec_date JSON时出错: {str(e)}")
     
-    try:
-        # 直接调用执行监控函数,确保脚本得到执行
-        result = execute_with_monitoring(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
-        logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
-        return f"处理模型表 {target_table} 完成,结果: {result}"
-    except Exception as e:
-        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 返回错误信息,但不抛出异常,确保DAG可以继续执行
-        return f"处理模型表 {target_table} 失败: {str(e)}"
+    return execute_with_monitoring(
+        target_table=target_table,
+        script_name=script_name,
+        script_exec_mode=script_exec_mode,
+        exec_date=exec_date
+    ) 
 
 #############################################
 # 第三阶段: 汇总阶段(Summary Phase)的函数
@@ -1320,127 +881,73 @@ def generate_unified_execution_report(exec_date, stats):
     
     return report_str
 
-def summarize_execution(**context):
-    """
-    汇总执行计划的执行情况,生成报告
-    """
-    logger.info(f"=== 开始汇总执行情况 - 时间戳: {datetime.now().isoformat()} ===")
+def summarize_execution(**kwargs):
+    """汇总执行情况的主函数"""
     try:
-        # 获取执行日期
-        execution_date = context.get('execution_date', datetime.now())
-        exec_date = execution_date.strftime('%Y-%m-%d')
-        
-        # 从本地文件加载执行计划
-        plan_dir = os.path.join(AIRFLOW_BASE_PATH, 'dags')
-        plan_path = os.path.join(plan_dir, 'last_execution_plan.json')
-        
-        if not os.path.exists(plan_path):
-            logger.warning(f"执行计划文件不存在: {plan_path}")
-            return "执行计划文件不存在,无法生成汇总报告"
-        
-        with open(plan_path, 'r') as f:
-            execution_plan = json.loads(f.read())
-        
-        # 获取任务列表
-        resource_tasks = execution_plan.get("resource_tasks", [])
-        model_tasks = execution_plan.get("model_tasks", [])
-        all_tasks = resource_tasks + model_tasks
-        
-        # 连接数据库,获取任务执行状态
-        conn = get_pg_conn()
-        cursor = conn.cursor()
+        exec_date = kwargs.get('ds') or get_today_date()
+        logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
         
-        # 分析任务执行状态
-        successful_tasks = []
-        failed_tasks = []
-        skipped_tasks = []
+        # 1. 更新缺失的执行结果
+        try:
+            update_count = update_missing_results(exec_date)
+            logger.info(f"更新了 {update_count} 个缺失的执行结果")
+        except Exception as e:
+            logger.error(f"更新缺失执行结果时出错: {str(e)}")
+            update_count = 0
         
-        for task in all_tasks:
-            table_name = task["target_table"]
-            table_type = "资源表" if task in resource_tasks else "模型表"
-            
-            # 查询任务执行状态
-            cursor.execute("""
-                SELECT status FROM airflow_task_execution 
-                WHERE table_name = %s AND exec_date = %s
-                ORDER BY execution_time DESC LIMIT 1
-            """, (table_name, exec_date))
-            
-            result = cursor.fetchone()
-            status = result[0] if result else "未执行"
-            
-            task_info = {
-                "table_name": table_name,
-                "table_type": table_type,
-                "script_name": task["script_name"],
-                "status": status
+        # 2. 获取执行统计信息
+        try:
+            stats = get_execution_stats(exec_date)
+            if not stats:
+                logger.warning("未能获取执行统计信息,将使用默认值")
+                stats = {
+                    "exec_date": exec_date,
+                    "total_tasks": 0,
+                    "type_counts": {},
+                    "success_count": 0,
+                    "fail_count": 0,
+                    "pending_count": 0,
+                    "success_rate": 0,
+                    "avg_duration": None,
+                    "min_duration": None,
+                    "max_duration": None,
+                    "failed_tasks": []
+                }
+        except Exception as e:
+            logger.error(f"获取执行统计信息时出错: {str(e)}")
+            stats = {
+                "exec_date": exec_date,
+                "total_tasks": 0,
+                "type_counts": {},
+                "success_count": 0,
+                "fail_count": 0,
+                "pending_count": 0,
+                "success_rate": 0,
+                "avg_duration": None,
+                "min_duration": None,
+                "max_duration": None,
+                "failed_tasks": []
             }
-            
-            if status == "成功":
-                successful_tasks.append(task_info)
-            elif status == "失败":
-                failed_tasks.append(task_info)
-            else:
-                skipped_tasks.append(task_info)
-        
-        # 生成汇总报告
-        total_tasks = len(all_tasks)
-        success_count = len(successful_tasks)
-        fail_count = len(failed_tasks)
-        skip_count = len(skipped_tasks)
-        
-        summary = f"""
-执行日期: {exec_date}
-总任务数: {total_tasks}
-成功任务数: {success_count}
-失败任务数: {fail_count}
-跳过任务数: {skip_count}
-
-=== 成功任务 ===
-"""
-        
-        for task in successful_tasks:
-            summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
-        
-        if failed_tasks:
-            summary += "\n=== 失败任务 ===\n"
-            for task in failed_tasks:
-                summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
         
-        if skipped_tasks:
-            summary += "\n=== 跳过任务 ===\n"
-            for task in skipped_tasks:
-                summary += f"{task['table_type']} {task['table_name']} ({task['script_name']})\n"
-        
-        # 更新汇总表
-        cursor.execute("""
-            INSERT INTO airflow_execution_summary
-            (exec_date, total_tasks, success_count, fail_count, skip_count, summary_text, created_at)
-            VALUES (%s, %s, %s, %s, %s, %s, %s)
-            ON CONFLICT (exec_date) 
-            DO UPDATE SET
-                total_tasks = EXCLUDED.total_tasks,
-                success_count = EXCLUDED.success_count,
-                fail_count = EXCLUDED.fail_count,
-                skip_count = EXCLUDED.skip_count,
-                summary_text = EXCLUDED.summary_text,
-                updated_at = CURRENT_TIMESTAMP
-        """, (
-            exec_date, total_tasks, success_count, fail_count, skip_count, 
-            summary, datetime.now()
-        ))
+        # 3. 生成执行报告
+        try:
+            report = generate_unified_execution_report(exec_date, stats)
+        except Exception as e:
+            logger.error(f"生成执行报告时出错: {str(e)}")
+            report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
         
-        conn.commit()
-        cursor.close()
-        conn.close()
+        # 将报告和统计信息传递给下一个任务
+        try:
+            kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
+            kwargs['ti'].xcom_push(key='execution_report', value=report)
+        except Exception as e:
+            logger.error(f"保存报告到XCom时出错: {str(e)}")
         
-        logger.info(f"=== 执行情况汇总完成 - 时间戳: {datetime.now().isoformat()} ===")
-        return summary
+        return report
     except Exception as e:
         logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
         # 返回一个简单的错误报告,确保任务不会失败
-        return f"执行汇总时出现错误: {str(e)}"
-
+        return f"执行汇总时出现错误: {str(e)}" 
 
 # 创建DAG
 with DAG(
@@ -1458,197 +965,250 @@ with DAG(
     }
 ) as dag:
     
-    # 初始化全局变量,避免在DAG解析时出现未定义错误
-    globals()['_resource_tasks'] = []
-    globals()['_task_dict'] = {}
-    
-    # DAG开始任务
-    dag_start = EmptyOperator(task_id="dag_start")
-    
-    # DAG结束任务
-    dag_end = EmptyOperator(
-        task_id="dag_end",
-        trigger_rule="all_done"  # 确保DAG无论上游任务成功与否都能完成
-    )
-    
-    # 准备阶段任务
-    prepare_task = PythonOperator(
-        task_id="prepare_dag_schedule",
-        python_callable=prepare_dag_schedule,
-        provide_context=True
-    )
-    
-    # 汇总执行情况任务
-    summarize_task = PythonOperator(
-        task_id='summarize_execution',
-        python_callable=summarize_execution,
-        provide_context=True,
-        trigger_rule='all_done',  # 无论之前的任务成功还是失败都执行
-        retries=2,  # 增加重试次数
-        retry_delay=timedelta(minutes=1)  # 重试延迟
-    )
-    
-    # 数据处理阶段
-    # 获取所有需要执行的任务(实际任务,不是TaskGroup包装的任务)
-    exec_date = get_latest_date()
-    resource_tasks, model_tasks = get_all_tasks(exec_date)
-    
-    # 创建任务字典,用于设置依赖关系
-    task_dict = {}
-    
-    # 创建资源表任务
-    for task_info in resource_tasks:
-        table_name = task_info["target_table"]
-        script_name = task_info["script_name"]
-        exec_mode = task_info.get("script_exec_mode", "append")
-        
-        # 创建安全的任务ID
-        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-        task_id = f"resource_{safe_table_name}"
+    #############################################
+    # 阶段1: 准备阶段(Prepare Phase)
+    #############################################
+    with TaskGroup("prepare_phase") as prepare_group:
+        # 任务开始标记
+        start_preparation = EmptyOperator(
+            task_id="start_preparation"
+        )
         
-        # 直接使用 execute_with_monitoring 函数,确保执行脚本
-        resource_task = PythonOperator(
-            task_id=task_id,
-            python_callable=execute_with_monitoring,
-            op_kwargs={
-                "target_table": table_name,
-                "script_name": script_name,
-                "script_exec_mode": exec_mode,
-                "exec_date": exec_date
-            },
-            retries=2,
-            retry_delay=timedelta(minutes=1),
-            trigger_rule="all_done"  # 确保无论上游任务成功或失败都会执行
+        # 准备调度任务
+        prepare_task = PythonOperator(
+            task_id="prepare_dag_schedule",
+            python_callable=prepare_dag_schedule,
+            provide_context=True
         )
         
-        # 将任务添加到字典
-        task_dict[table_name] = resource_task
+        # 创建执行计划 - 从data_processing_phase移至这里
+        create_plan = PythonOperator(
+            task_id="create_execution_plan",
+            python_callable=create_execution_plan,
+            provide_context=True
+        )
         
-        # 设置依赖关系:prepare_task -> resource_task
-        prepare_task >> resource_task
+        # 准备完成标记
+        preparation_completed = EmptyOperator(
+            task_id="preparation_completed"
+        )
         
-    # 为所有模型表获取依赖关系
-    model_table_names = [task["target_table"] for task in model_tasks]
-    dependencies = get_table_dependencies_for_data_phase(model_table_names)
-    
-    # 创建有向图,用于确定执行顺序
-    G = nx.DiGraph()
-    
-    # 将所有模型表添加为节点
-    for task_info in model_tasks:
-        G.add_node(task_info["target_table"])
-    
-    # 添加依赖边
-    for source, deps in dependencies.items():
-        for dep in deps:
-            if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                G.add_edge(dep.get("table_name"), source)
-    
-    # 处理循环依赖
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        for cycle in cycles:
-            G.remove_edge(cycle[-1], cycle[0])
+        # 设置任务依赖 - 调整为包含create_plan
+        start_preparation >> prepare_task >> create_plan >> preparation_completed
     
-    # 获取执行顺序
-    try:
-        execution_order = list(nx.topological_sort(G))
-    except Exception as e:
-        execution_order = [task["target_table"] for task in model_tasks]
+    #############################################
+    # 阶段2: 数据处理阶段(Data Processing Phase)
+    #############################################
+    with TaskGroup("data_processing_phase") as data_group:
+        # 过程完成标记
+        processing_completed = EmptyOperator(
+            task_id="processing_completed"
+        )
     
-    # 创建模型表任务
-    for table_name in execution_order:
-        task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-        if not task_info:
-            continue
-        
-        script_name = task_info["script_name"]
-        exec_mode = task_info.get("script_exec_mode", "append")
-        
-        # 创建安全的任务ID
-        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-        task_id = f"model_{safe_table_name}"
-        
-        # 直接使用 execute_with_monitoring 函数执行脚本
-        model_task = PythonOperator(
-            task_id=task_id,
-            python_callable=execute_with_monitoring,
-            op_kwargs={
-                "target_table": table_name,
-                "script_name": script_name,
-                "script_exec_mode": exec_mode,
-                "exec_date": exec_date
-            },
-            retries=2,
-            retry_delay=timedelta(minutes=1),
-            trigger_rule="all_done"  # 确保无论上游任务成功或失败都会执行
+    #############################################
+    # 阶段3: 汇总阶段(Summary Phase)
+    #############################################
+    with TaskGroup("summary_phase") as summary_group:
+        # 汇总执行情况
+        summarize_task = PythonOperator(
+            task_id="summarize_execution",
+            python_callable=summarize_execution,
+            provide_context=True
         )
         
-        # 将任务添加到字典
-        task_dict[table_name] = model_task
-        
-        # 设置依赖关系
-        deps = dependencies.get(table_name, [])
-        has_dependency = False
-        
-        # 处理模型表之间的依赖
-        for dep in deps:
-            dep_table = dep.get("table_name")
-            if dep_table in task_dict:
-                task_dict[dep_table] >> model_task
-                has_dependency = True
-        
-        # 如果没有依赖,则依赖于所有资源表任务
-        if not has_dependency and resource_tasks:
-            for resource_task_info in resource_tasks:
-                resource_name = resource_task_info["target_table"]
-                if resource_name in task_dict:
-                    task_dict[resource_name] >> model_task
+        # 总结完成标记
+        summary_completed = EmptyOperator(
+            task_id="summary_completed"
+        )
         
-        # 如果没有依赖,也没有资源表,则直接依赖于prepare_task
-        if not has_dependency and not resource_tasks:
-            prepare_task >> model_task
-    
-    # 所有处理任务都是summarize_task的上游
-    for task in task_dict.values():
-        task >> summarize_task
-    
-    # 设置主要流程
-    dag_start >> prepare_task
+        # 设置任务依赖
+        summarize_task >> summary_completed
     
-    # 创建执行计划文件任务
-    create_plan_task = PythonOperator(
-        task_id="create_execution_plan",
-        python_callable=create_execution_plan,
-        provide_context=True
-    )
+    # 设置三个阶段之间的依赖关系 - 使用简单的TaskGroup依赖
+    prepare_group >> data_group >> summary_group
+
+    # 实际数据处理任务的动态创建逻辑
+    # 这部分代码在DAG运行时执行,根据数据库数据和执行计划动态创建任务
     
-    # 设置依赖关系
-    prepare_task >> create_plan_task >> summarize_task >> dag_end
+    # 从执行计划JSON中获取信息
+    execution_plan_json = '''{"exec_date": "2025-04-12", "resource_tasks": [], "model_tasks": [], "dependencies": {}}'''
     
-    logger.info(f"DAG dag_dataops_unified_scheduler 定义完成,创建了 {len(task_dict)} 个脚本执行任务")
+    try:
+        # 尝试从文件中读取最新的执行计划,仅用于构建DAG视图
+        import os
+        plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
+        if os.path.exists(plan_path):
+            with open(plan_path, 'r') as f:
+                execution_plan_json = f.read()
+    except Exception as e:
+        logger.warning(f"读取执行计划默认值时出错: {str(e)}")
     
-    # 尝试从数据库获取最新的执行计划,用于WebUI展示
+    # 解析执行计划获取任务信息
     try:
-        # 使用一个只在DAG加载时执行一次的简单查询来获取表信息
-        # 这只用于UI展示,不影响实际执行
-        conn = get_pg_conn()
-        cursor = conn.cursor()
+        execution_plan = json.loads(execution_plan_json)
+        exec_date = execution_plan.get("exec_date", get_today_date())
+        resource_tasks = execution_plan.get("resource_tasks", [])
+        model_tasks = execution_plan.get("model_tasks", [])
+        dependencies = execution_plan.get("dependencies", {})
+        
+        # 任务字典,用于设置依赖关系
+        task_dict = {}
+        
+        # 1. 创建资源表任务
+        for task_info in resource_tasks:
+            table_name = task_info["target_table"]
+            script_name = task_info["script_name"]
+            exec_mode = task_info.get("script_exec_mode", "append")
+            
+            # 创建安全的任务ID - 直接使用表名作为ID,更简洁易读
+            safe_table_name = table_name.replace(".", "_").replace("-", "_")
+            
+            # 确保所有任务都是data_processing_phase的一部分
+            with data_group:
+                resource_task = PythonOperator(
+                    task_id=f"resource_{safe_table_name}",  # 不需要加前缀,TaskGroup会自动添加
+                    python_callable=process_resource,
+                    op_kwargs={
+                        "target_table": table_name,
+                        "script_name": script_name,
+                        "script_exec_mode": exec_mode,
+                        "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
+                    },
+                    retries=TASK_RETRY_CONFIG["retries"],
+                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                )
+            
+            # 将任务添加到字典
+            task_dict[table_name] = resource_task
+            
+            # 设置任务依赖 - 使用正确的引用方式
+            preparation_completed >> resource_task
+        
+        # 创建有向图,用于检测模型表之间的依赖关系
+        G = nx.DiGraph()
+        
+        # 将所有模型表添加为节点
+        for task_info in model_tasks:
+            table_name = task_info["target_table"]
+            G.add_node(table_name)
+        
+        # 添加模型表之间的依赖边
+        for source, deps in dependencies.items():
+            for dep in deps:
+                if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
+                    G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
+        
+        # 检测循环依赖并处理
+        cycles = list(nx.simple_cycles(G))
+        if cycles:
+            logger.warning(f"检测到循环依赖: {cycles}")
+            for cycle in cycles:
+                G.remove_edge(cycle[-1], cycle[0])
+                logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+        
+        # 生成拓扑排序,确定执行顺序
         try:
-            cursor.execute("""
-                SELECT COUNT(*) FROM airflow_dag_schedule
-            """)
-            count = cursor.fetchone()
-            if count and count[0] > 0:
-                logger.info(f"数据库中有 {count[0]} 条任务记录可用于调度")
-            else:
-                logger.info("数据库中没有找到任务记录,DAG的第一次运行将创建初始计划")
+            execution_order = list(nx.topological_sort(G))
+            logger.info(f"计算出的执行顺序: {execution_order}")
         except Exception as e:
-            logger.warning(f"查询数据库时出错: {str(e)}, 这不会影响DAG的实际执行")
-        finally:
-            cursor.close()
-            conn.close()
+            logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
+            execution_order = [task_info["target_table"] for task_info in model_tasks]
+        
+        # 2. 按拓扑排序顺序创建模型表任务
+        for table_name in execution_order:
+            task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+            if not task_info:
+                continue
+                
+            script_name = task_info["script_name"]
+            exec_mode = task_info.get("script_exec_mode", "append")
+            
+            # 创建安全的任务ID
+            safe_table_name = table_name.replace(".", "_").replace("-", "_")
+            
+            # 确保所有任务都是data_processing_phase的一部分
+            with data_group:
+                model_task = PythonOperator(
+                    task_id=f"model_{safe_table_name}", # 更简洁的ID
+                    python_callable=process_model,
+                    op_kwargs={
+                        "target_table": table_name,
+                        "script_name": script_name,
+                        "script_exec_mode": exec_mode,
+                        "exec_date": """{{ ti.xcom_pull(task_ids='prepare_phase.create_execution_plan') }}"""
+                    },
+                    retries=TASK_RETRY_CONFIG["retries"],
+                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                )
+            
+            # 将任务添加到字典
+            task_dict[table_name] = model_task
+            
+            # 设置依赖关系
+            deps = dependencies.get(table_name, [])
+            has_dependency = False
+            
+            # 处理模型表之间的依赖
+            for dep in deps:
+                dep_table = dep.get("table_name")
+                dep_type = dep.get("table_type")
+                
+                if dep_table in task_dict:
+                    task_dict[dep_table] >> model_task
+                    has_dependency = True
+                    logger.info(f"设置依赖: {dep_table} >> {table_name}")
+            
+            # 如果没有依赖,则依赖于资源表任务
+            if not has_dependency:
+                # 依赖于prepare_phase的完成
+                preparation_completed >> model_task
+                
+                # 同时从所有资源表任务连接
+                for resource_table in resource_tasks:
+                    resource_name = resource_table["target_table"]
+                    if resource_name in task_dict:
+                        task_dict[resource_name] >> model_task
+                        logger.info(f"设置资源依赖: {resource_name} >> {table_name}")
+
+        # 如果没有模型表任务,将所有资源表任务视为终端任务
+        if not model_tasks and resource_tasks:
+            terminal_tasks = [task["target_table"] for task in resource_tasks]
+        else:
+            # 找出所有终端任务(没有下游依赖的任务)
+            terminal_tasks = []
+            
+            # 检查所有模型表任务
+            for table_name in execution_order:
+                # 检查是否有下游任务
+                has_downstream = False
+                for source, deps in dependencies.items():
+                    if source == table_name:  # 跳过自身
+                        continue
+                    for dep in deps:
+                        if dep.get("table_name") == table_name:
+                            has_downstream = True
+                            break
+                    if has_downstream:
+                        break
+                
+                # 如果没有下游任务,添加到终端任务列表
+                if not has_downstream and table_name in task_dict:
+                    terminal_tasks.append(table_name)
+
+        # 如果既没有模型表任务也没有资源表任务,已有默认依赖链
+        if not terminal_tasks:
+            logger.warning("未找到任何任务,使用默认依赖链")
+        else:
+            # 将所有终端任务连接到完成标记
+            for table_name in terminal_tasks:
+                if table_name in task_dict:
+                    task_dict[table_name] >> processing_completed
+                    logger.info(f"设置终端任务: {table_name} >> processing_completed")
+
     except Exception as e:
-        logger.warning(f"初始化DAG时发生错误: {str(e)}, 这不会影响DAG的实际执行")
+        logger.error(f"构建任务DAG时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        
         # 确保即使出错,也有清晰的执行路径
         # 已经有默认依赖链,不需要额外添加