dag_data_model_scheduler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # dag_data_model_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import pendulum
  8. import logging
  9. import networkx as nx
  10. from utils import (
  11. get_enabled_tables,
  12. is_data_model_table,
  13. run_model_script,
  14. get_model_dependency_graph,
  15. check_script_exists,
  16. get_script_name_from_neo4j
  17. )
  18. from config import TASK_RETRY_CONFIG
  19. # 创建日志记录器
  20. logger = logging.getLogger(__name__)
  21. def get_all_enabled_tables_for_today():
  22. """
  23. 根据当前日期获取所有需要处理的表
  24. 返回:
  25. list: 需要处理的表配置列表
  26. """
  27. today = pendulum.today()
  28. # 原始代码(注释)
  29. # is_monday = today.day_of_week == 0
  30. # is_first_day_of_month = today.day == 1
  31. # is_first_day_of_year = today.month == 1 and today.day == 1
  32. # 测试用:所有条件设为True
  33. is_monday = True
  34. is_first_day_of_month = True
  35. is_first_day_of_year = True
  36. logger.info(f"今日日期: {today.to_date_string()}")
  37. logger.info(f"日期特性: 是否周一={is_monday}, 是否月初={is_first_day_of_month}, 是否年初={is_first_day_of_year}")
  38. all_tables = []
  39. # 每天都处理daily表
  40. daily_tables = get_enabled_tables("daily")
  41. all_tables.extend(daily_tables)
  42. logger.info(f"添加daily表: {len(daily_tables)}个")
  43. # 周一处理weekly表
  44. if is_monday:
  45. weekly_tables = get_enabled_tables("weekly")
  46. all_tables.extend(weekly_tables)
  47. logger.info(f"今天是周一,添加weekly表: {len(weekly_tables)}个")
  48. # 月初处理monthly表
  49. if is_first_day_of_month:
  50. monthly_tables = get_enabled_tables("monthly")
  51. all_tables.extend(monthly_tables)
  52. logger.info(f"今天是月初,添加monthly表: {len(monthly_tables)}个")
  53. # 年初处理yearly表
  54. if is_first_day_of_year:
  55. yearly_tables = get_enabled_tables("yearly")
  56. all_tables.extend(yearly_tables)
  57. logger.info(f"今天是年初,添加yearly表: {len(yearly_tables)}个")
  58. # 去重
  59. unique_tables = {}
  60. for item in all_tables:
  61. table_name = item["table_name"]
  62. if table_name not in unique_tables:
  63. unique_tables[table_name] = item
  64. else:
  65. # 如果存在重复,优先保留execution_mode为full_refresh的配置
  66. if item["execution_mode"] == "full_refresh":
  67. unique_tables[table_name] = item
  68. result_tables = list(unique_tables.values())
  69. logger.info(f"去重后,共 {len(result_tables)} 个表需要处理")
  70. # 记录所有需要处理的表
  71. for idx, item in enumerate(result_tables, 1):
  72. logger.info(f"表[{idx}]: {item['table_name']}, 执行模式: {item['execution_mode']}")
  73. return result_tables
  74. def optimize_execution_plan(tables):
  75. """
  76. 优化表的执行计划
  77. 参数:
  78. tables (list): 表配置列表
  79. 返回:
  80. tuple: (优化后的表执行顺序, 依赖关系图)
  81. """
  82. logger.info("开始优化执行计划...")
  83. # 筛选出DataModel类型的表
  84. model_tables = []
  85. for table in tables:
  86. table_name = table["table_name"]
  87. if is_data_model_table(table_name):
  88. model_tables.append(table)
  89. logger.info(f"筛选出 {len(model_tables)} 个DataModel类型的表")
  90. if not model_tables:
  91. logger.warning("没有找到DataModel类型的表,无需优化执行计划")
  92. return [], {}
  93. # 获取表名列表
  94. table_names = [t["table_name"] for t in model_tables]
  95. # 创建有向图
  96. G = nx.DiGraph()
  97. # 添加所有节点
  98. for table_name in table_names:
  99. G.add_node(table_name)
  100. # 获取依赖关系
  101. dependency_dict = get_model_dependency_graph(table_names)
  102. logger.info(f"获取到 {len(dependency_dict)} 个表的依赖关系")
  103. # 添加依赖边
  104. edge_count = 0
  105. for target, upstreams in dependency_dict.items():
  106. for upstream in upstreams:
  107. if upstream in table_names: # 确保只考虑当前处理的表
  108. G.add_edge(upstream, target) # 从上游指向下游
  109. edge_count += 1
  110. logger.info(f"依赖图中添加了 {edge_count} 条边")
  111. # 检测循环依赖
  112. cycles = list(nx.simple_cycles(G))
  113. if cycles:
  114. logger.warning(f"检测到 {len(cycles)} 个循环依赖,将尝试打破循环")
  115. # 打破循环依赖(简单策略:移除每个循环中的最后一条边)
  116. for cycle in cycles:
  117. # 移除循环中的最后一条边
  118. G.remove_edge(cycle[-1], cycle[0])
  119. logger.info(f"打破循环依赖: 移除 {cycle[-1]} -> {cycle[0]} 的依赖")
  120. # 生成拓扑排序
  121. try:
  122. # 拓扑排序会从没有入边的节点开始,这些节点是上游节点,因此排序结果是从上游到下游
  123. execution_order = list(nx.topological_sort(G))
  124. logger.info(f"成功生成执行顺序,按从上游到下游顺序共 {len(execution_order)} 个表")
  125. # 创建结果依赖字典,包含所有表(即使没有依赖)
  126. result_dependency_dict = {name: [] for name in table_names}
  127. # 添加实际依赖关系
  128. for target, upstreams in dependency_dict.items():
  129. if target in table_names: # 确保只考虑当前处理的表
  130. result_dependency_dict[target] = [u for u in upstreams if u in table_names]
  131. return execution_order, result_dependency_dict
  132. except Exception as e:
  133. logger.error(f"生成执行顺序失败: {str(e)}")
  134. # 如果拓扑排序失败,返回原始表名列表和空依赖图
  135. return table_names, {name: [] for name in table_names}
  136. with DAG(
  137. "dag_data_model_scheduler",
  138. start_date=datetime(2024, 1, 1),
  139. schedule_interval="@daily",
  140. catchup=False
  141. ) as dag:
  142. logger.info("初始化 dag_data_model_scheduler DAG")
  143. # 等待资源表 DAG 完成
  144. wait_for_resource = ExternalTaskSensor(
  145. task_id="wait_for_resource_loading",
  146. external_dag_id="dag_data_resource_scheduler",
  147. external_task_id="resource_loading_completed",
  148. mode="poke",
  149. timeout=3600,
  150. poke_interval=30
  151. )
  152. logger.info("创建资源表等待任务 - wait_for_resource_loading")
  153. # 创建一个完成标记任务
  154. model_processing_completed = EmptyOperator(
  155. task_id="model_processing_completed",
  156. dag=dag
  157. )
  158. logger.info("创建模型处理完成标记 - model_processing_completed")
  159. try:
  160. # 获取今日需要处理的所有表
  161. all_enabled_tables = get_all_enabled_tables_for_today()
  162. if not all_enabled_tables:
  163. logger.info("今天没有需要处理的表,直接连接开始和结束任务")
  164. wait_for_resource >> model_processing_completed
  165. else:
  166. # 优化执行计划
  167. execution_order, dependency_dict = optimize_execution_plan(all_enabled_tables)
  168. if not execution_order:
  169. logger.info("执行计划为空,直接连接开始和结束任务")
  170. wait_for_resource >> model_processing_completed
  171. else:
  172. # 创建任务字典
  173. task_dict = {}
  174. # 为每个表创建处理任务
  175. for table_name in execution_order:
  176. # 查找表配置
  177. table_config = next((t for t in all_enabled_tables if t["table_name"] == table_name), None)
  178. if table_config:
  179. logger.info(f"为表 {table_name} 创建处理任务,执行模式: {table_config['execution_mode']}")
  180. # 创建任务
  181. task = PythonOperator(
  182. task_id=f"process_{table_name}",
  183. python_callable=run_model_script,
  184. op_kwargs={
  185. "table_name": table_name,
  186. "execution_mode": table_config["execution_mode"]
  187. },
  188. retries=TASK_RETRY_CONFIG["retries"],
  189. retry_delay=timedelta(minutes=TASK_RETRY_CONFIG["retry_delay_minutes"]),
  190. dag=dag
  191. )
  192. # 将任务添加到字典
  193. task_dict[table_name] = task
  194. # 设置任务间的依赖关系
  195. for table_name, task in task_dict.items():
  196. # 获取上游依赖
  197. upstream_tables = dependency_dict.get(table_name, [])
  198. if not upstream_tables:
  199. # 如果没有上游依赖,直接连接到资源表等待任务
  200. logger.info(f"表 {table_name} 没有上游依赖,连接到资源表等待任务")
  201. wait_for_resource >> task
  202. else:
  203. # 设置与上游表的依赖关系
  204. for upstream_table in upstream_tables:
  205. if upstream_table in task_dict:
  206. logger.info(f"设置依赖: {upstream_table} >> {table_name}")
  207. task_dict[upstream_table] >> task
  208. # 检查是否是末端节点(没有下游节点)
  209. is_terminal = True
  210. for target, upstreams in dependency_dict.items():
  211. if table_name in upstreams:
  212. is_terminal = False
  213. break
  214. # 如果是末端节点,连接到模型处理完成标记
  215. if is_terminal:
  216. logger.info(f"表 {table_name} 是末端节点,连接到模型处理完成标记")
  217. task >> model_processing_completed
  218. # 处理特殊情况:检查是否有任务连接到完成标记
  219. has_connection_to_completed = False
  220. for task in task_dict.values():
  221. for downstream in task.downstream_list:
  222. if downstream.task_id == model_processing_completed.task_id:
  223. has_connection_to_completed = True
  224. break
  225. # 如果没有任务连接到完成标记,连接所有任务到完成标记
  226. if not has_connection_to_completed and task_dict:
  227. logger.info("没有发现连接到完成标记的任务,连接所有任务到完成标记")
  228. for task in task_dict.values():
  229. task >> model_processing_completed
  230. # 处理特殊情况:如果资源等待任务没有下游任务,直接连接到完成标记
  231. if not wait_for_resource.downstream_list:
  232. logger.info("资源等待任务没有下游任务,直接连接到完成标记")
  233. wait_for_resource >> model_processing_completed
  234. except Exception as e:
  235. logger.error(f"构建DAG时出错: {str(e)}")
  236. import traceback
  237. logger.error(traceback.format_exc())
  238. # 确保出错时也有完整的执行流
  239. wait_for_resource >> model_processing_completed