dag_dataops_unified_summary_scheduler.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. # dag_dataops_unified_summary_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import logging
  8. import json
  9. from decimal import Decimal
  10. from common import get_pg_conn, get_today_date
  11. # 创建日志记录器
  12. logger = logging.getLogger(__name__)
  13. # 添加自定义JSON编码器解决Decimal序列化问题
  14. class DecimalEncoder(json.JSONEncoder):
  15. def default(self, obj):
  16. if isinstance(obj, Decimal):
  17. return float(obj)
  18. # 处理日期类型
  19. elif isinstance(obj, datetime):
  20. return obj.isoformat()
  21. # 让父类处理其他类型
  22. return super(DecimalEncoder, self).default(obj)
  23. def get_execution_stats(exec_date):
  24. """获取当日执行统计信息"""
  25. conn = get_pg_conn()
  26. cursor = conn.cursor()
  27. try:
  28. # 查询总任务数
  29. cursor.execute("""
  30. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  31. """, (exec_date,))
  32. result = cursor.fetchone()
  33. total_tasks = result[0] if result else 0
  34. # 查询每种类型的任务数
  35. cursor.execute("""
  36. SELECT target_table_label, COUNT(*)
  37. FROM airflow_dag_schedule
  38. WHERE exec_date = %s
  39. GROUP BY target_table_label
  40. """, (exec_date,))
  41. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  42. # 查询执行结果统计
  43. cursor.execute("""
  44. SELECT COUNT(*)
  45. FROM airflow_dag_schedule
  46. WHERE exec_date = %s AND exec_result IS TRUE
  47. """, (exec_date,))
  48. result = cursor.fetchone()
  49. success_count = result[0] if result else 0
  50. cursor.execute("""
  51. SELECT COUNT(*)
  52. FROM airflow_dag_schedule
  53. WHERE exec_date = %s AND exec_result IS FALSE
  54. """, (exec_date,))
  55. result = cursor.fetchone()
  56. fail_count = result[0] if result else 0
  57. cursor.execute("""
  58. SELECT COUNT(*)
  59. FROM airflow_dag_schedule
  60. WHERE exec_date = %s AND exec_result IS NULL
  61. """, (exec_date,))
  62. result = cursor.fetchone()
  63. pending_count = result[0] if result else 0
  64. # 计算执行时间统计
  65. cursor.execute("""
  66. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  67. FROM airflow_dag_schedule
  68. WHERE exec_date = %s AND exec_duration IS NOT NULL
  69. """, (exec_date,))
  70. time_stats = cursor.fetchone()
  71. # 确保时间统计不为None
  72. if time_stats and time_stats[0] is not None:
  73. avg_duration = float(time_stats[0])
  74. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  75. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  76. else:
  77. avg_duration = None
  78. min_duration = None
  79. max_duration = None
  80. # 查询失败任务详情
  81. cursor.execute("""
  82. SELECT target_table, script_name, target_table_label, exec_duration
  83. FROM airflow_dag_schedule
  84. WHERE exec_date = %s AND exec_result IS FALSE
  85. """, (exec_date,))
  86. failed_tasks = []
  87. for row in cursor.fetchall():
  88. task_dict = {
  89. "target_table": row[0],
  90. "script_name": row[1],
  91. "target_table_label": row[2],
  92. }
  93. if row[3] is not None:
  94. task_dict["exec_duration"] = float(row[3])
  95. else:
  96. task_dict["exec_duration"] = None
  97. failed_tasks.append(task_dict)
  98. # 计算成功率,避免除零错误
  99. success_rate = 0
  100. if total_tasks > 0:
  101. success_rate = (success_count / total_tasks) * 100
  102. # 汇总统计信息
  103. stats = {
  104. "exec_date": exec_date,
  105. "total_tasks": total_tasks,
  106. "type_counts": type_counts,
  107. "success_count": success_count,
  108. "fail_count": fail_count,
  109. "pending_count": pending_count,
  110. "success_rate": success_rate,
  111. "avg_duration": avg_duration,
  112. "min_duration": min_duration,
  113. "max_duration": max_duration,
  114. "failed_tasks": failed_tasks
  115. }
  116. return stats
  117. except Exception as e:
  118. logger.error(f"获取执行统计信息时出错: {str(e)}")
  119. return {}
  120. finally:
  121. cursor.close()
  122. conn.close()
  123. def update_missing_results(exec_date):
  124. """更新缺失的执行结果信息"""
  125. conn = get_pg_conn()
  126. cursor = conn.cursor()
  127. try:
  128. # 查询所有缺失执行结果的任务
  129. cursor.execute("""
  130. SELECT target_table, script_name
  131. FROM airflow_dag_schedule
  132. WHERE exec_date = %s AND exec_result IS NULL
  133. """, (exec_date,))
  134. missing_results = cursor.fetchall()
  135. update_count = 0
  136. for row in missing_results:
  137. target_table, script_name = row
  138. # 如果有开始时间但没有结束时间,假设执行失败
  139. cursor.execute("""
  140. SELECT exec_start_time
  141. FROM airflow_dag_schedule
  142. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  143. """, (exec_date, target_table, script_name))
  144. start_time = cursor.fetchone()
  145. if start_time and start_time[0]:
  146. # 有开始时间但无结果,标记为失败
  147. now = datetime.now()
  148. duration = (now - start_time[0]).total_seconds()
  149. cursor.execute("""
  150. UPDATE airflow_dag_schedule
  151. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  152. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  153. """, (now, duration, exec_date, target_table, script_name))
  154. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  155. update_count += 1
  156. else:
  157. # 没有开始时间且无结果,假设未执行
  158. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  159. conn.commit()
  160. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  161. return update_count
  162. except Exception as e:
  163. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  164. conn.rollback()
  165. return 0
  166. finally:
  167. cursor.close()
  168. conn.close()
  169. def generate_unified_execution_report(exec_date, stats):
  170. """生成统一执行报告"""
  171. # 构建报告
  172. report = []
  173. report.append(f"========== 统一数据运维系统执行报告 ==========")
  174. report.append(f"执行日期: {exec_date}")
  175. report.append(f"总任务数: {stats['total_tasks']}")
  176. # 任务类型分布
  177. report.append("\n--- 任务类型分布 ---")
  178. for label, count in stats.get('type_counts', {}).items():
  179. report.append(f"{label} 任务: {count} 个")
  180. # 执行结果统计
  181. report.append("\n--- 执行结果统计 ---")
  182. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  183. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  184. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  185. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  186. # 执行时间统计
  187. report.append("\n--- 执行时间统计 (秒) ---")
  188. avg_duration = stats.get('avg_duration')
  189. min_duration = stats.get('min_duration')
  190. max_duration = stats.get('max_duration')
  191. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  192. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  193. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  194. # 失败任务详情
  195. failed_tasks = stats.get('failed_tasks', [])
  196. if failed_tasks:
  197. report.append("\n--- 失败任务详情 ---")
  198. for i, task in enumerate(failed_tasks, 1):
  199. report.append(f"{i}. 表名: {task['target_table']}")
  200. report.append(f" 脚本: {task['script_name']}")
  201. report.append(f" 类型: {task['target_table_label']}")
  202. exec_duration = task.get('exec_duration')
  203. if exec_duration is not None:
  204. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  205. else:
  206. report.append(" 执行时间: N/A")
  207. report.append("\n========== 报告结束 ==========")
  208. # 将报告转换为字符串
  209. report_str = "\n".join(report)
  210. # 记录到日志
  211. logger.info("\n" + report_str)
  212. return report_str
  213. def summarize_unified_execution(**kwargs):
  214. """汇总统一执行情况的主函数"""
  215. try:
  216. exec_date = kwargs.get('ds') or get_today_date()
  217. logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
  218. # 1. 更新缺失的执行结果
  219. try:
  220. update_count = update_missing_results(exec_date)
  221. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  222. except Exception as e:
  223. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  224. update_count = 0
  225. # 2. 获取执行统计信息
  226. try:
  227. stats = get_execution_stats(exec_date)
  228. if not stats:
  229. logger.warning("未能获取执行统计信息,将使用默认值")
  230. stats = {
  231. "exec_date": exec_date,
  232. "total_tasks": 0,
  233. "type_counts": {},
  234. "success_count": 0,
  235. "fail_count": 0,
  236. "pending_count": 0,
  237. "success_rate": 0,
  238. "avg_duration": None,
  239. "min_duration": None,
  240. "max_duration": None,
  241. "failed_tasks": []
  242. }
  243. except Exception as e:
  244. logger.error(f"获取执行统计信息时出错: {str(e)}")
  245. stats = {
  246. "exec_date": exec_date,
  247. "total_tasks": 0,
  248. "type_counts": {},
  249. "success_count": 0,
  250. "fail_count": 0,
  251. "pending_count": 0,
  252. "success_rate": 0,
  253. "avg_duration": None,
  254. "min_duration": None,
  255. "max_duration": None,
  256. "failed_tasks": []
  257. }
  258. # 3. 生成执行报告
  259. try:
  260. report = generate_unified_execution_report(exec_date, stats)
  261. except Exception as e:
  262. logger.error(f"生成执行报告时出错: {str(e)}")
  263. report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
  264. # 将报告和统计信息传递给下一个任务
  265. try:
  266. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  267. kwargs['ti'].xcom_push(key='execution_report', value=report)
  268. except Exception as e:
  269. logger.error(f"保存报告到XCom时出错: {str(e)}")
  270. return report
  271. except Exception as e:
  272. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  273. # 返回一个简单的错误报告,确保任务不会失败
  274. return f"执行汇总时出现错误: {str(e)}"
  275. # 创建DAG
  276. with DAG(
  277. "dag_dataops_unified_summary_scheduler",
  278. start_date=datetime(2024, 1, 1),
  279. schedule_interval="@daily",
  280. catchup=False,
  281. default_args={
  282. 'owner': 'airflow',
  283. 'depends_on_past': False,
  284. 'email_on_failure': False,
  285. 'email_on_retry': False,
  286. 'retries': 1,
  287. 'retry_delay': timedelta(minutes=5)
  288. }
  289. ) as dag:
  290. # 等待统一数据处理DAG完成
  291. wait_for_data_processing = ExternalTaskSensor(
  292. task_id="wait_for_data_processing",
  293. external_dag_id="dag_dataops_unified_data_scheduler",
  294. external_task_id="processing_completed",
  295. mode="poke",
  296. timeout=3600,
  297. poke_interval=30,
  298. dag=dag
  299. )
  300. # 汇总执行情况
  301. summarize_task = PythonOperator(
  302. task_id="summarize_unified_execution",
  303. python_callable=summarize_unified_execution,
  304. provide_context=True,
  305. dag=dag
  306. )
  307. # 总结完成标记
  308. summary_completed = EmptyOperator(
  309. task_id="summary_completed",
  310. dag=dag
  311. )
  312. # 设置任务依赖
  313. wait_for_data_processing >> summarize_task >> summary_completed