dag_dataops_unified_summary_scheduler.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # dag_dataops_unified_summary_scheduler.py
  2. from airflow import DAG
  3. from airflow.operators.python import PythonOperator
  4. from airflow.operators.empty import EmptyOperator
  5. from airflow.sensors.external_task import ExternalTaskSensor
  6. from datetime import datetime, timedelta
  7. import logging
  8. import json
  9. from decimal import Decimal
  10. from common import get_pg_conn, get_today_date
  11. from airflow.models import Variable
  12. # 创建日志记录器
  13. logger = logging.getLogger(__name__)
  14. # 添加自定义JSON编码器解决Decimal序列化问题
  15. class DecimalEncoder(json.JSONEncoder):
  16. def default(self, obj):
  17. if isinstance(obj, Decimal):
  18. return float(obj)
  19. # 处理日期类型
  20. elif isinstance(obj, datetime):
  21. return obj.isoformat()
  22. # 让父类处理其他类型
  23. return super(DecimalEncoder, self).default(obj)
  24. def get_execution_stats(exec_date):
  25. """获取当日执行统计信息"""
  26. conn = get_pg_conn()
  27. cursor = conn.cursor()
  28. try:
  29. # 查询总任务数
  30. cursor.execute("""
  31. SELECT COUNT(*) FROM airflow_dag_schedule WHERE exec_date = %s
  32. """, (exec_date,))
  33. result = cursor.fetchone()
  34. total_tasks = result[0] if result else 0
  35. # 查询每种类型的任务数
  36. cursor.execute("""
  37. SELECT target_table_label, COUNT(*)
  38. FROM airflow_dag_schedule
  39. WHERE exec_date = %s
  40. GROUP BY target_table_label
  41. """, (exec_date,))
  42. type_counts = {row[0]: row[1] for row in cursor.fetchall()}
  43. # 查询执行结果统计
  44. cursor.execute("""
  45. SELECT COUNT(*)
  46. FROM airflow_dag_schedule
  47. WHERE exec_date = %s AND exec_result IS TRUE
  48. """, (exec_date,))
  49. result = cursor.fetchone()
  50. success_count = result[0] if result else 0
  51. cursor.execute("""
  52. SELECT COUNT(*)
  53. FROM airflow_dag_schedule
  54. WHERE exec_date = %s AND exec_result IS FALSE
  55. """, (exec_date,))
  56. result = cursor.fetchone()
  57. fail_count = result[0] if result else 0
  58. cursor.execute("""
  59. SELECT COUNT(*)
  60. FROM airflow_dag_schedule
  61. WHERE exec_date = %s AND exec_result IS NULL
  62. """, (exec_date,))
  63. result = cursor.fetchone()
  64. pending_count = result[0] if result else 0
  65. # 计算执行时间统计
  66. cursor.execute("""
  67. SELECT AVG(exec_duration), MIN(exec_duration), MAX(exec_duration)
  68. FROM airflow_dag_schedule
  69. WHERE exec_date = %s AND exec_duration IS NOT NULL
  70. """, (exec_date,))
  71. time_stats = cursor.fetchone()
  72. # 确保时间统计不为None
  73. if time_stats and time_stats[0] is not None:
  74. avg_duration = float(time_stats[0])
  75. min_duration = float(time_stats[1]) if time_stats[1] is not None else None
  76. max_duration = float(time_stats[2]) if time_stats[2] is not None else None
  77. else:
  78. avg_duration = None
  79. min_duration = None
  80. max_duration = None
  81. # 查询失败任务详情
  82. cursor.execute("""
  83. SELECT target_table, script_name, target_table_label, exec_duration
  84. FROM airflow_dag_schedule
  85. WHERE exec_date = %s AND exec_result IS FALSE
  86. """, (exec_date,))
  87. failed_tasks = []
  88. for row in cursor.fetchall():
  89. task_dict = {
  90. "target_table": row[0],
  91. "script_name": row[1],
  92. "target_table_label": row[2],
  93. }
  94. if row[3] is not None:
  95. task_dict["exec_duration"] = float(row[3])
  96. else:
  97. task_dict["exec_duration"] = None
  98. failed_tasks.append(task_dict)
  99. # 计算成功率,避免除零错误
  100. success_rate = 0
  101. if total_tasks > 0:
  102. success_rate = (success_count / total_tasks) * 100
  103. # 汇总统计信息
  104. stats = {
  105. "exec_date": exec_date,
  106. "total_tasks": total_tasks,
  107. "type_counts": type_counts,
  108. "success_count": success_count,
  109. "fail_count": fail_count,
  110. "pending_count": pending_count,
  111. "success_rate": success_rate,
  112. "avg_duration": avg_duration,
  113. "min_duration": min_duration,
  114. "max_duration": max_duration,
  115. "failed_tasks": failed_tasks
  116. }
  117. return stats
  118. except Exception as e:
  119. logger.error(f"获取执行统计信息时出错: {str(e)}")
  120. return {}
  121. finally:
  122. cursor.close()
  123. conn.close()
  124. def update_missing_results(exec_date):
  125. """更新缺失的执行结果信息"""
  126. conn = get_pg_conn()
  127. cursor = conn.cursor()
  128. try:
  129. # 查询所有缺失执行结果的任务
  130. cursor.execute("""
  131. SELECT target_table, script_name
  132. FROM airflow_dag_schedule
  133. WHERE exec_date = %s AND exec_result IS NULL
  134. """, (exec_date,))
  135. missing_results = cursor.fetchall()
  136. update_count = 0
  137. for row in missing_results:
  138. target_table, script_name = row
  139. # 如果有开始时间但没有结束时间,假设执行失败
  140. cursor.execute("""
  141. SELECT exec_start_time
  142. FROM airflow_dag_schedule
  143. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  144. """, (exec_date, target_table, script_name))
  145. start_time = cursor.fetchone()
  146. if start_time and start_time[0]:
  147. # 有开始时间但无结果,标记为失败
  148. now = datetime.now()
  149. duration = (now - start_time[0]).total_seconds()
  150. cursor.execute("""
  151. UPDATE airflow_dag_schedule
  152. SET exec_result = FALSE, exec_end_time = %s, exec_duration = %s
  153. WHERE exec_date = %s AND target_table = %s AND script_name = %s
  154. """, (now, duration, exec_date, target_table, script_name))
  155. logger.warning(f"任务 {target_table} 的脚本 {script_name} 标记为失败,开始时间: {start_time[0]}")
  156. update_count += 1
  157. else:
  158. # 没有开始时间且无结果,假设未执行
  159. logger.warning(f"任务 {target_table} 的脚本 {script_name} 未执行")
  160. conn.commit()
  161. logger.info(f"更新了 {update_count} 个缺失结果的任务")
  162. return update_count
  163. except Exception as e:
  164. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  165. conn.rollback()
  166. return 0
  167. finally:
  168. cursor.close()
  169. conn.close()
  170. def generate_unified_execution_report(exec_date, stats):
  171. """生成统一执行报告"""
  172. # 构建报告
  173. report = []
  174. report.append(f"========== 统一数据运维系统执行报告 ==========")
  175. report.append(f"执行日期: {exec_date}")
  176. report.append(f"总任务数: {stats['total_tasks']}")
  177. # 任务类型分布
  178. report.append("\n--- 任务类型分布 ---")
  179. for label, count in stats.get('type_counts', {}).items():
  180. report.append(f"{label} 任务: {count} 个")
  181. # 执行结果统计
  182. report.append("\n--- 执行结果统计 ---")
  183. report.append(f"成功任务: {stats.get('success_count', 0)} 个")
  184. report.append(f"失败任务: {stats.get('fail_count', 0)} 个")
  185. report.append(f"未执行任务: {stats.get('pending_count', 0)} 个")
  186. report.append(f"成功率: {stats.get('success_rate', 0):.2f}%")
  187. # 执行时间统计
  188. report.append("\n--- 执行时间统计 (秒) ---")
  189. avg_duration = stats.get('avg_duration')
  190. min_duration = stats.get('min_duration')
  191. max_duration = stats.get('max_duration')
  192. report.append(f"平均执行时间: {avg_duration:.2f}" if avg_duration is not None else "平均执行时间: N/A")
  193. report.append(f"最短执行时间: {min_duration:.2f}" if min_duration is not None else "最短执行时间: N/A")
  194. report.append(f"最长执行时间: {max_duration:.2f}" if max_duration is not None else "最长执行时间: N/A")
  195. # 失败任务详情
  196. failed_tasks = stats.get('failed_tasks', [])
  197. if failed_tasks:
  198. report.append("\n--- 失败任务详情 ---")
  199. for i, task in enumerate(failed_tasks, 1):
  200. report.append(f"{i}. 表名: {task['target_table']}")
  201. report.append(f" 脚本: {task['script_name']}")
  202. report.append(f" 类型: {task['target_table_label']}")
  203. exec_duration = task.get('exec_duration')
  204. if exec_duration is not None:
  205. report.append(f" 执行时间: {exec_duration:.2f} 秒")
  206. else:
  207. report.append(" 执行时间: N/A")
  208. report.append("\n========== 报告结束 ==========")
  209. # 将报告转换为字符串
  210. report_str = "\n".join(report)
  211. # 记录到日志
  212. logger.info("\n" + report_str)
  213. return report_str
  214. def summarize_unified_execution(**kwargs):
  215. """汇总统一执行情况的主函数"""
  216. try:
  217. exec_date = kwargs.get('ds') or get_today_date()
  218. logger.info(f"开始汇总执行日期 {exec_date} 的统一执行情况")
  219. # 1. 更新缺失的执行结果
  220. try:
  221. update_count = update_missing_results(exec_date)
  222. logger.info(f"更新了 {update_count} 个缺失的执行结果")
  223. except Exception as e:
  224. logger.error(f"更新缺失执行结果时出错: {str(e)}")
  225. update_count = 0
  226. # 2. 获取执行统计信息
  227. try:
  228. stats = get_execution_stats(exec_date)
  229. if not stats:
  230. logger.warning("未能获取执行统计信息,将使用默认值")
  231. stats = {
  232. "exec_date": exec_date,
  233. "total_tasks": 0,
  234. "type_counts": {},
  235. "success_count": 0,
  236. "fail_count": 0,
  237. "pending_count": 0,
  238. "success_rate": 0,
  239. "avg_duration": None,
  240. "min_duration": None,
  241. "max_duration": None,
  242. "failed_tasks": []
  243. }
  244. except Exception as e:
  245. logger.error(f"获取执行统计信息时出错: {str(e)}")
  246. stats = {
  247. "exec_date": exec_date,
  248. "total_tasks": 0,
  249. "type_counts": {},
  250. "success_count": 0,
  251. "fail_count": 0,
  252. "pending_count": 0,
  253. "success_rate": 0,
  254. "avg_duration": None,
  255. "min_duration": None,
  256. "max_duration": None,
  257. "failed_tasks": []
  258. }
  259. # 3. 生成执行报告
  260. try:
  261. report = generate_unified_execution_report(exec_date, stats)
  262. except Exception as e:
  263. logger.error(f"生成执行报告时出错: {str(e)}")
  264. report = f"生成执行报告时出错: {str(e)}\n基础统计: 总任务数: {stats.get('total_tasks', 0)}, 成功: {stats.get('success_count', 0)}, 失败: {stats.get('fail_count', 0)}"
  265. # 将报告和统计信息传递给下一个任务
  266. try:
  267. kwargs['ti'].xcom_push(key='execution_stats', value=json.dumps(stats, cls=DecimalEncoder))
  268. kwargs['ti'].xcom_push(key='execution_report', value=report)
  269. except Exception as e:
  270. logger.error(f"保存报告到XCom时出错: {str(e)}")
  271. return report
  272. except Exception as e:
  273. logger.error(f"汇总执行情况时出现未处理的错误: {str(e)}")
  274. # 返回一个简单的错误报告,确保任务不会失败
  275. return f"执行汇总时出现错误: {str(e)}"
  276. # 创建DAG
  277. with DAG(
  278. "dag_dataops_unified_summary_scheduler",
  279. start_date=datetime(2024, 1, 1),
  280. schedule_interval="*/10 * * * *", # 修改为每15分钟执行一次,与data_scheduler保持一致
  281. catchup=False,
  282. default_args={
  283. 'owner': 'airflow',
  284. 'depends_on_past': False,
  285. 'email_on_failure': False,
  286. 'email_on_retry': False,
  287. 'retries': 1,
  288. 'retry_delay': timedelta(minutes=5)
  289. }
  290. ) as dag:
  291. # 检查是否跳过等待外部任务
  292. skip_wait = Variable.get("skip_summary_wait", default_var="false").lower() == "true"
  293. if skip_wait:
  294. # 如果跳过等待,创建一个空操作代替
  295. wait_for_data_processing = EmptyOperator(
  296. task_id="wait_for_data_processing",
  297. dag=dag
  298. )
  299. logger.info("跳过等待外部DAG完成,使用EmptyOperator替代")
  300. else:
  301. # 等待统一数据处理DAG完成
  302. # 定义一个函数来打印并返回执行日期
  303. def print_target_date(dt):
  304. logger.info(f"===== ExternalTaskSensor等待的目标日期信息 =====")
  305. logger.info(f"源DAG: dag_dataops_unified_summary_scheduler")
  306. logger.info(f"目标DAG: dag_dataops_unified_data_scheduler")
  307. logger.info(f"目标任务: processing_completed")
  308. logger.info(f"查找的执行日期: {dt}")
  309. logger.info(f"日期字符串格式: {dt.strftime('%Y-%m-%dT%H:%M:%S')}")
  310. logger.info(f"日期类型: {type(dt)}")
  311. logger.info(f"=======================================")
  312. # 必须返回原始日期,不能修改
  313. return dt
  314. wait_for_data_processing = ExternalTaskSensor(
  315. task_id="wait_for_data_processing",
  316. external_dag_id="dag_dataops_unified_data_scheduler",
  317. external_task_id="processing_completed",
  318. mode="reschedule", # 改为reschedule模式,不会占用worker
  319. timeout=7200, # 增加超时时间到2小时
  320. poke_interval=60, # 增加检查间隔到1分钟
  321. allowed_states=["success", "skipped"], # 允许成功或跳过的状态
  322. failed_states=["failed", "upstream_failed"], # 当检测到这些状态时立即失败
  323. dag=dag,
  324. # 添加自定义方法来打印和返回日期
  325. execution_date_fn=print_target_date
  326. )
  327. # 汇总执行情况
  328. summarize_task = PythonOperator(
  329. task_id="summarize_unified_execution",
  330. python_callable=summarize_unified_execution,
  331. provide_context=True,
  332. dag=dag
  333. )
  334. # 总结完成标记
  335. summary_completed = EmptyOperator(
  336. task_id="summary_completed",
  337. dag=dag
  338. )
  339. # 设置任务依赖
  340. wait_for_data_processing >> summarize_task >> summary_completed