dag_dataops_model_scheduler.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # dag_dataops_model_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import logging
  8. from common import (
  9. get_pg_conn, execute_with_monitoring, get_datamodel_dependency_from_neo4j,
  10. generate_optimized_execution_order, get_today_date
  11. )
  12. # 创建日志记录器
  13. logger = logging.getLogger(__name__)
  14. def get_latest_date_with_models():
  15. """
  16. 获取数据库中包含DataModel记录的最近日期
  17. 用于查找数据库中最近的日期,以确保能够获取到数据
  18. """
  19. conn = get_pg_conn()
  20. cursor = conn.cursor()
  21. try:
  22. cursor.execute("""
  23. SELECT DISTINCT exec_date
  24. FROM airflow_dag_schedule
  25. WHERE target_table_label = 'DataModel'
  26. ORDER BY exec_date DESC
  27. LIMIT 1
  28. """)
  29. result = cursor.fetchone()
  30. if result:
  31. latest_date = result[0]
  32. logger.info(f"找到最近的包含DataModel记录的日期: {latest_date}")
  33. return latest_date
  34. else:
  35. logger.warning("未找到包含DataModel记录的日期,将使用当前日期")
  36. return get_today_date()
  37. except Exception as e:
  38. logger.error(f"查找最近日期时出错: {str(e)}")
  39. return get_today_date()
  40. finally:
  41. cursor.close()
  42. conn.close()
  43. def get_datamodel_tasks(exec_date):
  44. """从airflow_dag_schedule表获取DataModel任务"""
  45. conn = get_pg_conn()
  46. cursor = conn.cursor()
  47. try:
  48. cursor.execute("""
  49. SELECT target_table, script_name, script_exec_mode
  50. FROM airflow_dag_schedule
  51. WHERE exec_date = %s AND target_table_label = 'DataModel'
  52. """, (exec_date,))
  53. results = cursor.fetchall()
  54. tasks = []
  55. for row in results:
  56. target_table, script_name, script_exec_mode = row
  57. tasks.append({
  58. "target_table": target_table,
  59. "script_name": script_name,
  60. "script_exec_mode": script_exec_mode or "append" # 默认为append
  61. })
  62. logger.info(f"使用日期 {exec_date} 获取到 {len(tasks)} 个DataModel任务")
  63. return tasks
  64. except Exception as e:
  65. logger.error(f"获取DataModel任务时出错: {str(e)}")
  66. return []
  67. finally:
  68. cursor.close()
  69. conn.close()
  70. # 创建DAG
  71. with DAG(
  72. "dag_dataops_model_scheduler",
  73. start_date=datetime(2024, 1, 1),
  74. schedule_interval="@daily",
  75. catchup=False,
  76. default_args={
  77. 'owner': 'airflow',
  78. 'depends_on_past': False,
  79. 'email_on_failure': False,
  80. 'email_on_retry': False,
  81. 'retries': 1,
  82. 'retry_delay': timedelta(minutes=5)
  83. }
  84. ) as dag:
  85. # 等待resource DAG完成
  86. wait_for_resource = ExternalTaskSensor(
  87. task_id="wait_for_resource",
  88. external_dag_id="dag_dataops_resource_scheduler",
  89. external_task_id="resource_processing_completed",
  90. mode="poke",
  91. timeout=3600,
  92. poke_interval=30,
  93. dag=dag
  94. )
  95. # 处理完成标记
  96. model_processing_completed = EmptyOperator(
  97. task_id="model_processing_completed",
  98. dag=dag
  99. )
  100. try:
  101. # 获取最近的日期
  102. latest_date = get_latest_date_with_models()
  103. logger.info(f"使用最近的日期 {latest_date} 查询模型任务")
  104. # 获取所有DataModel任务
  105. model_tasks = get_datamodel_tasks(latest_date)
  106. if model_tasks:
  107. # 获取表名列表
  108. table_names = [task["target_table"] for task in model_tasks]
  109. # 获取依赖关系
  110. dependency_dict = get_datamodel_dependency_from_neo4j(table_names)
  111. # 生成优化的执行顺序
  112. execution_order = generate_optimized_execution_order(table_names, dependency_dict)
  113. logger.info(f"生成的优化执行顺序: {execution_order}")
  114. # 创建任务字典
  115. task_dict = {}
  116. # 为每个表创建处理任务
  117. for table_name in execution_order:
  118. # 查找表任务信息
  119. task_info = next((t for t in model_tasks if t["target_table"] == table_name), None)
  120. if task_info and task_info.get("script_name"):
  121. process_task = PythonOperator(
  122. task_id=f"process_model_{table_name.replace('.', '_')}",
  123. python_callable=execute_with_monitoring,
  124. op_kwargs={
  125. "target_table": table_name,
  126. "script_name": task_info["script_name"],
  127. "script_exec_mode": task_info.get("script_exec_mode", "append"),
  128. "exec_date": latest_date # 使用从数据库获取的最近日期
  129. },
  130. dag=dag
  131. )
  132. task_dict[table_name] = process_task
  133. logger.info(f"创建处理任务: {table_name}")
  134. else:
  135. logger.warning(f"表 {table_name} 没有script_name,跳过任务创建")
  136. # 设置任务间的依赖关系
  137. for target_table, task in task_dict.items():
  138. # 获取上游依赖
  139. upstream_tables = dependency_dict.get(target_table, [])
  140. upstream_tables = [t for t in upstream_tables if t in task_dict]
  141. if not upstream_tables:
  142. # 如果没有上游依赖,直接连接到wait_for_resource
  143. logger.info(f"表 {target_table} 没有上游依赖,连接到wait_for_resource")
  144. wait_for_resource >> task
  145. else:
  146. # 设置与上游表的依赖关系
  147. for upstream_table in upstream_tables:
  148. logger.info(f"设置依赖: {upstream_table} >> {target_table}")
  149. task_dict[upstream_table] >> task
  150. # 检查是否是末端节点(没有下游任务)
  151. is_terminal = True
  152. for downstream, upstreams in dependency_dict.items():
  153. if target_table in upstreams and downstream in task_dict:
  154. is_terminal = False
  155. break
  156. # 如果是末端节点,连接到model_processing_completed
  157. if is_terminal:
  158. logger.info(f"表 {target_table} 是末端节点,连接到model_processing_completed")
  159. task >> model_processing_completed
  160. # 处理特殊情况
  161. # 检查是否有任务连接到model_processing_completed
  162. has_connection_to_completed = False
  163. for task in task_dict.values():
  164. for downstream in task.downstream_list:
  165. if downstream.task_id == model_processing_completed.task_id:
  166. has_connection_to_completed = True
  167. break
  168. if has_connection_to_completed:
  169. break
  170. # 如果没有任务连接到model_processing_completed,连接所有任务到完成标记
  171. if not has_connection_to_completed and task_dict:
  172. logger.info("没有任务连接到model_processing_completed,连接所有任务到完成标记")
  173. for task in task_dict.values():
  174. task >> model_processing_completed
  175. # 检查是否有任务连接到wait_for_resource
  176. has_connection_from_wait = False
  177. for task in task_dict.values():
  178. for upstream in task.upstream_list:
  179. if upstream.task_id == wait_for_resource.task_id:
  180. has_connection_from_wait = True
  181. break
  182. if has_connection_from_wait:
  183. break
  184. # 如果没有任务连接到wait_for_resource,连接wait_for_resource到所有任务
  185. if not has_connection_from_wait and task_dict:
  186. logger.info("没有任务连接到wait_for_resource,连接wait_for_resource到所有任务")
  187. for task in task_dict.values():
  188. wait_for_resource >> task
  189. else:
  190. # 如果没有任务,直接将等待节点连接到完成
  191. wait_for_resource >> model_processing_completed
  192. logger.warning("没有找到DataModel任务,直接将等待节点连接到完成")
  193. except Exception as e:
  194. logger.error(f"创建模型处理DAG时出错: {str(e)}")
  195. import traceback
  196. logger.error(traceback.format_exc())
  197. # 确保在出错时也有完整的执行流程
  198. wait_for_resource >> model_processing_completed