Переглянути джерело

开始项目代码优化整合

wangxq 1 місяць тому
батько
коміт
89fbff1ca9

+ 0 - 90
dags/dag_data_model_daily.py

@@ -1,90 +0,0 @@
-# dag_data_model_daily.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime
-from utils import (
-    get_enabled_tables, is_data_model_table, run_model_script, 
-    get_model_dependency_graph, process_model_tables
-)
-from config import NEO4J_CONFIG
-import pendulum
-import logging
-import networkx as nx
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def generate_optimized_execution_order(table_names: list) -> list:
-    """
-    生成优化的执行顺序,可处理循环依赖    
-    参数:
-        table_names: 表名列表    
-    返回:
-        list: 优化后的执行顺序列表
-    """
-    # 创建依赖图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table_name in table_names:
-        G.add_node(table_name)
-    
-    # 添加依赖边
-    dependency_dict = get_model_dependency_graph(table_names)
-    for target, upstreams in dependency_dict.items():
-        for upstream in upstreams:
-            if upstream in table_names:  # 确保只考虑目标表集合中的表
-                G.add_edge(upstream, target)
-    
-    # 检测循环依赖
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
-        # 打破循环依赖(简单策略:移除每个循环中的一条边)
-        for cycle in cycles:
-            # 移除循环中的最后一条边
-            G.remove_edge(cycle[-1], cycle[0])
-            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-    
-    # 生成拓扑排序
-    try:
-        execution_order = list(nx.topological_sort(G))
-        return execution_order
-    except Exception as e:
-        logger.error(f"生成执行顺序失败: {str(e)}")
-        # 返回原始列表作为备选
-        return table_names
-
-with DAG("dag_data_model_daily", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
-    logger.info("初始化 dag_data_model_daily DAG")
-    
-    # 等待资源表 DAG 完成
-    wait_for_resource = ExternalTaskSensor(
-        task_id="wait_for_data_resource",
-        external_dag_id="dag_data_resource",
-        external_task_id=None,
-        mode="poke",
-        timeout=3600,
-        poke_interval=30
-    )
-    logger.info("创建资源表等待任务 - wait_for_data_resource")
-
-    # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
-    daily_completed = EmptyOperator(
-        task_id="daily_processing_completed",
-        dag=dag
-    )
-    logger.info("创建任务完成标记 - daily_processing_completed")
-
-    # 获取启用的 daily 模型表
-    try:
-        enabled_tables = get_enabled_tables("daily")
-        # 使用公共函数处理模型表
-        process_model_tables(enabled_tables, "daily", wait_for_resource, daily_completed, dag)
-    except Exception as e:
-        logger.error(f"获取 daily 模型表时出错: {str(e)}")
-        # 出错时也要确保完成标记被触发
-        wait_for_resource >> daily_completed
-        raise

+ 0 - 123
dags/dag_data_model_monthly.py

@@ -1,123 +0,0 @@
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-from utils import (
-    get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph, 
-    check_table_relationship, process_model_tables
-)
-from config import NEO4J_CONFIG, TASK_RETRY_CONFIG
-import pendulum
-import logging
-import networkx as nx
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def generate_optimized_execution_order(table_names: list) -> list:
-    """
-    生成优化的执行顺序,可处理循环依赖    
-    参数:
-        table_names: 表名列表    
-    返回:
-        list: 优化后的执行顺序列表
-    """
-    # 创建依赖图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table_name in table_names:
-        G.add_node(table_name)
-    
-    # 添加依赖边
-    dependency_dict = get_model_dependency_graph(table_names)
-    for target, upstreams in dependency_dict.items():
-        for upstream in upstreams:
-            if upstream in table_names:  # 确保只考虑目标表集合中的表
-                G.add_edge(upstream, target)
-    
-    # 检测循环依赖
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
-        # 打破循环依赖(简单策略:移除每个循环中的一条边)
-        for cycle in cycles:
-            # 移除循环中的最后一条边
-            G.remove_edge(cycle[-1], cycle[0])
-            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-    
-    # 生成拓扑排序
-    try:
-        execution_order = list(nx.topological_sort(G))
-        return execution_order
-    except Exception as e:
-        logger.error(f"生成执行顺序失败: {str(e)}")
-        # 返回原始列表作为备选
-        return table_names
-
-def is_first_day():
-    return True
-    # 生产环境中应使用实际判断
-    # return pendulum.now().day == 1
-
-with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
-    logger.info("初始化 dag_data_model_monthly DAG")
-    
-    # 修改依赖关系:直接依赖于daily.py而不是weekly.py
-    wait_for_daily = ExternalTaskSensor(
-        task_id="wait_for_daily_model",
-        external_dag_id="dag_data_model_daily",
-        external_task_id="daily_processing_completed",  # 指定完成标记任务
-        mode="poke",
-        timeout=3600,
-        poke_interval=30
-    )
-    logger.info("创建日模型等待任务 - wait_for_daily_model")
-    
-    # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
-    monthly_completed = EmptyOperator(
-        task_id="monthly_processing_completed",
-        dag=dag
-    )
-    logger.info("创建任务完成标记 - monthly_processing_completed")
-    
-    # 检查今天是否是月初
-    if is_first_day():
-        logger.info("今天是月初,开始处理月模型")
-        # 获取启用的 monthly 模型表
-        try:
-            enabled_tables = get_enabled_tables("monthly")
-            
-            # 特别检查两个表之间的关系(这是monthly.py特有的)
-            table_names = [t['table_name'] for t in enabled_tables if is_data_model_table(t['table_name'])]
-            if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
-                logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
-                relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
-                logger.info(f"关系检查结果: {relationship}")
-            
-            # 定义monthly特有的任务选项
-            task_options = {
-                'default': {
-                    'retries': TASK_RETRY_CONFIG["retries"],
-                    'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
-                },
-                'book_sale_amt_monthly': {
-                    'trigger_rule': "none_failed",
-                    'retries': TASK_RETRY_CONFIG["retries"],
-                    'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
-                }
-            }
-            
-            # 使用公共函数处理模型表 - 修改依赖任务名称
-            process_model_tables(enabled_tables, "monthly", wait_for_daily, monthly_completed, dag, **task_options)
-            
-        except Exception as e:
-            logger.error(f"获取 monthly 模型表时出错: {str(e)}")
-            # 出错时也要确保完成标记被触发 - 修改依赖任务名称
-            wait_for_daily >> monthly_completed
-            raise
-    else:
-        # 如果不是月初,直接将等待任务与完成标记相连接,跳过处理 - 修改依赖任务名称
-        logger.info("今天不是月初,跳过月模型处理")
-        wait_for_daily >> monthly_completed

+ 0 - 288
dags/dag_data_model_scheduler.py

@@ -1,288 +0,0 @@
-# dag_data_model_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-import pendulum
-import logging
-import networkx as nx
-from utils import (
-    get_enabled_tables,
-    is_data_model_table,
-    run_model_script,
-    get_model_dependency_graph,
-    check_script_exists,
-    get_script_name_from_neo4j
-)
-from config import TASK_RETRY_CONFIG
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_all_enabled_tables_for_today():
-    """
-    根据当前日期获取所有需要处理的表
-    
-    返回:
-        list: 需要处理的表配置列表
-    """
-    today = pendulum.today()
-    # 原始代码(注释)
-    # is_monday = today.day_of_week == 0
-    # is_first_day_of_month = today.day == 1
-    # is_first_day_of_year = today.month == 1 and today.day == 1
-    
-    # 测试用:所有条件设为True
-    is_monday = True
-    is_first_day_of_month = True
-    is_first_day_of_year = True
-    
-    logger.info(f"今日日期: {today.to_date_string()}")
-    logger.info(f"日期特性: 是否周一={is_monday}, 是否月初={is_first_day_of_month}, 是否年初={is_first_day_of_year}")
-    
-    all_tables = []
-    
-    # 每天都处理daily表
-    daily_tables = get_enabled_tables("daily")
-    all_tables.extend(daily_tables)
-    logger.info(f"添加daily表: {len(daily_tables)}个")
-    
-    # 周一处理weekly表
-    if is_monday:
-        weekly_tables = get_enabled_tables("weekly")
-        all_tables.extend(weekly_tables)
-        logger.info(f"今天是周一,添加weekly表: {len(weekly_tables)}个")
-    
-    # 月初处理monthly表
-    if is_first_day_of_month:
-        monthly_tables = get_enabled_tables("monthly")
-        all_tables.extend(monthly_tables)
-        logger.info(f"今天是月初,添加monthly表: {len(monthly_tables)}个")
-    
-    # 年初处理yearly表
-    if is_first_day_of_year:
-        yearly_tables = get_enabled_tables("yearly")
-        all_tables.extend(yearly_tables)
-        logger.info(f"今天是年初,添加yearly表: {len(yearly_tables)}个")
-    
-    # 去重
-    unique_tables = {}
-    for item in all_tables:
-        table_name = item["table_name"]
-        if table_name not in unique_tables:
-            unique_tables[table_name] = item
-        else:
-            # 如果存在重复,优先保留execution_mode为full_refresh的配置
-            if item["execution_mode"] == "full_refresh":
-                unique_tables[table_name] = item
-    
-    result_tables = list(unique_tables.values())
-    logger.info(f"去重后,共 {len(result_tables)} 个表需要处理")
-    
-    # 记录所有需要处理的表
-    for idx, item in enumerate(result_tables, 1):
-        logger.info(f"表[{idx}]: {item['table_name']}, 执行模式: {item['execution_mode']}")
-    
-    return result_tables
-
-def optimize_execution_plan(tables):
-    """
-    优化表的执行计划
-    
-    参数:
-        tables (list): 表配置列表
-        
-    返回:
-        tuple: (优化后的表执行顺序, 依赖关系图)
-    """
-    logger.info("开始优化执行计划...")
-    
-    # 筛选出DataModel类型的表
-    model_tables = []
-    for table in tables:
-        table_name = table["table_name"]
-        if is_data_model_table(table_name):
-            model_tables.append(table)
-    
-    logger.info(f"筛选出 {len(model_tables)} 个DataModel类型的表")
-    
-    if not model_tables:
-        logger.warning("没有找到DataModel类型的表,无需优化执行计划")
-        return [], {}
-    
-    # 获取表名列表
-    table_names = [t["table_name"] for t in model_tables]
-    
-    # 创建有向图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table_name in table_names:
-        G.add_node(table_name)
-    
-    # 获取依赖关系
-    dependency_dict = get_model_dependency_graph(table_names)
-    logger.info(f"获取到 {len(dependency_dict)} 个表的依赖关系")
-    
-    # 添加依赖边
-    edge_count = 0
-    for target, upstreams in dependency_dict.items():
-        for upstream in upstreams:
-            if upstream in table_names:  # 确保只考虑当前处理的表
-                G.add_edge(upstream, target)  # 从上游指向下游
-                edge_count += 1
-    
-    logger.info(f"依赖图中添加了 {edge_count} 条边")
-    
-    # 检测循环依赖
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        logger.warning(f"检测到 {len(cycles)} 个循环依赖,将尝试打破循环")
-        # 打破循环依赖(简单策略:移除每个循环中的最后一条边)
-        for cycle in cycles:
-            # 移除循环中的最后一条边
-            G.remove_edge(cycle[-1], cycle[0])
-            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-    
-    # 生成拓扑排序
-    try:
-        # 拓扑排序会从没有入边的节点开始,这些节点是上游节点,因此排序结果是从上游到下游
-        execution_order = list(nx.topological_sort(G))
-        logger.info(f"成功生成执行顺序,按从上游到下游顺序共 {len(execution_order)} 个表")
-        
-        # 创建结果依赖字典,包含所有表(即使没有依赖)
-        result_dependency_dict = {name: [] for name in table_names}
-        
-        # 添加实际依赖关系
-        for target, upstreams in dependency_dict.items():
-            if target in table_names:  # 确保只考虑当前处理的表
-                result_dependency_dict[target] = [u for u in upstreams if u in table_names]
-        
-        return execution_order, result_dependency_dict
-    except Exception as e:
-        logger.error(f"生成执行顺序失败: {str(e)}")
-        # 如果拓扑排序失败,返回原始表名列表和空依赖图
-        return table_names, {name: [] for name in table_names}
-
-with DAG(
-    "dag_data_model_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False
-) as dag:
-    logger.info("初始化 dag_data_model_scheduler DAG")
-    
-    # 等待资源表 DAG 完成
-    wait_for_resource = ExternalTaskSensor(
-        task_id="wait_for_resource_loading",
-        external_dag_id="dag_data_resource_scheduler",
-        external_task_id="resource_loading_completed",
-        mode="poke",
-        timeout=3600,
-        poke_interval=30
-    )
-    logger.info("创建资源表等待任务 - wait_for_resource_loading")
-
-    # 创建一个完成标记任务
-    model_processing_completed = EmptyOperator(
-        task_id="model_processing_completed",
-        dag=dag
-    )
-    logger.info("创建模型处理完成标记 - model_processing_completed")
-
-    try:
-        # 获取今日需要处理的所有表
-        all_enabled_tables = get_all_enabled_tables_for_today()
-        
-        if not all_enabled_tables:
-            logger.info("今天没有需要处理的表,直接连接开始和结束任务")
-            wait_for_resource >> model_processing_completed
-        else:
-            # 优化执行计划
-            execution_order, dependency_dict = optimize_execution_plan(all_enabled_tables)
-            
-            if not execution_order:
-                logger.info("执行计划为空,直接连接开始和结束任务")
-                wait_for_resource >> model_processing_completed
-            else:
-                # 创建任务字典
-                task_dict = {}
-                
-                # 为每个表创建处理任务
-                for table_name in execution_order:
-                    # 查找表配置
-                    table_config = next((t for t in all_enabled_tables if t["table_name"] == table_name), None)
-                    
-                    if table_config:
-                        logger.info(f"为表 {table_name} 创建处理任务,执行模式: {table_config['execution_mode']}")
-                        
-                        # 创建任务
-                        task = PythonOperator(
-                            task_id=f"process_{table_name}",
-                            python_callable=run_model_script,
-                            op_kwargs={
-                                "table_name": table_name,
-                                "execution_mode": table_config["execution_mode"]
-                            },
-                            retries=TASK_RETRY_CONFIG["retries"],
-                            retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
-                            dag=dag
-                        )
-                        
-                        # 将任务添加到字典
-                        task_dict[table_name] = task
-                
-                # 设置任务间的依赖关系
-                for table_name, task in task_dict.items():
-                    # 获取上游依赖
-                    upstream_tables = dependency_dict.get(table_name, [])
-                    
-                    if not upstream_tables:
-                        # 如果没有上游依赖,直接连接到资源表等待任务
-                        logger.info(f"表 {table_name} 没有上游依赖,连接到资源表等待任务")
-                        wait_for_resource >> task
-                    else:
-                        # 设置与上游表的依赖关系
-                        for upstream_table in upstream_tables:
-                            if upstream_table in task_dict:
-                                logger.info(f"设置依赖: {upstream_table} >> {table_name}")
-                                task_dict[upstream_table] >> task
-                    
-                    # 检查是否是末端节点(没有下游节点)
-                    is_terminal = True
-                    for target, upstreams in dependency_dict.items():
-                        if table_name in upstreams:
-                            is_terminal = False
-                            break
-                    
-                    # 如果是末端节点,连接到模型处理完成标记
-                    if is_terminal:
-                        logger.info(f"表 {table_name} 是末端节点,连接到模型处理完成标记")
-                        task >> model_processing_completed
-                
-                # 处理特殊情况:检查是否有任务连接到完成标记
-                has_connection_to_completed = False
-                for task in task_dict.values():
-                    for downstream in task.downstream_list:
-                        if downstream.task_id == model_processing_completed.task_id:
-                            has_connection_to_completed = True
-                            break
-                
-                # 如果没有任务连接到完成标记,连接所有任务到完成标记
-                if not has_connection_to_completed and task_dict:
-                    logger.info("没有发现连接到完成标记的任务,连接所有任务到完成标记")
-                    for task in task_dict.values():
-                        task >> model_processing_completed
-                
-                # 处理特殊情况:如果资源等待任务没有下游任务,直接连接到完成标记
-                if not wait_for_resource.downstream_list:
-                    logger.info("资源等待任务没有下游任务,直接连接到完成标记")
-                    wait_for_resource >> model_processing_completed
-    
-    except Exception as e:
-        logger.error(f"构建DAG时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 确保出错时也有完整的执行流
-        wait_for_resource >> model_processing_completed

+ 0 - 101
dags/dag_data_model_weekly.py

@@ -1,101 +0,0 @@
-# dag_data_model_weekly.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime
-from utils import (
-    get_enabled_tables, is_data_model_table, run_model_script, 
-    get_model_dependency_graph, process_model_tables
-)
-from config import NEO4J_CONFIG
-import pendulum
-import logging
-import networkx as nx
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def generate_optimized_execution_order(table_names: list) -> list:
-    """
-    生成优化的执行顺序,可处理循环依赖    
-    参数:
-        table_names: 表名列表    
-    返回:
-        list: 优化后的执行顺序列表
-    """
-    # 创建依赖图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table_name in table_names:
-        G.add_node(table_name)
-    
-    # 添加依赖边
-    dependency_dict = get_model_dependency_graph(table_names)
-    for target, upstreams in dependency_dict.items():
-        for upstream in upstreams:
-            if upstream in table_names:  # 确保只考虑目标表集合中的表
-                G.add_edge(upstream, target)
-    
-    # 检测循环依赖
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
-        # 打破循环依赖(简单策略:移除每个循环中的一条边)
-        for cycle in cycles:
-            # 移除循环中的最后一条边
-            G.remove_edge(cycle[-1], cycle[0])
-            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-    
-    # 生成拓扑排序
-    try:
-        execution_order = list(nx.topological_sort(G))
-        return execution_order
-    except Exception as e:
-        logger.error(f"生成执行顺序失败: {str(e)}")
-        # 返回原始列表作为备选
-        return table_names
-
-def is_monday():
-    return True
-    #return pendulum.now().day_of_week == 0
-
-with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
-    logger.info("初始化 dag_data_model_weekly DAG")
-    
-    # 等待日模型 DAG 完成
-    wait_for_daily = ExternalTaskSensor(
-        task_id="wait_for_daily_model",
-        external_dag_id="dag_data_model_daily",
-        external_task_id="daily_processing_completed",  # 指定完成标记任务
-        mode="poke",
-        timeout=3600,
-        poke_interval=30
-    )
-    logger.info("创建日模型等待任务 - wait_for_daily_model")
-    
-    # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
-    weekly_completed = EmptyOperator(
-        task_id="weekly_processing_completed",
-        dag=dag
-    )
-    logger.info("创建任务完成标记 - weekly_processing_completed")
-    
-    # 检查今天是否是周一
-    if is_monday():
-        logger.info("今天是周一,开始处理周模型")
-        # 获取启用的 weekly 模型表
-        try:
-            enabled_tables = get_enabled_tables("weekly")
-            # 使用公共函数处理模型表
-            process_model_tables(enabled_tables, "weekly", wait_for_daily, weekly_completed, dag)
-        except Exception as e:
-            logger.error(f"获取 weekly 模型表时出错: {str(e)}")
-            # 出错时也要确保完成标记被触发
-            wait_for_daily >> weekly_completed
-            raise
-    else:
-        # 如果不是周一,直接将等待任务与完成标记相连接,跳过处理
-        logger.info("今天不是周一,跳过周模型处理")
-        wait_for_daily >> weekly_completed

+ 0 - 60
dags/dag_data_model_yearly.py

@@ -1,60 +0,0 @@
-# dag_data_model_yearly.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime
-from utils import (
-    get_enabled_tables, is_data_model_table, run_model_script, 
-    get_model_dependency_graph, process_model_tables
-)
-from config import NEO4J_CONFIG
-import pendulum
-import logging
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def is_first_day_of_year():
-    return True
-    # 生产环境中应使用实际判断
-    # return pendulum.now().month == 1 and pendulum.now().day == 1
-
-with DAG("dag_data_model_yearly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
-    logger.info("初始化 dag_data_model_yearly DAG")
-    
-    # 等待月模型 DAG 完成
-    wait_for_monthly = ExternalTaskSensor(
-        task_id="wait_for_monthly_model",
-        external_dag_id="dag_data_model_monthly",
-        external_task_id="monthly_processing_completed",  # 指定完成标记任务
-        mode="poke",
-        timeout=3600,
-        poke_interval=30
-    )
-    logger.info("创建月模型等待任务 - wait_for_monthly_model")
-    
-    # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
-    yearly_completed = EmptyOperator(
-        task_id="yearly_processing_completed",
-        dag=dag
-    )
-    logger.info("创建任务完成标记 - yearly_processing_completed")
-    
-    # 检查今天是否是年初
-    if is_first_day_of_year():
-        logger.info("今天是年初,开始处理年模型")
-        # 获取启用的 yearly 模型表
-        try:
-            enabled_tables = get_enabled_tables("yearly")
-            # 使用公共函数处理模型表
-            process_model_tables(enabled_tables, "yearly", wait_for_monthly, yearly_completed, dag)
-        except Exception as e:
-            logger.error(f"获取 yearly 模型表时出错: {str(e)}")
-            # 出错时也要确保完成标记被触发
-            wait_for_monthly >> yearly_completed
-            raise
-    else:
-        # 如果不是年初,直接将等待任务与完成标记相连接,跳过处理
-        logger.info("今天不是年初,跳过年模型处理")
-        wait_for_monthly >> yearly_completed 

+ 0 - 155
dags/dag_data_resource.py

@@ -1,155 +0,0 @@
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from datetime import datetime
-from utils import (
-    get_enabled_tables,
-    get_resource_subscribed_tables,
-    get_dependency_resource_tables,
-    check_script_exists
-)
-import pendulum
-import logging
-import sys
-from airflow.operators.empty import EmptyOperator
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
-from neo4j import GraphDatabase
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_resource_script_name_from_neo4j(table_name):
-    from neo4j import GraphDatabase
-    from config import NEO4J_CONFIG
-    
-    # 正确处理Neo4j连接参数
-    uri = NEO4J_CONFIG['uri']
-    auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-    driver = GraphDatabase.driver(uri, auth=auth)
-    
-    query = """
-        MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
-        RETURN rel.script_name AS script_name
-    """
-    try:
-        with driver.session() as session:
-            result = session.run(query, table_name=table_name)
-            record = result.single()
-            if record:
-                logger.info(f"找到表 {table_name} 的完整记录: {record}")
-                try:
-                    script_name = record['script_name']
-                    logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
-                    return script_name
-                except (KeyError, TypeError) as e:
-                    logger.warning(f"记录中不包含script_name字段: {e}")
-                    return None
-            else:
-                logger.warning(f"未找到表 {table_name} 的记录")
-                return None
-    except Exception as e:
-        logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
-        return None
-    finally:
-        driver.close()
-
-def load_table_data(table_name, execution_mode):
-    import os
-    import importlib.util
-
-    script_name = get_resource_script_name_from_neo4j(table_name)
-    if not script_name:
-        logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
-        return
-    
-    logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
-    
-    # 检查脚本文件是否存在
-    exists, script_path = check_script_exists(script_name)
-    if not exists:
-        logger.error(f"表 {table_name} 的脚本文件 {script_name} 不存在,跳过处理")
-        return False
-    
-    # 执行脚本
-    logger.info(f"开始执行脚本: {script_path}")
-    try:
-        # 动态导入模块
-        import importlib.util
-        import sys
-        
-        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
-        module = importlib.util.module_from_spec(spec)
-        spec.loader.exec_module(module)
-        
-        # 检查并调用标准入口函数run
-        if hasattr(module, "run"):
-            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            module.run(table_name=table_name, execution_mode=execution_mode)
-            logger.info(f"脚本 {script_name} 执行成功")
-            return True
-        else:
-            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
-            return False
-    except Exception as e:
-        logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        return False
-
-with DAG(
-    "dag_data_resource", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False,
-    # 添加DAG级别的并行度控制,每个DAG运行最多同时执行2个任务
-    # max_active_tasks=8
-    # 无论有多少个DAG运行实例(比如昨天、今天的运行),这个DAG定义的所有实例总共最多有5个任务同时运行
-    # concurrency=5
-) as dag:
-    today = pendulum.today()
-    is_monday = today.day_of_week == 0
-    is_first_day_of_month = today.day == 1
-    is_first_day_of_year = today.month == 1 and today.day == 1
-
-    all_resource_tables = []
-
-    # 使用循环处理不同频率的表
-    frequency_configs = [
-        {"name": "daily", "condition": True},
-        {"name": "weekly", "condition": is_monday},
-        {"name": "monthly", "condition": is_first_day_of_month},
-        {"name": "yearly", "condition": is_first_day_of_year}
-    ]
-
-    for config in frequency_configs:
-        frequency = config["name"]
-        if config["condition"]:
-            enabled_tables = get_enabled_tables(frequency)
-            all_resource_tables.extend(get_resource_subscribed_tables(enabled_tables))
-            all_resource_tables.extend(get_dependency_resource_tables(enabled_tables))
-            logger.info(f"已添加 {frequency} 频率的资源表")
-
-    # 去重(按表名)
-    unique_resources = {}
-    for item in all_resource_tables:
-        name = item["table_name"]
-        if name not in unique_resources:
-            unique_resources[name] = item
-
-    resource_tables = list(unique_resources.values())
-    
-    # 创建开始任务
-    start_loading = EmptyOperator(task_id="start_resource_loading")
-    
-    # 创建结束任务
-    end_loading = EmptyOperator(task_id="finish_resource_loading")
-    
-    
-    for item in resource_tables:
-        task = PythonOperator(
-            task_id=f"load_{item['table_name']}",
-            python_callable=load_table_data,
-            op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
-        )
-        
-        # 设置依赖
-        start_loading >> task >> end_loading

+ 0 - 184
dags/dag_data_resource_scheduler.py

@@ -1,184 +0,0 @@
-# dag_data_resource_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from datetime import datetime
-import pendulum
-import logging
-import os
-from pathlib import Path
-from neo4j import GraphDatabase
-from utils import (
-    get_enabled_tables,
-    get_resource_subscribed_tables,
-    get_dependency_resource_tables,
-    check_script_exists
-)
-from config import NEO4J_CONFIG, SCRIPTS_BASE_PATH
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_resource_script_name_from_neo4j(table_name):
-    """
-    从Neo4j数据库中查询DataResource表对应的脚本名称
-    这个函数直接在当前文件中实现,而不是从utils导入
-    
-    参数:
-        table_name (str): 数据资源表名
-        
-    返回:
-        str: 脚本名称,如果未找到则返回None
-    """
-    # 使用导入的Neo4j配置
-    uri = NEO4J_CONFIG['uri']
-    auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
-    driver = GraphDatabase.driver(uri, auth=auth)
-    
-    query = """
-        MATCH (dr:DataResource {en_name: $table_name})-[rel:ORIGINATES_FROM]->(ds:DataSource)
-        RETURN rel.script_name AS script_name
-    """
-    try:
-        with driver.session() as session:
-            result = session.run(query, table_name=table_name)
-            record = result.single()
-            if record:
-                logger.info(f"找到表 {table_name} 的完整记录: {record}")
-                try:
-                    script_name = record['script_name']
-                    logger.info(f"找到表 {table_name} 的 script_name: {script_name}")
-                    return script_name
-                except (KeyError, TypeError) as e:
-                    logger.warning(f"记录中不包含script_name字段: {e}")
-                    return None
-            else:
-                logger.warning(f"未找到表 {table_name} 的记录")
-                return None
-    except Exception as e:
-        logger.error(f"查询表 {table_name} 的脚本名称时出错: {str(e)}")
-        return None
-    finally:
-        driver.close()
-
-def load_table_data(table_name, execution_mode):
-    """执行数据资源表加载脚本的函数"""
-    script_name = get_resource_script_name_from_neo4j(table_name)
-    if not script_name:
-        logger.warning(f"未找到表 {table_name} 的 script_name,跳过")
-        return False
-    
-    logger.info(f"从Neo4j获取到表 {table_name} 的脚本名称: {script_name}")
-    
-    # 检查脚本文件是否存在
-    exists, script_path = check_script_exists(script_name)
-    if not exists:
-        logger.error(f"表 {table_name} 的脚本文件 {script_name} 不存在,跳过处理")
-        return False
-    
-    # 执行脚本
-    logger.info(f"开始执行脚本: {script_path}")
-    try:
-        # 动态导入模块
-        import importlib.util
-        
-        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
-        module = importlib.util.module_from_spec(spec)
-        spec.loader.exec_module(module)
-        
-        # 检查并调用标准入口函数run
-        if hasattr(module, "run"):
-            logger.info(f"调用脚本 {script_name} 的标准入口函数 run()")
-            module.run(table_name=table_name, execution_mode=execution_mode)
-            logger.info(f"脚本 {script_name} 执行成功")
-            return True
-        else:
-            logger.error(f"脚本 {script_name} 中未定义标准入口函数 run(),无法执行")
-            return False
-    except Exception as e:
-        logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        return False
-
-with DAG(
-    "dag_data_resource_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False,
-) as dag:
-    # 获取当前日期信息
-    today = pendulum.today()
-    # 原始代码(注释)
-    # is_monday = today.day_of_week == 0
-    # is_first_day_of_month = today.day == 1
-    # is_first_day_of_year = today.month == 1 and today.day == 1
-    
-    # 测试用:所有条件设为True
-    is_monday = True
-    is_first_day_of_month = True
-    is_first_day_of_year = True
-
-    all_resource_tables = []
-
-    # 使用循环处理不同频率的表
-    frequency_configs = [
-        {"name": "daily", "condition": True},
-        {"name": "weekly", "condition": is_monday},
-        {"name": "monthly", "condition": is_first_day_of_month},
-        {"name": "yearly", "condition": is_first_day_of_year}
-    ]
-
-    # 记录日期信息
-    logger.info(f"今日日期: {today}, 是否周一: {is_monday}, 是否月初: {is_first_day_of_month}, 是否年初: {is_first_day_of_year}")
-    logger.info(f"脚本基础路径: {SCRIPTS_BASE_PATH}")
-
-    # 收集所有需要处理的资源表
-    for config in frequency_configs:
-        frequency = config["name"]
-        if config["condition"]:
-            logger.info(f"今天需要处理 {frequency} 频率的资源表")
-            enabled_tables = get_enabled_tables(frequency)
-            resource_tables = get_resource_subscribed_tables(enabled_tables)
-            dependency_tables = get_dependency_resource_tables(enabled_tables)
-            
-            all_resource_tables.extend(resource_tables)
-            all_resource_tables.extend(dependency_tables)
-            logger.info(f"已添加 {frequency} 频率的资源表,共 {len(resource_tables) + len(dependency_tables)} 个")
-        else:
-            logger.info(f"今天不需要处理 {frequency} 频率的资源表")
-
-    # 去重(按表名)
-    unique_resources = {}
-    for item in all_resource_tables:
-        name = item["table_name"]
-        if name not in unique_resources:
-            unique_resources[name] = item
-
-    resource_tables = list(unique_resources.values())
-    logger.info(f"去重后,共需处理 {len(resource_tables)} 个资源表")
-    
-    # 创建开始任务
-    start_loading = EmptyOperator(task_id="start_resource_loading")
-    
-    # 创建结束任务
-    end_loading = EmptyOperator(task_id="resource_loading_completed")
-    
-    if resource_tables:
-        for item in resource_tables:
-            table_name = item['table_name']
-            execution_mode = item['execution_mode']
-            
-            logger.info(f"为资源表 {table_name} 创建加载任务")
-            task = PythonOperator(
-                task_id=f"load_{table_name}",
-                python_callable=load_table_data,
-                op_kwargs={"table_name": table_name, "execution_mode": execution_mode},
-            )
-            
-            # 设置依赖
-            start_loading >> task >> end_loading
-    else:
-        logger.info("没有资源表需要处理,直接连接开始和结束任务")
-        # 如果没有任务,确保开始和结束任务相连
-        start_loading >> end_loading

+ 0 - 226
dags/dag_dataops_model_scheduler.py

@@ -1,226 +0,0 @@
-# dag_dataops_model_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-import logging
-from common import (
-    get_pg_conn, execute_with_monitoring, get_datamodel_dependency_from_neo4j,
-    generate_optimized_execution_order, get_today_date
-)
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_latest_date_with_models():
-    """
-    获取数据库中包含DataModel记录的最近日期
-    
-    用于查找数据库中最近的日期,以确保能够获取到数据
-    """
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT DISTINCT exec_date
-            FROM airflow_dag_schedule 
-            WHERE target_table_label = 'DataModel'
-            ORDER BY exec_date DESC
-            LIMIT 1
-        """)
-        result = cursor.fetchone()
-        if result:
-            latest_date = result[0]
-            logger.info(f"找到最近的包含DataModel记录的日期: {latest_date}")
-            return latest_date
-        else:
-            logger.warning("未找到包含DataModel记录的日期,将使用当前日期")
-            return get_today_date()
-    except Exception as e:
-        logger.error(f"查找最近日期时出错: {str(e)}")
-        return get_today_date()
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_datamodel_tasks(exec_date):
-    """从airflow_dag_schedule表获取DataModel任务"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT target_table, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataModel'
-        """, (exec_date,))
-        
-        results = cursor.fetchall()
-        
-        tasks = []
-        for row in results:
-            target_table, script_name, script_exec_mode = row
-            tasks.append({
-                "target_table": target_table,
-                "script_name": script_name,
-                "script_exec_mode": script_exec_mode or "append"  # 默认为append
-            })
-        
-        logger.info(f"使用日期 {exec_date} 获取到 {len(tasks)} 个DataModel任务")
-        return tasks
-    except Exception as e:
-        logger.error(f"获取DataModel任务时出错: {str(e)}")
-        return []
-    finally:
-        cursor.close()
-        conn.close()
-
-# 创建DAG
-with DAG(
-    "dag_dataops_model_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 等待resource DAG完成
-    wait_for_resource = ExternalTaskSensor(
-        task_id="wait_for_resource",
-        external_dag_id="dag_dataops_resource_scheduler",
-        external_task_id="resource_processing_completed",
-        mode="poke",
-        timeout=3600,
-        poke_interval=30,
-        dag=dag
-    )
-    
-    # 处理完成标记
-    model_processing_completed = EmptyOperator(
-        task_id="model_processing_completed",
-        dag=dag
-    )
-    
-    try:
-        # 获取最近的日期
-        latest_date = get_latest_date_with_models()
-        logger.info(f"使用最近的日期 {latest_date} 查询模型任务")
-        
-        # 获取所有DataModel任务
-        model_tasks = get_datamodel_tasks(latest_date)
-        
-        if model_tasks:
-            # 获取表名列表
-            table_names = [task["target_table"] for task in model_tasks]
-            
-            # 获取依赖关系
-            dependency_dict = get_datamodel_dependency_from_neo4j(table_names)
-            
-            # 生成优化的执行顺序
-            execution_order = generate_optimized_execution_order(table_names, dependency_dict)
-            logger.info(f"生成的优化执行顺序: {execution_order}")
-            
-            # 创建任务字典
-            task_dict = {}
-            
-            # 为每个表创建处理任务
-            for table_name in execution_order:
-                # 查找表任务信息
-                task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-                
-                if task_info and task_info.get("script_name"):
-                    process_task = PythonOperator(
-                        task_id=f"process_model_{table_name.replace('.', '_')}",
-                        python_callable=execute_with_monitoring,
-                        op_kwargs={
-                            "target_table": table_name,
-                            "script_name": task_info["script_name"],
-                            "script_exec_mode": task_info.get("script_exec_mode", "append"), 
-                            "exec_date": latest_date  # 使用从数据库获取的最近日期
-                        },
-                        dag=dag
-                    )
-                    task_dict[table_name] = process_task
-                    logger.info(f"创建处理任务: {table_name}")
-                else:
-                    logger.warning(f"表 {table_name} 没有script_name,跳过任务创建")
-            
-            # 设置任务间的依赖关系
-            for target_table, task in task_dict.items():
-                # 获取上游依赖
-                upstream_tables = dependency_dict.get(target_table, [])
-                upstream_tables = [t for t in upstream_tables if t in task_dict]
-                
-                if not upstream_tables:
-                    # 如果没有上游依赖,直接连接到wait_for_resource
-                    logger.info(f"表 {target_table} 没有上游依赖,连接到wait_for_resource")
-                    wait_for_resource >> task
-                else:
-                    # 设置与上游表的依赖关系
-                    for upstream_table in upstream_tables:
-                        logger.info(f"设置依赖: {upstream_table} >> {target_table}")
-                        task_dict[upstream_table] >> task
-                
-                # 检查是否是末端节点(没有下游任务)
-                is_terminal = True
-                for downstream, upstreams in dependency_dict.items():
-                    if target_table in upstreams and downstream in task_dict:
-                        is_terminal = False
-                        break
-                
-                # 如果是末端节点,连接到model_processing_completed
-                if is_terminal:
-                    logger.info(f"表 {target_table} 是末端节点,连接到model_processing_completed")
-                    task >> model_processing_completed
-            
-            # 处理特殊情况
-            # 检查是否有任务连接到model_processing_completed
-            has_connection_to_completed = False
-            for task in task_dict.values():
-                for downstream in task.downstream_list:
-                    if downstream.task_id == model_processing_completed.task_id:
-                        has_connection_to_completed = True
-                        break
-                
-                if has_connection_to_completed:
-                    break
-            
-            # 如果没有任务连接到model_processing_completed,连接所有任务到完成标记
-            if not has_connection_to_completed and task_dict:
-                logger.info("没有任务连接到model_processing_completed,连接所有任务到完成标记")
-                for task in task_dict.values():
-                    task >> model_processing_completed
-            
-            # 检查是否有任务连接到wait_for_resource
-            has_connection_from_wait = False
-            for task in task_dict.values():
-                for upstream in task.upstream_list:
-                    if upstream.task_id == wait_for_resource.task_id:
-                        has_connection_from_wait = True
-                        break
-                
-                if has_connection_from_wait:
-                    break
-            
-            # 如果没有任务连接到wait_for_resource,连接wait_for_resource到所有任务
-            if not has_connection_from_wait and task_dict:
-                logger.info("没有任务连接到wait_for_resource,连接wait_for_resource到所有任务")
-                for task in task_dict.values():
-                    wait_for_resource >> task
-        else:
-            # 如果没有任务,直接将等待节点连接到完成
-            wait_for_resource >> model_processing_completed
-            logger.warning("没有找到DataModel任务,直接将等待节点连接到完成")
-    except Exception as e:
-        logger.error(f"创建模型处理DAG时出错: {str(e)}")
-        import traceback
-        logger.error(traceback.format_exc())
-        # 确保在出错时也有完整的执行流程
-        wait_for_resource >> model_processing_completed

+ 0 - 355
dags/dag_dataops_prepare_scheduler.py

@@ -1,355 +0,0 @@
-# dag_dataops_prepare_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from datetime import datetime, timedelta
-import pendulum
-import logging
-from common import get_pg_conn, get_neo4j_driver, get_today_date
-from config import PG_CONFIG, NEO4J_CONFIG
-import networkx as nx
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_enabled_tables():
-    """获取所有启用的表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT owner_id, table_name 
-            FROM schedule_status 
-            WHERE schedule_is_enabled = TRUE
-        """)
-        result = cursor.fetchall()
-        return [row[1] for row in result]  # 只返回表名
-    except Exception as e:
-        logger.error(f"获取启用表失败: {str(e)}")
-        return []
-    finally:
-        cursor.close()
-        conn.close()
-
-def check_table_directly_subscribed(table_name):
-    """检查表是否在schedule_status表中直接订阅"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT schedule_is_enabled
-            FROM schedule_status 
-            WHERE table_name = %s
-        """, (table_name,))
-        result = cursor.fetchone()
-        return result and result[0] is True
-    except Exception as e:
-        logger.error(f"检查表订阅状态失败: {str(e)}")
-        return False
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_table_info_from_neo4j(table_name):
-    """从Neo4j获取表的详细信息"""
-    driver = get_neo4j_driver()
-     # 检查表是否直接订阅
-    is_directly_schedule = check_table_directly_subscribed(table_name)
-
-    table_info = {
-        'target_table': table_name,
-        'is_directly_schedule': is_directly_schedule,  # 初始值设为True,从schedule_status表获取
-    }
-    
-    try:
-        with driver.session() as session:
-            # 查询表标签和状态
-            query_table = """
-                MATCH (t {en_name: $table_name})
-                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
-            """
-            result = session.run(query_table, table_name=table_name)
-            record = result.single()
-            
-            if record:
-                labels = record.get("labels", [])
-                table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
-                table_info['target_table_status'] = record.get("status", True)  # 默认为True
-                table_info['default_update_frequency'] = record.get("frequency")
-                
-                # 根据标签类型查询关系和脚本信息
-                if "DataResource" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
-                elif "DataModel" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
-                else:
-                    logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
-                    return table_info
-                
-                result = session.run(query_rel, table_name=table_name)
-                record = result.single()
-                
-                if record:
-                    table_info['source_table'] = record.get("source_table")     
-
-                    # 检查script_name是否为空
-                    script_name = record.get("script_name")
-                    if not script_name:
-                        logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
-                    table_info['script_name'] = script_name
-                    
-                    # 设置默认值,确保即使属性为空也有默认值
-                    table_info['script_type'] = record.get("script_type", "python")  # 默认为python
-                    table_info['script_exec_mode'] = record.get("script_exec_mode", "append")  # 默认为append
-                else:
-                    logger.warning(f"未找到表 {table_name} 的关系信息")
-            else:
-                logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
-    except Exception as e:
-        logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    return table_info
-
-def process_dependencies(tables_info):
-    """处理表间依赖关系,添加被动调度的表"""
-    # 存储所有表信息的字典
-    all_tables = {t['target_table']: t for t in tables_info}
-    driver = get_neo4j_driver()
-    
-    try:
-        with driver.session() as session:
-            for table_name, table_info in list(all_tables.items()):
-                if table_info.get('target_table_label') == 'DataModel':
-                    # 查询其依赖表
-                    query = """
-                        MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
-                        RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
-                               dep.status AS dep_status, dep.frequency AS dep_frequency
-                    """
-                    result = session.run(query, table_name=table_name)
-                    
-                    for record in result:
-                        dep_name = record.get("dep_name")
-                        dep_labels = record.get("dep_labels", [])
-                        dep_status = record.get("dep_status", True)
-                        dep_frequency = record.get("dep_frequency")
-                        
-                        # 处理未被直接调度的依赖表
-                        if dep_name and dep_name not in all_tables:
-                            logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
-                            
-                            # 获取依赖表详细信息
-                            dep_info = get_table_info_from_neo4j(dep_name)
-                            dep_info['is_directly_schedule'] = False
-                            
-                            # 处理调度频率继承
-                            if not dep_info.get('default_update_frequency'):
-                                dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
-                            
-                            all_tables[dep_name] = dep_info
-    except Exception as e:
-        logger.error(f"处理依赖关系时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    return list(all_tables.values())
-
-def filter_invalid_tables(tables_info):
-    """过滤无效表及其依赖,使用NetworkX构建依赖图"""
-    # 构建表名到索引的映射
-    table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
-    
-    # 找出无效表
-    invalid_tables = set()
-    for table in tables_info:
-        if table.get('target_table_status') is False:
-            invalid_tables.add(table['target_table'])
-            logger.info(f"表 {table['target_table']} 的状态为无效")
-    
-    # 构建依赖图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table in tables_info:
-        G.add_node(table['target_table'])
-    
-    # 查询并添加依赖边
-    driver = get_neo4j_driver()
-    try:
-        with driver.session() as session:
-            for table in tables_info:
-                if table.get('target_table_label') == 'DataModel':
-                    query = """
-                        MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
-                        RETURN target.en_name AS target_name
-                    """
-                    result = session.run(query, table_name=table['target_table'])
-                    
-                    for record in result:
-                        target_name = record.get("target_name")
-                        if target_name and target_name in table_dict:
-                            # 添加从目标到源的边,表示目标依赖于源
-                            G.add_edge(table['target_table'], target_name)
-                            logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
-    except Exception as e:
-        logger.error(f"构建依赖图时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    # 找出依赖于无效表的所有表
-    downstream_invalid = set()
-    for invalid_table in invalid_tables:
-        # 获取可从无效表到达的所有节点
-        try:
-            descendants = nx.descendants(G, invalid_table)
-            downstream_invalid.update(descendants)
-            logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
-        except Exception as e:
-            logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
-    
-    # 合并所有无效表
-    all_invalid = invalid_tables.union(downstream_invalid)
-    logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
-    
-    # 过滤出有效表
-    valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
-    logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
-    
-    return valid_tables
-
-def write_to_airflow_dag_schedule(exec_date, tables_info):
-    """将表信息写入airflow_dag_schedule表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    
-    try:
-        # 清理当日数据,避免重复
-        cursor.execute("""
-            DELETE FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        logger.info(f"已清理执行日期 {exec_date} 的现有数据")
-        
-        # 批量插入新数据
-        inserted_count = 0
-        for table in tables_info:
-            cursor.execute("""
-                INSERT INTO airflow_dag_schedule (
-                    exec_date, source_table, target_table, target_table_label,
-                    target_table_status, is_directly_schedule, default_update_frequency,
-                    script_name, script_type, script_exec_mode
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
-            """, (
-                exec_date,
-                table.get('source_table'),
-                table['target_table'],
-                table.get('target_table_label'),
-                table.get('target_table_status', True),
-                table.get('is_directly_schedule', False),
-                table.get('default_update_frequency'),
-                table.get('script_name'),
-                table.get('script_type', 'python'),
-                table.get('script_exec_mode', 'append')
-            ))
-            inserted_count += 1
-        
-        conn.commit()
-        logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
-        return inserted_count
-    except Exception as e:
-        logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
-        conn.rollback()
-        # 不要返回0,而是重新抛出异常,确保错误被正确传播
-        raise
-    finally:
-        cursor.close()
-        conn.close()
-
-def prepare_dag_schedule(**kwargs):
-    """准备DAG调度任务的主函数"""
-    exec_date = kwargs.get('ds') or get_today_date()
-    logger.info(f"开始准备执行日期 {exec_date} 的调度任务")
-    
-    # 1. 获取启用的表
-    enabled_tables = get_enabled_tables()
-    logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
-    
-    if not enabled_tables:
-        logger.warning("没有找到启用的表,准备工作结束")
-        return 0
-    
-    # 2. 获取表的详细信息
-    tables_info = []
-    for table_name in enabled_tables:
-        table_info = get_table_info_from_neo4j(table_name)
-        if table_info:
-            tables_info.append(table_info)
-    
-    logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
-    
-    # 3. 处理依赖关系,添加被动调度的表
-    enriched_tables = process_dependencies(tables_info)
-    logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
-    
-    # 4. 过滤无效表及其依赖
-    valid_tables = filter_invalid_tables(enriched_tables)
-    logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
-    
-    # 5. 写入airflow_dag_schedule表
-    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
-    
-    # 6. 检查插入操作是否成功,如果失败则抛出异常
-    if inserted_count == 0 and valid_tables:
-        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
-        logger.error(error_msg)
-        raise Exception(error_msg)
-    
-    return inserted_count
-
-# 创建DAG
-with DAG(
-    "dag_dataops_prepare_scheduler",
-    start_date=datetime(2024, 1, 1),
-    schedule_interval="@daily",
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 任务开始标记
-    start_preparation = EmptyOperator(
-        task_id="start_preparation",
-        dag=dag
-    )
-    
-    # 准备调度任务
-    prepare_task = PythonOperator(
-        task_id="prepare_dag_schedule",
-        python_callable=prepare_dag_schedule,
-        provide_context=True,
-        dag=dag
-    )
-    
-    # 准备完成标记
-    preparation_completed = EmptyOperator(
-        task_id="preparation_completed",
-        dag=dag
-    )
-    
-    # 设置任务依赖
-    start_preparation >> prepare_task >> preparation_completed

+ 0 - 185
dags/dag_dataops_resource_scheduler.py

@@ -1,185 +0,0 @@
-# dag_dataops_resource_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-import logging
-from common import (
-    get_pg_conn, execute_with_monitoring, get_today_date
-)
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-# 添加获取最近日期的函数
-def get_latest_date_with_resources():
-    """
-    获取数据库中包含DataResource记录的最近日期
-    
-    用于查找数据库中最近的日期,以确保能够获取到数据
-    """
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT DISTINCT exec_date
-            FROM airflow_dag_schedule 
-            WHERE target_table_label = 'DataResource'
-            ORDER BY exec_date DESC
-            LIMIT 1
-        """)
-        result = cursor.fetchone()
-        if result:
-            latest_date = result[0]
-            logger.info(f"找到最近的包含DataResource记录的日期: {latest_date}")
-            return latest_date
-        else:
-            logger.warning("未找到包含DataResource记录的日期,将使用当前日期")
-            return get_today_date()
-    except Exception as e:
-        logger.error(f"查找最近日期时出错: {str(e)}")
-        return get_today_date()
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_dataresource_tasks(exec_date):
-    """从airflow_dag_schedule表获取DataResource任务"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 构建SQL查询
-        sql = """
-            SELECT source_table, target_table, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataResource'
-        """
-        # 记录查询信息
-        logger.info(f"查询资源任务,使用日期: {exec_date}")
-        
-        # 执行查询
-        cursor.execute(sql, (exec_date,))
-        results = cursor.fetchall()
-        logger.info(f"使用日期 {exec_date} 查询到 {len(results)} 个DataResource任务")
-        
-        # 处理去重
-        unique_tasks = {}
-        for row in results:
-            source_table, target_table, script_name, script_exec_mode = row
-            # 使用目标表名作为键进行去重
-            if target_table not in unique_tasks:
-                unique_tasks[target_table] = {
-                    "source_table": source_table,
-                    "target_table": target_table,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode or "append"  # 默认值
-                }
-        
-        logger.info(f"获取到 {len(results)} 个DataResource任务,去重后剩余 {len(unique_tasks)} 个")
-        return list(unique_tasks.values())
-    except Exception as e:
-        logger.error(f"获取DataResource任务时出错: {str(e)}")
-        return []
-    finally:
-        cursor.close()
-        conn.close()
-
-def process_resource(target_table, script_name, script_exec_mode, **kwargs):
-    """处理单个资源表的函数"""
-    exec_date = kwargs.get('ds')
-    logger.info(f"开始处理资源表: {target_table}, 脚本: {script_name}")
-    
-    try:
-        # 调用执行函数
-        result = execute_with_monitoring(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
-        logger.info(f"资源表 {target_table} 处理完成")
-        return result
-    except Exception as e:
-        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
-        raise
-
-def generate_no_task_message(**kwargs):
-    """当没有任务时执行的函数"""
-    logger.info("没有资源需要处理")
-    return "没有资源需要处理"
-
-# 创建DAG
-with DAG(
-    "dag_dataops_resource_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 等待prepare DAG完成
-    wait_for_prepare = ExternalTaskSensor(
-        task_id="wait_for_prepare",
-        external_dag_id="dag_dataops_prepare_scheduler",
-        external_task_id="preparation_completed",
-        mode="poke",
-        timeout=3600,
-        poke_interval=30,
-        dag=dag
-    )
-    
-    # 处理完成标记
-    resource_processing_completed = EmptyOperator(
-        task_id="resource_processing_completed",
-        dag=dag
-    )
-    
-    # 在DAG运行时获取最近日期和资源任务
-    latest_date = get_latest_date_with_resources()
-    logger.info(f"使用最近的日期 {latest_date} 查询资源任务")
-    
-    # 获取资源任务
-    resource_tasks = get_dataresource_tasks(latest_date)
-    
-    if resource_tasks:
-        for i, task_info in enumerate(resource_tasks):
-            target_table = task_info["target_table"]
-            script_name = task_info["script_name"]
-            script_exec_mode = task_info["script_exec_mode"]
-            
-            if not script_name:
-                logger.warning(f"资源表 {target_table} 没有关联脚本,跳过")
-                continue
-            
-            # 为每个资源表创建单独的处理任务
-            task_id = f"process_resource_{target_table.replace('.', '_')}"
-            process_task = PythonOperator(
-                task_id=task_id,
-                python_callable=process_resource,
-                op_kwargs={
-                    "target_table": target_table,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode
-                },
-                provide_context=True,
-                dag=dag
-            )
-            
-            # 设置依赖 - 直接从wait_for_prepare连接到处理任务
-            wait_for_prepare >> process_task >> resource_processing_completed
-    else:
-        # 如果没有任务,添加一个空任务
-        empty_task = PythonOperator(
-            task_id="no_resources_to_process",
-            python_callable=generate_no_task_message,
-            dag=dag
-        )
-        wait_for_prepare >> empty_task >> resource_processing_completed

+ 0 - 287
dags/dag_dataops_summary_scheduler.py

@@ -1,287 +0,0 @@
-# dag_dataops_summary_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-import logging
-import json
-from decimal import Decimal
-from common import get_pg_conn, get_today_date
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-# 添加自定义JSON编码器解决Decimal序列化问题
-class DecimalEncoder(json.JSONEncoder):
-    def default(self, obj):
-        if isinstance(obj, Decimal):
-            return float(obj)
-        # 处理日期类型
-        elif isinstance(obj, datetime):
-            return obj.isoformat()
-        # 让父类处理其他类型
-        return super(DecimalEncoder, self).default(obj)
-
-def get_execution_stats(exec_date):
-    """获取当日执行统计信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询总任务数
-        cursor.execute("""
-            SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        total_tasks = cursor.fetchone()[0]
-        
-        # 查询每种类型的任务数
-        cursor.execute("""
-            SELECT target_table_label, COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s 
-            GROUP BY target_table_label
-        """, (exec_date,))
-        type_counts = {row[0]: row[1] for row in cursor.fetchall()}
-        
-        # 查询执行结果统计
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS TRUE
-        """, (exec_date,))
-        success_count = cursor.fetchone()[0]
-        
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        fail_count = cursor.fetchone()[0]
-        
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        pending_count = cursor.fetchone()[0]
-        
-        # 计算执行时间统计
-        cursor.execute("""
-            SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_duration IS NOT NULL
-        """, (exec_date,))
-        time_stats = cursor.fetchone()
-        avg_duration, min_duration, max_duration = time_stats if time_stats else (None, None, None)
-        
-        # 将Decimal转换为float
-        if avg_duration is not None:
-            avg_duration = float(avg_duration)
-        if min_duration is not None:
-            min_duration = float(min_duration)
-        if max_duration is not None:
-            max_duration = float(max_duration)
-        
-        # 查询失败任务详情
-        cursor.execute("""
-            SELECT target_table, script_name, target_table_label, exec_duration
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        failed_tasks = [
-            {
-                "target_table": row[0],
-                "script_name": row[1],
-                "target_table_label": row[2],
-                "exec_duration": float(row[3]) if row[3] is not None else None
-            }
-            for row in cursor.fetchall()
-        ]
-        
-        # 汇总统计信息
-        stats = {
-            "exec_date": exec_date,
-            "total_tasks": total_tasks,
-            "type_counts": type_counts,
-            "success_count": success_count,
-            "fail_count": fail_count,
-            "pending_count": pending_count,
-            "success_rate": (success_count / total_tasks * 100) if total_tasks > 0 else 0,
-            "avg_duration": avg_duration,
-            "min_duration": min_duration,
-            "max_duration": max_duration,
-            "failed_tasks": failed_tasks
-        }
-        
-        return stats
-    except Exception as e:
-        logger.error(f"获取执行统计信息时出错: {str(e)}")
-        return {}
-    finally:
-        cursor.close()
-        conn.close()
-
-def update_missing_results(exec_date):
-    """更新缺失的执行结果信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询所有缺失执行结果的任务
-        cursor.execute("""
-            SELECT target_table, script_name
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        missing_results = cursor.fetchall()
-        
-        update_count = 0
-        for row in missing_results:
-            target_table, script_name = row
-            
-            # 如果有开始时间但没有结束时间,假设执行失败
-            cursor.execute("""
-                SELECT exec_start_time
-                FROM airflow_dag_schedule
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            
-            start_time = cursor.fetchone()
-            
-            if start_time and start_time[0]:
-                # 有开始时间但无结果,标记为失败
-                now = datetime.now()
-                duration = (now - start_time[0]).total_seconds()
-                
-                cursor.execute("""
-                    UPDATE airflow_dag_schedule
-                    SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
-                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                """, (now, duration, exec_date, target_table, script_name))
-                
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
-                update_count += 1
-            else:
-                # 没有开始时间且无结果,假设未执行
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
-        
-        conn.commit()
-        logger.info(f"更新了 {update_count} 个缺失结果的任务")
-        return update_count
-    except Exception as e:
-        logger.error(f"更新缺失执行结果时出错: {str(e)}")
-        conn.rollback()
-        return 0
-    finally:
-        cursor.close()
-        conn.close()
-
-def generate_execution_report(exec_date, stats):
-    """生成执行报告"""
-    # 构建报告
-    report = []
-    report.append(f"========== 数据运维系统执行报告 ==========")
-    report.append(f"执行日期: {exec_date}")
-    report.append(f"总任务数: {stats['total_tasks']}")
-    
-    # 任务类型分布
-    report.append("\n--- 任务类型分布 ---")
-    for label, count in stats.get('type_counts', {}).items():
-        report.append(f"{label} 任务: {count} 个")
-    
-    # 执行结果统计
-    report.append("\n--- 执行结果统计 ---")
-    report.append(f"成功任务: {stats.get('success_count', 0)} 个")
-    report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
-    report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
-    report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
-    
-    # 执行时间统计
-    report.append("\n--- 执行时间统计 (秒) ---")
-    report.append(f"平均执行时间: {stats.get('avg_duration', 0):.2f}")
-    report.append(f"最短执行时间: {stats.get('min_duration', 0):.2f}")
-    report.append(f"最长执行时间: {stats.get('max_duration', 0):.2f}")
-    
-    # 失败任务详情
-    failed_tasks = stats.get('failed_tasks', [])
-    if failed_tasks:
-        report.append("\n--- 失败任务详情 ---")
-        for i, task in enumerate(failed_tasks, 1):
-            report.append(f"{i}. 表名: {task['target_table']}")
-            report.append(f"   脚本: {task['script_name']}")
-            report.append(f"   类型: {task['target_table_label']}")
-            report.append(f"   执行时间: {task.get('exec_duration', 'N/A'):.2f} 秒")
-    
-    report.append("\n========== 报告结束 ==========")
-    
-    # 将报告转换为字符串
-    report_str = "\n".join(report)
-    
-    # 记录到日志
-    logger.info("\n" + report_str)
-    
-    return report_str
-
-def summarize_execution(**kwargs):
-    """汇总执行情况的主函数"""
-    exec_date = kwargs.get('ds') or get_today_date()
-    logger.info(f"开始汇总执行日期 {exec_date} 的执行情况")
-    
-    # 1. 更新缺失的执行结果
-    update_count = update_missing_results(exec_date)
-    logger.info(f"更新了 {update_count} 个缺失的执行结果")
-    
-    # 2. 获取执行统计信息
-    stats = get_execution_stats(exec_date)
-    
-    # 3. 生成执行报告
-    report = generate_execution_report(exec_date, stats)
-    
-    # 将报告和统计信息传递给下一个任务
-    kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
-    kwargs['ti'].xcom_push(key='execution_report', value=report)
-    
-    return report
-
-# 创建DAG
-with DAG(
-    "dag_dataops_summary_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    schedule_interval="@daily", 
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 等待model DAG完成
-    wait_for_model = ExternalTaskSensor(
-        task_id="wait_for_model",
-        external_dag_id="dag_dataops_model_scheduler",
-        external_task_id="model_processing_completed",
-        mode="poke",
-        timeout=3600,
-        poke_interval=30,
-        dag=dag
-    )
-    
-    # 汇总执行情况
-    summarize_task = PythonOperator(
-        task_id="summarize_execution",
-        python_callable=summarize_execution,
-        provide_context=True,
-        dag=dag
-    )
-    
-    # 总结完成标记
-    summary_completed = EmptyOperator(
-        task_id="summary_completed",
-        dag=dag
-    )
-    
-    # 设置任务依赖
-    wait_for_model >> summarize_task >> summary_completed

+ 0 - 530
dags/dag_dataops_unified_data_scheduler.py

@@ -1,530 +0,0 @@
-# dag_dataops_unified_data_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from datetime import datetime, timedelta, date
-import logging
-import networkx as nx
-import json
-import os
-from common import (
-    get_pg_conn, 
-    get_neo4j_driver,
-    execute_with_monitoring,
-    get_today_date
-)
-from config import TASK_RETRY_CONFIG, SCRIPTS_BASE_PATH, AIRFLOW_BASE_PATH
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_latest_date():
-    """获取数据库中包含记录的最近日期"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT DISTINCT exec_date
-            FROM airflow_dag_schedule 
-            ORDER BY exec_date DESC
-            LIMIT 1
-        """)
-        result = cursor.fetchone()
-        if result:
-            latest_date = result[0]
-            logger.info(f"找到最近的包含记录的日期: {latest_date}")
-            return latest_date
-        else:
-            logger.warning("未找到包含记录的日期,将使用当前日期")
-            return get_today_date()
-    except Exception as e:
-        logger.error(f"查找最近日期时出错: {str(e)}")
-        return get_today_date()
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_all_tasks(exec_date):
-    """获取所有需要执行的任务(DataResource和DataModel)"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询数据表中记录总数
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s
-        """, (exec_date,))
-        total_count = cursor.fetchone()[0]
-        logger.info(f"执行日期 {exec_date} 在airflow_dag_schedule表中共有 {total_count} 条记录")
-        
-        # 查询所有资源表任务
-        cursor.execute("""
-            SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataResource' AND script_name IS NOT NULL
-        """, (exec_date,))
-        resource_results = cursor.fetchall()
-        logger.info(f"查询到 {len(resource_results)} 条DataResource记录")
-        
-        # 查询所有模型表任务
-        cursor.execute("""
-            SELECT source_table, target_table, target_table_label, script_name, script_exec_mode
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND target_table_label = 'DataModel' AND script_name IS NOT NULL
-        """, (exec_date,))
-        model_results = cursor.fetchall()
-        logger.info(f"查询到 {len(model_results)} 条DataModel记录")
-        
-        # 整理资源表信息
-        resource_tasks = []
-        for row in resource_results:
-            source_table, target_table, target_table_label, script_name, script_exec_mode = row
-            if script_name:  # 确保脚本名称不为空
-                resource_tasks.append({
-                    "source_table": source_table,
-                    "target_table": target_table,
-                    "target_table_label": target_table_label,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode or "append"
-                })
-        
-        # 整理模型表信息
-        model_tasks = []
-        for row in model_results:
-            source_table, target_table, target_table_label, script_name, script_exec_mode = row
-            if script_name:  # 确保脚本名称不为空
-                model_tasks.append({
-                    "source_table": source_table,
-                    "target_table": target_table,
-                    "target_table_label": target_table_label,
-                    "script_name": script_name,
-                    "script_exec_mode": script_exec_mode or "append"
-                })
-        
-        logger.info(f"获取到 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
-        return resource_tasks, model_tasks
-    except Exception as e:
-        logger.error(f"获取任务信息时出错: {str(e)}")
-        return [], []
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_table_dependencies(table_names):
-    """获取表之间的依赖关系"""
-    driver = get_neo4j_driver()
-    dependency_dict = {name: [] for name in table_names}
-    
-    try:
-        with driver.session() as session:
-            # 获取所有模型表之间的依赖关系
-            query = """
-                MATCH (source:DataModel)-[:DERIVED_FROM]->(target)
-                WHERE source.en_name IN $table_names
-                RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
-            """
-            result = session.run(query, table_names=table_names)
-            
-            for record in result:
-                source = record.get("source")
-                target = record.get("target")
-                target_labels = record.get("target_labels", [])
-                
-                if source and target:
-                    # 将目标表添加到源表的依赖列表中
-                    dependency_dict[source].append({
-                        "table_name": target,
-                        "table_type": next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
-                    })
-                    logger.debug(f"依赖关系: {source} 依赖于 {target}")
-    except Exception as e:
-        logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    return dependency_dict
-
-def json_serial(obj):
-    """将日期对象序列化为ISO格式字符串的JSON序列化器"""
-    if isinstance(obj, (datetime, date)):
-        return obj.isoformat()
-    raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
-
-def prepare_unified_execution_plan(**kwargs):
-    """准备统一执行计划的主函数"""
-    # 获取执行日期
-    exec_date = get_latest_date()
-    logger.info(f"使用执行日期: {exec_date}")
-    
-    # 获取所有任务
-    resource_tasks, model_tasks = get_all_tasks(exec_date)
-    
-    if not resource_tasks and not model_tasks:
-        logger.warning(f"执行日期 {exec_date} 没有找到任务")
-        return 0
-    
-    # 为所有模型表获取依赖关系
-    model_table_names = [task["target_table"] for task in model_tasks]
-    dependencies = get_table_dependencies(model_table_names)
-    
-    # 创建执行计划
-    execution_plan = {
-        "exec_date": exec_date,
-        "resource_tasks": resource_tasks,
-        "model_tasks": model_tasks,
-        "dependencies": dependencies
-    }
-    
-    # 记录资源任务和模型任务的名称,便于调试
-    resource_names = [task["target_table"] for task in resource_tasks]
-    model_names = [task["target_table"] for task in model_tasks]
-    logger.info(f"资源表任务: {resource_names}")
-    logger.info(f"模型表任务: {model_names}")
-    
-    # 已经不需要推送到XCom,因为我们直接从文件读取
-    # 这里仅用于验证执行计划与文件中的是否一致
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    ready_path = f"{plan_path}.ready"
-    
-    if os.path.exists(plan_path) and os.path.exists(ready_path):
-        try:
-            with open(plan_path, 'r') as f:
-                existing_plan = json.load(f)
-            
-            # 比较执行计划是否有变化
-            existing_resources = sorted([t.get("target_table") for t in existing_plan.get("resource_tasks", [])])
-            current_resources = sorted(resource_names)
-            
-            existing_models = sorted([t.get("target_table") for t in existing_plan.get("model_tasks", [])])
-            current_models = sorted(model_names)
-            
-            if existing_resources == current_resources and existing_models == current_models:
-                logger.info("执行计划无变化,继续使用现有任务结构")
-            else:
-                logger.warning("执行计划与现有文件不一致,但DAG结构已固定,需等待下次解析")
-                logger.warning(f"现有资源表: {existing_resources}")
-                logger.warning(f"当前资源表: {current_resources}")
-                logger.warning(f"现有模型表: {existing_models}")
-                logger.warning(f"当前模型表: {current_models}")
-        except Exception as e:
-            logger.error(f"比较执行计划时出错: {str(e)}")
-    
-    logger.info(f"准备了执行计划,包含 {len(resource_tasks)} 个资源表任务和 {len(model_tasks)} 个模型表任务")
-    return len(resource_tasks) + len(model_tasks)
-
-def process_resource(target_table, script_name, script_exec_mode, exec_date):
-    """处理单个资源表"""
-    logger.info(f"执行资源表 {target_table} 的脚本 {script_name}")
-    # 检查exec_date是否是JSON字符串
-    if isinstance(exec_date, str) and exec_date.startswith('{'):
-        try:
-            # 尝试解析JSON字符串
-            exec_date_data = json.loads(exec_date)
-            exec_date = exec_date_data.get("exec_date")
-            logger.info(f"从JSON中提取执行日期: {exec_date}")
-        except Exception as e:
-            logger.error(f"解析exec_date JSON时出错: {str(e)}")
-    
-    try:
-        # 正常调用执行监控函数
-        result = execute_with_monitoring(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
-        logger.info(f"资源表 {target_table} 处理完成,结果: {result}")
-        return result
-    except Exception as e:
-        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
-        # 确保即使出错也返回结果,不会阻塞DAG
-        return False
-
-def process_model(target_table, script_name, script_exec_mode, exec_date):
-    """处理单个模型表"""
-    logger.info(f"执行模型表 {target_table} 的脚本 {script_name}")
-    # 检查exec_date是否是JSON字符串
-    if isinstance(exec_date, str) and exec_date.startswith('{'):
-        try:
-            # 尝试解析JSON字符串
-            exec_date_data = json.loads(exec_date)
-            exec_date = exec_date_data.get("exec_date")
-            logger.info(f"从JSON中提取执行日期: {exec_date}")
-        except Exception as e:
-            logger.error(f"解析exec_date JSON时出错: {str(e)}")
-    
-    try:
-        result = execute_with_monitoring(
-            target_table=target_table,
-            script_name=script_name,
-            script_exec_mode=script_exec_mode,
-            exec_date=exec_date
-        )
-        logger.info(f"模型表 {target_table} 处理完成,结果: {result}")
-        return result
-    except Exception as e:
-        logger.error(f"处理模型表 {target_table} 时出错: {str(e)}")
-        # 确保即使出错也返回结果,不会阻塞DAG
-        return False
-
-# 修改预先加载数据以创建任务的逻辑
-try:
-    logger.info("预先加载执行计划数据用于构建DAG")
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    ready_path = f"{plan_path}.ready"
-    execution_plan = {"exec_date": get_today_date(), "resource_tasks": [], "model_tasks": [], "dependencies": {}}
-    
-    # 首先检查ready文件是否存在,确保JSON文件已完整生成
-    if os.path.exists(ready_path) and os.path.exists(plan_path):
-        try:
-            # 读取ready文件中的时间戳
-            with open(ready_path, 'r') as f:
-                ready_timestamp = f.read().strip()
-                logger.info(f"执行计划ready标记时间: {ready_timestamp}")
-            
-            # 读取执行计划文件
-            with open(plan_path, 'r') as f:
-                execution_plan_json = f.read()
-                execution_plan = json.loads(execution_plan_json)
-                logger.info(f"从文件加载执行计划: {plan_path}")
-        except Exception as e:
-            logger.warning(f"读取执行计划文件出错: {str(e)}")
-    else:
-        if not os.path.exists(ready_path):
-            logger.warning(f"执行计划ready标记文件不存在: {ready_path}")
-        if not os.path.exists(plan_path):
-            logger.warning(f"执行计划文件不存在: {plan_path}")
-        logger.warning("将创建基础DAG结构")
-    
-    # 提取信息
-    exec_date = execution_plan.get("exec_date", get_today_date())
-    resource_tasks = execution_plan.get("resource_tasks", [])
-    model_tasks = execution_plan.get("model_tasks", [])
-    dependencies = execution_plan.get("dependencies", {})
-    
-    logger.info(f"预加载执行计划: exec_date={exec_date}, resource_tasks数量={len(resource_tasks)}, model_tasks数量={len(model_tasks)}")
-except Exception as e:
-    logger.error(f"预加载执行计划数据时出错: {str(e)}")
-    exec_date = get_today_date()
-    resource_tasks = []
-    model_tasks = []
-    dependencies = {}
-
-# 定义处理DAG失败的回调函数
-def handle_dag_failure(context):
-    logger.error(f"DAG执行失败: {context.get('exception')}")
-
-# 创建DAG
-with DAG(
-    "dag_dataops_unified_data_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    # 修改调度间隔为每10分钟检查一次,以便及时响应执行计划变化
-    # schedule_interval="*/10 * * * *",
-    # 修改调度间隔为每日0点执行一次
-    schedule_interval="0 0 * * *",
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    },
-    on_failure_callback=handle_dag_failure,
-    # 添加DAG级别参数,确保任务运行时有正确的环境
-    params={
-        "scripts_path": SCRIPTS_BASE_PATH,
-        "airflow_base_path": AIRFLOW_BASE_PATH
-    }
-) as dag:
-    
-    # 准备执行计划
-    prepare_plan = PythonOperator(
-        task_id="prepare_execution_plan",
-        python_callable=prepare_unified_execution_plan,
-        provide_context=True,
-        dag=dag
-    )
-    
-    # 处理完成标记
-    processing_completed = EmptyOperator(
-        task_id="processing_completed",
-        trigger_rule="none_failed_min_one_success",  # 只要有一个任务成功且没有失败的任务就标记为完成
-        dag=dag
-    )
-    
-    # 任务字典,用于设置依赖关系
-    task_dict = {}
-    
-    # 添加一个空任务作为下游任务的起始点,确保即使没有资源表和模型表,DAG也能正常执行
-    start_processing = EmptyOperator(
-        task_id="start_processing",
-        dag=dag
-    )
-    
-    # 设置基本依赖
-    prepare_plan >> start_processing
-    
-    # 1. 预先创建资源表任务
-    for task_info in resource_tasks:
-        table_name = task_info["target_table"]
-        script_name = task_info["script_name"]
-        exec_mode = task_info.get("script_exec_mode", "append")
-        
-        # 创建安全的任务ID
-        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-        task_id = f"resource_{safe_table_name}"
-        
-        resource_task = PythonOperator(
-            task_id=task_id,
-            python_callable=process_resource,
-            op_kwargs={
-                "target_table": table_name,
-                "script_name": script_name,
-                "script_exec_mode": exec_mode,
-                "exec_date": exec_date  # 直接使用解析出的exec_date,不使用XCom
-            },
-            retries=TASK_RETRY_CONFIG["retries"],
-            retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
-            dag=dag
-        )
-        
-        # 将任务添加到字典
-        task_dict[table_name] = resource_task
-        
-        # 设置与start_processing的依赖
-        start_processing >> resource_task
-        logger.info(f"设置基本依赖: start_processing >> {task_id}")
-    
-    # 创建有向图,用于检测模型表之间的依赖关系
-    G = nx.DiGraph()
-    
-    # 将所有模型表添加为节点
-    for task_info in model_tasks:
-        table_name = task_info["target_table"]
-        G.add_node(table_name)
-    
-    # 添加模型表之间的依赖边
-    for source, deps in dependencies.items():
-        for dep in deps:
-            if dep.get("table_type") == "DataModel" and dep.get("table_name") in G.nodes():
-                G.add_edge(dep.get("table_name"), source)  # 依赖方向:依赖项 -> 目标
-    
-    # 检测循环依赖并处理
-    cycles = list(nx.simple_cycles(G))
-    if cycles:
-        logger.warning(f"检测到循环依赖: {cycles}")
-        for cycle in cycles:
-            G.remove_edge(cycle[-1], cycle[0])
-            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
-    
-    # 生成拓扑排序,确定执行顺序
-    execution_order = []
-    try:
-        execution_order = list(nx.topological_sort(G))
-        logger.info(f"预加载计算的执行顺序: {execution_order}")
-    except Exception as e:
-        logger.error(f"生成拓扑排序失败: {str(e)}, 使用原始顺序")
-        execution_order = [task_info["target_table"] for task_info in model_tasks]
-    
-    # 2. 预先创建模型表任务
-    for table_name in execution_order:
-        task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
-        if not task_info:
-            continue
-            
-        script_name = task_info["script_name"]
-        exec_mode = task_info.get("script_exec_mode", "append")
-        
-        # 创建安全的任务ID
-        safe_table_name = table_name.replace(".", "_").replace("-", "_")
-        task_id = f"model_{safe_table_name}"
-        
-        model_task = PythonOperator(
-            task_id=task_id,
-            python_callable=process_model,
-            op_kwargs={
-                "target_table": table_name,
-                "script_name": script_name,
-                "script_exec_mode": exec_mode,
-                "exec_date": exec_date  # 直接使用解析出的exec_date,不使用XCom
-            },
-            retries=TASK_RETRY_CONFIG["retries"],
-            retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
-            dag=dag
-        )
-        
-        # 将任务添加到字典
-        task_dict[table_name] = model_task
-        
-        # 设置依赖关系
-        deps = dependencies.get(table_name, [])
-        has_dependency = False
-        
-        # 处理模型表之间的依赖
-        for dep in deps:
-            dep_table = dep.get("table_name")
-            dep_type = dep.get("table_type")
-            
-            if dep_table in task_dict:
-                task_dict[dep_table] >> model_task
-                has_dependency = True
-                logger.info(f"预先设置依赖: {dep_table} >> {table_name}")
-        
-        # 如果没有依赖,则依赖于start_processing和资源表任务
-        if not has_dependency:
-            # 从start_processing任务直接连接
-            start_processing >> model_task
-            logger.info(f"设置基本依赖: start_processing >> {task_id}")
-            
-            # 同时从所有资源表任务连接 - 限制每个模型表最多依赖5个资源表,避免过度复杂的依赖关系
-            resource_count = 0
-            for resource_table in resource_tasks:
-                if resource_count >= 5:
-                    break
-                
-                resource_name = resource_table["target_table"]
-                if resource_name in task_dict:
-                    task_dict[resource_name] >> model_task
-                    logger.info(f"预先设置资源依赖: {resource_name} >> {table_name}")
-                    resource_count += 1
-    
-    # 找出所有终端任务(没有下游依赖的任务)
-    terminal_tasks = []
-    
-    # 检查所有模型表任务
-    for table_name in execution_order:
-        # 检查是否有下游任务
-        has_downstream = False
-        for source, deps in dependencies.items():
-            if source == table_name:  # 跳过自身
-                continue
-            for dep in deps:
-                if dep.get("table_name") == table_name:
-                    has_downstream = True
-                    break
-            if has_downstream:
-                break
-        
-        # 如果没有下游任务,添加到终端任务列表
-        if not has_downstream and table_name in task_dict:
-            terminal_tasks.append(table_name)
-    
-    # 如果没有模型表任务,将所有资源表任务视为终端任务
-    if not model_tasks and resource_tasks:
-        terminal_tasks = [task["target_table"] for task in resource_tasks]
-    
-    # 如果既没有模型表任务也没有资源表任务,直接连接start_processing到完成标记
-    if not terminal_tasks:
-        start_processing >> processing_completed
-        logger.warning("未找到任何任务,直接连接start_processing到完成标记")
-    else:
-        # 将所有终端任务连接到完成标记
-        for table_name in terminal_tasks:
-            if table_name in task_dict:
-                task_dict[table_name] >> processing_completed
-                logger.info(f"设置终端任务: {table_name} >> processing_completed")
-
-logger.info(f"DAG dag_dataops_unified_data_scheduler 定义完成,预创建了 {len(task_dict)} 个任务")

+ 0 - 588
dags/dag_dataops_unified_prepare_scheduler.py

@@ -1,588 +0,0 @@
-# dag_dataops_unified_prepare_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator, ShortCircuitOperator
-from airflow.operators.empty import EmptyOperator
-from datetime import datetime, timedelta
-import logging
-import networkx as nx
-import json
-import os
-from common import (
-    get_pg_conn, 
-    get_neo4j_driver,
-    get_today_date
-)
-from config import PG_CONFIG, NEO4J_CONFIG
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-def get_enabled_tables():
-    """获取所有启用的表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT owner_id, table_name 
-            FROM schedule_status 
-            WHERE schedule_is_enabled = TRUE
-        """)
-        result = cursor.fetchall()
-        return [row[1] for row in result]  # 只返回表名
-    except Exception as e:
-        logger.error(f"获取启用表失败: {str(e)}")
-        return []
-    finally:
-        cursor.close()
-        conn.close()
-
-def check_table_directly_subscribed(table_name):
-    """检查表是否在schedule_status表中直接订阅"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        cursor.execute("""
-            SELECT schedule_is_enabled
-            FROM schedule_status 
-            WHERE table_name = %s
-        """, (table_name,))
-        result = cursor.fetchone()
-        return result and result[0] is True
-    except Exception as e:
-        logger.error(f"检查表订阅状态失败: {str(e)}")
-        return False
-    finally:
-        cursor.close()
-        conn.close()
-
-def get_table_info_from_neo4j(table_name):
-    """从Neo4j获取表的详细信息"""
-    driver = get_neo4j_driver()
-     # 检查表是否直接订阅
-    is_directly_schedule = check_table_directly_subscribed(table_name)
-
-    table_info = {
-        'target_table': table_name,
-        'is_directly_schedule': is_directly_schedule,  # 初始值设为True,从schedule_status表获取
-    }
-    
-    try:
-        with driver.session() as session:
-            # 查询表标签和状态
-            query_table = """
-                MATCH (t {en_name: $table_name})
-                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
-            """
-            result = session.run(query_table, table_name=table_name)
-            record = result.single()
-            
-            if record:
-                labels = record.get("labels", [])
-                table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
-                table_info['target_table_status'] = record.get("status", True)  # 默认为True
-                table_info['default_update_frequency'] = record.get("frequency")
-                
-                # 根据标签类型查询关系和脚本信息
-                if "DataResource" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
-                elif "DataModel" in labels:
-                    query_rel = """
-                        MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
-                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
-                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
-                    """
-                else:
-                    logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
-                    return table_info
-                
-                result = session.run(query_rel, table_name=table_name)
-                record = result.single()
-                
-                if record:
-                    table_info['source_table'] = record.get("source_table")     
-
-                    # 检查script_name是否为空
-                    script_name = record.get("script_name")
-                    if not script_name:
-                        logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
-                    table_info['script_name'] = script_name
-                    
-                    # 设置默认值,确保即使属性为空也有默认值
-                    table_info['script_type'] = record.get("script_type", "python")  # 默认为python
-                    table_info['script_exec_mode'] = record.get("script_exec_mode", "append")  # 默认为append
-                else:
-                    logger.warning(f"未找到表 {table_name} 的关系信息")
-            else:
-                logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
-    except Exception as e:
-        logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    return table_info
-
-def process_dependencies(tables_info):
-    """处理表间依赖关系,添加被动调度的表"""
-    # 存储所有表信息的字典
-    all_tables = {t['target_table']: t for t in tables_info}
-    driver = get_neo4j_driver()
-    
-    try:
-        with driver.session() as session:
-            for table_name, table_info in list(all_tables.items()):
-                if table_info.get('target_table_label') == 'DataModel':
-                    # 查询其依赖表
-                    query = """
-                        MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
-                        RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
-                               dep.status AS dep_status, dep.frequency AS dep_frequency
-                    """
-                    result = session.run(query, table_name=table_name)
-                    
-                    for record in result:
-                        dep_name = record.get("dep_name")
-                        dep_labels = record.get("dep_labels", [])
-                        dep_status = record.get("dep_status", True)
-                        dep_frequency = record.get("dep_frequency")
-                        
-                        # 处理未被直接调度的依赖表
-                        if dep_name and dep_name not in all_tables:
-                            logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
-                            
-                            # 获取依赖表详细信息
-                            dep_info = get_table_info_from_neo4j(dep_name)
-                            dep_info['is_directly_schedule'] = False
-                            
-                            # 处理调度频率继承
-                            if not dep_info.get('default_update_frequency'):
-                                dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
-                            
-                            all_tables[dep_name] = dep_info
-    except Exception as e:
-        logger.error(f"处理依赖关系时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    return list(all_tables.values())
-
-def filter_invalid_tables(tables_info):
-    """过滤无效表及其依赖,使用NetworkX构建依赖图"""
-    # 构建表名到索引的映射
-    table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
-    
-    # 找出无效表
-    invalid_tables = set()
-    for table in tables_info:
-        if table.get('target_table_status') is False:
-            invalid_tables.add(table['target_table'])
-            logger.info(f"表 {table['target_table']} 的状态为无效")
-    
-    # 构建依赖图
-    G = nx.DiGraph()
-    
-    # 添加所有节点
-    for table in tables_info:
-        G.add_node(table['target_table'])
-    
-    # 查询并添加依赖边
-    driver = get_neo4j_driver()
-    try:
-        with driver.session() as session:
-            for table in tables_info:
-                if table.get('target_table_label') == 'DataModel':
-                    query = """
-                        MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
-                        RETURN target.en_name AS target_name
-                    """
-                    result = session.run(query, table_name=table['target_table'])
-                    
-                    for record in result:
-                        target_name = record.get("target_name")
-                        if target_name and target_name in table_dict:
-                            # 添加从目标到源的边,表示目标依赖于源
-                            G.add_edge(table['target_table'], target_name)
-                            logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
-    except Exception as e:
-        logger.error(f"构建依赖图时出错: {str(e)}")
-    finally:
-        driver.close()
-    
-    # 找出依赖于无效表的所有表
-    downstream_invalid = set()
-    for invalid_table in invalid_tables:
-        # 获取可从无效表到达的所有节点
-        try:
-            descendants = nx.descendants(G, invalid_table)
-            downstream_invalid.update(descendants)
-            logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
-        except Exception as e:
-            logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
-    
-    # 合并所有无效表
-    all_invalid = invalid_tables.union(downstream_invalid)
-    logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
-    
-    # 过滤出有效表
-    valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
-    logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
-    
-    return valid_tables
-
-def write_to_airflow_dag_schedule(exec_date, tables_info):
-    """将表信息写入airflow_dag_schedule表"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    
-    try:
-        # 清理当日数据,避免重复
-        cursor.execute("""
-            DELETE FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        logger.info(f"已清理执行日期 {exec_date} 的现有数据")
-        
-        # 批量插入新数据
-        inserted_count = 0
-        for table in tables_info:
-            cursor.execute("""
-                INSERT INTO airflow_dag_schedule (
-                    exec_date, source_table, target_table, target_table_label,
-                    target_table_status, is_directly_schedule, default_update_frequency,
-                    script_name, script_type, script_exec_mode
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
-            """, (
-                exec_date,
-                table.get('source_table'),
-                table['target_table'],
-                table.get('target_table_label'),
-                table.get('target_table_status', True),
-                table.get('is_directly_schedule', False),
-                table.get('default_update_frequency'),
-                table.get('script_name'),
-                table.get('script_type', 'python'),
-                table.get('script_exec_mode', 'append')
-            ))
-            inserted_count += 1
-        
-        conn.commit()
-        logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
-        return inserted_count
-    except Exception as e:
-        logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
-# dag_dataops_unified_prepare_scheduler.py (续)
-        conn.rollback()
-        # 不要返回0,而是重新抛出异常,确保错误被正确传播
-        raise
-    finally:
-        cursor.close()
-        conn.close()
-
-def prepare_unified_dag_schedule(**kwargs):
-    """准备统一DAG调度任务的主函数"""
-    import hashlib
-    
-    exec_date = kwargs.get('ds') or get_today_date()
-    logger.info(f"开始准备执行日期 {exec_date} 的统一调度任务")
-    
-    # 检查执行计划文件和ready文件是否存在
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    ready_path = f"{plan_path}.ready"
-    files_exist = os.path.exists(plan_path) and os.path.exists(ready_path)
-    
-    if not files_exist:
-        logger.info("执行计划文件或ready标记文件不存在,将重新生成执行计划")
-    
-    # 1. 计算当前订阅表状态的哈希值,用于检测变化
-    def get_subscription_state_hash():
-        """获取订阅表状态的哈希值"""
-        conn = get_pg_conn()
-        cursor = conn.cursor()
-        try:
-            cursor.execute("""
-                SELECT table_name, schedule_is_enabled
-                FROM schedule_status
-                ORDER BY table_name
-            """)
-            rows = cursor.fetchall()
-            # 将所有行拼接成一个字符串,然后计算哈希值
-            data_str = '|'.join(f"{row[0]}:{row[1]}" for row in rows)
-            return hashlib.md5(data_str.encode()).hexdigest()
-        except Exception as e:
-            logger.error(f"计算订阅表状态哈希值时出错: {str(e)}")
-            return None
-        finally:
-            cursor.close()
-            conn.close()
-    
-    # 获取当前订阅表状态哈希值
-    current_hash = get_subscription_state_hash()
-    if not current_hash:
-        logger.error("无法获取订阅表状态,将中止处理")
-        return 0
-    
-    # 2. 读取上次记录的哈希值
-    hash_file = os.path.join(os.path.dirname(__file__), '.subscription_state')
-    last_hash = None
-    if os.path.exists(hash_file):
-        try:
-            with open(hash_file, 'r') as f:
-                last_hash = f.read().strip()
-        except Exception as e:
-            logger.warning(f"读取上次订阅状态哈希值失败: {str(e)}")
-    
-    # 3. 如果哈希值相同且文件存在,说明订阅表未变化且执行计划存在,可以提前退出
-    if last_hash == current_hash and files_exist:
-        logger.info("订阅表状态未变化且执行计划文件存在,无需更新执行计划")
-        return 0
-    
-    # 记录重新生成原因
-    if not files_exist:
-        logger.info("执行计划文件或ready标记文件不存在,需要重新生成")
-    else:
-        logger.info(f"检测到订阅表状态变化。旧哈希值: {last_hash}, 新哈希值: {current_hash}")
-    
-    # 4. 获取启用的表
-    enabled_tables = get_enabled_tables()
-    logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
-    
-    if not enabled_tables:
-        logger.warning("没有找到启用的表,准备工作结束")
-        return 0
-    
-    # 5. 获取表的详细信息
-    tables_info = []
-    for table_name in enabled_tables:
-        table_info = get_table_info_from_neo4j(table_name)
-        if table_info:
-            tables_info.append(table_info)
-    
-    logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
-    
-    # 6. 处理依赖关系,添加被动调度的表
-    enriched_tables = process_dependencies(tables_info)
-    logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
-    
-    # 7. 过滤无效表及其依赖
-    valid_tables = filter_invalid_tables(enriched_tables)
-    logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
-    
-    # 8. 写入airflow_dag_schedule表
-    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
-    
-    # 9. 检查插入操作是否成功,如果失败则抛出异常
-    if inserted_count == 0 and valid_tables:
-        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
-        logger.error(error_msg)
-        raise Exception(error_msg)
-    
-    # 10. 保存最新执行计划,供DAG读取使用
-    try:
-        # 构建执行计划
-        resource_tasks = []
-        model_tasks = []
-        
-        for table in valid_tables:
-            if table.get('target_table_label') == 'DataResource':
-                resource_tasks.append({
-                    "source_table": table.get('source_table'),
-                    "target_table": table['target_table'],
-                    "target_table_label": "DataResource",
-                    "script_name": table.get('script_name'),
-                    "script_exec_mode": table.get('script_exec_mode', 'append')
-                })
-            elif table.get('target_table_label') == 'DataModel':
-                model_tasks.append({
-                    "source_table": table.get('source_table'),
-                    "target_table": table['target_table'],
-                    "target_table_label": "DataModel",
-                    "script_name": table.get('script_name'),
-                    "script_exec_mode": table.get('script_exec_mode', 'append')
-                })
-        
-        # 获取依赖关系
-        model_table_names = [t['target_table'] for t in model_tasks]
-        dependencies = {}
-        
-        driver = get_neo4j_driver()
-        try:
-            with driver.session() as session:
-                for table_name in model_table_names:
-                    query = """
-                        MATCH (source:DataModel {en_name: $table_name})-[:DERIVED_FROM]->(target)
-                        RETURN source.en_name AS source, target.en_name AS target, labels(target) AS target_labels
-                    """
-                    result = session.run(query, table_name=table_name)
-                    
-                    deps = []
-                    for record in result:
-                        target = record.get("target")
-                        target_labels = record.get("target_labels", [])
-                        
-                        if target:
-                            table_type = next((label for label in target_labels if label in ["DataModel", "DataResource"]), None)
-                            deps.append({
-                                "table_name": target,
-                                "table_type": table_type
-                            })
-                    
-                    dependencies[table_name] = deps
-        finally:
-            driver.close()
-        
-        # 创建执行计划
-        execution_plan = {
-            "exec_date": exec_date,
-            "resource_tasks": resource_tasks,
-            "model_tasks": model_tasks,
-            "dependencies": dependencies
-        }
-        
-        # 使用临时文件先写入内容,再原子替换,确保写入过程不会被中断
-        temp_plan_path = f"{plan_path}.temp"
-        
-        try:
-            # 10.1 写入临时文件
-            with open(temp_plan_path, 'w') as f:
-                json.dump(execution_plan, f, indent=2)
-            logger.info(f"已保存执行计划到临时文件: {temp_plan_path}")
-            
-            # 10.2 原子替换正式文件
-            os.replace(temp_plan_path, plan_path)
-            logger.info(f"已替换执行计划文件: {plan_path}")
-            
-            # 10.3 创建ready文件,标记执行计划就绪
-            with open(ready_path, 'w') as f:
-                f.write(datetime.now().isoformat())
-            logger.info(f"已创建ready标记文件: {ready_path}")
-            
-            # 10.4 更新订阅表状态哈希值
-            with open(hash_file, 'w') as f:
-                f.write(current_hash)
-            logger.info(f"已更新订阅表状态哈希值: {current_hash}")
-            
-            # 10.5 触发data_scheduler DAG重新解析
-            data_scheduler_path = os.path.join(os.path.dirname(__file__), 'dag_dataops_unified_data_scheduler.py')
-            if os.path.exists(data_scheduler_path):
-                # 更新文件修改时间,触发Airflow重新解析
-                os.utime(data_scheduler_path, None)
-                logger.info(f"已触发数据调度器DAG重新解析: {data_scheduler_path}")
-            else:
-                logger.warning(f"数据调度器DAG文件不存在: {data_scheduler_path}")
-        except Exception as e:
-            logger.error(f"保存执行计划文件或触发DAG重新解析时出错: {str(e)}")
-            # 出错时清理临时文件
-            if os.path.exists(temp_plan_path):
-                try:
-                    os.remove(temp_plan_path)
-                    logger.info(f"已清理临时文件: {temp_plan_path}")
-                except Exception as rm_e:
-                    logger.error(f"清理临时文件时出错: {str(rm_e)}")
-            raise  # 重新抛出异常,确保任务失败
-            
-    except Exception as e:
-        error_msg = f"保存或验证执行计划文件时出错: {str(e)}"
-        logger.error(error_msg)
-        # 强制抛出异常,确保任务失败,阻止下游DAG执行
-        raise Exception(error_msg)
-    
-    return inserted_count
-
-def check_execution_plan_file(**kwargs):
-    """
-    检查执行计划文件是否存在且有效
-    返回False将阻止所有下游任务执行
-    """
-    logger.info("检查执行计划文件是否存在且有效")
-    plan_path = os.path.join(os.path.dirname(__file__), 'last_execution_plan.json')
-    
-    # 检查文件是否存在
-    if not os.path.exists(plan_path):
-        logger.error(f"执行计划文件不存在: {plan_path}")
-        return False
-    
-    # 检查文件是否可读且内容有效
-    try:
-        with open(plan_path, 'r') as f:
-            data = json.load(f)
-            
-            # 检查必要字段
-            if "exec_date" not in data:
-                logger.error("执行计划缺少exec_date字段")
-                return False
-                
-            if not isinstance(data.get("resource_tasks", []), list):
-                logger.error("执行计划的resource_tasks字段无效")
-                return False
-                
-            if not isinstance(data.get("model_tasks", []), list):
-                logger.error("执行计划的model_tasks字段无效")
-                return False
-            
-            # 检查是否有任务数据
-            resource_tasks = data.get("resource_tasks", [])
-            model_tasks = data.get("model_tasks", [])
-            if not resource_tasks and not model_tasks:
-                logger.warning("执行计划不包含任何任务,但文件格式有效")
-                # 注意:即使没有任务,我们仍然允许流程继续
-            
-            logger.info(f"执行计划文件验证成功: 包含 {len(resource_tasks)} 个资源任务和 {len(model_tasks)} 个模型任务")
-            return True
-            
-    except json.JSONDecodeError as je:
-        logger.error(f"执行计划文件不是有效的JSON: {str(je)}")
-        return False
-    except Exception as e:
-        logger.error(f"检查执行计划文件时出错: {str(e)}")
-        return False
-
-# 创建DAG
-with DAG(
-    "dag_dataops_unified_prepare_scheduler",
-    start_date=datetime(2024, 1, 1),
-    # 每10分钟运行一次,而不是每天
-    # schedule_interval="*/5 * * * *",  
-    # 修改调度间隔为每小时执行一次
-    schedule_interval="0 * * * *",
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 任务开始标记
-    start_preparation = EmptyOperator(
-        task_id="start_preparation",
-        dag=dag
-    )
-    
-    # 准备调度任务
-    prepare_task = PythonOperator(
-        task_id="prepare_unified_dag_schedule",
-        python_callable=prepare_unified_dag_schedule,
-        provide_context=True,
-        dag=dag
-    )
-    
-    # 检查执行计划文件
-    check_plan_file = ShortCircuitOperator(
-        task_id="check_execution_plan_file",
-        python_callable=check_execution_plan_file,
-        dag=dag
-    )
-    
-    # 准备完成标记
-    preparation_completed = EmptyOperator(
-        task_id="preparation_completed",
-        dag=dag
-    )
-    
-    # 设置任务依赖
-    start_preparation >> prepare_task >> check_plan_file >> preparation_completed

+ 0 - 387
dags/dag_dataops_unified_summary_scheduler.py

@@ -1,387 +0,0 @@
-# dag_dataops_unified_summary_scheduler.py
-from airflow import DAG
-from airflow.operators.python import PythonOperator
-from airflow.operators.empty import EmptyOperator
-from airflow.sensors.external_task import ExternalTaskSensor
-from datetime import datetime, timedelta
-import logging
-import json
-from decimal import Decimal
-from common import get_pg_conn, get_today_date
-from airflow.models import Variable
-
-# 创建日志记录器
-logger = logging.getLogger(__name__)
-
-# 添加自定义JSON编码器解决Decimal序列化问题
-class DecimalEncoder(json.JSONEncoder):
-    def default(self, obj):
-        if isinstance(obj, Decimal):
-            return float(obj)
-        # 处理日期类型
-        elif isinstance(obj, datetime):
-            return obj.isoformat()
-        # 让父类处理其他类型
-        return super(DecimalEncoder, self).default(obj)
-
-def get_execution_stats(exec_date):
-    """获取当日执行统计信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询总任务数
-        cursor.execute("""
-            SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
-        """, (exec_date,))
-        result = cursor.fetchone()
-        total_tasks = result[0] if result else 0
-        
-        # 查询每种类型的任务数
-        cursor.execute("""
-            SELECT target_table_label, COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s 
-            GROUP BY target_table_label
-        """, (exec_date,))
-        type_counts = {row[0]: row[1] for row in cursor.fetchall()}
-        
-        # 查询执行结果统计
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS TRUE
-        """, (exec_date,))
-        result = cursor.fetchone()
-        success_count = result[0] if result else 0
-        
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        result = cursor.fetchone()
-        fail_count = result[0] if result else 0
-        
-        cursor.execute("""
-            SELECT COUNT(*) 
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        result = cursor.fetchone()
-        pending_count = result[0] if result else 0
-        
-        # 计算执行时间统计
-        cursor.execute("""
-            SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_duration IS NOT NULL
-        """, (exec_date,))
-        time_stats = cursor.fetchone()
-        
-        # 确保时间统计不为None
-        if time_stats and time_stats[0] is not None:
-            avg_duration = float(time_stats[0])
-            min_duration = float(time_stats[1]) if time_stats[1] is not None else None
-            max_duration = float(time_stats[2]) if time_stats[2] is not None else None
-        else:
-            avg_duration = None
-            min_duration = None
-            max_duration = None
-        
-        # 查询失败任务详情
-        cursor.execute("""
-            SELECT target_table, script_name, target_table_label, exec_duration
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS FALSE
-        """, (exec_date,))
-        failed_tasks = []
-        for row in cursor.fetchall():
-            task_dict = {
-                "target_table": row[0],
-                "script_name": row[1],
-                "target_table_label": row[2],
-            }
-            if row[3] is not None:
-                task_dict["exec_duration"] = float(row[3])
-            else:
-                task_dict["exec_duration"] = None
-            failed_tasks.append(task_dict)
-        
-        # 计算成功率,避免除零错误
-        success_rate = 0
-        if total_tasks > 0:
-            success_rate = (success_count / total_tasks) * 100
-        
-        # 汇总统计信息
-        stats = {
-            "exec_date": exec_date,
-            "total_tasks": total_tasks,
-            "type_counts": type_counts,
-            "success_count": success_count,
-            "fail_count": fail_count,
-            "pending_count": pending_count,
-            "success_rate": success_rate,
-            "avg_duration": avg_duration,
-            "min_duration": min_duration,
-            "max_duration": max_duration,
-            "failed_tasks": failed_tasks
-        }
-        
-        return stats
-    except Exception as e:
-        logger.error(f"获取执行统计信息时出错: {str(e)}")
-        return {}
-    finally:
-        cursor.close()
-        conn.close()
-
-def update_missing_results(exec_date):
-    """更新缺失的执行结果信息"""
-    conn = get_pg_conn()
-    cursor = conn.cursor()
-    try:
-        # 查询所有缺失执行结果的任务
-        cursor.execute("""
-            SELECT target_table, script_name
-            FROM airflow_dag_schedule 
-            WHERE exec_date = %s AND exec_result IS NULL
-        """, (exec_date,))
-        missing_results = cursor.fetchall()
-        
-        update_count = 0
-        for row in missing_results:
-            target_table, script_name = row
-            
-            # 如果有开始时间但没有结束时间,假设执行失败
-            cursor.execute("""
-                SELECT exec_start_time
-                FROM airflow_dag_schedule
-                WHERE exec_date = %s AND target_table = %s AND script_name = %s
-            """, (exec_date, target_table, script_name))
-            
-            start_time = cursor.fetchone()
-            
-            if start_time and start_time[0]:
-                # 有开始时间但无结果,标记为失败
-                now = datetime.now()
-                duration = (now - start_time[0]).total_seconds()
-                
-                cursor.execute("""
-                    UPDATE airflow_dag_schedule
-                    SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
-                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
-                """, (now, duration, exec_date, target_table, script_name))
-                
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
-                update_count += 1
-            else:
-                # 没有开始时间且无结果,假设未执行
-                logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
-        
-        conn.commit()
-        logger.info(f"更新了 {update_count} 个缺失结果的任务")
-        return update_count
-    except Exception as e:
-        logger.error(f"更新缺失执行结果时出错: {str(e)}")
-        conn.rollback()
-        return 0
-    finally:
-        cursor.close()
-        conn.close()
-
-def generate_unified_execution_report(exec_date, stats):
-    """生成统一执行报告"""
-    # 构建报告
-    report = []
-    report.append(f"========== 统一数据运维系统执行报告 ==========")
-    report.append(f"执行日期: {exec_date}")
-    report.append(f"总任务数: {stats['total_tasks']}")
-    
-    # 任务类型分布
-    report.append("\n--- 任务类型分布 ---")
-    for label, count in stats.get('type_counts', {}).items():
-        report.append(f"{label} 任务: {count} 个")
-    
-    # 执行结果统计
-    report.append("\n--- 执行结果统计 ---")
-    report.append(f"成功任务: {stats.get('success_count', 0)} 个")
-    report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
-    report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
-    report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
-    
-    # 执行时间统计
-    report.append("\n--- 执行时间统计 (秒) ---")
-    avg_duration = stats.get('avg_duration')
-    min_duration = stats.get('min_duration')
-    max_duration = stats.get('max_duration')
-    
-    report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
-    report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
-    report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
-    
-    # 失败任务详情
-    failed_tasks = stats.get('failed_tasks', [])
-    if failed_tasks:
-        report.append("\n--- 失败任务详情 ---")
-        for i, task in enumerate(failed_tasks, 1):
-            report.append(f"{i}. 表名: {task['target_table']}")
-            report.append(f"   脚本: {task['script_name']}")
-            report.append(f"   类型: {task['target_table_label']}")
-            exec_duration = task.get('exec_duration')
-            if exec_duration is not None:
-                report.append(f"   执行时间: {exec_duration:.2f} 秒")
-            else:
-                report.append("   执行时间: N/A")
-    
-    report.append("\n========== 报告结束 ==========")
-    
-    # 将报告转换为字符串
-    report_str = "\n".join(report)
-    
-    # 记录到日志
-    logger.info("\n" + report_str)
-    
-    return report_str
-
-def summarize_unified_execution(**kwargs):
-    """汇总统一执行情况的主函数"""
-    try:
-        exec_date = kwargs.get('ds') or get_today_date()
-        logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
-        
-        # 1. 更新缺失的执行结果
-        try:
-            update_count = update_missing_results(exec_date)
-            logger.info(f"更新了 {update_count} 个缺失的执行结果")
-        except Exception as e:
-            logger.error(f"更新缺失执行结果时出错: {str(e)}")
-            update_count = 0
-        
-        # 2. 获取执行统计信息
-        try:
-            stats = get_execution_stats(exec_date)
-            if not stats:
-                logger.warning("未能获取执行统计信息,将使用默认值")
-                stats = {
-                    "exec_date": exec_date,
-                    "total_tasks": 0,
-                    "type_counts": {},
-                    "success_count": 0,
-                    "fail_count": 0,
-                    "pending_count": 0,
-                    "success_rate": 0,
-                    "avg_duration": None,
-                    "min_duration": None,
-                    "max_duration": None,
-                    "failed_tasks": []
-                }
-        except Exception as e:
-            logger.error(f"获取执行统计信息时出错: {str(e)}")
-            stats = {
-                "exec_date": exec_date,
-                "total_tasks": 0,
-                "type_counts": {},
-                "success_count": 0,
-                "fail_count": 0,
-                "pending_count": 0,
-                "success_rate": 0,
-                "avg_duration": None,
-                "min_duration": None,
-                "max_duration": None,
-                "failed_tasks": []
-            }
-        
-        # 3. 生成执行报告
-        try:
-            report = generate_unified_execution_report(exec_date, stats)
-        except Exception as e:
-            logger.error(f"生成执行报告时出错: {str(e)}")
-            report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
-        
-        # 将报告和统计信息传递给下一个任务
-        try:
-            kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
-            kwargs['ti'].xcom_push(key='execution_report', value=report)
-        except Exception as e:
-            logger.error(f"保存报告到XCom时出错: {str(e)}")
-        
-        return report
-    except Exception as e:
-        logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
-        # 返回一个简单的错误报告,确保任务不会失败
-        return f"执行汇总时出现错误: {str(e)}"
-
-# 创建DAG
-with DAG(
-    "dag_dataops_unified_summary_scheduler", 
-    start_date=datetime(2024, 1, 1), 
-    # 修改为每15分钟执行一次,与data_scheduler保持一致
-    # schedule_interval="*/15 * * * *",  
-    # 修改调度间隔为每日0点执行一次
-    schedule_interval="0 0 * * *",
-    catchup=False,
-    default_args={
-        'owner': 'airflow',
-        'depends_on_past': False,
-        'email_on_failure': False,
-        'email_on_retry': False,
-        'retries': 1,
-        'retry_delay': timedelta(minutes=5)
-    }
-) as dag:
-    
-    # 检查是否跳过等待外部任务
-    skip_wait = Variable.get("skip_summary_wait", default_var="false").lower() == "true"
-    
-    if skip_wait:
-        # 如果跳过等待,创建一个空操作代替
-        wait_for_data_processing = EmptyOperator(
-            task_id="wait_for_data_processing",
-            dag=dag
-        )
-        logger.info("跳过等待外部DAG完成,使用EmptyOperator替代")
-    else:
-        # 等待统一数据处理DAG完成
-        # 定义一个函数来打印并返回执行日期
-        def print_target_date(dt):
-            logger.info(f"===== ExternalTaskSensor等待的目标日期信息 =====")
-            logger.info(f"源DAG: dag_dataops_unified_summary_scheduler")
-            logger.info(f"目标DAG: dag_dataops_unified_data_scheduler")
-            logger.info(f"目标任务: processing_completed")
-            logger.info(f"查找的执行日期: {dt}")
-            logger.info(f"日期字符串格式: {dt.strftime('%Y-%m-%dT%H:%M:%S')}")
-            logger.info(f"日期类型: {type(dt)}")
-            logger.info(f"=======================================")
-            # 必须返回原始日期,不能修改
-            return dt
-
-        wait_for_data_processing = ExternalTaskSensor(
-            task_id="wait_for_data_processing",
-            external_dag_id="dag_dataops_unified_data_scheduler",
-            external_task_id="processing_completed",
-            mode="reschedule",  # 改为reschedule模式,不会占用worker
-            timeout=7200,  # 增加超时时间到2小时
-            poke_interval=60,  # 增加检查间隔到1分钟
-            allowed_states=["success", "skipped"],  # 允许成功或跳过的状态
-            failed_states=["failed", "upstream_failed"],  # 当检测到这些状态时立即失败
-            dag=dag,
-            # 添加自定义方法来打印和返回日期
-            execution_date_fn=print_target_date
-        )
-    
-    # 汇总执行情况
-    summarize_task = PythonOperator(
-        task_id="summarize_unified_execution",
-        python_callable=summarize_unified_execution,
-        provide_context=True,
-        dag=dag
-    )
-    
-    # 总结完成标记
-    summary_completed = EmptyOperator(
-        task_id="summary_completed",
-        dag=dag
-    )
-    
-    # 设置任务依赖
-    wait_for_data_processing >> summarize_task >> summary_completed