dag_data_model_monthly.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.operators.empty import EmptyOperator
  4. from airflow.sensors.external_task import ExternalTaskSensor
  5. from datetime import datetime
  6. from utils import get_enabled_tables, is_data_model_table, run_model_script, get_model_dependency_graph, check_table_relationship
  7. from config import NEO4J_CONFIG
  8. import pendulum
  9. import logging
  10. import networkx as nx
  11. # 创建日志记录器
  12. logger = logging.getLogger(__name__)
  13. def generate_optimized_execution_order(table_names: list) -> list:
  14. """
  15. 生成优化的执行顺序,可处理循环依赖
  16. 参数:
  17. table_names: 表名列表
  18. 返回:
  19. list: 优化后的执行顺序列表
  20. """
  21. # 创建依赖图
  22. G = nx.DiGraph()
  23. # 添加所有节点
  24. for table_name in table_names:
  25. G.add_node(table_name)
  26. # 添加依赖边
  27. dependency_dict = get_model_dependency_graph(table_names)
  28. for target, upstreams in dependency_dict.items():
  29. for upstream in upstreams:
  30. if upstream in table_names: # 确保只考虑目标表集合中的表
  31. G.add_edge(upstream, target)
  32. # 检测循环依赖
  33. cycles = list(nx.simple_cycles(G))
  34. if cycles:
  35. logger.warning(f"检测到循环依赖,将尝试打破循环: {cycles}")
  36. # 打破循环依赖(简单策略:移除每个循环中的一条边)
  37. for cycle in cycles:
  38. # 移除循环中的最后一条边
  39. G.remove_edge(cycle[-1], cycle[0])
  40. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  41. # 生成拓扑排序
  42. try:
  43. execution_order = list(nx.topological_sort(G))
  44. return execution_order
  45. except Exception as e:
  46. logger.error(f"生成执行顺序失败: {str(e)}")
  47. # 返回原始列表作为备选
  48. return table_names
  49. def is_first_day():
  50. return True
  51. # 生产环境中应使用实际判断
  52. # return pendulum.now().day == 1
  53. with DAG("dag_data_model_monthly", start_date=datetime(2024, 1, 1), schedule_interval="@daily", catchup=False) as dag:
  54. logger.info("初始化 dag_data_model_monthly DAG")
  55. # 等待周模型 DAG 完成
  56. wait_for_weekly = ExternalTaskSensor(
  57. task_id="wait_for_weekly_model",
  58. external_dag_id="dag_data_model_weekly",
  59. external_task_id="weekly_processing_completed", # 指定完成标记任务
  60. mode="poke",
  61. timeout=3600,
  62. poke_interval=30
  63. )
  64. logger.info("创建周模型等待任务 - wait_for_weekly_model")
  65. # 创建一个完成标记任务,确保即使没有处理任务也能标记DAG完成
  66. monthly_completed = EmptyOperator(
  67. task_id="monthly_processing_completed",
  68. dag=dag
  69. )
  70. logger.info("创建任务完成标记 - monthly_processing_completed")
  71. # 检查今天是否是月初
  72. if is_first_day():
  73. logger.info("今天是月初,开始处理月模型")
  74. # 获取启用的 monthly 模型表
  75. try:
  76. enabled_tables = get_enabled_tables("monthly")
  77. model_tables = [t for t in enabled_tables if is_data_model_table(t['table_name'])]
  78. logger.info(f"获取到 {len(model_tables)} 个启用的 monthly 模型表")
  79. if not model_tables:
  80. # 如果没有模型表需要处理,直接将等待任务与完成标记相连接
  81. logger.info("没有找到需要处理的月模型表,DAG将直接标记为完成")
  82. wait_for_weekly >> monthly_completed
  83. else:
  84. # 获取表名列表
  85. table_names = [t['table_name'] for t in model_tables]
  86. # 特别检查两个表之间的关系
  87. if 'book_sale_amt_yearly' in table_names and 'book_sale_amt_monthly' in table_names:
  88. logger.info("特别检查 book_sale_amt_yearly 和 book_sale_amt_monthly 之间的关系")
  89. relationship = check_table_relationship('book_sale_amt_yearly', 'book_sale_amt_monthly')
  90. logger.info(f"关系检查结果: {relationship}")
  91. # 使用优化函数生成执行顺序,可以处理循环依赖
  92. optimized_table_order = generate_optimized_execution_order(table_names)
  93. logger.info(f"生成优化执行顺序, 共 {len(optimized_table_order)} 个表")
  94. # 获取依赖图 (仍然需要用于设置任务依赖关系)
  95. try:
  96. dependency_graph = get_model_dependency_graph(table_names)
  97. logger.info(f"构建了 {len(dependency_graph)} 个表的依赖关系图")
  98. except Exception as e:
  99. logger.error(f"构建依赖关系图时出错: {str(e)}")
  100. # 出错时也要确保完成标记被触发
  101. wait_for_weekly >> monthly_completed
  102. raise
  103. # 构建 task 对象
  104. task_dict = {}
  105. for table_name in optimized_table_order:
  106. # 获取表的配置信息
  107. table_config = next((t for t in model_tables if t['table_name'] == table_name), None)
  108. if table_config:
  109. try:
  110. task = PythonOperator(
  111. task_id=f"process_monthly_{table_name}",
  112. python_callable=run_model_script,
  113. op_kwargs={"table_name": table_name, "execution_mode": table_config['execution_mode']},
  114. )
  115. task_dict[table_name] = task
  116. logger.info(f"创建模型处理任务: process_monthly_{table_name}")
  117. except Exception as e:
  118. logger.error(f"创建任务 process_monthly_{table_name} 时出错: {str(e)}")
  119. # 出错时也要确保完成标记被触发
  120. wait_for_weekly >> monthly_completed
  121. raise
  122. # 建立任务依赖(基于 DERIVED_FROM 图)
  123. dependency_count = 0
  124. logger.info("开始建立任务依赖关系...")
  125. for target, upstream_list in dependency_graph.items():
  126. logger.info(f"处理目标表 {target} 的依赖关系")
  127. for upstream in upstream_list:
  128. if upstream in task_dict and target in task_dict:
  129. logger.info(f"建立依赖边: {upstream} >> {target}")
  130. task_dict[upstream] >> task_dict[target]
  131. dependency_count += 1
  132. logger.debug(f"建立依赖关系: {upstream} >> {target}")
  133. else:
  134. missing = []
  135. if upstream not in task_dict:
  136. missing.append(f"上游表 {upstream}")
  137. if target not in task_dict:
  138. missing.append(f"目标表 {target}")
  139. missing_str = " 和 ".join(missing)
  140. logger.warning(f"无法建立依赖关系: {upstream} >> {target},缺少任务: {missing_str}")
  141. logger.info(f"总共建立了 {dependency_count} 个任务依赖关系")
  142. logger.info(f"任务字典中的所有表: {list(task_dict.keys())}")
  143. # 最顶层的 task(没有任何上游)需要依赖周模型任务完成
  144. all_upstreams = set()
  145. for upstreams in dependency_graph.values():
  146. all_upstreams.update(upstreams)
  147. top_level_tasks = [t for t in table_names if t not in all_upstreams]
  148. logger.info(f"所有上游表集合: {all_upstreams}")
  149. logger.info(f"识别出的顶层表: {top_level_tasks}")
  150. if top_level_tasks:
  151. logger.info(f"发现 {len(top_level_tasks)} 个顶层任务: {', '.join(top_level_tasks)}")
  152. for name in top_level_tasks:
  153. if name in task_dict:
  154. wait_for_weekly >> task_dict[name]
  155. else:
  156. logger.warning("没有找到顶层任务,请检查依赖关系图是否正确")
  157. # 如果没有顶层任务,直接将等待任务与完成标记相连接
  158. wait_for_weekly >> monthly_completed
  159. # 连接所有末端任务(没有下游任务的)到完成标记
  160. # 找出所有没有下游任务的任务(即终端任务)
  161. terminal_tasks = []
  162. for table_name, task in task_dict.items():
  163. is_terminal = True
  164. for upstream_list in dependency_graph.values():
  165. if table_name in upstream_list:
  166. is_terminal = False
  167. break
  168. if is_terminal:
  169. terminal_tasks.append(task)
  170. logger.debug(f"发现终端任务: {table_name}")
  171. # 如果有终端任务,将它们连接到完成标记
  172. if terminal_tasks:
  173. logger.info(f"连接 {len(terminal_tasks)} 个终端任务到完成标记")
  174. for task in terminal_tasks:
  175. task >> monthly_completed
  176. else:
  177. # 如果没有终端任务(可能是因为存在循环依赖),直接将等待任务与完成标记相连接
  178. logger.warning("没有找到终端任务,直接将等待任务与完成标记相连接")
  179. wait_for_weekly >> monthly_completed
  180. except Exception as e:
  181. logger.error(f"获取 monthly 模型表时出错: {str(e)}")
  182. # 出错时也要确保完成标记被触发
  183. wait_for_weekly >> monthly_completed
  184. raise
  185. else:
  186. # 如果不是月初,直接将等待任务与完成标记相连接,跳过处理
  187. logger.info("今天不是月初,跳过月模型处理")
  188. wait_for_weekly >> monthly_completed