dag_data_model_daily.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # dag_data_model_daily.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime
  7. from utils import (
  8. get_enabled_tables, is_data_model_table, run_model_script,
  9. get_model_dependency_graph, process_model_tables
  10. )
  11. from config import NEO4J_CONFIG
  12. import pendulum
  13. import logging
  14. import networkx as nx
  15. # 创建日志记录器
  16. logger = logging.getLogger(__name__)
  17. def generate_optimized_execution_order(table_names: list) -> list:
  18. """
  19. 生成优化的执行顺序,可处理循环依赖
  20. 参数:
  21. table_names: 表名列表
  22. 返回:
  23. list: 优化后的执行顺序列表
  24. """
  25. # 创建依赖图
  26. G = nx.DiGraph()
  27. # 添加所有节点
  28. for table_name in table_names:
  29. G.add_node(table_name)
  30. # 添加依赖边
  31. dependency_dict = get_model_dependency_graph(table_names)
  32. for target, upstreams in dependency_dict.items():
  33. for upstream in upstreams:
  34. if upstream in table_names: # 确保只考虑目标表集合中的表
  35. G.add_edge(upstream, target)
  36. # 检测循环依赖
  37. cycles = list(nx.simple_cycles(G))
  38. if cycles:
  39. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  40. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  41. for cycle in cycles:
  42. # 移除循环中的最后一条边
  43. G.remove_edge(cycle[-1], cycle[0])
  44. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  45. # 生成拓扑排序
  46. try:
  47. execution_order = list(nx.topological_sort(G))
  48. return execution_order
  49. except Exception as e:
  50. logger.error(f"生成执行顺序失败: {str(e)}")
  51. # 返回原始列表作为备选
  52. return table_names
  53. with DAG("dag_data_model_daily", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  54. logger.info("初始化 dag_data_model_daily DAG")
  55. # 等待资源表 DAG 完成
  56. wait_for_resource = ExternalTaskSensor(
  57. task_id="wait_for_data_resource",
  58. external_dag_id="dag_data_resource",
  59. external_task_id=None,
  60. mode="poke",
  61. timeout=3600,
  62. poke_interval=30
  63. )
  64. logger.info("创建资源表等待任务 - wait_for_data_resource")
  65. # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
  66. daily_completed = EmptyOperator(
  67. task_id="daily_processing_completed",
  68. dag=dag
  69. )
  70. logger.info("创建任务完成标记 - daily_processing_completed")
  71. # 获取启用的 daily 模型表
  72. try:
  73. enabled_tables = get_enabled_tables("daily")
  74. # 使用公共函数处理模型表
  75. process_model_tables(enabled_tables, "daily", wait_for_resource, daily_completed, dag)
  76. except Exception as e:
  77. logger.error(f"获取 daily 模型表时出错: {str(e)}")
  78. # 出错时也要确保完成标记被触发
  79. wait_for_resource >> daily_completed
  80. raise