dag_dataops_resource_scheduler.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # dag_dataops_resource_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import logging
  8. from common import (
  9. get_pg_conn, execute_with_monitoring, get_today_date
  10. )
  11. # 创建日志记录器
  12. logger = logging.getLogger(__name__)
  13. # 添加获取最近日期的函数
  14. def get_latest_date_with_resources():
  15. """
  16. 获取数据库中包含DataResource记录的最近日期
  17. 用于查找数据库中最近的日期,以确保能够获取到数据
  18. """
  19. conn = get_pg_conn()
  20. cursor = conn.cursor()
  21. try:
  22. cursor.execute("""
  23. SELECT DISTINCT exec_date
  24. FROM airflow_dag_schedule
  25. WHERE target_table_label = 'DataResource'
  26. ORDER BY exec_date DESC
  27. LIMIT 1
  28. """)
  29. result = cursor.fetchone()
  30. if result:
  31. latest_date = result[0]
  32. logger.info(f"找到最近的包含DataResource记录的日期: {latest_date}")
  33. return latest_date
  34. else:
  35. logger.warning("未找到包含DataResource记录的日期,将使用当前日期")
  36. return get_today_date()
  37. except Exception as e:
  38. logger.error(f"查找最近日期时出错: {str(e)}")
  39. return get_today_date()
  40. finally:
  41. cursor.close()
  42. conn.close()
  43. def get_dataresource_tasks(exec_date):
  44. """从airflow_dag_schedule表获取DataResource任务"""
  45. conn = get_pg_conn()
  46. cursor = conn.cursor()
  47. try:
  48. # 构建SQL查询
  49. sql = """
  50. SELECT source_table, target_table, script_name, script_exec_mode
  51. FROM airflow_dag_schedule
  52. WHERE exec_date = %s AND target_table_label = 'DataResource'
  53. """
  54. # 记录查询信息
  55. logger.info(f"查询资源任务,使用日期: {exec_date}")
  56. # 执行查询
  57. cursor.execute(sql, (exec_date,))
  58. results = cursor.fetchall()
  59. logger.info(f"使用日期 {exec_date} 查询到 {len(results)} 个DataResource任务")
  60. # 处理去重
  61. unique_tasks = {}
  62. for row in results:
  63. source_table, target_table, script_name, script_exec_mode = row
  64. # 使用目标表名作为键进行去重
  65. if target_table not in unique_tasks:
  66. unique_tasks[target_table] = {
  67. "source_table": source_table,
  68. "target_table": target_table,
  69. "script_name": script_name,
  70. "script_exec_mode": script_exec_mode or "append" # 默认值
  71. }
  72. logger.info(f"获取到 {len(results)} 个DataResource任务,去重后剩余 {len(unique_tasks)} 个")
  73. return list(unique_tasks.values())
  74. except Exception as e:
  75. logger.error(f"获取DataResource任务时出错: {str(e)}")
  76. return []
  77. finally:
  78. cursor.close()
  79. conn.close()
  80. def process_resource(target_table, script_name, script_exec_mode, **kwargs):
  81. """处理单个资源表的函数"""
  82. exec_date = kwargs.get('ds')
  83. logger.info(f"开始处理资源表: {target_table}, 脚本: {script_name}")
  84. try:
  85. # 调用执行函数
  86. result = execute_with_monitoring(
  87. target_table=target_table,
  88. script_name=script_name,
  89. script_exec_mode=script_exec_mode,
  90. exec_date=exec_date
  91. )
  92. logger.info(f"资源表 {target_table} 处理完成")
  93. return result
  94. except Exception as e:
  95. logger.error(f"处理资源表 {target_table} 时出错: {str(e)}")
  96. raise
  97. def generate_no_task_message(**kwargs):
  98. """当没有任务时执行的函数"""
  99. logger.info("没有资源需要处理")
  100. return "没有资源需要处理"
  101. # 创建DAG
  102. with DAG(
  103. "dag_dataops_resource_scheduler",
  104. start_date=datetime(2024, 1, 1),
  105. schedule_interval="@daily",
  106. catchup=False,
  107. default_args={
  108. 'owner': 'airflow',
  109. 'depends_on_past': False,
  110. 'email_on_failure': False,
  111. 'email_on_retry': False,
  112. 'retries': 1,
  113. 'retry_delay': timedelta(minutes=5)
  114. }
  115. ) as dag:
  116. # 等待prepare DAG完成
  117. wait_for_prepare = ExternalTaskSensor(
  118. task_id="wait_for_prepare",
  119. external_dag_id="dag_dataops_prepare_scheduler",
  120. external_task_id="preparation_completed",
  121. mode="poke",
  122. timeout=3600,
  123. poke_interval=30,
  124. dag=dag
  125. )
  126. # 处理完成标记
  127. resource_processing_completed = EmptyOperator(
  128. task_id="resource_processing_completed",
  129. dag=dag
  130. )
  131. # 在DAG运行时获取最近日期和资源任务
  132. latest_date = get_latest_date_with_resources()
  133. logger.info(f"使用最近的日期 {latest_date} 查询资源任务")
  134. # 获取资源任务
  135. resource_tasks = get_dataresource_tasks(latest_date)
  136. if resource_tasks:
  137. for i, task_info in enumerate(resource_tasks):
  138. target_table = task_info["target_table"]
  139. script_name = task_info["script_name"]
  140. script_exec_mode = task_info["script_exec_mode"]
  141. if not script_name:
  142. logger.warning(f"资源表 {target_table} 没有关联脚本,跳过")
  143. continue
  144. # 为每个资源表创建单独的处理任务
  145. task_id = f"process_resource_{target_table.replace('.', '_')}"
  146. process_task = PythonOperator(
  147. task_id=task_id,
  148. python_callable=process_resource,
  149. op_kwargs={
  150. "target_table": target_table,
  151. "script_name": script_name,
  152. "script_exec_mode": script_exec_mode
  153. },
  154. provide_context=True,
  155. dag=dag
  156. )
  157. # 设置依赖 - 直接从wait_for_prepare连接到处理任务
  158. wait_for_prepare >> process_task >> resource_processing_completed
  159. else:
  160. # 如果没有任务,添加一个空任务
  161. empty_task = PythonOperator(
  162. task_id="no_resources_to_process",
  163. python_callable=generate_no_task_message,
  164. dag=dag
  165. )
  166. wait_for_prepare >> empty_task >> resource_processing_completed