Browse Source

完成对dag_dataops_*_scheduler.py 四个脚本的编写,后续需要研究airflow的系统执行时间,现在不同DAG中系统执行时间不一致。

wangxq 1 month ago
parent
commit
6f291ae161

+ 220 - 0
dags/common.py

@@ -0,0 +1,220 @@
+# common.py
+import psycopg2
+from neo4j import GraphDatabase
+import logging
+import importlib.util
+from pathlib import Path
+import networkx as nx
+import os
+from datetime import datetime, timedelta
+from config import PG_CONFIG, NEO4J_CONFIG, SCRIPTS_BASE_PATH
+
+# 创建统一的日志记录器
+logger = logging.getLogger("airflow.task")
+
+def get_pg_conn():
+    """获取PostgreSQL连接"""
+    return psycopg2.connect(**PG_CONFIG)
+
+def get_neo4j_driver():
+    """获取Neo4j连接驱动"""
+    uri = NEO4J_CONFIG['uri']
+    auth = (NEO4J_CONFIG['user'], NEO4J_CONFIG['password'])
+    return GraphDatabase.driver(uri, auth=auth)
+
+def update_task_start_time(exec_date, target_table, script_name, start_time):
+    """更新任务开始时间"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            UPDATE airflow_dag_schedule 
+            SET exec_start_time = %s
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (start_time, exec_date, target_table, script_name))
+        conn.commit()
+    except Exception as e:
+        logger.error(f"更新任务开始时间失败: {str(e)}")
+        conn.rollback()
+    finally:
+        cursor.close()
+        conn.close()
+
+def update_task_completion(exec_date, target_table, script_name, success, end_time, duration):
+    """更新任务完成信息"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            UPDATE airflow_dag_schedule 
+            SET exec_result = %s, exec_end_time = %s, exec_duration = %s
+            WHERE exec_date = %s AND target_table = %s AND script_name = %s
+        """, (success, end_time, duration, exec_date, target_table, script_name))
+        conn.commit()
+    except Exception as e:
+        logger.error(f"更新任务完成信息失败: {str(e)}")
+        conn.rollback()
+    finally:
+        cursor.close()
+        conn.close()
+
+def execute_with_monitoring(target_table, script_name, script_exec_mode, exec_date, **kwargs):
+    """执行脚本并监控执行情况"""
+
+    # 检查script_name是否为空
+    if not script_name:
+        logger.error(f"表 {target_table} 的script_name为空,无法执行")
+        # 记录执行失败
+        now = datetime.now()
+        update_task_completion(exec_date, target_table, script_name or "", False, now, 0)
+        return False
+    # 记录执行开始时间
+    start_time = datetime.now()
+    update_task_start_time(exec_date, target_table, script_name, start_time)
+    
+    try:
+        # 执行实际脚本
+        success = execute_script(script_name, target_table, script_exec_mode)
+        
+        # 记录结束时间和结果
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        update_task_completion(exec_date, target_table, script_name, success, end_time, duration)
+        
+        return success
+    except Exception as e:
+        # 处理异常
+        logger.error(f"执行任务出错: {str(e)}")
+        end_time = datetime.now()
+        duration = (end_time - start_time).total_seconds()
+        update_task_completion(exec_date, target_table, script_name, False, end_time, duration)
+        raise e
+
+def execute_script(script_name, table_name, execution_mode):
+    """执行脚本并返回结果"""
+    if not script_name:
+        logger.error("未提供脚本名称,无法执行")
+        return False
+    
+    try:
+        # 直接使用配置的部署路径
+        script_path = Path(SCRIPTS_BASE_PATH) / script_name
+        logger.info(f"使用配置的Airflow部署路径: {script_path}")
+        
+        # 动态导入模块
+        spec = importlib.util.spec_from_file_location("dynamic_module", script_path)
+        module = importlib.util.module_from_spec(spec)
+        spec.loader.exec_module(module)
+        
+        # 使用标准入口函数run
+        if hasattr(module, "run"):
+            logger.info(f"执行脚本 {script_name} 的标准入口函数 run()")
+            result = module.run(table_name=table_name, execution_mode=execution_mode)
+            logger.info(f"脚本 {script_name} 执行结果: {result}")
+            return result
+        else:
+            logger.warning(f"脚本 {script_name} 未定义标准入口函数 run(),无法执行")
+            return False
+    except Exception as e:
+        logger.error(f"执行脚本 {script_name} 时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        return False
+
+def generate_optimized_execution_order(table_names, dependency_dict):
+    """
+    生成优化的执行顺序,处理循环依赖
+    
+    参数:
+        table_names: 表名列表
+        dependency_dict: 依赖关系字典 {表名: [依赖表1, 依赖表2, ...]}
+    
+    返回:
+        list: 优化后的执行顺序列表
+    """
+    # 创建有向图
+    G = nx.DiGraph()
+    
+    # 添加所有节点
+    for table_name in table_names:
+        G.add_node(table_name)
+    
+    # 添加依赖边
+    for target, sources in dependency_dict.items():
+        for source in sources:
+            if source in table_names:  # 确保只考虑目标表集合中的表
+                # 从依赖指向目标,表示依赖需要先执行
+                G.add_edge(source, target)
+    
+    # 检测循环依赖
+    cycles = list(nx.simple_cycles(G))
+    if cycles:
+        logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
+        # 打破循环依赖(简单策略:移除每个循环中的一条边)
+        for cycle in cycles:
+            # 移除循环中的最后一条边
+            G.remove_edge(cycle[-1], cycle[0])
+            logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
+    
+    # 生成拓扑排序
+    try:
+        execution_order = list(nx.topological_sort(G))
+        return execution_order
+    except Exception as e:
+        logger.error(f"生成执行顺序失败: {str(e)}")
+        # 返回原始列表作为备选
+        return table_names
+
+def get_datamodel_dependency_from_neo4j(table_names):
+    """
+    从Neo4j获取DataModel表间的依赖关系
+    
+    参数:
+        table_names: 表名列表
+    
+    返回:
+        dict: 依赖关系字典 {目标表: [依赖表1, 依赖表2, ...]}
+    """
+    logger.info(f"开始获取 {len(table_names)} 个表的依赖关系")
+    
+    # 创建Neo4j连接
+    driver = get_neo4j_driver()
+    dependency_dict = {name: [] for name in table_names}
+    
+    try:
+        with driver.session() as session:
+            # 使用一次性查询获取所有表之间的依赖关系
+            query = """
+                MATCH (source:DataModel)-[:DERIVED_FROM]->(target:DataModel)
+                WHERE source.en_name IN $table_names AND target.en_name IN $table_names
+                RETURN source.en_name AS source, target.en_name AS target
+            """
+            result = session.run(query, table_names=table_names)
+            
+            # 处理结果
+            for record in result:
+                source = record.get("source")
+                target = record.get("target")
+                
+                if source and target:
+                    # 目标依赖于源
+                    if source in dependency_dict:
+                        dependency_dict[source].append(target)
+                        logger.debug(f"依赖关系: {source} 依赖于 {target}")
+    except Exception as e:
+        logger.error(f"从Neo4j获取依赖关系时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    # 记录依赖关系
+    for table, deps in dependency_dict.items():
+        if deps:
+            logger.info(f"表 {table} 依赖于: {deps}")
+        else:
+            logger.info(f"表 {table} 没有依赖")
+    
+    return dependency_dict
+
+def get_today_date():
+    """获取今天的日期,返回YYYY-MM-DD格式字符串"""
+    return datetime.now().strftime("%Y-%m-%d")

+ 226 - 0
dags/dag_dataops_model_scheduler.py

@@ -0,0 +1,226 @@
+# dag_dataops_model_scheduler.py
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from datetime import datetime, timedelta
+import logging
+from common import (
+    get_pg_conn, execute_with_monitoring, get_datamodel_dependency_from_neo4j,
+    generate_optimized_execution_order, get_today_date
+)
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+def get_latest_date_with_models():
+    """
+    获取数据库中包含DataModel记录的最近日期
+    
+    用于查找数据库中最近的日期,以确保能够获取到数据
+    """
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT DISTINCT exec_date
+            FROM airflow_dag_schedule 
+            WHERE target_table_label = 'DataModel'
+            ORDER BY exec_date DESC
+            LIMIT 1
+        """)
+        result = cursor.fetchone()
+        if result:
+            latest_date = result[0]
+            logger.info(f"找到最近的包含DataModel记录的日期: {latest_date}")
+            return latest_date
+        else:
+            logger.warning("未找到包含DataModel记录的日期,将使用当前日期")
+            return get_today_date()
+    except Exception as e:
+        logger.error(f"查找最近日期时出错: {str(e)}")
+        return get_today_date()
+    finally:
+        cursor.close()
+        conn.close()
+
+def get_datamodel_tasks(exec_date):
+    """从airflow_dag_schedule表获取DataModel任务"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT target_table, script_name, script_exec_mode
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND target_table_label = 'DataModel'
+        """, (exec_date,))
+        
+        results = cursor.fetchall()
+        
+        tasks = []
+        for row in results:
+            target_table, script_name, script_exec_mode = row
+            tasks.append({
+                "target_table": target_table,
+                "script_name": script_name,
+                "script_exec_mode": script_exec_mode or "append"  # 默认为append
+            })
+        
+        logger.info(f"使用日期 {exec_date} 获取到 {len(tasks)} 个DataModel任务")
+        return tasks
+    except Exception as e:
+        logger.error(f"获取DataModel任务时出错: {str(e)}")
+        return []
+    finally:
+        cursor.close()
+        conn.close()
+
+# 创建DAG
+with DAG(
+    "dag_dataops_model_scheduler", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily", 
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 等待resource DAG完成
+    wait_for_resource = ExternalTaskSensor(
+        task_id="wait_for_resource",
+        external_dag_id="dag_dataops_resource_scheduler",
+        external_task_id="resource_processing_completed",
+        mode="poke",
+        timeout=3600,
+        poke_interval=30,
+        dag=dag
+    )
+    
+    # 处理完成标记
+    model_processing_completed = EmptyOperator(
+        task_id="model_processing_completed",
+        dag=dag
+    )
+    
+    try:
+        # 获取最近的日期
+        latest_date = get_latest_date_with_models()
+        logger.info(f"使用最近的日期 {latest_date} 查询模型任务")
+        
+        # 获取所有DataModel任务
+        model_tasks = get_datamodel_tasks(latest_date)
+        
+        if model_tasks:
+            # 获取表名列表
+            table_names = [task["target_table"] for task in model_tasks]
+            
+            # 获取依赖关系
+            dependency_dict = get_datamodel_dependency_from_neo4j(table_names)
+            
+            # 生成优化的执行顺序
+            execution_order = generate_optimized_execution_order(table_names, dependency_dict)
+            logger.info(f"生成的优化执行顺序: {execution_order}")
+            
+            # 创建任务字典
+            task_dict = {}
+            
+            # 为每个表创建处理任务
+            for table_name in execution_order:
+                # 查找表任务信息
+                task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
+                
+                if task_info and task_info.get("script_name"):
+                    process_task = PythonOperator(
+                        task_id=f"process_model_{table_name.replace('.', '_')}",
+                        python_callable=execute_with_monitoring,
+                        op_kwargs={
+                            "target_table": table_name,
+                            "script_name": task_info["script_name"],
+                            "script_exec_mode": task_info.get("script_exec_mode", "append"), 
+                            "exec_date": latest_date  # 使用从数据库获取的最近日期
+                        },
+                        dag=dag
+                    )
+                    task_dict[table_name] = process_task
+                    logger.info(f"创建处理任务: {table_name}")
+                else:
+                    logger.warning(f"表 {table_name} 没有script_name,跳过任务创建")
+            
+            # 设置任务间的依赖关系
+            for target_table, task in task_dict.items():
+                # 获取上游依赖
+                upstream_tables = dependency_dict.get(target_table, [])
+                upstream_tables = [t for t in upstream_tables if t in task_dict]
+                
+                if not upstream_tables:
+                    # 如果没有上游依赖,直接连接到wait_for_resource
+                    logger.info(f"表 {target_table} 没有上游依赖,连接到wait_for_resource")
+                    wait_for_resource >> task
+                else:
+                    # 设置与上游表的依赖关系
+                    for upstream_table in upstream_tables:
+                        logger.info(f"设置依赖: {upstream_table} >> {target_table}")
+                        task_dict[upstream_table] >> task
+                
+                # 检查是否是末端节点(没有下游任务)
+                is_terminal = True
+                for downstream, upstreams in dependency_dict.items():
+                    if target_table in upstreams and downstream in task_dict:
+                        is_terminal = False
+                        break
+                
+                # 如果是末端节点,连接到model_processing_completed
+                if is_terminal:
+                    logger.info(f"表 {target_table} 是末端节点,连接到model_processing_completed")
+                    task >> model_processing_completed
+            
+            # 处理特殊情况
+            # 检查是否有任务连接到model_processing_completed
+            has_connection_to_completed = False
+            for task in task_dict.values():
+                for downstream in task.downstream_list:
+                    if downstream.task_id == model_processing_completed.task_id:
+                        has_connection_to_completed = True
+                        break
+                
+                if has_connection_to_completed:
+                    break
+            
+            # 如果没有任务连接到model_processing_completed,连接所有任务到完成标记
+            if not has_connection_to_completed and task_dict:
+                logger.info("没有任务连接到model_processing_completed,连接所有任务到完成标记")
+                for task in task_dict.values():
+                    task >> model_processing_completed
+            
+            # 检查是否有任务连接到wait_for_resource
+            has_connection_from_wait = False
+            for task in task_dict.values():
+                for upstream in task.upstream_list:
+                    if upstream.task_id == wait_for_resource.task_id:
+                        has_connection_from_wait = True
+                        break
+                
+                if has_connection_from_wait:
+                    break
+            
+            # 如果没有任务连接到wait_for_resource,连接wait_for_resource到所有任务
+            if not has_connection_from_wait and task_dict:
+                logger.info("没有任务连接到wait_for_resource,连接wait_for_resource到所有任务")
+                for task in task_dict.values():
+                    wait_for_resource >> task
+        else:
+            # 如果没有任务,直接将等待节点连接到完成
+            wait_for_resource >> model_processing_completed
+            logger.warning("没有找到DataModel任务,直接将等待节点连接到完成")
+    except Exception as e:
+        logger.error(f"创建模型处理DAG时出错: {str(e)}")
+        import traceback
+        logger.error(traceback.format_exc())
+        # 确保在出错时也有完整的执行流程
+        wait_for_resource >> model_processing_completed

+ 355 - 0
dags/dag_dataops_prepare_scheduler.py

@@ -0,0 +1,355 @@
+# dag_dataops_prepare_scheduler.py
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from datetime import datetime, timedelta
+import pendulum
+import logging
+from common import get_pg_conn, get_neo4j_driver, get_today_date
+from config import PG_CONFIG, NEO4J_CONFIG
+import networkx as nx
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+def get_enabled_tables():
+    """获取所有启用的表"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT owner_id, table_name 
+            FROM schedule_status 
+            WHERE schedule_is_enabled = TRUE
+        """)
+        result = cursor.fetchall()
+        return [row[1] for row in result]  # 只返回表名
+    except Exception as e:
+        logger.error(f"获取启用表失败: {str(e)}")
+        return []
+    finally:
+        cursor.close()
+        conn.close()
+
+def check_table_directly_subscribed(table_name):
+    """检查表是否在schedule_status表中直接订阅"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT schedule_is_enabled
+            FROM schedule_status 
+            WHERE table_name = %s
+        """, (table_name,))
+        result = cursor.fetchone()
+        return result and result[0] is True
+    except Exception as e:
+        logger.error(f"检查表订阅状态失败: {str(e)}")
+        return False
+    finally:
+        cursor.close()
+        conn.close()
+
+def get_table_info_from_neo4j(table_name):
+    """从Neo4j获取表的详细信息"""
+    driver = get_neo4j_driver()
+     # 检查表是否直接订阅
+    is_directly_schedule = check_table_directly_subscribed(table_name)
+
+    table_info = {
+        'target_table': table_name,
+        'is_directly_schedule': is_directly_schedule,  # 初始值设为True,从schedule_status表获取
+    }
+    
+    try:
+        with driver.session() as session:
+            # 查询表标签和状态
+            query_table = """
+                MATCH (t {en_name: $table_name})
+                RETURN labels(t) AS labels, t.status AS status, t.frequency AS frequency
+            """
+            result = session.run(query_table, table_name=table_name)
+            record = result.single()
+            
+            if record:
+                labels = record.get("labels", [])
+                table_info['target_table_label'] = [label for label in labels if label in ["DataResource", "DataModel", "DataSource"]][0] if labels else None
+                table_info['target_table_status'] = record.get("status", True)  # 默认为True
+                table_info['default_update_frequency'] = record.get("frequency")
+                
+                # 根据标签类型查询关系和脚本信息
+                if "DataResource" in labels:
+                    query_rel = """
+                        MATCH (target {en_name: $table_name})-[rel:ORIGINATES_FROM]->(source)
+                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
+                    """
+                elif "DataModel" in labels:
+                    query_rel = """
+                        MATCH (target {en_name: $table_name})-[rel:DERIVED_FROM]->(source)
+                        RETURN source.en_name AS source_table, rel.script_name AS script_name,
+                               rel.script_type AS script_type, rel.script_exec_mode AS script_exec_mode
+                    """
+                else:
+                    logger.warning(f"表 {table_name} 不是DataResource或DataModel类型")
+                    return table_info
+                
+                result = session.run(query_rel, table_name=table_name)
+                record = result.single()
+                
+                if record:
+                    table_info['source_table'] = record.get("source_table")     
+
+                    # 检查script_name是否为空
+                    script_name = record.get("script_name")
+                    if not script_name:
+                        logger.warning(f"表 {table_name} 的关系中没有script_name属性,可能导致后续处理出错")
+                    table_info['script_name'] = script_name
+                    
+                    # 设置默认值,确保即使属性为空也有默认值
+                    table_info['script_type'] = record.get("script_type", "python")  # 默认为python
+                    table_info['script_exec_mode'] = record.get("script_exec_mode", "append")  # 默认为append
+                else:
+                    logger.warning(f"未找到表 {table_name} 的关系信息")
+            else:
+                logger.warning(f"在Neo4j中找不到表 {table_name} 的信息")
+    except Exception as e:
+        logger.error(f"获取表 {table_name} 的信息时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    return table_info
+
+def process_dependencies(tables_info):
+    """处理表间依赖关系,添加被动调度的表"""
+    # 存储所有表信息的字典
+    all_tables = {t['target_table']: t for t in tables_info}
+    driver = get_neo4j_driver()
+    
+    try:
+        with driver.session() as session:
+            for table_name, table_info in list(all_tables.items()):
+                if table_info.get('target_table_label') == 'DataModel':
+                    # 查询其依赖表
+                    query = """
+                        MATCH (dm {en_name: $table_name})-[:DERIVED_FROM]->(dep)
+                        RETURN dep.en_name AS dep_name, labels(dep) AS dep_labels, 
+                               dep.status AS dep_status, dep.frequency AS dep_frequency
+                    """
+                    result = session.run(query, table_name=table_name)
+                    
+                    for record in result:
+                        dep_name = record.get("dep_name")
+                        dep_labels = record.get("dep_labels", [])
+                        dep_status = record.get("dep_status", True)
+                        dep_frequency = record.get("dep_frequency")
+                        
+                        # 处理未被直接调度的依赖表
+                        if dep_name and dep_name not in all_tables:
+                            logger.info(f"发现被动依赖表: {dep_name}, 标签: {dep_labels}")
+                            
+                            # 获取依赖表详细信息
+                            dep_info = get_table_info_from_neo4j(dep_name)
+                            dep_info['is_directly_schedule'] = False
+                            
+                            # 处理调度频率继承
+                            if not dep_info.get('default_update_frequency'):
+                                dep_info['default_update_frequency'] = table_info.get('default_update_frequency')
+                            
+                            all_tables[dep_name] = dep_info
+    except Exception as e:
+        logger.error(f"处理依赖关系时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    return list(all_tables.values())
+
+def filter_invalid_tables(tables_info):
+    """过滤无效表及其依赖,使用NetworkX构建依赖图"""
+    # 构建表名到索引的映射
+    table_dict = {t['target_table']: i for i, t in enumerate(tables_info)}
+    
+    # 找出无效表
+    invalid_tables = set()
+    for table in tables_info:
+        if table.get('target_table_status') is False:
+            invalid_tables.add(table['target_table'])
+            logger.info(f"表 {table['target_table']} 的状态为无效")
+    
+    # 构建依赖图
+    G = nx.DiGraph()
+    
+    # 添加所有节点
+    for table in tables_info:
+        G.add_node(table['target_table'])
+    
+    # 查询并添加依赖边
+    driver = get_neo4j_driver()
+    try:
+        with driver.session() as session:
+            for table in tables_info:
+                if table.get('target_table_label') == 'DataModel':
+                    query = """
+                        MATCH (source {en_name: $table_name})-[:DERIVED_FROM]->(target)
+                        RETURN target.en_name AS target_name
+                    """
+                    result = session.run(query, table_name=table['target_table'])
+                    
+                    for record in result:
+                        target_name = record.get("target_name")
+                        if target_name and target_name in table_dict:
+                            # 添加从目标到源的边,表示目标依赖于源
+                            G.add_edge(table['target_table'], target_name)
+                            logger.debug(f"添加依赖边: {table['target_table']} -> {target_name}")
+    except Exception as e:
+        logger.error(f"构建依赖图时出错: {str(e)}")
+    finally:
+        driver.close()
+    
+    # 找出依赖于无效表的所有表
+    downstream_invalid = set()
+    for invalid_table in invalid_tables:
+        # 获取可从无效表到达的所有节点
+        try:
+            descendants = nx.descendants(G, invalid_table)
+            downstream_invalid.update(descendants)
+            logger.info(f"表 {invalid_table} 的下游无效表: {descendants}")
+        except Exception as e:
+            logger.error(f"处理表 {invalid_table} 的下游依赖时出错: {str(e)}")
+    
+    # 合并所有无效表
+    all_invalid = invalid_tables.union(downstream_invalid)
+    logger.info(f"总共 {len(all_invalid)} 个表被标记为无效: {all_invalid}")
+    
+    # 过滤出有效表
+    valid_tables = [t for t in tables_info if t['target_table'] not in all_invalid]
+    logger.info(f"过滤后保留 {len(valid_tables)} 个有效表")
+    
+    return valid_tables
+
+def write_to_airflow_dag_schedule(exec_date, tables_info):
+    """将表信息写入airflow_dag_schedule表"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    
+    try:
+        # 清理当日数据,避免重复
+        cursor.execute("""
+            DELETE FROM airflow_dag_schedule WHERE exec_date = %s
+        """, (exec_date,))
+        logger.info(f"已清理执行日期 {exec_date} 的现有数据")
+        
+        # 批量插入新数据
+        inserted_count = 0
+        for table in tables_info:
+            cursor.execute("""
+                INSERT INTO airflow_dag_schedule (
+                    exec_date, source_table, target_table, target_table_label,
+                    target_table_status, is_directly_schedule, default_update_frequency,
+                    script_name, script_type, script_exec_mode
+                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+            """, (
+                exec_date,
+                table.get('source_table'),
+                table['target_table'],
+                table.get('target_table_label'),
+                table.get('target_table_status', True),
+                table.get('is_directly_schedule', False),
+                table.get('default_update_frequency'),
+                table.get('script_name'),
+                table.get('script_type', 'python'),
+                table.get('script_exec_mode', 'append')
+            ))
+            inserted_count += 1
+        
+        conn.commit()
+        logger.info(f"成功插入 {inserted_count} 条记录到 airflow_dag_schedule 表")
+        return inserted_count
+    except Exception as e:
+        logger.error(f"写入 airflow_dag_schedule 表时出错: {str(e)}")
+        conn.rollback()
+        # 不要返回0,而是重新抛出异常,确保错误被正确传播
+        raise
+    finally:
+        cursor.close()
+        conn.close()
+
+def prepare_dag_schedule(**kwargs):
+    """准备DAG调度任务的主函数"""
+    exec_date = kwargs.get('ds') or get_today_date()
+    logger.info(f"开始准备执行日期 {exec_date} 的调度任务")
+    
+    # 1. 获取启用的表
+    enabled_tables = get_enabled_tables()
+    logger.info(f"从schedule_status表获取到 {len(enabled_tables)} 个启用的表")
+    
+    if not enabled_tables:
+        logger.warning("没有找到启用的表,准备工作结束")
+        return 0
+    
+    # 2. 获取表的详细信息
+    tables_info = []
+    for table_name in enabled_tables:
+        table_info = get_table_info_from_neo4j(table_name)
+        if table_info:
+            tables_info.append(table_info)
+    
+    logger.info(f"成功获取 {len(tables_info)} 个表的详细信息")
+    
+    # 3. 处理依赖关系,添加被动调度的表
+    enriched_tables = process_dependencies(tables_info)
+    logger.info(f"处理依赖后,总共有 {len(enriched_tables)} 个表")
+    
+    # 4. 过滤无效表及其依赖
+    valid_tables = filter_invalid_tables(enriched_tables)
+    logger.info(f"过滤无效表后,最终有 {len(valid_tables)} 个有效表")
+    
+    # 5. 写入airflow_dag_schedule表
+    inserted_count = write_to_airflow_dag_schedule(exec_date, valid_tables)
+    
+    # 6. 检查插入操作是否成功,如果失败则抛出异常
+    if inserted_count == 0 and valid_tables:
+        error_msg = f"插入操作失败,无记录被插入到airflow_dag_schedule表,但有{len(valid_tables)}个有效表需要处理"
+        logger.error(error_msg)
+        raise Exception(error_msg)
+    
+    return inserted_count
+
+# 创建DAG
+with DAG(
+    "dag_dataops_prepare_scheduler",
+    start_date=datetime(2024, 1, 1),
+    schedule_interval="@daily",
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 任务开始标记
+    start_preparation = EmptyOperator(
+        task_id="start_preparation",
+        dag=dag
+    )
+    
+    # 准备调度任务
+    prepare_task = PythonOperator(
+        task_id="prepare_dag_schedule",
+        python_callable=prepare_dag_schedule,
+        provide_context=True,
+        dag=dag
+    )
+    
+    # 准备完成标记
+    preparation_completed = EmptyOperator(
+        task_id="preparation_completed",
+        dag=dag
+    )
+    
+    # 设置任务依赖
+    start_preparation >> prepare_task >> preparation_completed

+ 185 - 0
dags/dag_dataops_resource_scheduler.py

@@ -0,0 +1,185 @@
+# dag_dataops_resource_scheduler.py
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from datetime import datetime, timedelta
+import logging
+from common import (
+    get_pg_conn, execute_with_monitoring, get_today_date
+)
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+# 添加获取最近日期的函数
+def get_latest_date_with_resources():
+    """
+    获取数据库中包含DataResource记录的最近日期
+    
+    用于查找数据库中最近的日期,以确保能够获取到数据
+    """
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        cursor.execute("""
+            SELECT DISTINCT exec_date
+            FROM airflow_dag_schedule 
+            WHERE target_table_label = 'DataResource'
+            ORDER BY exec_date DESC
+            LIMIT 1
+        """)
+        result = cursor.fetchone()
+        if result:
+            latest_date = result[0]
+            logger.info(f"找到最近的包含DataResource记录的日期: {latest_date}")
+            return latest_date
+        else:
+            logger.warning("未找到包含DataResource记录的日期,将使用当前日期")
+            return get_today_date()
+    except Exception as e:
+        logger.error(f"查找最近日期时出错: {str(e)}")
+        return get_today_date()
+    finally:
+        cursor.close()
+        conn.close()
+
+def get_dataresource_tasks(exec_date):
+    """从airflow_dag_schedule表获取DataResource任务"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        # 构建SQL查询
+        sql = """
+            SELECT source_table, target_table, script_name, script_exec_mode
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND target_table_label = 'DataResource'
+        """
+        # 记录查询信息
+        logger.info(f"查询资源任务,使用日期: {exec_date}")
+        
+        # 执行查询
+        cursor.execute(sql, (exec_date,))
+        results = cursor.fetchall()
+        logger.info(f"使用日期 {exec_date} 查询到 {len(results)} 个DataResource任务")
+        
+        # 处理去重
+        unique_tasks = {}
+        for row in results:
+            source_table, target_table, script_name, script_exec_mode = row
+            # 使用目标表名作为键进行去重
+            if target_table not in unique_tasks:
+                unique_tasks[target_table] = {
+                    "source_table": source_table,
+                    "target_table": target_table,
+                    "script_name": script_name,
+                    "script_exec_mode": script_exec_mode or "append"  # 默认值
+                }
+        
+        logger.info(f"获取到 {len(results)} 个DataResource任务,去重后剩余 {len(unique_tasks)} 个")
+        return list(unique_tasks.values())
+    except Exception as e:
+        logger.error(f"获取DataResource任务时出错: {str(e)}")
+        return []
+    finally:
+        cursor.close()
+        conn.close()
+
+def process_resource(target_table, script_name, script_exec_mode, **kwargs):
+    """处理单个资源表的函数"""
+    exec_date = kwargs.get('ds')
+    logger.info(f"开始处理资源表: {target_table}, 脚本: {script_name}")
+    
+    try:
+        # 调用执行函数
+        result = execute_with_monitoring(
+            target_table=target_table,
+            script_name=script_name,
+            script_exec_mode=script_exec_mode,
+            exec_date=exec_date
+        )
+        logger.info(f"资源表 {target_table} 处理完成")
+        return result
+    except Exception as e:
+        logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
+        raise
+
+def generate_no_task_message(**kwargs):
+    """当没有任务时执行的函数"""
+    logger.info("没有资源需要处理")
+    return "没有资源需要处理"
+
+# 创建DAG
+with DAG(
+    "dag_dataops_resource_scheduler", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily", 
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 等待prepare DAG完成
+    wait_for_prepare = ExternalTaskSensor(
+        task_id="wait_for_prepare",
+        external_dag_id="dag_dataops_prepare_scheduler",
+        external_task_id="preparation_completed",
+        mode="poke",
+        timeout=3600,
+        poke_interval=30,
+        dag=dag
+    )
+    
+    # 处理完成标记
+    resource_processing_completed = EmptyOperator(
+        task_id="resource_processing_completed",
+        dag=dag
+    )
+    
+    # 在DAG运行时获取最近日期和资源任务
+    latest_date = get_latest_date_with_resources()
+    logger.info(f"使用最近的日期 {latest_date} 查询资源任务")
+    
+    # 获取资源任务
+    resource_tasks = get_dataresource_tasks(latest_date)
+    
+    if resource_tasks:
+        for i, task_info in enumerate(resource_tasks):
+            target_table = task_info["target_table"]
+            script_name = task_info["script_name"]
+            script_exec_mode = task_info["script_exec_mode"]
+            
+            if not script_name:
+                logger.warning(f"资源表 {target_table} 没有关联脚本,跳过")
+                continue
+            
+            # 为每个资源表创建单独的处理任务
+            task_id = f"process_resource_{target_table.replace('.', '_')}"
+            process_task = PythonOperator(
+                task_id=task_id,
+                python_callable=process_resource,
+                op_kwargs={
+                    "target_table": target_table,
+                    "script_name": script_name,
+                    "script_exec_mode": script_exec_mode
+                },
+                provide_context=True,
+                dag=dag
+            )
+            
+            # 设置依赖 - 直接从wait_for_prepare连接到处理任务
+            wait_for_prepare >> process_task >> resource_processing_completed
+    else:
+        # 如果没有任务,添加一个空任务
+        empty_task = PythonOperator(
+            task_id="no_resources_to_process",
+            python_callable=generate_no_task_message,
+            dag=dag
+        )
+        wait_for_prepare >> empty_task >> resource_processing_completed

+ 287 - 0
dags/dag_dataops_summary_scheduler.py

@@ -0,0 +1,287 @@
+# dag_dataops_summary_scheduler.py
+from airflow import DAG
+from airflow.operators.python import PythonOperator
+from airflow.operators.empty import EmptyOperator
+from airflow.sensors.external_task import ExternalTaskSensor
+from datetime import datetime, timedelta
+import logging
+import json
+from decimal import Decimal
+from common import get_pg_conn, get_today_date
+
+# 创建日志记录器
+logger = logging.getLogger(__name__)
+
+# 添加自定义JSON编码器解决Decimal序列化问题
+class DecimalEncoder(json.JSONEncoder):
+    def default(self, obj):
+        if isinstance(obj, Decimal):
+            return float(obj)
+        # 处理日期类型
+        elif isinstance(obj, datetime):
+            return obj.isoformat()
+        # 让父类处理其他类型
+        return super(DecimalEncoder, self).default(obj)
+
+def get_execution_stats(exec_date):
+    """获取当日执行统计信息"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        # 查询总任务数
+        cursor.execute("""
+            SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
+        """, (exec_date,))
+        total_tasks = cursor.fetchone()[0]
+        
+        # 查询每种类型的任务数
+        cursor.execute("""
+            SELECT target_table_label, COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s 
+            GROUP BY target_table_label
+        """, (exec_date,))
+        type_counts = {row[0]: row[1] for row in cursor.fetchall()}
+        
+        # 查询执行结果统计
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_result IS TRUE
+        """, (exec_date,))
+        success_count = cursor.fetchone()[0]
+        
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_result IS FALSE
+        """, (exec_date,))
+        fail_count = cursor.fetchone()[0]
+        
+        cursor.execute("""
+            SELECT COUNT(*) 
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_result IS NULL
+        """, (exec_date,))
+        pending_count = cursor.fetchone()[0]
+        
+        # 计算执行时间统计
+        cursor.execute("""
+            SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_duration IS NOT NULL
+        """, (exec_date,))
+        time_stats = cursor.fetchone()
+        avg_duration, min_duration, max_duration = time_stats if time_stats else (None, None, None)
+        
+        # 将Decimal转换为float
+        if avg_duration is not None:
+            avg_duration = float(avg_duration)
+        if min_duration is not None:
+            min_duration = float(min_duration)
+        if max_duration is not None:
+            max_duration = float(max_duration)
+        
+        # 查询失败任务详情
+        cursor.execute("""
+            SELECT target_table, script_name, target_table_label, exec_duration
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_result IS FALSE
+        """, (exec_date,))
+        failed_tasks = [
+            {
+                "target_table": row[0],
+                "script_name": row[1],
+                "target_table_label": row[2],
+                "exec_duration": float(row[3]) if row[3] is not None else None
+            }
+            for row in cursor.fetchall()
+        ]
+        
+        # 汇总统计信息
+        stats = {
+            "exec_date": exec_date,
+            "total_tasks": total_tasks,
+            "type_counts": type_counts,
+            "success_count": success_count,
+            "fail_count": fail_count,
+            "pending_count": pending_count,
+            "success_rate": (success_count / total_tasks * 100) if total_tasks > 0 else 0,
+            "avg_duration": avg_duration,
+            "min_duration": min_duration,
+            "max_duration": max_duration,
+            "failed_tasks": failed_tasks
+        }
+        
+        return stats
+    except Exception as e:
+        logger.error(f"获取执行统计信息时出错: {str(e)}")
+        return {}
+    finally:
+        cursor.close()
+        conn.close()
+
+def update_missing_results(exec_date):
+    """更新缺失的执行结果信息"""
+    conn = get_pg_conn()
+    cursor = conn.cursor()
+    try:
+        # 查询所有缺失执行结果的任务
+        cursor.execute("""
+            SELECT target_table, script_name
+            FROM airflow_dag_schedule 
+            WHERE exec_date = %s AND exec_result IS NULL
+        """, (exec_date,))
+        missing_results = cursor.fetchall()
+        
+        update_count = 0
+        for row in missing_results:
+            target_table, script_name = row
+            
+            # 如果有开始时间但没有结束时间,假设执行失败
+            cursor.execute("""
+                SELECT exec_start_time
+                FROM airflow_dag_schedule
+                WHERE exec_date = %s AND target_table = %s AND script_name = %s
+            """, (exec_date, target_table, script_name))
+            
+            start_time = cursor.fetchone()
+            
+            if start_time and start_time[0]:
+                # 有开始时间但无结果,标记为失败
+                now = datetime.now()
+                duration = (now - start_time[0]).total_seconds()
+                
+                cursor.execute("""
+                    UPDATE airflow_dag_schedule
+                    SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
+                    WHERE exec_date = %s AND target_table = %s AND script_name = %s
+                """, (now, duration, exec_date, target_table, script_name))
+                
+                logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
+                update_count += 1
+            else:
+                # 没有开始时间且无结果,假设未执行
+                logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
+        
+        conn.commit()
+        logger.info(f"更新了 {update_count} 个缺失结果的任务")
+        return update_count
+    except Exception as e:
+        logger.error(f"更新缺失执行结果时出错: {str(e)}")
+        conn.rollback()
+        return 0
+    finally:
+        cursor.close()
+        conn.close()
+
+def generate_execution_report(exec_date, stats):
+    """生成执行报告"""
+    # 构建报告
+    report = []
+    report.append(f"========== 数据运维系统执行报告 ==========")
+    report.append(f"执行日期: {exec_date}")
+    report.append(f"总任务数: {stats['total_tasks']}")
+    
+    # 任务类型分布
+    report.append("\n--- 任务类型分布 ---")
+    for label, count in stats.get('type_counts', {}).items():
+        report.append(f"{label} 任务: {count} 个")
+    
+    # 执行结果统计
+    report.append("\n--- 执行结果统计 ---")
+    report.append(f"成功任务: {stats.get('success_count', 0)} 个")
+    report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
+    report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
+    report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
+    
+    # 执行时间统计
+    report.append("\n--- 执行时间统计 (秒) ---")
+    report.append(f"平均执行时间: {stats.get('avg_duration', 0):.2f}")
+    report.append(f"最短执行时间: {stats.get('min_duration', 0):.2f}")
+    report.append(f"最长执行时间: {stats.get('max_duration', 0):.2f}")
+    
+    # 失败任务详情
+    failed_tasks = stats.get('failed_tasks', [])
+    if failed_tasks:
+        report.append("\n--- 失败任务详情 ---")
+        for i, task in enumerate(failed_tasks, 1):
+            report.append(f"{i}. 表名: {task['target_table']}")
+            report.append(f"   脚本: {task['script_name']}")
+            report.append(f"   类型: {task['target_table_label']}")
+            report.append(f"   执行时间: {task.get('exec_duration', 'N/A'):.2f} 秒")
+    
+    report.append("\n========== 报告结束 ==========")
+    
+    # 将报告转换为字符串
+    report_str = "\n".join(report)
+    
+    # 记录到日志
+    logger.info("\n" + report_str)
+    
+    return report_str
+
+def summarize_execution(**kwargs):
+    """汇总执行情况的主函数"""
+    exec_date = kwargs.get('ds') or get_today_date()
+    logger.info(f"开始汇总执行日期 {exec_date} 的执行情况")
+    
+    # 1. 更新缺失的执行结果
+    update_count = update_missing_results(exec_date)
+    logger.info(f"更新了 {update_count} 个缺失的执行结果")
+    
+    # 2. 获取执行统计信息
+    stats = get_execution_stats(exec_date)
+    
+    # 3. 生成执行报告
+    report = generate_execution_report(exec_date, stats)
+    
+    # 将报告和统计信息传递给下一个任务
+    kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
+    kwargs['ti'].xcom_push(key='execution_report', value=report)
+    
+    return report
+
+# 创建DAG
+with DAG(
+    "dag_dataops_summary_scheduler", 
+    start_date=datetime(2024, 1, 1), 
+    schedule_interval="@daily", 
+    catchup=False,
+    default_args={
+        'owner': 'airflow',
+        'depends_on_past': False,
+        'email_on_failure': False,
+        'email_on_retry': False,
+        'retries': 1,
+        'retry_delay': timedelta(minutes=5)
+    }
+) as dag:
+    
+    # 等待model DAG完成
+    wait_for_model = ExternalTaskSensor(
+        task_id="wait_for_model",
+        external_dag_id="dag_dataops_model_scheduler",
+        external_task_id="model_processing_completed",
+        mode="poke",
+        timeout=3600,
+        poke_interval=30,
+        dag=dag
+    )
+    
+    # 汇总执行情况
+    summarize_task = PythonOperator(
+        task_id="summarize_execution",
+        python_callable=summarize_execution,
+        provide_context=True,
+        dag=dag
+    )
+    
+    # 总结完成标记
+    summary_completed = EmptyOperator(
+        task_id="summary_completed",
+        dag=dag
+    )
+    
+    # 设置任务依赖
+    wait_for_model >> summarize_task >> summary_completed