Sfoglia il codice sorgente

完成脚本编排的定期调度版本

wangxq 3 settimane fa
parent
commit
af9c8c987d

+ 636 - 0
dags/dataops_productline_execute_dag.py

@@ -0,0 +1,636 @@
+"""
+统一数据产品线执行器 DAG
+
+功能:
+1. 面向脚本的作业编排,不再是面向表
+2. 基于dataops_productline_prepare_dag生成的执行计划执行脚本
+3. 支持对脚本执行顺序的优化
+4. 提供详细的执行日志和错误处理
+"""
+from airflow import DAG
+from airflow.operators.python import PythonOperator, ShortCircuitOperator
+from airflow.operators.empty import EmptyOperator
+from airflow.utils.task_group import TaskGroup
+from datetime import datetime, timedelta, date
+import logging
+import networkx as nx
+import json
+import os
+import pendulum
+from decimal import Decimal
+from common import (
+    get_pg_conn, 
+    get_neo4j_driver,
+    get_today_date
+)
+from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, PG_CONFIG, NEO4J_CONFIG
+import pytz
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+# 开启详细诊断日志记录
+ENABLE_DEBUG_LOGGING = True
+
+def log_debug(message):
+    """记录调试日志,但只在启用调试模式时"""
+    if ENABLE_DEBUG_LOGGING:
+        logger.info(f"[DEBUG] {message}")
+
+# 在DAG启动时输出诊断信息
+log_debug("======== 诊断信息 ========")
+log_debug(f"当前工作目录: {os.getcwd()}")
+log_debug(f"SCRIPTS_BASE_PATH: {SCRIPTS_BASE_PATH}")
+log_debug(f"导入的common模块路径: {get_pg_conn.__module__}")
+
+#############################################
+# 通用工具函数
+#############################################
+
+def json_serial(obj):
+    """将日期对象序列化为ISO格式字符串的JSON序列化器"""
+    if isinstance(obj, (datetime, date)):
+        return obj.isoformat()
+    raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
+
+# 添加自定义JSON编码器解决Decimal序列化问题
+class DecimalEncoder(json.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, Decimal):
+            return float(obj)
+        # 处理日期类型
+        elif isinstance(obj, (datetime, date)):
+            return obj.isoformat()
+        # 让父类处理其他类型
+        return super(DecimalEncoder, self).default(obj)
+
+#############################################
+# 脚本执行函数
+#############################################
+
+def execute_script(script_id, script_name, target_table, exec_date, script_exec_mode='append', **kwargs):
+    """
+    执行单个脚本并返回执行结果
+    
+    参数:
+        script_id: 脚本ID
+        script_name: 脚本文件名
+        target_table: 目标表名
+        exec_date: 执行日期
+        script_exec_mode: 执行模式
+        **kwargs: 其他参数,如source_tables、target_type等
+    
+    返回:
+        bool: 脚本执行结果
+    """
+    # 添加详细日志
+    logger.info(f"===== 开始执行脚本 {script_id} =====")
+    logger.info(f"script_id: {script_id}, 类型: {type(script_id)}")
+    logger.info(f"script_name: {script_name}, 类型: {type(script_name)}")
+    logger.info(f"target_table: {target_table}, 类型: {type(target_table)}")
+    logger.info(f"script_exec_mode: {script_exec_mode}, 类型: {type(script_exec_mode)}")
+    logger.info(f"exec_date: {exec_date}, 类型: {type(exec_date)}")
+
+    # 记录额外参数
+    for key, value in kwargs.items():
+        logger.info(f"额外参数 - {key}: {value}, 类型: {type(value)}")
+
+    # 检查script_name是否为空
+    if not script_name:
+        logger.error(f"脚本ID {script_id} 的script_name为空,无法执行")
+        return False
+        
+    # 记录执行开始时间
+    start_time = datetime.now()
+    
+    try:
+        # 导入和执行脚本模块
+        import importlib.util
+        import sys
+        script_path = os.path.join(SCRIPTS_BASE_PATH, script_name)
+        
+        if not os.path.exists(script_path):
+            logger.error(f"脚本文件不存在: {script_path}")
+            return False
+            
+        # 动态导入模块
+        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
+        module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(module)
+        
+        # 检查并调用标准入口函数run
+        if hasattr(module, "run"):
+            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
+            # 构建完整的参数字典
+            run_params = {
+                "table_name": target_table,
+                "execution_mode": script_exec_mode,
+                "exec_date": exec_date
+            }
+
+            ## 添加可能的额外参数
+            for key in ['target_type', 'storage_location', 'frequency', 'source_tables']:
+                if key in kwargs and kwargs[key] is not None:
+                    run_params[key] = kwargs[key] 
+
+            # 调用脚本的run函数
+            logger.info(f"调用run函数并传递参数: {run_params}")
+            result = module.run(**run_params)
+            logger.info(f"脚本执行完成,原始返回值: {result}, 类型: {type(result)}")
+            
+            # 确保result是布尔值
+            if result is None:
+                logger.warning(f"脚本返回值为None,转换为False")
+                result = False
+            elif not isinstance(result, bool):
+                original_result = result
+                result = bool(result)
+                logger.warning(f"脚本返回非布尔值 {original_result},转换为布尔值: {result}")
+            
+            # 记录结束时间和结果
+            end_time = datetime.now()
+            duration = (end_time - start_time).total_seconds()
+            logger.info(f"脚本 {script_name} 执行完成,结果: {result}, 耗时: {duration:.2f}秒")
+            
+            return result
+        else:
+            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
+            return False
+    except Exception as e:
+        # a处理异常
+        logger.error(f"执行脚本 {script_id} 出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        logger.error(f"脚本 {script_name} 执行失败,耗时: {duration:.2f}秒")
+        logger.info(f"===== 脚本执行异常结束 =====")
+        import traceback
+        logger.error(traceback.format_exc())
+        
+        # 确保不会阻塞DAG
+        return False
+
+#############################################
+# 执行计划获取和处理函数
+#############################################
+
+def get_execution_plan_from_db(ds):
+    """
+    从数据库获取产品线执行计划
+    
+    参数:
+        ds (str): 执行日期,格式为'YYYY-MM-DD'
+        
+    返回:
+        dict: 执行计划字典,如果找不到则返回None
+    """
+    # 记录输入参数详细信息
+    if isinstance(ds, datetime):
+        if ds.tzinfo:
+            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 带时区: {ds.tzinfo}")
+        else:
+            logger.debug(f"【执行日期】get_execution_plan_from_db接收到datetime对象: {ds}, 无时区")
+    else:
+        logger.debug(f"【执行日期】get_execution_plan_from_db接收到: {ds}, 类型: {type(ds)}")
+    
+    logger.info(f"尝试从数据库获取执行日期 {ds} 的产品线执行计划")
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    execution_plan = None
+    
+    try:
+        # 查询条件a: 当前日期=表的exec_date,如果有多条记录,取logical_date最大的一条
+        cursor.execute("""
+            SELECT plan
+            FROM airflow_exec_plans
+            WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date = %s
+            ORDER BY logical_date DESC
+            LIMIT 1
+        """, (ds,))
+        result = cursor.fetchone()
+        
+        if result:
+            # 获取计划
+            plan_json = result[0]
+            
+            # 处理plan_json可能已经是dict的情况
+            if isinstance(plan_json, dict):
+                execution_plan = plan_json
+            else:
+                execution_plan = json.loads(plan_json)
+                
+            logger.info(f"找到当前日期 exec_date={ds} 的执行计划记录")
+            return execution_plan
+        
+        # 查询条件b: 找不到当前日期的记录,查找exec_date<当前ds的最新记录
+        logger.info(f"未找到当前日期 exec_date={ds} 的执行计划记录,尝试查找历史记录")
+        cursor.execute("""
+            SELECT plan, exec_date
+            FROM airflow_exec_plans
+            WHERE dag_id = 'dataops_productline_prepare_dag' AND exec_date < %s
+            ORDER BY exec_date DESC, logical_date DESC
+            LIMIT 1
+        """, (ds,))
+        result = cursor.fetchone()
+        
+        if result:
+            # 获取计划和exec_date
+            plan_json, plan_ds = result
+            
+            # 处理plan_json可能已经是dict的情况
+            if isinstance(plan_json, dict):
+                execution_plan = plan_json
+            else:
+                execution_plan = json.loads(plan_json)
+                
+            logger.info(f"找到历史执行计划记录,exec_date: {plan_ds}")
+            return execution_plan
+        
+        # 找不到任何执行计划记录
+        logger.error(f"在数据库中未找到任何执行计划记录,当前DAG exec_date={ds}")
+        return None
+        
+    except Exception as e:
+        logger.error(f"从数据库获取执行计划时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        return None
+    finally:
+        cursor.close()
+        conn.close()
+
+def check_execution_plan(**kwargs):
+    """
+    检查执行计划是否存在且有效
+    返回False将阻止所有下游任务执行
+    """
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    
+    # 检查是否是手动触发
+    is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
+    if is_manual_trigger:
+        logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
+    
+    # 记录重要的时间参数
+    logger.info(f"【时间参数】check_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+    logger.info("检查数据库中的执行计划是否存在且有效")
+    
+    # 从数据库获取执行计划
+    execution_plan = get_execution_plan_from_db(exec_date)
+    
+    # 检查是否成功获取到执行计划
+    if not execution_plan:
+        logger.error(f"未找到执行日期 {exec_date} 的执行计划")
+        return False
+    
+    # 检查执行计划是否包含必要字段
+    if "exec_date" not in execution_plan:
+        logger.error("执行计划缺少exec_date字段")
+        return False
+        
+    if not isinstance(execution_plan.get("scripts", []), list):
+        logger.error("执行计划的scripts字段无效")
+        return False
+        
+    if not isinstance(execution_plan.get("script_dependencies", {}), dict):
+        logger.error("执行计划的script_dependencies字段无效")
+        return False
+    
+    # 检查是否有脚本数据
+    scripts = execution_plan.get("scripts", [])
+    
+    if not scripts:
+        logger.warning("执行计划不包含任何脚本")
+        # 如果没有脚本,则阻止下游任务执行
+        return False
+    
+    logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本")
+    
+    # 保存执行计划到XCom以便下游任务使用
+    kwargs['ti'].xcom_push(key='execution_plan', value=execution_plan)
+    
+    return True
+
+def optimize_execution_order(scripts, script_dependencies):
+    """
+    使用NetworkX优化脚本执行顺序
+    
+    参数:
+        scripts (list): 脚本信息列表
+        script_dependencies (dict): 脚本依赖关系字典
+        
+    返回:
+        list: 优化后的脚本执行顺序(脚本ID列表)
+    """
+    logger.info("开始使用NetworkX优化脚本执行顺序")
+    
+    # 构建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有脚本作为节点
+    for script in scripts:
+        script_id = script['script_id']
+        G.add_node(script_id)
+    
+    # 添加依赖边
+    for script_id, dependencies in script_dependencies.items():
+        for dep_id in dependencies:
+            # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
+            G.add_edge(script_id, dep_id)
+            logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
+    
+    # 检查是否有循环依赖
+    try:
+        cycles = list(nx.simple_cycles(G))
+        if cycles:
+            logger.warning(f"检测到循环依赖: {cycles}")
+            # 处理循环依赖,可以通过删除一些边来打破循环
+            for cycle in cycles:
+                # 选择一条边删除,这里简单地选择第一条边
+                if len(cycle) > 1:
+                    G.remove_edge(cycle[0], cycle[1])
+                    logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
+    except Exception as e:
+        logger.error(f"检测循环依赖时出错: {str(e)}")
+    
+    # 使用拓扑排序获取执行顺序
+    try:
+        # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
+        reverse_G = G.reverse()
+        execution_order = list(nx.topological_sort(reverse_G))
+        
+        # 反转结果,使上游任务先执行
+        execution_order.reverse()
+        
+        logger.info(f"NetworkX优化后的脚本执行顺序: {execution_order}")
+        return execution_order
+    except Exception as e:
+        logger.error(f"生成脚本执行顺序时出错: {str(e)}")
+        # 出错时返回原始脚本ID列表,不进行优化
+        return [script['script_id'] for script in scripts]
+
+def create_execution_plan(**kwargs):
+    """
+    创建或获取执行计划
+    """
+    try:
+        dag_run = kwargs.get('dag_run')
+        logical_date = dag_run.logical_date
+        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+        exec_date = local_logical_date.strftime('%Y-%m-%d')
+        
+        # 检查是否是手动触发
+        is_manual_trigger = dag_run.conf.get('MANUAL_TRIGGER', False) if dag_run.conf else False
+        if is_manual_trigger:
+            logger.info(f"【手动触发】当前DAG是手动触发的,使用传入的logical_date: {logical_date}")
+        
+        # 记录重要的时间参数
+        logger.info(f"【时间参数】create_execution_plan: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
+        
+        # 从XCom获取执行计划
+        execution_plan = kwargs['ti'].xcom_pull(task_ids='check_execution_plan', key='execution_plan')
+        
+        # 如果找不到执行计划,则从数据库获取
+        if not execution_plan:
+            logger.info(f"未找到执行计划,从数据库获取。使用执行日期: {exec_date}")
+            execution_plan = get_execution_plan_from_db(exec_date)
+            
+            if not execution_plan:
+                logger.error(f"执行日期 {exec_date} 没有找到执行计划")
+                return None
+        
+        # 验证执行计划结构
+        scripts = execution_plan.get("scripts", [])
+        script_dependencies = execution_plan.get("script_dependencies", {})
+        execution_order = execution_plan.get("execution_order", [])
+        
+        # 如果执行计划中没有execution_order或为空,使用NetworkX优化
+        if not execution_order:
+            logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
+            execution_order = optimize_execution_order(scripts, script_dependencies)
+            execution_plan["execution_order"] = execution_order
+        
+        # 保存完整的执行计划到XCom
+        kwargs['ti'].xcom_push(key='full_execution_plan', value=execution_plan)
+        
+        logger.info(f"成功处理执行计划,包含 {len(scripts)} 个脚本")
+        return execution_plan
+    except Exception as e:
+        logger.error(f"创建执行计划时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        return None
+
+# 创建DAG
+with DAG(
+    "dataops_productline_execute_dag", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily",  # 设置为每日调度
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    },
+    params={
+        'MANUAL_TRIGGER': False, 
+    }
+) as dag:
+    
+    # 记录DAG实例化时的重要信息
+    now = datetime.now()
+    now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
+    default_exec_date = get_today_date()
+    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
+    
+    #############################################
+    # 准备阶段: 检查并创建执行计划
+    #############################################
+    with TaskGroup("prepare_phase") as prepare_group:
+        # 检查执行计划是否存在
+        check_plan = ShortCircuitOperator(
+            task_id="check_execution_plan",
+            python_callable=check_execution_plan,
+            provide_context=True
+        )
+        
+        # 创建执行计划
+        create_plan = PythonOperator(
+            task_id="create_execution_plan",
+            python_callable=create_execution_plan,
+            provide_context=True
+        )
+        
+        # 设置任务依赖
+        check_plan >> create_plan
+    
+    #############################################
+    # 执行阶段: 按依赖关系执行脚本
+    #############################################
+    with TaskGroup("execution_phase") as execution_group:
+        try:
+            # 获取当前DAG的执行日期
+            exec_date = get_today_date()  # 使用当天日期作为默认值
+            logger.info(f"当前DAG执行日期 ds={exec_date},尝试从数据库获取执行计划")
+            
+            # 从数据库获取执行计划
+            execution_plan = get_execution_plan_from_db(exec_date)
+            
+            # 检查是否成功获取到执行计划
+            if execution_plan is None:
+                error_msg = f"无法从数据库获取有效的执行计划,当前DAG exec_date={exec_date}"
+                logger.error(error_msg)
+                # 使用全局变量而不是异常来强制DAG失败
+                raise ValueError(error_msg)
+            
+            # 提取信息
+            exec_date = execution_plan.get("exec_date", exec_date)
+            scripts = execution_plan.get("scripts", [])
+            script_dependencies = execution_plan.get("script_dependencies", {})
+            execution_order = execution_plan.get("execution_order", [])
+            
+            # 如果执行计划中没有execution_order或为空,使用NetworkX优化
+            if not execution_order:
+                logger.info("执行计划中没有execution_order,使用NetworkX进行优化")
+                execution_order = optimize_execution_order(scripts, script_dependencies)
+            
+            logger.info(f"执行计划: exec_date={exec_date}, scripts数量={len(scripts)}")
+            
+            # 如果执行计划为空(没有脚本),也应该失败
+            if not scripts:
+                error_msg = f"执行计划中没有任何脚本,当前DAG exec_date={exec_date}"
+                logger.error(error_msg)
+                raise ValueError(error_msg)
+            
+            # 1. 创建开始和结束任务
+            start_execution = EmptyOperator(
+                task_id="start_execution"
+            )
+            
+            execution_completed = EmptyOperator(
+                task_id="execution_completed",
+                trigger_rule="none_failed_min_one_success"  # 只要有一个任务成功且没有失败的任务就标记为完成
+            )
+            
+            # 创建脚本任务字典,用于管理任务依赖
+            task_dict = {}
+            
+            # 2. 先创建所有脚本任务,不设置依赖关系
+            for script in scripts:
+                script_id = script['script_id']
+                script_name = script.get("script_name")
+                target_table = script.get("target_table")
+                script_type = script.get("script_type", "python")
+                script_exec_mode = script.get("script_exec_mode", "append")
+                source_tables = script.get("source_tables", [])
+                
+                # 使用描述性的任务ID,包含脚本名称和目标表
+                # 提取文件名
+                if "/" in script_name:
+                    script_file = script_name.split("/")[-1]  # 获取文件名部分
+                else:
+                    script_file = script_name
+                
+                # 确保任务ID不包含不允许的特殊字符
+                safe_script_name = script_file.replace(" ", "_")
+                safe_target_table = target_table.replace("-", "_").replace(" ", "_")
+                
+                # 按照指定格式创建任务ID
+                task_id = f"{safe_script_name}-TO-{safe_target_table}"
+                
+                # 构建op_kwargs参数
+                op_kwargs = {
+                    "script_id": script_id,
+                    "script_name": script_name,
+                    "target_table": target_table,
+                    "exec_date": str(exec_date),
+                    "script_exec_mode": script_exec_mode,
+                    "source_tables": source_tables
+                }
+                
+                # 添加特殊参数(如果有)
+                for key in ['target_type', 'storage_location', 'frequency']:
+                    if key in script and script[key] is not None:
+                        op_kwargs[key] = script[key]
+                
+                # 创建任务
+                script_task = PythonOperator(
+                    task_id=task_id,
+                    python_callable=execute_script,
+                    op_kwargs=op_kwargs,
+                    retries=TASK_RETRY_CONFIG["retries"],
+                    retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
+                )
+                
+                # 将任务添加到字典
+                task_dict[script_id] = script_task
+            
+            # 3. 设置开始任务与所有无依赖的脚本任务的关系
+            no_dep_scripts = []
+            for script_id, dependencies in script_dependencies.items():
+                if not dependencies:  # 如果没有依赖
+                    if script_id in task_dict:
+                        no_dep_scripts.append(script_id)
+                        start_execution >> task_dict[script_id]
+                        logger.info(f"设置无依赖脚本: start_execution >> {script_id}")
+            
+            # 4. 设置脚本间的依赖关系
+            for script_id, dependencies in script_dependencies.items():
+                for dep_id in dependencies:
+                    if script_id in task_dict and dep_id in task_dict:
+                        # 正确的依赖关系:依赖任务 >> 当前任务
+                        task_dict[dep_id] >> task_dict[script_id]
+                        logger.info(f"设置脚本依赖: {dep_id} >> {script_id}")
+            
+            # 5. 找出所有叶子节点(没有下游任务的节点)并连接到execution_completed
+            # 首先,构建一个下游节点集合
+            has_downstream = set()
+            for script_id, dependencies in script_dependencies.items():
+                for dep_id in dependencies:
+                    has_downstream.add(dep_id)
+            
+            # 然后,找出没有下游节点的任务
+            leaf_nodes = []
+            for script_id in task_dict:
+                if script_id not in has_downstream:
+                    leaf_nodes.append(script_id)
+                    task_dict[script_id] >> execution_completed
+                    logger.info(f"将叶子节点连接到completion: {script_id} >> execution_completed")
+            
+            # 如果没有找到叶子节点,则将所有任务都连接到completion
+            if not leaf_nodes:
+                logger.warning("未找到叶子节点,将所有任务连接到completion")
+                for script_id, task in task_dict.items():
+                    task >> execution_completed
+            
+            # 设置TaskGroup与prepare_phase的依赖关系
+            prepare_group >> start_execution
+            
+            logger.info(f"成功创建 {len(task_dict)} 个脚本执行任务")
+            
+        except Exception as e:
+            logger.error(f"加载执行计划或创建任务时出错: {str(e)}")
+            import traceback
+            logger.error(traceback.format_exc())
+
+    # 添加触发finalize DAG的任务
+    from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+    
+    trigger_finalize_dag = TriggerDagRunOperator(
+        task_id="trigger_finalize_dag",
+        trigger_dag_id="dataops_productline_finalize_dag",
+        conf={"execution_date": "{{ ds }}", "parent_execution_date": "{{ execution_date }}", "parent_run_id": "{{ run_id }}"},
+        reset_dag_run=True,
+        wait_for_completion=False,
+        poke_interval=60,
+    )
+    
+    # 设置依赖关系,确保执行阶段完成后触发finalize DAG
+    execution_group >> trigger_finalize_dag
+
+logger.info(f"DAG dataops_productline_execute_dag 定义完成") 

+ 430 - 0
dags/dataops_productline_finalize_dag.py

@@ -0,0 +1,430 @@
+"""
+统一数据产品线完成器 DAG
+
+功能:
+1. 由dataops_productline_execute_dag触发,不自行调度
+2. 收集执行DAG的执行统计信息
+3. 生成执行报告
+4. 无论execute DAG执行成功与否都会运行
+"""
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from datetime import datetime, timedelta, date
+import logging
+import json
+import pendulum
+import pytz
+from airflow.models import DagRun, TaskInstance
+from airflow.utils.state import State
+from sqlalchemy import desc
+from airflow import settings
+from common import get_today_date
+from decimal import Decimal
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+# 开启详细日志记录
+ENABLE_DEBUG_LOGGING = True
+
+def log_debug(message):
+    """记录调试日志,但只在启用调试模式时"""
+    if ENABLE_DEBUG_LOGGING:
+        logger.info(f"[DEBUG] {message}")
+
+# 在DAG启动时输出诊断信息
+log_debug("======== 诊断信息 ========")
+log_debug(f"DAG dataops_productline_finalize_dag 初始化")
+log_debug("======== 诊断信息结束 ========")
+
+#############################################
+# 通用工具函数
+#############################################
+
+def json_serial(obj):
+    """将日期对象序列化为ISO格式字符串的JSON序列化器"""
+    if isinstance(obj, (datetime, date)):
+        return obj.isoformat()
+    raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
+
+# 添加自定义JSON编码器解决Decimal序列化问题
+class DecimalEncoder(json.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, Decimal):
+            return float(obj)
+        # 处理日期类型
+        elif isinstance(obj, (datetime, date)):
+            return obj.isoformat()
+        # 让父类处理其他类型
+        return super(DecimalEncoder, self).default(obj)
+
+#############################################
+# 统计和报告生成函数
+#############################################
+
+def collect_execution_stats(**kwargs):
+    """
+    从Airflow元数据收集执行DAG的执行统计信息
+    """
+    # 获取当前执行的日期和时间信息
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    
+    # 获取触发此DAG的配置信息(如果有)
+    conf = dag_run.conf or {}
+    parent_exec_date = conf.get('execution_date', exec_date)
+    parent_run_id = conf.get('parent_run_id')
+    
+    # 记录完整的conf内容
+    logger.info(f"【从上游DAG接收的配置】complete conf: {conf}")
+    
+    # 记录重要的时间参数
+    logger.info(f"【时间参数】collect_execution_stats: exec_date={exec_date}, logical_date={logical_date}, parent_exec_date={parent_exec_date}")
+    logger.info(f"【上游DAG信息】parent_run_id={parent_run_id}")
+    logger.info(f"开始收集执行日期 {parent_exec_date} 的脚本执行统计信息")
+    
+    # 执行DAG的ID
+    target_dag_id = "dataops_productline_execute_dag"
+    
+    try:
+        # 创建数据库会话
+        session = settings.Session()
+        
+        # 首先通过run_id查询(如果提供了)
+        dag_runs = []
+        if parent_run_id:
+            logger.info(f"使用parent_run_id={parent_run_id}查询DAG运行记录")
+            dag_runs = session.query(DagRun).filter(
+                DagRun.dag_id == target_dag_id,
+                DagRun.run_id == parent_run_id
+            ).all()
+            
+            if dag_runs:
+                logger.info(f"通过run_id找到匹配的DAG运行记录")
+            else:
+                logger.warning(f"未通过run_id找到匹配的DAG运行记录,尝试使用执行日期")
+        
+        # 如果通过run_id未找到记录,尝试使用执行日期
+        if not dag_runs and parent_exec_date:
+            # 尝试解析父执行日期为datetime对象
+            if isinstance(parent_exec_date, str):
+                try:
+                    parent_exec_date_dt = pendulum.parse(parent_exec_date)
+                except:
+                    parent_exec_date_dt = None
+            else:
+                parent_exec_date_dt = parent_exec_date
+            
+            # 记录解析结果
+            logger.info(f"解析后的父执行日期: {parent_exec_date_dt}")
+            
+            # 如果成功解析,使用它查询
+            if parent_exec_date_dt:
+                logger.info(f"使用父执行日期 {parent_exec_date_dt} 查询DAG运行")
+                dag_runs = session.query(DagRun).filter(
+                    DagRun.dag_id == target_dag_id,
+                    DagRun.execution_date == parent_exec_date_dt
+                ).order_by(desc(DagRun.execution_date)).limit(1).all()
+                
+                if dag_runs:
+                    logger.info(f"通过执行日期找到匹配的DAG运行记录")
+                else:
+                    logger.error(f"未通过执行日期找到匹配的DAG运行记录")
+            else:
+                logger.error(f"无法解析父执行日期 {parent_exec_date}")
+        
+        # 如果两种方法都无法找到记录,则报错
+        if not dag_runs:
+            error_msg = f"无法找到DAG {target_dag_id} 的相关运行记录。提供的run_id: {parent_run_id}, 执行日期: {parent_exec_date}"
+            logger.error(error_msg)
+            session.close()
+            raise ValueError(error_msg)
+        
+        # 获取DAG运行记录
+        dag_run = dag_runs[0]
+        dag_run_id = dag_run.run_id
+        dag_start_time = dag_run.start_date
+        dag_end_time = dag_run.end_date
+        dag_state = dag_run.state
+        dag_execution_date = dag_run.execution_date
+        
+        # 记录匹配方式
+        if parent_run_id and dag_run_id == parent_run_id:
+            match_method = "run_id精确匹配"
+        else:
+            match_method = "执行日期匹配"
+        
+        logger.info(f"【匹配方式】成功通过{match_method}找到DAG运行记录")
+        logger.info(f"【匹配结果】run_id={dag_run_id}, 执行日期={dag_execution_date}, 状态={dag_state}")
+        
+        # 计算DAG运行时间
+        dag_duration = None
+        if dag_start_time and dag_end_time:
+            dag_duration = (dag_end_time - dag_start_time).total_seconds()
+        
+        # 时区转换
+        if dag_start_time:
+            dag_start_time_local = pendulum.instance(dag_start_time).in_timezone('Asia/Shanghai')
+            dag_start_time_str = dag_start_time_local.strftime('%Y-%m-%d %H:%M:%S')
+        else:
+            dag_start_time_str = 'N/A'
+            
+        if dag_end_time:
+            dag_end_time_local = pendulum.instance(dag_end_time).in_timezone('Asia/Shanghai')
+            dag_end_time_str = dag_end_time_local.strftime('%Y-%m-%d %H:%M:%S')
+        else:
+            dag_end_time_str = 'N/A'
+        
+        # 获取所有相关的任务实例
+        task_instances = session.query(TaskInstance).filter(
+            TaskInstance.dag_id == target_dag_id,
+            TaskInstance.run_id == dag_run_id
+        ).all()
+        
+        # 关闭会话
+        session.close()
+        
+        # 统计任务状态信息
+        total_tasks = len(task_instances)
+        success_count = sum(1 for ti in task_instances if ti.state == State.SUCCESS)
+        fail_count = sum(1 for ti in task_instances if ti.state == State.FAILED)
+        skipped_count = sum(1 for ti in task_instances if ti.state == State.SKIPPED)
+        upstream_failed_count = sum(1 for ti in task_instances if ti.state == State.UPSTREAM_FAILED)
+        
+        # 统计脚本类型任务数量
+        script_task_count = sum(1 for ti in task_instances if "-TO-" in ti.task_id)
+        
+        # 获取执行时间最长的几个任务
+        task_durations = []
+        for ti in task_instances:
+            if ti.start_date and ti.end_date:
+                duration = (ti.end_date - ti.start_date).total_seconds()
+                task_durations.append({
+                    "task_id": ti.task_id,
+                    "duration": duration,
+                    "state": ti.state
+                })
+        
+        # 按持续时间降序排序
+        task_durations.sort(key=lambda x: x["duration"] if x["duration"] is not None else 0, reverse=True)
+        top_tasks_by_duration = task_durations[:5]  # 取前5个
+        
+        # 获取失败的任务
+        failed_tasks = []
+        for ti in task_instances:
+            if ti.state in [State.FAILED, State.UPSTREAM_FAILED]:
+                failed_task = {
+                    "task_id": ti.task_id,
+                    "state": ti.state,
+                    "try_number": ti.try_number,
+                }
+                if ti.start_date and ti.end_date:
+                    failed_task["duration"] = (ti.end_date - ti.start_date).total_seconds()
+                failed_tasks.append(failed_task)
+        
+        # 构建统计结果
+        stats = {
+            "exec_date": exec_date,
+            "dag_id": target_dag_id,
+            "dag_execution_date": dag_execution_date.isoformat() if dag_execution_date else None,
+            "dag_run_id": dag_run_id,
+            "status": dag_state,
+            "total_tasks": total_tasks,
+            "success_count": success_count,
+            "fail_count": fail_count,
+            "skipped_count": skipped_count,
+            "upstream_failed_count": upstream_failed_count,
+            "script_task_count": script_task_count,
+            "duration": dag_duration,
+            "start_time": dag_start_time_str,
+            "end_time": dag_end_time_str,
+            "top_tasks_by_duration": top_tasks_by_duration,
+            "failed_tasks": failed_tasks
+        }
+        
+        # 将统计结果保存到XCom
+        kwargs['ti'].xcom_push(key='execution_stats', value=stats)
+        
+        logger.info(f"成功收集脚本执行统计信息: 总任务数={total_tasks}, 成功={success_count}, 失败={fail_count}")
+        return stats
+    except Exception as e:
+        logger.error(f"收集脚本执行统计信息时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回一个基本的错误信息
+        error_stats = {
+            "exec_date": exec_date,
+            "dag_id": target_dag_id,
+            "status": "ERROR",
+            "error": str(e),
+            "total_tasks": 0,
+            "success_count": 0,
+            "fail_count": 0
+        }
+        kwargs['ti'].xcom_push(key='execution_stats', value=error_stats)
+        return error_stats
+
+def generate_execution_report(**kwargs):
+    """
+    基于收集的统计信息生成执行报告
+    """
+    try:
+        # 从XCom获取统计信息
+        ti = kwargs['ti']
+        stats = ti.xcom_pull(task_ids='collect_execution_stats')
+        
+        if not stats:
+            logger.warning("未找到脚本执行统计信息,无法生成报告")
+            report = "未找到脚本执行统计信息,无法生成报告。"
+            ti.xcom_push(key='execution_report', value=report)
+            return report
+        
+        # 构建报告
+        report = []
+        report.append(f"\n========== 脚本执行报告 ==========")
+        report.append(f"执行日期: {stats['exec_date']}")
+        report.append(f"DAG ID: {stats['dag_id']}")
+        report.append(f"Run ID: {stats.get('dag_run_id', 'N/A')}")
+        report.append(f"状态: {stats['status']}")
+        report.append(f"总任务数: {stats['total_tasks']}")
+        
+        # 任务状态统计
+        report.append("\n--- 任务状态统计 ---")
+        report.append(f"成功任务: {stats['success_count']} 个")
+        report.append(f"失败任务: {stats['fail_count']} 个")
+        report.append(f"跳过任务: {stats.get('skipped_count', 0)} 个")
+        report.append(f"上游失败任务: {stats.get('upstream_failed_count', 0)} 个")
+        
+        # 任务类型统计
+        report.append("\n--- 任务类型统计 ---")
+        report.append(f"脚本执行任务: {stats.get('script_task_count', 0)} 个")
+        
+        # 执行时间统计
+        report.append("\n--- 执行时间统计 ---")
+        if stats.get('duration') is not None:
+            hours, remainder = divmod(stats['duration'], 3600)
+            minutes, seconds = divmod(remainder, 60)
+            report.append(f"总执行时间: {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
+        else:
+            report.append("总执行时间: N/A")
+            
+        report.append(f"开始时间(北京时间): {stats.get('start_time', 'N/A')}")
+        report.append(f"结束时间(北京时间): {stats.get('end_time', 'N/A')}")
+        
+        # 执行时间最长的任务
+        top_tasks = stats.get('top_tasks_by_duration', [])
+        if top_tasks:
+            report.append("\n--- 执行时间最长的任务 ---")
+            for i, task in enumerate(top_tasks, 1):
+                duration_secs = task.get('duration', 0)
+                minutes, seconds = divmod(duration_secs, 60)
+                report.append(f"{i}. {task['task_id']}: {int(minutes)}分钟 {int(seconds)}秒 ({task['state']})")
+        
+        # 失败任务详情
+        failed_tasks = stats.get('failed_tasks', [])
+        if failed_tasks:
+            report.append("\n--- 失败任务详情 ---")
+            for i, task in enumerate(failed_tasks, 1):
+                report.append(f"{i}. 任务ID: {task['task_id']}")
+                report.append(f"   状态: {task['state']}")
+                report.append(f"   尝试次数: {task.get('try_number', 'N/A')}")
+                
+                if 'duration' in task:
+                    minutes, seconds = divmod(task['duration'], 60)
+                    report.append(f"   执行时间: {int(minutes)}分钟 {int(seconds)}秒")
+                else:
+                    report.append("   执行时间: N/A")
+        
+        # 总结
+        success_rate = 0
+        if stats['total_tasks'] > 0:
+            success_rate = (stats['success_count'] / stats['total_tasks']) * 100
+            
+        report.append("\n--- 总结 ---")
+        report.append(f"任务成功率: {success_rate:.2f}%")
+        
+        if stats['status'] == 'success':
+            report.append("所有脚本执行成功完成!")
+        elif stats['status'] == 'failed':
+            report.append(f"脚本执行过程中出现失败。有 {stats['fail_count']} 个任务失败。")
+        else:
+            report.append(f"当前状态: {stats['status']}")
+        
+        report.append("\n========== 报告结束 ==========")
+        
+        # 将报告转换为字符串
+        report_str = "\n".join(report)
+        
+        # 记录到日志
+        logger.info("\n" + report_str)
+        
+        # 保存到XCom
+        ti.xcom_push(key='execution_report', value=report_str)
+        
+        return report_str
+    except Exception as e:
+        logger.error(f"生成执行报告时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 返回一个简单的错误报告
+        error_report = f"生成执行报告时出错: {str(e)}"
+        kwargs['ti'].xcom_push(key='execution_report', value=error_report)
+        return error_report
+
+#############################################
+# 创建DAG
+#############################################
+with DAG(
+    "dataops_productline_finalize_dag", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval=None,  # 不自行调度,由dataops_productline_execute_dag触发
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 记录DAG实例化时的信息
+    now = datetime.now()
+    now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
+    default_exec_date = get_today_date()
+    logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
+    
+    #############################################
+    # 收集统计信息
+    #############################################
+    collect_stats = PythonOperator(
+        task_id="collect_execution_stats",
+        python_callable=collect_execution_stats,
+        provide_context=True,
+        dag=dag
+    )
+    
+    #############################################
+    # 生成执行报告
+    #############################################
+    generate_report = PythonOperator(
+        task_id="generate_execution_report",
+        python_callable=generate_execution_report,
+        provide_context=True,
+        dag=dag
+    )
+    
+    #############################################
+    # 完成标记
+    #############################################
+    finalize_completed = EmptyOperator(
+        task_id="finalize_completed",
+        dag=dag
+    )
+    
+    # 设置任务依赖
+    collect_stats >> generate_report >> finalize_completed 

+ 1358 - 0
dags/dataops_productline_prepare_dag.py

@@ -0,0 +1,1358 @@
+# dataops_productline_prepare_dag.py
+from airflow import DAG
+from airflow.operators.python import PythonOperator, ShortCircuitOperator
+from airflow.operators.empty import EmptyOperator
+from datetime import datetime, timedelta
+import logging
+import networkx as nx
+import json
+import os
+import re
+import glob
+from pathlib import Path
+import hashlib
+import pendulum
+from common import (
+    get_pg_conn, 
+    get_neo4j_driver,
+    get_today_date
+)
+from config import PG_CONFIG, NEO4J_CONFIG
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+def get_enabled_tables():
+    """获取所有启用的表"""
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        try:
+            cursor.execute("""
+                SELECT owner_id, table_name 
+                FROM schedule_status 
+                WHERE schedule_is_enabled = TRUE
+            """)
+            result = cursor.fetchall()
+            return [row[1] for row in result]  # 只返回表名
+        except Exception as e:
+            logger.error(f"获取启用表失败: {str(e)}")
+            raise Exception(f"PostgreSQL数据库查询失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def check_table_directly_subscribed(table_name):
+    """检查表是否在schedule_status表中直接调度"""
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        try:
+            cursor.execute("""
+                SELECT schedule_is_enabled
+                FROM schedule_status 
+                WHERE table_name = %s
+            """, (table_name,))
+            result = cursor.fetchone()
+            return result and result[0] is True
+        except Exception as e:
+            logger.error(f"检查表订阅状态失败: {str(e)}")
+            raise Exception(f"PostgreSQL查询表订阅状态失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def should_execute_today(table_name, frequency, exec_date):
+    """
+    判断指定频率的表在给定执行日期是否应该执行
+    
+    参数:
+        table_name (str): 表名,用于日志记录
+        frequency (str): 调度频率,如'daily'、'weekly'、'monthly'、'yearly',为None时默认为'daily'
+        exec_date (str): 执行日期,格式为'YYYY-MM-DD'
+    
+    返回:
+        bool: 如果该表应该在执行日期执行,则返回True,否则返回False
+    """
+    # 将执行日期字符串转换为pendulum日期对象
+    try:
+        exec_date_obj = pendulum.parse(exec_date)
+    except Exception as e:
+        logger.error(f"解析执行日期 {exec_date} 出错: {str(e)},使用当前日期")
+        exec_date_obj = pendulum.today()
+    
+    # 计算下一个日期,用于判断是否是月初、周初等
+    next_date = exec_date_obj.add(days=1)
+    
+    # 如果频率为None或空字符串,默认为daily
+    if not frequency:
+        logger.info(f"表 {table_name} 未指定调度频率,默认为daily")
+        return True
+    
+    frequency = frequency.lower() if isinstance(frequency, str) else 'daily'
+    
+    if frequency == 'daily':
+        # 日任务每天都执行
+        return True
+    elif frequency == 'weekly':
+        # 周任务只在周日执行(因为exec_date+1是周一时才执行)
+        is_sunday = next_date.day_of_week == 1  # 1表示周一
+        logger.info(f"表 {table_name} 是weekly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否周日: {is_sunday}")
+        return is_sunday
+    elif frequency == 'monthly':
+        # 月任务只在每月最后一天执行(因为exec_date+1是月初时才执行)
+        is_month_end = next_date.day == 1
+        logger.info(f"表 {table_name} 是monthly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否月末: {is_month_end}")
+        return is_month_end
+    elif frequency == 'quarterly':
+        # 季度任务只在每季度最后一天执行(因为exec_date+1是季度初时才执行)
+        is_quarter_end = next_date.day == 1 and next_date.month in [1, 4, 7, 10]
+        logger.info(f"表 {table_name} 是quarterly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否季末: {is_quarter_end}")
+        return is_quarter_end
+    elif frequency == 'yearly':
+        # 年任务只在每年最后一天执行(因为exec_date+1是年初时才执行)
+        is_year_end = next_date.day == 1 and next_date.month == 1
+        logger.info(f"表 {table_name} 是yearly任务,exec_date={exec_date},next_date={next_date.to_date_string()},是否年末: {is_year_end}")
+        return is_year_end
+    else:
+        # 未知频率,默认执行
+        logger.warning(f"表 {table_name} 使用未知的调度频率: {frequency},默认执行")
+        return True
+
+def get_table_info_from_neo4j(table_name):
+    """从Neo4j获取表的详细信息,保留完整的scripts_info并确保正确获取源表依赖"""
+    try:
+        driver = get_neo4j_driver()
+    except Exception as e:
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+        
+    # 检查表是否直接订阅
+    is_directly_schedule = check_table_directly_subscribed(table_name)
+
+    table_info = {
+        'target_table': table_name,
+        'is_directly_schedule': is_directly_schedule,  # 初始值设为True,从schedule_status表获取
+    }
+    
+    try:
+        with driver.session() as session:
+            # 尝试执行一个简单查询来验证连接
+            try:
+                test_result = session.run("RETURN 1 as test")
+                test_record = test_result.single()
+                if not test_record or test_record.get("test") != 1:
+                    logger.error("Neo4j连接测试失败")
+                    raise Exception("Neo4j连接测试失败")
+            except Exception as e:
+                logger.error(f"Neo4j连接测试失败: {str(e)}")
+                raise Exception(f"Neo4j连接测试失败: {str(e)}")
+            
+            # 查询表标签和状态
+            query_table = """
+                MATCH (t {en_name: $table_name})
+                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency,
+                       t.type AS type, t.storage_location AS storage_location
+            """
+            try:
+                result = session.run(query_table, table_name=table_name)
+                record = result.single()
+            except Exception as e:
+                logger.error(f"Neo4j查询表信息失败: {str(e)}")
+                raise Exception(f"Neo4j查询表信息失败: {str(e)}")
+            
+            if record:
+                labels = record.get("labels", [])
+                table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
+                table_info['target_table_status'] = record.get("status", True)  # 默认为True
+                table_info['frequency'] = record.get("frequency")
+                table_info['target_type'] = record.get("type")  # 获取type属性
+                table_info['storage_location'] = record.get("storage_location")  # 获取storage_location属性
+                
+                # 根据标签类型查询关系和脚本信息
+                if "DataResource" in labels:
+                    # 检查是否为structure类型
+                    if table_info.get('target_type') == "structure":
+                        # 对于structure类型,设置默认值,不查询关系
+                        table_info['source_tables'] = []  # 使用空数组表示无源表
+                        table_info['script_name'] = "load_file.py"
+                        table_info['script_type'] = "python"
+                        
+                        # csv类型的DataResource没有上游,使用默认的append模式
+                        table_info['script_exec_mode'] = "append"
+                        logger.info(f"表 {table_name} 为structure类型,使用默认执行模式: append")
+
+                        # 添加脚本信息
+                        table_info['scripts_info'] = {
+                            "load_file.py": {
+                                "sources": [],
+                                "script_type": "python",
+                                "script_exec_mode": "append"
+                            }
+                        }
+
+                        return table_info
+                    else:
+                        # 查询源表关系和脚本信息
+                        query_rel = """
+                            MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                            WITH source, rel, 
+                                 CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                                 CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                            RETURN source.en_name AS source_table, script_name AS script_name,
+                                   script_type AS script_type, 'append' AS script_exec_mode
+                        """
+                elif "DataModel" in labels:
+                    # 查询源表关系和脚本信息
+                    query_rel = """
+                        MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                        WITH source, rel, 
+                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                        RETURN source.en_name AS source_table, script_name AS script_name,
+                               script_type AS script_type, 'append' AS script_exec_mode
+                    """
+                else:
+                    logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
+                    # 即使不是这两种类型,也尝试查询其源表依赖关系
+                    query_rel = """
+                        MATCH (target {en_name: $table_name})-[rel]->(source)
+                        WITH source, rel, 
+                             CASE WHEN rel.script_name IS NULL THEN target.en_name + '_script.py' ELSE rel.script_name END AS script_name,
+                             CASE WHEN rel.script_type IS NULL THEN 'python' ELSE rel.script_type END AS script_type
+                        RETURN source.en_name AS source_table, script_name AS script_name,
+                               script_type AS script_type, 'append' AS script_exec_mode
+                    """
+                
+                # 收集所有关系记录
+                result = session.run(query_rel, table_name=table_name)
+                # 检查result对象是否有collect方法,否则使用data方法或list直接转换
+                try:
+                    if hasattr(result, 'collect'):
+                        records = result.collect()  # 使用collect()获取所有记录
+                    else:
+                        # 尝试使用其他方法获取记录
+                        logger.info(f"表 {table_name} 的查询结果不支持collect方法,尝试使用其他方法")
+                        try:
+                            records = list(result)  # 直接转换为列表
+                        except Exception as e1:
+                            logger.warning(f"尝试列表转换失败: {str(e1)},尝试使用data方法")
+                            try:
+                                records = result.data()  # 使用data()方法
+                            except Exception as e2:
+                                logger.warning(f"所有方法都失败,使用空列表: {str(e2)}")
+                                records = []
+                except Exception as e:
+                    logger.warning(f"获取查询结果时出错: {str(e)},使用空列表")
+                    records = []
+                
+                # 记录查询到的原始记录
+                logger.info(f"表 {table_name} 查询到 {len(records)} 条关系记录")
+                for idx, rec in enumerate(records):
+                    logger.info(f"关系记录[{idx}]: source_table={rec.get('source_table')}, script_name={rec.get('script_name')}, " 
+                                f"script_type={rec.get('script_type')}, script_exec_mode={rec.get('script_exec_mode')}")
+                
+                if records:
+                    # 按脚本名称分组源表
+                    scripts_info = {}
+                    for record in records:
+                        script_name = record.get("script_name")
+                        source_table = record.get("source_table")
+                        script_type = record.get("script_type", "python")
+                        script_exec_mode = record.get("script_exec_mode", "append")
+                        
+                        logger.info(f"处理记录: source_table={source_table}, script_name={script_name}")
+                        
+                        # 如果script_name为空,生成默认的脚本名
+                        if not script_name:
+                            script_name = f"{table_name}_script.py"
+                            logger.warning(f"表 {table_name} 的关系中没有script_name属性,使用默认值: {script_name}")
+                            
+                        if script_name not in scripts_info:
+                            scripts_info[script_name] = {
+                                "sources": [],
+                                "script_type": script_type,
+                                "script_exec_mode": script_exec_mode
+                            }
+                        
+                        # 确保source_table有值且不为None才添加到sources列表中
+                        if source_table and source_table not in scripts_info[script_name]["sources"]:
+                            scripts_info[script_name]["sources"].append(source_table)
+                            logger.debug(f"为表 {table_name} 的脚本 {script_name} 添加源表: {source_table}")
+                    
+                    # 处理分组信息
+                    if scripts_info:
+                        # 存储完整的脚本信息
+                        table_info['scripts_info'] = scripts_info
+                        
+                        # 如果只有一个脚本,直接使用它
+                        if len(scripts_info) == 1:
+                            script_name = list(scripts_info.keys())[0]
+                            script_info = scripts_info[script_name]
+                            
+                            table_info['source_tables'] = script_info["sources"]  # 使用数组
+                            table_info['script_name'] = script_name
+                            table_info['script_type'] = script_info["script_type"]
+                            table_info['script_exec_mode'] = script_info["script_exec_mode"]
+                            logger.info(f"表 {table_name} 有单个脚本 {script_name},源表: {script_info['sources']}")
+                        else:
+                            # 如果有多个不同脚本,记录多脚本信息
+                            logger.info(f"表 {table_name} 有多个不同脚本: {list(scripts_info.keys())}")
+                            # 暂时使用第一个脚本的信息作为默认值
+                            first_script = list(scripts_info.keys())[0]
+                            table_info['source_tables'] = scripts_info[first_script]["sources"]
+                            table_info['script_name'] = first_script
+                            table_info['script_type'] = scripts_info[first_script]["script_type"]
+                            table_info['script_exec_mode'] = scripts_info[first_script]["script_exec_mode"]
+                    else:
+                        logger.warning(f"表 {table_name} 未找到有效的脚本信息")
+                        table_info['source_tables'] = []  # 使用空数组
+                else:
+                    # 如果没有找到关系记录,则尝试直接查询表的上游依赖
+                    logger.info(f"表 {table_name} 未通过关系查询找到源表,尝试直接查询依赖...")
+                    
+                    # 查询任何类型的上游依赖关系
+                    query_deps = """
+                        MATCH (target {en_name: $table_name})-[rel]->(source)
+                        RETURN source.en_name AS source_table
+                    """
+                    
+                    try:
+                        deps_result = session.run(query_deps, table_name=table_name)
+                        deps_records = list(deps_result)
+                        
+                        source_tables = []
+                        for rec in deps_records:
+                            src_table = rec.get("source_table")
+                            if src_table and src_table not in source_tables:
+                                source_tables.append(src_table)
+                                logger.info(f"直接依赖查询: 表 {table_name} 依赖于 {src_table}")
+                        
+                        # 设置默认的脚本名和源表
+                        script_name = f"{table_name}_script.py"
+                        table_info['source_tables'] = source_tables
+                        table_info['script_name'] = script_name
+                        table_info['script_type'] = "python"
+                        table_info['script_exec_mode'] = "append"
+                        
+                        # 创建scripts_info
+                        table_info['scripts_info'] = {
+                            script_name: {
+                                "sources": source_tables,
+                                "script_type": "python",
+                                "script_exec_mode": "append"
+                            }
+                        }
+                        
+                        logger.info(f"为表 {table_name} 设置默认脚本 {script_name},源表: {source_tables}")
+                    except Exception as e:
+                        logger.warning(f"直接查询表 {table_name} 的依赖关系时出错: {str(e)}")
+                        table_info['source_tables'] = []  # 使用空数组
+                        table_info['script_name'] = f"{table_name}_script.py"
+                        table_info['script_type'] = "python"
+                        table_info['script_exec_mode'] = "append"
+                        
+                        # 创建空的scripts_info
+                        table_info['scripts_info'] = {
+                            table_info['script_name']: {
+                                "sources": [],
+                                "script_type": "python",
+                                "script_exec_mode": "append"
+                            }
+                        }
+            else:
+                logger.warning(f"在Neo4j中找不到表 {table_name} 的信息,设置默认值")
+                table_info['source_tables'] = []
+                table_info['script_name'] = f"{table_name}_script.py"
+                table_info['script_type'] = "python"
+                table_info['script_exec_mode'] = "append"
+                
+                # 创建空的scripts_info
+                table_info['scripts_info'] = {
+                    table_info['script_name']: {
+                        "sources": [],
+                        "script_type": "python",
+                        "script_exec_mode": "append"
+                    }
+                }
+    except Exception as e:
+        if "Neo4j连接" in str(e) or "Neo4j查询" in str(e):
+            # 这是我们已经处理过的错误,直接抛出
+            raise
+        else:
+            logger.error(f"处理表 {table_name} 的信息时出错: {str(e)}")
+            raise Exception(f"Neo4j数据处理失败: {str(e)}")
+    finally:
+        driver.close()
+    
+    return table_info
+
+def process_dependencies(tables_info):
+    """处理表间依赖关系,添加被动调度的表"""
+    # 存储所有表信息的字典
+    all_tables = {t['target_table']: t for t in tables_info}
+    try:
+        driver = get_neo4j_driver()
+    except Exception as e:
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
+    try:
+        with driver.session() as session:
+            # 验证连接
+            try:
+                test_result = session.run("RETURN 1 as test")
+                test_record = test_result.single()
+                if not test_record or test_record.get("test") != 1:
+                    logger.error("Neo4j连接测试失败")
+                    raise Exception("Neo4j连接测试失败")
+            except Exception as e:
+                logger.error(f"Neo4j连接测试失败: {str(e)}")
+                raise Exception(f"Neo4j连接测试失败: {str(e)}")
+                
+            for table_name, table_info in list(all_tables.items()):
+                if table_info.get('target_table_label') == 'DataModel':
+                    # 查询其依赖表
+                    query = """
+                        MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
+                        RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
+                               dep.status AS dep_status, dep.frequency AS dep_frequency
+                    """
+                    try:
+                        result = session.run(query, table_name=table_name)
+                    except Exception as e:
+                        logger.error(f"Neo4j查询依赖关系失败: {str(e)}")
+                        raise Exception(f"Neo4j查询依赖关系失败: {str(e)}")
+                    
+                    for record in result:
+                        dep_name = record.get("dep_name")
+                        dep_labels = record.get("dep_labels", [])
+                        dep_status = record.get("dep_status", True)
+                        dep_frequency = record.get("dep_frequency")
+                        
+                        # 处理未被直接调度的依赖表
+                        if dep_name and dep_name not in all_tables:
+                            logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
+                            
+                            # 获取依赖表详细信息
+                            dep_info = get_table_info_from_neo4j(dep_name)
+                            dep_info['is_directly_schedule'] = False
+                            
+                            # 处理调度频率继承
+                            if not dep_info.get('frequency'):
+                                dep_info['frequency'] = table_info.get('frequency')
+                            
+                            all_tables[dep_name] = dep_info
+    except Exception as e:
+        if "Neo4j" in str(e):
+            # 已经处理过的错误,直接抛出
+            raise
+        else:
+            logger.error(f"处理依赖关系时出错: {str(e)}")
+            raise Exception(f"处理依赖关系时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    return list(all_tables.values())
+
+def filter_invalid_tables(tables_info):
+    """过滤无效表及其依赖,使用NetworkX构建依赖图"""
+    # 构建表名到索引的映射
+    table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
+    
+    # 找出无效表
+    invalid_tables = set()
+    for table in tables_info:
+        if table.get('target_table_status') is False:
+            invalid_tables.add(table['target_table'])
+            logger.info(f"表 {table['target_table']} 的状态为无效")
+    
+    # 构建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有节点
+    for table in tables_info:
+        G.add_node(table['target_table'])
+    
+    # 查询并添加依赖边
+    try:
+        driver = get_neo4j_driver()
+    except Exception as e:
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
+    try:
+        with driver.session() as session:
+            # 验证连接
+            try:
+                test_result = session.run("RETURN 1 as test")
+                test_record = test_result.single()
+                if not test_record or test_record.get("test") != 1:
+                    logger.error("Neo4j连接测试失败")
+                    raise Exception("Neo4j连接测试失败")
+            except Exception as e:
+                logger.error(f"Neo4j连接测试失败: {str(e)}")
+                raise Exception(f"Neo4j连接测试失败: {str(e)}")
+                
+            for table in tables_info:
+                if table.get('target_table_label') == 'DataModel':
+                    query = """
+                        MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
+                        RETURN target.en_name AS target_name
+                    """
+                    try:
+                        result = session.run(query, table_name=table['target_table'])
+                    except Exception as e:
+                        logger.error(f"Neo4j查询表依赖关系失败: {str(e)}")
+                        raise Exception(f"Neo4j查询表依赖关系失败: {str(e)}")
+                    
+                    for record in result:
+                        target_name = record.get("target_name")
+                        if target_name and target_name in table_dict:
+                            # 添加从目标到源的边,表示目标依赖于源
+                            G.add_edge(table['target_table'], target_name)
+                            logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
+    except Exception as e:
+        if "Neo4j" in str(e):
+            # 已经处理过的错误,直接抛出
+            raise
+        else:
+            logger.error(f"构建依赖图时出错: {str(e)}")
+            raise Exception(f"构建依赖图时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    # 找出依赖于无效表的所有表
+    downstream_invalid = set()
+    for invalid_table in invalid_tables:
+        # 获取可从无效表到达的所有节点
+        try:
+            descendants = nx.descendants(G, invalid_table)
+            downstream_invalid.update(descendants)
+            logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
+        except Exception as e:
+            logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
+            raise Exception(f"处理下游依赖失败: {str(e)}")
+    
+    # 合并所有无效表
+    all_invalid = invalid_tables.union(downstream_invalid)
+    logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
+    
+    # 过滤出有效表
+    valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
+    logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
+    
+    return valid_tables
+
+def touch_product_scheduler_file():
+    """
+    更新产品线调度器DAG文件的修改时间,触发重新解析
+    
+    返回:
+        bool: 是否成功更新
+    """
+    data_scheduler_path = os.path.join(os.path.dirname(__file__), 'dataops_productline_execute_dag.py')
+    
+    success = False
+    try:
+        if os.path.exists(data_scheduler_path):
+            # 更新文件修改时间,触发Airflow重新解析
+            os.utime(data_scheduler_path, None)
+            logger.info(f"已触发产品线执行DAG重新解析: {data_scheduler_path}")
+            success = True
+        else:
+            logger.warning(f"产品线执行DAG文件不存在: {data_scheduler_path}")
+                
+        return success
+    except Exception as e:
+        logger.error(f"触发DAG重新解析时出错: {str(e)}")
+        return False
+
+def get_subscription_state_hash():
+    """获取订阅表状态的哈希值"""
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        try:
+            cursor.execute("""
+                SELECT table_name, schedule_is_enabled
+                FROM schedule_status
+                ORDER BY table_name
+            """)
+            rows = cursor.fetchall()
+            # 将所有行拼接成一个字符串,然后计算哈希值
+            data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
+            return hashlib.md5(data_str.encode()).hexdigest()
+        except Exception as e:
+            logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
+            raise Exception(f"PostgreSQL查询订阅表状态失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def check_execution_plan_in_db(**kwargs):
+    """
+    检查当天的执行计划是否存在于数据库中
+    返回False将阻止所有下游任务执行
+    """
+    # 获取执行日期
+    dag_run = kwargs.get('dag_run')
+    logical_date = dag_run.logical_date
+    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+    exec_date = local_logical_date.strftime('%Y-%m-%d')
+    logger.info(f"logical_date: {logical_date} ")
+    logger.info(f"local_logical_date {local_logical_date} ")
+    logger.info(f"检查执行日期 exec_date {exec_date} 的执行计划是否存在于数据库中")
+   
+    # 检查数据库中是否存在执行计划
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        try:
+            cursor.execute("""
+                SELECT plan
+                FROM airflow_exec_plans
+                WHERE exec_date = %s AND dag_id = 'dataops_productline_prepare_dag'
+                ORDER BY logical_date DESC
+                LIMIT 1
+            """, (exec_date,))
+            
+            result = cursor.fetchone()
+            if not result:
+                logger.error(f"数据库中不存在执行日期 {exec_date} 的执行计划")
+                return False
+            
+            # 检查执行计划内容是否有效
+            try:
+                # PostgreSQL的jsonb类型会被psycopg2自动转换为Python字典,无需再使用json.loads
+                plan_data = result[0]            
+                # 检查必要字段
+                if "exec_date" not in plan_data:
+                    logger.error("执行计划缺少exec_date字段")
+                    return False
+                    
+                if not isinstance(plan_data.get("scripts", []), list):
+                    logger.error("执行计划的scripts字段无效")
+                    return False
+                    
+                if not isinstance(plan_data.get("resource_scripts", []), list):
+                    logger.error("执行计划的resource_scripts字段无效")
+                    return False
+
+                if not isinstance(plan_data.get("model_scripts", []), list):
+                    logger.error("执行计划的model_scripts字段无效")
+                    return False
+                
+                # 检查是否有脚本数据
+                scripts = plan_data.get("scripts", [])
+                resource_scripts = plan_data.get("resource_scripts", [])
+                model_scripts = plan_data.get("model_scripts", [])
+                
+                logger.info(f"执行计划验证成功: 包含 {len(scripts)} 个脚本,{len(resource_scripts)} 个资源脚本和 {len(model_scripts)} 个模型脚本")
+                return True
+                
+            except Exception as je:
+                logger.error(f"处理执行计划数据时出错: {str(je)}")
+                return False
+            
+        except Exception as e:
+            logger.error(f"检查数据库中执行计划时出错: {str(e)}")
+            raise Exception(f"PostgreSQL查询执行计划失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def save_execution_plan_to_db(execution_plan, dag_id, run_id, logical_date, ds):
+    """
+    将执行计划保存到airflow_exec_plans表
+    
+    参数:
+        execution_plan (dict): 执行计划字典
+        dag_id (str): DAG的ID
+        run_id (str): DAG运行的ID
+        logical_date (datetime): 逻辑日期
+        ds (str): 日期字符串,格式为YYYY-MM-DD
+    
+    返回:
+        bool: 操作是否成功
+    """
+    try:
+        conn = get_pg_conn()
+        cursor = conn.cursor()
+        
+        try:
+            # 将执行计划转换为JSON字符串
+            plan_json = json.dumps(execution_plan)
+            
+            # 获取本地时间
+            local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+            
+            # 插入记录
+            cursor.execute("""
+                INSERT INTO airflow_exec_plans
+                (dag_id, run_id, logical_date, local_logical_date, exec_date, plan)
+                VALUES (%s, %s, %s, %s, %s, %s)
+            """, (dag_id, run_id, logical_date, local_logical_date, ds, plan_json))
+            
+            conn.commit()
+            logger.info(f"成功将执行计划保存到airflow_exec_plans表,dag_id={dag_id}, run_id={run_id}, exec_date={ds}")
+            return True
+        except Exception as e:
+            logger.error(f"保存执行计划到数据库时出错: {str(e)}")
+            conn.rollback()
+            raise Exception(f"PostgreSQL保存执行计划失败: {str(e)}")
+        finally:
+            cursor.close()
+            conn.close()
+    except Exception as e:
+        logger.error(f"连接PostgreSQL数据库失败: {str(e)}")
+        raise Exception(f"无法连接PostgreSQL数据库: {str(e)}")
+
+def generate_task_id(script_name, source_tables, target_table):
+    """
+    根据脚本名和表名生成唯一任务ID
+    
+    参数:
+        script_name (str): 脚本文件名
+        source_tables (list): 源表列表
+        target_table (str): 目标表名
+        
+    返回:
+        str: 唯一的任务ID
+    """
+    # 移除脚本名的文件扩展名
+    script_base = os.path.splitext(script_name)[0]
+    
+    # 对于特殊脚本如load_file.py,直接使用目标表名
+    if script_name.lower() in ['load_file.py']:
+        return f"{script_base}_{target_table}"
+    
+    # 处理源表部分
+    if source_tables:
+        # 将所有源表按字母顺序排序并连接
+        source_part = "_".join(sorted(source_tables))
+        # 生成任务ID: 脚本名_源表_to_目标表
+        return f"{script_base}_{source_part}_to_{target_table}"
+    else:
+        # 没有源表时,只使用脚本名和目标表
+        return f"{script_base}_{target_table}"
+
+def prepare_scripts_from_tables(tables_info):
+    """
+    将表信息转换为脚本信息
+    
+    参数:
+        tables_info (list): 表信息列表
+        
+    返回:
+        list: 脚本信息列表
+    """
+    scripts = []
+    
+    for table in tables_info:
+        target_table = table['target_table']
+        target_table_label = table.get('target_table_label')
+        frequency = table.get('frequency')
+        
+        # 处理表的脚本信息
+        if 'scripts_info' in table and table['scripts_info']:
+            # 表有多个脚本
+            for script_name, script_info in table['scripts_info'].items():
+                source_tables = script_info.get('sources', [])
+                script_type = script_info.get('script_type', 'python')
+                script_exec_mode = script_info.get('script_exec_mode', 'append')
+                
+                # 生成任务ID
+                task_id = generate_task_id(script_name, source_tables, target_table)
+                
+                # 创建脚本信息
+                script = {
+                    "script_id": task_id,
+                    "script_name": script_name,
+                    "source_tables": source_tables,
+                    "target_table": target_table,
+                    "target_table_label": target_table_label,
+                    "script_type": script_type,
+                    "script_exec_mode": script_exec_mode,
+                    "frequency": frequency,
+                    "task_id": task_id
+                }
+                
+                # 为structure类型添加特殊属性
+                if table.get('target_type') == "structure":
+                    script["target_type"] = "structure"
+                    script["storage_location"] = table.get('storage_location')
+                
+                scripts.append(script)
+                logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
+        else:
+            # 表只有单个脚本或没有明确指定脚本信息
+            script_name = table.get('script_name')
+            
+            # 如果没有script_name,使用默认值
+            if not script_name:
+                script_name = f"{target_table}_script.py"
+                logger.warning(f"表 {target_table} 没有指定脚本名,使用默认值: {script_name}")
+            
+            source_tables = table.get('source_tables', [])
+            script_type = table.get('script_type', 'python')
+            script_exec_mode = table.get('script_exec_mode', 'append')
+            
+            # 生成任务ID
+            task_id = generate_task_id(script_name, source_tables, target_table)
+            
+            # 创建脚本信息
+            script = {
+                "script_id": task_id,
+                "script_name": script_name,
+                "source_tables": source_tables,
+                "target_table": target_table,
+                "target_table_label": target_table_label,
+                "script_type": script_type,
+                "script_exec_mode": script_exec_mode,
+                "frequency": frequency,
+                "task_id": task_id
+            }
+            
+            # 为structure类型添加特殊属性
+            if table.get('target_type') == "structure":
+                script["target_type"] = "structure"
+                script["storage_location"] = table.get('storage_location')
+            
+            scripts.append(script)
+            logger.info(f"为表 {target_table} 创建脚本 {script_name},任务ID: {task_id}")
+    
+    return scripts
+
+def build_script_dependency_graph(scripts):
+    """
+    处理脚本间的依赖关系
+    
+    参数:
+        scripts (list): 脚本信息列表
+        
+    返回:
+        tuple: (依赖关系字典, 图对象)
+    """
+    # 打印所有脚本的源表信息,用于调试
+    logger.info("构建脚本依赖图,当前脚本信息:")
+    for script in scripts:
+        script_id = script['script_id']
+        script_name = script['script_name']
+        target_table = script['target_table']
+        source_tables = script['source_tables']
+        logger.info(f"脚本: {script_id} ({script_name}), 目标表: {target_table}, 源表: {source_tables}")
+    
+    # 创建目标表到脚本ID的映射
+    table_to_scripts = {}
+    for script in scripts:
+        target_table = script['target_table']
+        if target_table not in table_to_scripts:
+            table_to_scripts[target_table] = []
+        table_to_scripts[target_table].append(script['script_id'])
+    
+    # 记录表到脚本的映射关系
+    logger.info("表到脚本的映射关系:")
+    for table, script_ids in table_to_scripts.items():
+        logger.info(f"表 {table} 由脚本 {script_ids} 生成")
+    
+    # 创建脚本依赖关系
+    script_dependencies = {}
+    for script in scripts:
+        script_id = script['script_id']
+        source_tables = script['source_tables']
+        target_table = script['target_table']
+        
+        # 初始化依赖列表
+        script_dependencies[script_id] = []
+        
+        # 查找源表对应的脚本
+        if source_tables:
+            logger.info(f"处理脚本 {script_id} 的依赖关系,源表: {source_tables}")
+            for source_table in source_tables:
+                if source_table in table_to_scripts:
+                    # 添加所有生成源表的脚本作为依赖
+                    for source_script_id in table_to_scripts[source_table]:
+                        if source_script_id != script_id:  # 避免自我依赖
+                            script_dependencies[script_id].append(source_script_id)
+                            logger.info(f"添加依赖: {script_id} 依赖于 {source_script_id} (表 {target_table} 依赖于表 {source_table})")
+                else:
+                    logger.warning(f"源表 {source_table} 没有对应的脚本,无法为脚本 {script_id} 创建依赖")
+        else:
+            logger.info(f"脚本 {script_id} 没有源表依赖")
+    
+    # 尝试从Neo4j额外查询依赖关系(如果脚本没有显式的source_tables)
+    try:
+        driver = get_neo4j_driver()
+    except Exception as e:
+        logger.error(f"连接Neo4j数据库失败: {str(e)}")
+        raise Exception(f"无法连接Neo4j数据库: {str(e)}")
+    
+    try:
+        with driver.session() as session:
+            # 验证连接
+            try:
+                test_result = session.run("RETURN 1 as test")
+                test_record = test_result.single()
+                if not test_record or test_record.get("test") != 1:
+                    logger.error("Neo4j连接测试失败")
+                    raise Exception("Neo4j连接测试失败")
+            except Exception as e:
+                logger.error(f"Neo4j连接测试失败: {str(e)}")
+                raise Exception(f"Neo4j连接测试失败: {str(e)}")
+                
+            for script in scripts:
+                script_id = script['script_id']
+                target_table = script['target_table']
+                
+                # 只处理没有源表的脚本
+                if not script['source_tables'] and not script_dependencies[script_id]:
+                    logger.info(f"脚本 {script_id} 没有源表,尝试从Neo4j直接查询表 {target_table} 的依赖")
+                    
+                    # 查询表的直接依赖
+                    query = """
+                        MATCH (target {en_name: $table_name})-[rel]->(dep)
+                        RETURN dep.en_name AS dep_name
+                    """
+                    
+                    try:
+                        result = session.run(query, table_name=target_table)
+                        records = list(result)
+                        
+                        for record in records:
+                            dep_name = record.get("dep_name")
+                            if dep_name and dep_name in table_to_scripts:
+                                for dep_script_id in table_to_scripts[dep_name]:
+                                    if dep_script_id != script_id:  # 避免自我依赖
+                                        script_dependencies[script_id].append(dep_script_id)
+                                        logger.info(f"从Neo4j添加额外依赖: {script_id} 依赖于 {dep_script_id} (表 {target_table} 依赖于表 {dep_name})")
+                    except Exception as e:
+                        logger.warning(f"从Neo4j查询表 {target_table} 依赖时出错: {str(e)}")
+                        raise Exception(f"Neo4j查询表依赖失败: {str(e)}")
+    except Exception as e:
+        if "Neo4j" in str(e):
+            # 已经处理过的错误,直接抛出
+            raise
+        else:
+            logger.error(f"访问Neo4j获取额外依赖时出错: {str(e)}")
+            raise Exception(f"Neo4j依赖查询失败: {str(e)}")
+    finally:
+        driver.close()
+    
+    # 构建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有脚本作为节点
+    for script in scripts:
+        G.add_node(script['script_id'])
+    
+    # 添加依赖边
+    for script_id, dependencies in script_dependencies.items():
+        if dependencies:
+            for dep_id in dependencies:
+                # 添加从script_id到dep_id的边,表示script_id依赖于dep_id
+                G.add_edge(script_id, dep_id)
+                logger.debug(f"添加依赖边: {script_id} -> {dep_id}")
+        else:
+            logger.info(f"脚本 {script_id} 没有依赖的上游脚本")
+    
+    # 确保所有脚本ID都在依赖关系字典中
+    for script in scripts:
+        script_id = script['script_id']
+        if script_id not in script_dependencies:
+            script_dependencies[script_id] = []
+    
+    # 记录每个脚本的依赖数量
+    for script_id, deps in script_dependencies.items():
+        logger.info(f"脚本 {script_id} 有 {len(deps)} 个依赖: {deps}")
+    
+    return script_dependencies, G
+
+def optimize_script_execution_order(scripts, script_dependencies, G):
+    """
+    使用NetworkX优化脚本执行顺序
+    
+    参数:
+        scripts (list): 脚本信息列表
+        script_dependencies (dict): 脚本依赖关系字典
+        G (nx.DiGraph): 依赖图对象
+        
+    返回:
+        list: 优化后的脚本执行顺序(脚本ID列表)
+    """
+    # 检查是否有循环依赖
+    try:
+        cycles = list(nx.simple_cycles(G))
+        if cycles:
+            logger.warning(f"检测到循环依赖: {cycles}")
+            # 处理循环依赖,可以通过删除一些边来打破循环
+            for cycle in cycles:
+                # 选择一条边删除,这里简单地选择第一条边
+                if len(cycle) > 1:
+                    G.remove_edge(cycle[0], cycle[1])
+                    logger.warning(f"删除边 {cycle[0]} -> {cycle[1]} 以打破循环")
+    except Exception as e:
+        logger.error(f"检测循环依赖时出错: {str(e)}")
+    
+    # 使用拓扑排序获取执行顺序
+    try:
+        # 反转图,因为我们的边表示"依赖于"关系,而拓扑排序需要"优先于"关系
+        reverse_G = G.reverse()
+        execution_order = list(nx.topological_sort(reverse_G))
+        
+        # 反转结果,使上游任务先执行
+        execution_order.reverse()
+        
+        logger.info(f"生成优化的脚本执行顺序: {execution_order}")
+        return execution_order
+    except Exception as e:
+        logger.error(f"生成脚本执行顺序时出错: {str(e)}")
+        # 出错时返回原始脚本ID列表,不进行优化
+        return [script['script_id'] for script in scripts] 
+
+def prepare_productline_dag_schedule(**kwargs):
+    """准备产品线DAG调度任务的主函数"""
+    # 添加更严格的异常处理
+    try:
+        # 检查是否是手动触发模式
+        is_manual_trigger = False
+        params = kwargs.get('params', {})
+        if params and 'MANUAL_TRIGGER' in params:
+            is_manual_trigger = params.get('MANUAL_TRIGGER', False)
+            if is_manual_trigger:
+                logger.info(f"接收到手动触发参数: MANUAL_TRIGGER={is_manual_trigger}")
+        
+        # 获取执行日期
+        dag_run = kwargs.get('dag_run')
+        logical_date = dag_run.logical_date
+        local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+        exec_date = local_logical_date.strftime('%Y-%m-%d')
+        logger.info(f"开始准备执行日期 {exec_date} 的产品线调度任务")
+        
+        # 检查是否需要创建新的执行计划
+        need_create_plan = False
+        
+        # 条件1: 数据库中不存在当天的执行计划
+        try:
+            has_plan_in_db = check_execution_plan_in_db(**kwargs)
+            if not has_plan_in_db:
+                logger.info(f"数据库中不存在执行日期exec_date {exec_date} 的执行计划,需要创建新的执行计划")
+                need_create_plan = True
+        except Exception as e:
+            # 如果执行计划查询失败,直接报错
+            logger.error(f"检查执行计划失败: {str(e)}")
+            raise Exception(f"检查执行计划失败,可能是数据库连接问题: {str(e)}")
+        
+        # 条件2: schedule_status表中的数据发生了变更
+        if not need_create_plan:
+            # 计算当前哈希值
+            current_hash = get_subscription_state_hash()
+            # 读取上次记录的哈希值
+            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+            last_hash = None
+            if os.path.exists(hash_file):
+                try:
+                    with open(hash_file, 'r') as f:
+                        last_hash = f.read().strip()
+                except Exception as e:
+                    logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
+            
+            # 如果哈希值不同,表示数据发生了变更
+            if current_hash != last_hash:
+                logger.info(f"检测到schedule_status表数据变更。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
+                need_create_plan = True
+        
+        # 手动触发模式覆盖以上判断
+        if is_manual_trigger:
+            logger.info("手动触发模式,将创建新的执行计划")
+            need_create_plan = True
+        
+        # 如果不需要创建新的执行计划,直接返回
+        if not need_create_plan:
+            logger.info("无需创建新的执行计划")
+            return 0
+        
+        # 继续处理,创建新的执行计划
+        # 1. 获取启用的表
+        enabled_tables = get_enabled_tables()
+        logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
+        
+        if not enabled_tables:
+            logger.warning("没有找到启用的表,准备工作结束")
+            return 0
+        
+        # 2. 获取表的详细信息
+        tables_info = []
+        for table_name in enabled_tables:
+            table_info = get_table_info_from_neo4j(table_name)
+            if table_info:
+                tables_info.append(table_info)
+        
+        logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
+        
+        # 2.1 根据调度频率过滤表
+        filtered_tables_info = []
+        for table_info in tables_info:
+            table_name = table_info['target_table']
+            frequency = table_info.get('frequency')
+            
+            if should_execute_today(table_name, frequency, exec_date):
+                filtered_tables_info.append(table_info)
+                logger.info(f"表 {table_name} (频率: {frequency}) 将在今天{exec_date}执行")
+            else:
+                logger.info(f"表 {table_name} (频率: {frequency}) 今天{exec_date}不执行,已过滤")
+        
+        logger.info(f"按调度频率过滤后,今天{exec_date}需要执行的表有 {len(filtered_tables_info)} 个")
+
+        # 3. 处理依赖关系,添加被动调度的表
+        enriched_tables = process_dependencies(filtered_tables_info)
+        logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
+        
+        # 4. 过滤无效表及其依赖
+        valid_tables = filter_invalid_tables(enriched_tables)
+        logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
+        
+        # 5. 将表信息转换为脚本信息
+        scripts = prepare_scripts_from_tables(valid_tables)
+        logger.info(f"生成了 {len(scripts)} 个脚本信息")
+        
+        # 检查所有脚本的source_tables字段
+        scripts_without_sources = [s['script_id'] for s in scripts if not s.get('source_tables')]
+        if scripts_without_sources:
+            logger.warning(f"有 {len(scripts_without_sources)} 个脚本没有源表信息: {scripts_without_sources}")
+        
+        # 6. 处理脚本依赖关系
+        script_dependencies, dependency_graph = build_script_dependency_graph(scripts)
+        logger.info(f"构建了脚本依赖关系图,包含 {len(script_dependencies)} 个节点")
+        
+        # 检查依赖关系是否为空
+        empty_deps = {k: v for k, v in script_dependencies.items() if not v}
+        if len(empty_deps) == len(script_dependencies):
+            logger.warning(f"所有脚本的依赖关系为空,这可能表示Neo4j查询未能正确获取表之间的关系")
+            
+            # 尝试从Neo4j额外构建顶层表间依赖关系
+            logger.info("尝试通过Neo4j直接查询表间依赖关系...")
+            driver = get_neo4j_driver()
+            try:
+                with driver.session() as session:
+                    # 查询所有表之间的依赖关系
+                    query = """
+                        MATCH (a)-[r]->(b)
+                        WHERE a.en_name IS NOT NULL AND b.en_name IS NOT NULL
+                        RETURN a.en_name AS source, b.en_name AS target
+                    """
+                    
+                    try:
+                        result = session.run(query)
+                        records = list(result)
+                        
+                        # 创建表之间的依赖映射
+                        table_dependencies = {}
+                        for record in records:
+                            source = record.get("source")
+                            target = record.get("target")
+                            
+                            if source and target:
+                                if source not in table_dependencies:
+                                    table_dependencies[source] = []
+                                
+                                if target not in table_dependencies[source]:
+                                    table_dependencies[source].append(target)
+                                    logger.info(f"从Neo4j发现表依赖: {source} 依赖于 {target}")
+                        
+                        # 创建表到脚本的映射
+                        table_to_script_map = {}
+                        for script in scripts:
+                            target_table = script['target_table']
+                            script_id = script['script_id']
+                            
+                            if target_table not in table_to_script_map:
+                                table_to_script_map[target_table] = []
+                            
+                            table_to_script_map[target_table].append(script_id)
+                        
+                        # 基于表依赖关系构建脚本依赖关系
+                        updated_dependencies = False
+                        for table, deps in table_dependencies.items():
+                            if table in table_to_script_map:
+                                for table_script_id in table_to_script_map[table]:
+                                    for dep_table in deps:
+                                        if dep_table in table_to_script_map:
+                                            for dep_script_id in table_to_script_map[dep_table]:
+                                                if table_script_id != dep_script_id and dep_script_id not in script_dependencies[table_script_id]:
+                                                    script_dependencies[table_script_id].append(dep_script_id)
+                                                    logger.info(f"添加额外脚本依赖: {table_script_id} 依赖于 {dep_script_id}")
+                                                    updated_dependencies = True
+                        
+                        if updated_dependencies:
+                            # 如果依赖关系有更新,重新构建依赖图并优化执行顺序
+                            G = nx.DiGraph()
+                            
+                            # 添加所有脚本作为节点
+                            for script in scripts:
+                                G.add_node(script['script_id'])
+                            
+                            # 添加依赖边
+                            for script_id, deps in script_dependencies.items():
+                                for dep_id in deps:
+                                    G.add_edge(script_id, dep_id)
+                            
+                            dependency_graph = G
+                            logger.info("依赖图已基于表依赖关系重新构建")
+                    except Exception as e:
+                        logger.warning(f"查询Neo4j获取表级依赖时出错: {str(e)}")
+            except Exception as e:
+                logger.error(f"尝试直接查询表依赖关系时出错: {str(e)}")
+            finally:
+                driver.close()
+        
+        # 7. 优化脚本执行顺序
+        execution_order = optimize_script_execution_order(scripts, script_dependencies, dependency_graph)
+        logger.info(f"生成优化的执行顺序,包含 {len(execution_order)} 个脚本")
+        
+        # 8. 分类脚本
+        resource_scripts = []
+        model_scripts = []
+        
+        for script in scripts:
+            script_id = script['script_id']
+            if script['target_table_label'] == 'DataResource':
+                resource_scripts.append(script_id)
+            elif script['target_table_label'] == 'DataModel':
+                model_scripts.append(script_id)
+        
+        logger.info(f"分类完成: {len(resource_scripts)} 个资源脚本, {len(model_scripts)} 个模型脚本")
+        
+        # 构建执行计划并保存到数据库
+        try:
+            # 9. 创建最终执行计划
+            execution_plan = {
+                "exec_date": exec_date,
+                "version": "2.0",
+                "scripts": scripts,
+                "script_dependencies": script_dependencies,
+                "resource_scripts": resource_scripts,
+                "model_scripts": model_scripts,
+                "execution_order": execution_order
+            }
+            
+            # 10. 更新订阅表状态哈希值
+            current_hash = get_subscription_state_hash()
+            hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
+            with open(hash_file, 'w') as f:
+                f.write(current_hash)
+            logger.info(f"已更新订阅表状态哈希值: {current_hash}")
+            
+            # 11. 触发产品线执行DAG重新解析
+            touch_product_scheduler_file()
+            
+            # 12. 保存执行计划到数据库表
+            try:
+                # 获取DAG运行信息
+                dag_run = kwargs.get('dag_run')
+                if dag_run:
+                    dag_id = dag_run.dag_id
+                    run_id = dag_run.run_id
+                    logical_date = dag_run.logical_date
+                    local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
+                else:
+                    # 如果无法获取dag_run,使用默认值
+                    dag_id = kwargs.get('dag').dag_id if 'dag' in kwargs else "dataops_productline_prepare_dag"
+                    run_id = f"manual_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+                    logical_date = datetime.now()
+                
+                # 保存到数据库
+                save_result = save_execution_plan_to_db(
+                    execution_plan=execution_plan,
+                    dag_id=dag_id,
+                    run_id=run_id,
+                    logical_date=local_logical_date,
+                    ds=exec_date
+                )
+                
+                if save_result:
+                    logger.info("执行计划已成功保存到数据库")
+                else:
+                    raise Exception("执行计划保存到数据库失败")
+                
+            except Exception as db_e:
+                # 捕获数据库保存错误
+                error_msg = f"保存执行计划到数据库时出错: {str(db_e)}"
+                logger.error(error_msg)
+                raise Exception(error_msg)
+            
+        except Exception as e:
+            error_msg = f"创建或保存执行计划时出错: {str(e)}"
+            logger.error(error_msg)
+            # 强制抛出异常,确保任务失败,阻止下游DAG执行
+            raise Exception(error_msg)
+        
+        return len(scripts)  # 返回脚本数量
+    except Exception as e:
+        error_msg = f"产品线DAG调度任务准备失败: {str(e)}"
+        logger.error(error_msg)
+        # 强制抛出异常,确保任务失败
+        raise Exception(error_msg)
+
+# 创建DAG
+with DAG(
+    "dataops_productline_prepare_dag",
+    start_date=datetime(2024, 1, 1),
+    # 每小时执行一次
+    schedule_interval="0 * * * *",
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    },
+    params={
+        'MANUAL_TRIGGER': False, 
+    },
+) as dag:
+    
+    # 任务开始标记
+    start_preparation = EmptyOperator(
+        task_id="start_preparation",
+        dag=dag
+    )
+    
+    # 准备调度任务
+    prepare_task = PythonOperator(
+        task_id="prepare_productline_dag_schedule",
+        python_callable=prepare_productline_dag_schedule,
+        provide_context=True,
+        dag=dag
+    )
+    
+    # 检查执行计划是否存在于数据库中
+    check_plan_in_db = ShortCircuitOperator(
+        task_id="check_execution_plan_in_db",
+        python_callable=check_execution_plan_in_db,
+        provide_context=True,
+        dag=dag
+    )
+    
+    # 准备完成标记
+    preparation_completed = EmptyOperator(
+        task_id="preparation_completed",
+        dag=dag
+    )
+    
+    # 设置任务依赖
+    start_preparation >> prepare_task >> check_plan_in_db >> preparation_completed