dag_dataops_summary_scheduler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # dag_dataops_summary_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import logging
  8. import json
  9. from decimal import Decimal
  10. from common import get_pg_conn, get_today_date
  11. # 创建日志记录器
  12. logger = logging.getLogger(__name__)
  13. # 添加自定义JSON编码器解决Decimal序列化问题
  14. class DecimalEncoder(json.JSONEncoder):
  15. def default(self, obj):
  16. if isinstance(obj, Decimal):
  17. return float(obj)
  18. # 处理日期类型
  19. elif isinstance(obj, datetime):
  20. return obj.isoformat()
  21. # 让父类处理其他类型
  22. return super(DecimalEncoder, self).default(obj)
  23. def get_execution_stats(exec_date):
  24. """获取当日执行统计信息"""
  25. conn = get_pg_conn()
  26. cursor = conn.cursor()
  27. try:
  28. # 查询总任务数
  29. cursor.execute("""
  30. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  31. """, (exec_date,))
  32. total_tasks = cursor.fetchone()[0]
  33. # 查询每种类型的任务数
  34. cursor.execute("""
  35. SELECT target_table_label, COUNT(*)
  36. FROM airflow_dag_schedule
  37. WHERE exec_date = %s
  38. GROUP BY target_table_label
  39. """, (exec_date,))
  40. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  41. # 查询执行结果统计
  42. cursor.execute("""
  43. SELECT COUNT(*)
  44. FROM airflow_dag_schedule
  45. WHERE exec_date = %s AND exec_result IS TRUE
  46. """, (exec_date,))
  47. success_count = cursor.fetchone()[0]
  48. cursor.execute("""
  49. SELECT COUNT(*)
  50. FROM airflow_dag_schedule
  51. WHERE exec_date = %s AND exec_result IS FALSE
  52. """, (exec_date,))
  53. fail_count = cursor.fetchone()[0]
  54. cursor.execute("""
  55. SELECT COUNT(*)
  56. FROM airflow_dag_schedule
  57. WHERE exec_date = %s AND exec_result IS NULL
  58. """, (exec_date,))
  59. pending_count = cursor.fetchone()[0]
  60. # 计算执行时间统计
  61. cursor.execute("""
  62. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  63. FROM airflow_dag_schedule
  64. WHERE exec_date = %s AND exec_duration IS NOT NULL
  65. """, (exec_date,))
  66. time_stats = cursor.fetchone()
  67. avg_duration, min_duration, max_duration = time_stats if time_stats else (None, None, None)
  68. # 将Decimal转换为float
  69. if avg_duration is not None:
  70. avg_duration = float(avg_duration)
  71. if min_duration is not None:
  72. min_duration = float(min_duration)
  73. if max_duration is not None:
  74. max_duration = float(max_duration)
  75. # 查询失败任务详情
  76. cursor.execute("""
  77. SELECT target_table, script_name, target_table_label, exec_duration
  78. FROM airflow_dag_schedule
  79. WHERE exec_date = %s AND exec_result IS FALSE
  80. """, (exec_date,))
  81. failed_tasks = [
  82. {
  83. "target_table": row[0],
  84. "script_name": row[1],
  85. "target_table_label": row[2],
  86. "exec_duration": float(row[3]) if row[3] is not None else None
  87. }
  88. for row in cursor.fetchall()
  89. ]
  90. # 汇总统计信息
  91. stats = {
  92. "exec_date": exec_date,
  93. "total_tasks": total_tasks,
  94. "type_counts": type_counts,
  95. "success_count": success_count,
  96. "fail_count": fail_count,
  97. "pending_count": pending_count,
  98. "success_rate": (success_count / total_tasks * 100) if total_tasks > 0 else 0,
  99. "avg_duration": avg_duration,
  100. "min_duration": min_duration,
  101. "max_duration": max_duration,
  102. "failed_tasks": failed_tasks
  103. }
  104. return stats
  105. except Exception as e:
  106. logger.error(f"获取执行统计信息时出错: {str(e)}")
  107. return {}
  108. finally:
  109. cursor.close()
  110. conn.close()
  111. def update_missing_results(exec_date):
  112. """更新缺失的执行结果信息"""
  113. conn = get_pg_conn()
  114. cursor = conn.cursor()
  115. try:
  116. # 查询所有缺失执行结果的任务
  117. cursor.execute("""
  118. SELECT target_table, script_name
  119. FROM airflow_dag_schedule
  120. WHERE exec_date = %s AND exec_result IS NULL
  121. """, (exec_date,))
  122. missing_results = cursor.fetchall()
  123. update_count = 0
  124. for row in missing_results:
  125. target_table, script_name = row
  126. # 如果有开始时间但没有结束时间,假设执行失败
  127. cursor.execute("""
  128. SELECT exec_start_time
  129. FROM airflow_dag_schedule
  130. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  131. """, (exec_date, target_table, script_name))
  132. start_time = cursor.fetchone()
  133. if start_time and start_time[0]:
  134. # 有开始时间但无结果,标记为失败
  135. now = datetime.now()
  136. duration = (now - start_time[0]).total_seconds()
  137. cursor.execute("""
  138. UPDATE airflow_dag_schedule
  139. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  140. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  141. """, (now, duration, exec_date, target_table, script_name))
  142. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  143. update_count += 1
  144. else:
  145. # 没有开始时间且无结果,假设未执行
  146. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  147. conn.commit()
  148. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  149. return update_count
  150. except Exception as e:
  151. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  152. conn.rollback()
  153. return 0
  154. finally:
  155. cursor.close()
  156. conn.close()
  157. def generate_execution_report(exec_date, stats):
  158. """生成执行报告"""
  159. # 构建报告
  160. report = []
  161. report.append(f"========== 数据运维系统执行报告 ==========")
  162. report.append(f"执行日期: {exec_date}")
  163. report.append(f"总任务数: {stats['total_tasks']}")
  164. # 任务类型分布
  165. report.append("\n--- 任务类型分布 ---")
  166. for label, count in stats.get('type_counts', {}).items():
  167. report.append(f"{label} 任务: {count} 个")
  168. # 执行结果统计
  169. report.append("\n--- 执行结果统计 ---")
  170. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  171. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  172. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  173. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  174. # 执行时间统计
  175. report.append("\n--- 执行时间统计 (秒) ---")
  176. report.append(f"平均执行时间: {stats.get('avg_duration', 0):.2f}")
  177. report.append(f"最短执行时间: {stats.get('min_duration', 0):.2f}")
  178. report.append(f"最长执行时间: {stats.get('max_duration', 0):.2f}")
  179. # 失败任务详情
  180. failed_tasks = stats.get('failed_tasks', [])
  181. if failed_tasks:
  182. report.append("\n--- 失败任务详情 ---")
  183. for i, task in enumerate(failed_tasks, 1):
  184. report.append(f"{i}. 表名: {task['target_table']}")
  185. report.append(f" 脚本: {task['script_name']}")
  186. report.append(f" 类型: {task['target_table_label']}")
  187. report.append(f" 执行时间: {task.get('exec_duration', 'N/A'):.2f} 秒")
  188. report.append("\n========== 报告结束 ==========")
  189. # 将报告转换为字符串
  190. report_str = "\n".join(report)
  191. # 记录到日志
  192. logger.info("\n" + report_str)
  193. return report_str
  194. def summarize_execution(**kwargs):
  195. """汇总执行情况的主函数"""
  196. exec_date = kwargs.get('ds') or get_today_date()
  197. logger.info(f"开始汇总执行日期 {exec_date} 的执行情况")
  198. # 1. 更新缺失的执行结果
  199. update_count = update_missing_results(exec_date)
  200. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  201. # 2. 获取执行统计信息
  202. stats = get_execution_stats(exec_date)
  203. # 3. 生成执行报告
  204. report = generate_execution_report(exec_date, stats)
  205. # 将报告和统计信息传递给下一个任务
  206. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  207. kwargs['ti'].xcom_push(key='execution_report', value=report)
  208. return report
  209. # 创建DAG
  210. with DAG(
  211. "dag_dataops_summary_scheduler",
  212. start_date=datetime(2024, 1, 1),
  213. schedule_interval="@daily",
  214. catchup=False,
  215. default_args={
  216. 'owner': 'airflow',
  217. 'depends_on_past': False,
  218. 'email_on_failure': False,
  219. 'email_on_retry': False,
  220. 'retries': 1,
  221. 'retry_delay': timedelta(minutes=5)
  222. }
  223. ) as dag:
  224. # 等待model DAG完成
  225. wait_for_model = ExternalTaskSensor(
  226. task_id="wait_for_model",
  227. external_dag_id="dag_dataops_model_scheduler",
  228. external_task_id="model_processing_completed",
  229. mode="poke",
  230. timeout=3600,
  231. poke_interval=30,
  232. dag=dag
  233. )
  234. # 汇总执行情况
  235. summarize_task = PythonOperator(
  236. task_id="summarize_execution",
  237. python_callable=summarize_execution,
  238. provide_context=True,
  239. dag=dag
  240. )
  241. # 总结完成标记
  242. summary_completed = EmptyOperator(
  243. task_id="summary_completed",
  244. dag=dag
  245. )
  246. # 设置任务依赖
  247. wait_for_model >> summarize_task >> summary_completed