dag_data_model_monthly.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.operators.empty import EmptyOperator
  4. from airflow.sensors.external_task import ExternalTaskSensor
  5. from datetime import datetime, timedelta
  6. from utils import (
  7. get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph,
  8. check_table_relationship, process_model_tables
  9. )
  10. from config import NEO4J_CONFIG, TASK_RETRY_CONFIG
  11. import pendulum
  12. import logging
  13. import networkx as nx
  14. # 创建日志记录器
  15. logger = logging.getLogger(__name__)
  16. def generate_optimized_execution_order(table_names: list) -> list:
  17. """
  18. 生成优化的执行顺序,可处理循环依赖
  19. 参数:
  20. table_names: 表名列表
  21. 返回:
  22. list: 优化后的执行顺序列表
  23. """
  24. # 创建依赖图
  25. G = nx.DiGraph()
  26. # 添加所有节点
  27. for table_name in table_names:
  28. G.add_node(table_name)
  29. # 添加依赖边
  30. dependency_dict = get_model_dependency_graph(table_names)
  31. for target, upstreams in dependency_dict.items():
  32. for upstream in upstreams:
  33. if upstream in table_names: # 确保只考虑目标表集合中的表
  34. G.add_edge(upstream, target)
  35. # 检测循环依赖
  36. cycles = list(nx.simple_cycles(G))
  37. if cycles:
  38. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  39. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  40. for cycle in cycles:
  41. # 移除循环中的最后一条边
  42. G.remove_edge(cycle[-1], cycle[0])
  43. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  44. # 生成拓扑排序
  45. try:
  46. execution_order = list(nx.topological_sort(G))
  47. return execution_order
  48. except Exception as e:
  49. logger.error(f"生成执行顺序失败: {str(e)}")
  50. # 返回原始列表作为备选
  51. return table_names
  52. def is_first_day():
  53. return True
  54. # 生产环境中应使用实际判断
  55. # return pendulum.now().day == 1
  56. with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  57. logger.info("初始化 dag_data_model_monthly DAG")
  58. # 修改依赖关系:直接依赖于daily.py而不是weekly.py
  59. wait_for_daily = ExternalTaskSensor(
  60. task_id="wait_for_daily_model",
  61. external_dag_id="dag_data_model_daily",
  62. external_task_id="daily_processing_completed", # 指定完成标记任务
  63. mode="poke",
  64. timeout=3600,
  65. poke_interval=30
  66. )
  67. logger.info("创建日模型等待任务 - wait_for_daily_model")
  68. # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
  69. monthly_completed = EmptyOperator(
  70. task_id="monthly_processing_completed",
  71. dag=dag
  72. )
  73. logger.info("创建任务完成标记 - monthly_processing_completed")
  74. # 检查今天是否是月初
  75. if is_first_day():
  76. logger.info("今天是月初,开始处理月模型")
  77. # 获取启用的 monthly 模型表
  78. try:
  79. enabled_tables = get_enabled_tables("monthly")
  80. # 特别检查两个表之间的关系(这是monthly.py特有的)
  81. table_names = [t['table_name'] for t in enabled_tables if is_data_model_table(t['table_name'])]
  82. if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
  83. logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
  84. relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
  85. logger.info(f"关系检查结果: {relationship}")
  86. # 定义monthly特有的任务选项
  87. task_options = {
  88. 'default': {
  89. 'retries': TASK_RETRY_CONFIG["retries"],
  90. 'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  91. },
  92. 'book_sale_amt_monthly': {
  93. 'trigger_rule': "none_failed",
  94. 'retries': TASK_RETRY_CONFIG["retries"],
  95. 'retry_delay': timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"])
  96. }
  97. }
  98. # 使用公共函数处理模型表 - 修改依赖任务名称
  99. process_model_tables(enabled_tables, "monthly", wait_for_daily, monthly_completed, dag, **task_options)
  100. except Exception as e:
  101. logger.error(f"获取 monthly 模型表时出错: {str(e)}")
  102. # 出错时也要确保完成标记被触发 - 修改依赖任务名称
  103. wait_for_daily >> monthly_completed
  104. raise
  105. else:
  106. # 如果不是月初,直接将等待任务与完成标记相连接,跳过处理 - 修改依赖任务名称
  107. logger.info("今天不是月初,跳过月模型处理")
  108. wait_for_daily >> monthly_completed