dag_data_model_weekly.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # dag_data_model_weekly.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime
  7. from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph
  8. from config import NEO4J_CONFIG
  9. import pendulum
  10. import logging
  11. # 创建日志记录器
  12. logger = logging.getLogger(__name__)
  13. def is_monday():
  14. return True
  15. #return pendulum.now().day_of_week == 0
  16. with DAG("dag_data_model_weekly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  17. logger.info("初始化 dag_data_model_weekly DAG")
  18. # 等待日模型 DAG 完成
  19. wait_for_daily = ExternalTaskSensor(
  20. task_id="wait_for_daily_model",
  21. external_dag_id="dag_data_model_daily",
  22. external_task_id="daily_processing_completed", # 指定完成标记任务
  23. mode="poke",
  24. timeout=3600,
  25. poke_interval=30
  26. )
  27. logger.info("创建日模型等待任务 - wait_for_daily_model")
  28. # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
  29. weekly_completed = EmptyOperator(
  30. task_id="weekly_processing_completed",
  31. dag=dag
  32. )
  33. logger.info("创建任务完成标记 - weekly_processing_completed")
  34. # 检查今天是否是周一
  35. if is_monday():
  36. logger.info("今天是周一,开始处理周模型")
  37. # 获取启用的 weekly 模型表
  38. try:
  39. enabled_tables = get_enabled_tables("weekly")
  40. model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
  41. logger.info(f"获取到 {len(model_tables)} 个启用的 weekly 模型表")
  42. if not model_tables:
  43. # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
  44. logger.info("没有找到需要处理的周模型表,DAG将直接标记为完成")
  45. wait_for_daily >> weekly_completed
  46. else:
  47. # 获取依赖图
  48. try:
  49. table_names = [t['table_name'] for t in model_tables]
  50. dependency_graph = get_model_dependency_graph(table_names)
  51. logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
  52. except Exception as e:
  53. logger.error(f"构建依赖关系图时出错: {str(e)}")
  54. # 出错时也要确保完成标记被触发
  55. wait_for_daily >> weekly_completed
  56. raise
  57. # 构建 task 对象
  58. task_dict = {}
  59. for item in model_tables:
  60. try:
  61. task = PythonOperator(
  62. task_id=f"process_weekly_{item['table_name']}",
  63. python_callable=run_model_script,
  64. op_kwargs={"table_name": item['table_name'], "execution_mode": item['execution_mode']},
  65. )
  66. task_dict[item['table_name']] = task
  67. logger.info(f"创建模型处理任务: process_weekly_{item['table_name']}")
  68. except Exception as e:
  69. logger.error(f"创建任务 process_weekly_{item['table_name']} 时出错: {str(e)}")
  70. # 出错时也要确保完成标记被触发
  71. wait_for_daily >> weekly_completed
  72. raise
  73. # 建立任务依赖(基于 DERIVED_FROM 图)
  74. dependency_count = 0
  75. for target, upstream_list in dependency_graph.items():
  76. for upstream in upstream_list:
  77. if upstream in task_dict and target in task_dict:
  78. task_dict[upstream] >> task_dict[target]
  79. dependency_count += 1
  80. logger.debug(f"建立依赖关系: {upstream} >> {target}")
  81. else:
  82. logger.warning(f"无法建立依赖关系,缺少任务: {upstream} 或 {target}")
  83. logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
  84. # 最顶层的 task(没有任何上游)需要依赖日模型任务完成
  85. all_upstreams = set()
  86. for upstreams in dependency_graph.values():
  87. all_upstreams.update(upstreams)
  88. top_level_tasks = [t for t in table_names if t not in all_upstreams]
  89. if top_level_tasks:
  90. logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
  91. for name in top_level_tasks:
  92. wait_for_daily >> task_dict[name]
  93. else:
  94. logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
  95. # 如果没有顶层任务,直接将等待任务与完成标记相连接
  96. wait_for_daily >> weekly_completed
  97. # 连接所有末端任务(没有下游任务的)到完成标记
  98. # 找出所有没有下游任务的任务(即终端任务)
  99. terminal_tasks = []
  100. for table_name, task in task_dict.items():
  101. is_terminal = True
  102. for upstream_list in dependency_graph.values():
  103. if table_name in upstream_list:
  104. is_terminal = False
  105. break
  106. if is_terminal:
  107. terminal_tasks.append(task)
  108. logger.debug(f"发现终端任务: {table_name}")
  109. # 如果有终端任务,将它们连接到完成标记
  110. if terminal_tasks:
  111. logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
  112. for task in terminal_tasks:
  113. task >> weekly_completed
  114. else:
  115. # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
  116. logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
  117. wait_for_daily >> weekly_completed
  118. except Exception as e:
  119. logger.error(f"获取 weekly 模型表时出错: {str(e)}")
  120. # 出错时也要确保完成标记被触发
  121. wait_for_daily >> weekly_completed
  122. raise
  123. else:
  124. # 如果不是周一,直接将等待任务与完成标记相连接,跳过处理
  125. logger.info("今天不是周一,跳过周模型处理")
  126. wait_for_daily >> weekly_completed