dag_dataops_pipeline_summary_scheduler.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. # dag_dataops_pipeline_summary_scheduler.py
  2. """
  3. 数据管道执行统计汇总 DAG
  4. 功能:
  5. 1. 依赖主数据处理 DAG (dag_dataops_pipeline_data_scheduler) 的完成
  6. 2. 收集主 DAG 的执行统计信息
  7. 3. 生成执行报告
  8. 4. 无论主 DAG 执行成功与否都会运行
  9. """
  10. from airflow import DAG
  11. from airflow.operators.python import PythonOperator
  12. from airflow.operators.empty import EmptyOperator
  13. from airflow.sensors.external_task import ExternalTaskSensor
  14. from datetime import datetime, timedelta
  15. import logging
  16. import json
  17. import pendulum
  18. import pytz
  19. from airflow.models import DagRun, TaskInstance
  20. from airflow.utils.state import State
  21. from sqlalchemy import desc
  22. from airflow import settings
  23. from common import get_today_date
  24. # 创建日志记录器
  25. logger = logging.getLogger(__name__)
  26. # 开启详细日志记录
  27. ENABLE_DEBUG_LOGGING = True
  28. def log_debug(message):
  29. """记录调试日志,但只在启用调试模式时"""
  30. if ENABLE_DEBUG_LOGGING:
  31. logger.info(f"[DEBUG] {message}")
  32. def print_target_date(dt):
  33. """
  34. 打印并返回执行日期信息,用于 ExternalTaskSensor
  35. """
  36. # 转换为中国时区
  37. local_dt = pendulum.instance(dt).in_timezone('Asia/Shanghai')
  38. logger.info(f"===== ExternalTaskSensor等待的目标日期信息 =====")
  39. logger.info(f"源DAG: dag_dataops_pipeline_summary_scheduler")
  40. logger.info(f"目标DAG: dag_dataops_pipeline_data_scheduler")
  41. logger.info(f"目标任务: data_processing_phase.processing_completed")
  42. logger.info(f"查找的执行日期(UTC): {dt}")
  43. logger.info(f"查找的执行日期(北京时间): {local_dt}")
  44. logger.info(f"日期字符串格式(UTC): {dt.strftime('%Y-%m-%dT%H:%M:%S')}")
  45. logger.info(f"日期字符串格式(北京时间): {local_dt.strftime('%Y-%m-%dT%H:%M:%S')}")
  46. logger.info(f"日期UTC时区: {dt.tzinfo}")
  47. logger.info(f"日期类型: {type(dt)}")
  48. logger.info(f"=======================================")
  49. # 必须返回原始日期,不能修改
  50. return dt
  51. def collect_pipeline_stats(**kwargs):
  52. """
  53. 从 Airflow 元数据收集主 DAG 的执行统计信息
  54. """
  55. # 获取当前执行的日期和时间信息
  56. dag_run = kwargs.get('dag_run')
  57. logical_date = dag_run.logical_date
  58. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  59. exec_date = local_logical_date.strftime('%Y-%m-%d')
  60. # 记录重要的时间参数
  61. logger.info(f"【时间参数】collect_pipeline_stats: exec_date={exec_date}, logical_date={logical_date}, local_logical_date={local_logical_date}")
  62. logger.info(f"开始收集执行日期 {exec_date} 的管道执行统计信息")
  63. # 主 DAG 的 ID
  64. target_dag_id = "dag_dataops_pipeline_data_scheduler"
  65. try:
  66. # 创建数据库会话
  67. session = settings.Session()
  68. # 查询最近的 DAG 运行记录,按照创建时间降序排序
  69. dag_runs = session.query(DagRun).filter(
  70. DagRun.dag_id == target_dag_id,
  71. DagRun.execution_date == local_logical_date # 只考虑当前执行日期及之前的运行
  72. ).order_by(desc(DagRun.execution_date)).limit(1).all()
  73. if not dag_runs:
  74. logger.warning(f"未找到 DAG {target_dag_id} 的运行记录")
  75. session.close()
  76. return {
  77. "exec_date": exec_date,
  78. "dag_id": target_dag_id,
  79. "status": "NOT_FOUND",
  80. "total_tasks": 0,
  81. "success_count": 0,
  82. "fail_count": 0,
  83. "skipped_count": 0,
  84. "upstream_failed_count": 0,
  85. "duration": None,
  86. "start_time": None,
  87. "end_time": None
  88. }
  89. # 获取最近的 DAG 运行
  90. dag_run = dag_runs[0]
  91. dag_run_id = dag_run.run_id
  92. dag_start_time = dag_run.start_date
  93. dag_end_time = dag_run.end_date
  94. dag_state = dag_run.state
  95. dag_execution_date = dag_run.execution_date
  96. # 计算 DAG 运行时间
  97. dag_duration = None
  98. if dag_start_time and dag_end_time:
  99. dag_duration = (dag_end_time - dag_start_time).total_seconds()
  100. # 时区转换
  101. if dag_start_time:
  102. dag_start_time_local = pendulum.instance(dag_start_time).in_timezone('Asia/Shanghai')
  103. dag_start_time_str = dag_start_time_local.strftime('%Y-%m-%d %H:%M:%S')
  104. else:
  105. dag_start_time_str = 'N/A'
  106. if dag_end_time:
  107. dag_end_time_local = pendulum.instance(dag_end_time).in_timezone('Asia/Shanghai')
  108. dag_end_time_str = dag_end_time_local.strftime('%Y-%m-%d %H:%M:%S')
  109. else:
  110. dag_end_time_str = 'N/A'
  111. # 获取所有相关的任务实例
  112. task_instances = session.query(TaskInstance).filter(
  113. TaskInstance.dag_id == target_dag_id,
  114. TaskInstance.run_id == dag_run_id
  115. ).all()
  116. # 关闭会话
  117. session.close()
  118. # 统计任务状态信息
  119. total_tasks = len(task_instances)
  120. success_count = sum(1 for ti in task_instances if ti.state == State.SUCCESS)
  121. fail_count = sum(1 for ti in task_instances if ti.state == State.FAILED)
  122. skipped_count = sum(1 for ti in task_instances if ti.state == State.SKIPPED)
  123. upstream_failed_count = sum(1 for ti in task_instances if ti.state == State.UPSTREAM_FAILED)
  124. # 统计各任务类型的数量
  125. resource_task_count = sum(1 for ti in task_instances if "resource_" in ti.task_id)
  126. model_task_count = sum(1 for ti in task_instances if "model_" in ti.task_id)
  127. # 获取执行时间最长的几个任务
  128. task_durations = []
  129. for ti in task_instances:
  130. if ti.start_date and ti.end_date:
  131. duration = (ti.end_date - ti.start_date).total_seconds()
  132. task_durations.append({
  133. "task_id": ti.task_id,
  134. "duration": duration,
  135. "state": ti.state
  136. })
  137. # 按持续时间降序排序
  138. task_durations.sort(key=lambda x: x["duration"] if x["duration"] is not None else 0, reverse=True)
  139. top_tasks_by_duration = task_durations[:5] # 取前5个
  140. # 获取失败的任务
  141. failed_tasks = []
  142. for ti in task_instances:
  143. if ti.state in [State.FAILED, State.UPSTREAM_FAILED]:
  144. failed_task = {
  145. "task_id": ti.task_id,
  146. "state": ti.state,
  147. "try_number": ti.try_number,
  148. }
  149. if ti.start_date and ti.end_date:
  150. failed_task["duration"] = (ti.end_date - ti.start_date).total_seconds()
  151. failed_tasks.append(failed_task)
  152. # 构建统计结果
  153. stats = {
  154. "exec_date": exec_date,
  155. "dag_id": target_dag_id,
  156. "dag_execution_date": dag_execution_date.isoformat() if dag_execution_date else None,
  157. "dag_run_id": dag_run_id,
  158. "status": dag_state,
  159. "total_tasks": total_tasks,
  160. "success_count": success_count,
  161. "fail_count": fail_count,
  162. "skipped_count": skipped_count,
  163. "upstream_failed_count": upstream_failed_count,
  164. "resource_task_count": resource_task_count,
  165. "model_task_count": model_task_count,
  166. "duration": dag_duration,
  167. "start_time": dag_start_time_str,
  168. "end_time": dag_end_time_str,
  169. "top_tasks_by_duration": top_tasks_by_duration,
  170. "failed_tasks": failed_tasks
  171. }
  172. # 将统计结果保存到 XCom
  173. kwargs['ti'].xcom_push(key='pipeline_stats', value=stats)
  174. logger.info(f"成功收集管道执行统计信息: 总任务数={total_tasks}, 成功={success_count}, 失败={fail_count}")
  175. return stats
  176. except Exception as e:
  177. logger.error(f"收集管道执行统计信息时出错: {str(e)}")
  178. import traceback
  179. logger.error(traceback.format_exc())
  180. # 返回一个基本的错误信息
  181. error_stats = {
  182. "exec_date": exec_date,
  183. "dag_id": target_dag_id,
  184. "status": "ERROR",
  185. "error": str(e),
  186. "total_tasks": 0,
  187. "success_count": 0,
  188. "fail_count": 0
  189. }
  190. kwargs['ti'].xcom_push(key='pipeline_stats', value=error_stats)
  191. return error_stats
  192. def generate_execution_report(**kwargs):
  193. """
  194. 基于收集的统计信息生成执行报告
  195. """
  196. try:
  197. # 从 XCom 获取统计信息
  198. ti = kwargs['ti']
  199. stats = ti.xcom_pull(task_ids='collect_pipeline_stats')
  200. if not stats:
  201. logger.warning("未找到管道执行统计信息,无法生成报告")
  202. report = "未找到管道执行统计信息,无法生成报告。"
  203. ti.xcom_push(key='execution_report', value=report)
  204. return report
  205. # 构建报告
  206. report = []
  207. report.append(f"\n========== Data pipeline 执行报告 ==========")
  208. report.append(f"执行日期: {stats['exec_date']}")
  209. report.append(f"DAG ID: {stats['dag_id']}")
  210. report.append(f"Runn ID: {stats.get('dag_run_id', 'N/A')}")
  211. report.append(f"状态: {stats['status']}")
  212. report.append(f"总任务数: {stats['total_tasks']}")
  213. # 任务状态统计
  214. report.append("\n--- 任务状态统计 ---")
  215. report.append(f"成功任务: {stats['success_count']} 个")
  216. report.append(f"失败任务: {stats['fail_count']} 个")
  217. report.append(f"跳过任务: {stats.get('skipped_count', 0)} 个")
  218. report.append(f"上游失败任务: {stats.get('upstream_failed_count', 0)} 个")
  219. # 任务类型统计
  220. report.append("\n--- 任务类型统计 ---")
  221. report.append(f"资源任务: {stats.get('resource_task_count', 0)} 个")
  222. report.append(f"模型任务: {stats.get('model_task_count', 0)} 个")
  223. # 执行时间统计
  224. report.append("\n--- 执行时间统计 ---")
  225. if stats.get('duration') is not None:
  226. hours, remainder = divmod(stats['duration'], 3600)
  227. minutes, seconds = divmod(remainder, 60)
  228. report.append(f"总执行时间: {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
  229. else:
  230. report.append("总执行时间: N/A")
  231. report.append(f"开始时间(北京时间): {stats.get('start_time', 'N/A')}")
  232. report.append(f"结束时间(北京时间): {stats.get('end_time', 'N/A')}")
  233. # 执行时间最长的任务
  234. top_tasks = stats.get('top_tasks_by_duration', [])
  235. if top_tasks:
  236. report.append("\n--- 执行时间最长的任务 ---")
  237. for i, task in enumerate(top_tasks, 1):
  238. duration_secs = task.get('duration', 0)
  239. minutes, seconds = divmod(duration_secs, 60)
  240. report.append(f"{i}. {task['task_id']}: {int(minutes)}分钟 {int(seconds)}秒 ({task['state']})")
  241. # 失败任务详情
  242. failed_tasks = stats.get('failed_tasks', [])
  243. if failed_tasks:
  244. report.append("\n--- 失败任务详情 ---")
  245. for i, task in enumerate(failed_tasks, 1):
  246. report.append(f"{i}. 任务ID: {task['task_id']}")
  247. report.append(f" 状态: {task['state']}")
  248. report.append(f" 尝试次数: {task.get('try_number', 'N/A')}")
  249. if 'duration' in task:
  250. minutes, seconds = divmod(task['duration'], 60)
  251. report.append(f" 执行时间: {int(minutes)}分钟 {int(seconds)}秒")
  252. else:
  253. report.append(" 执行时间: N/A")
  254. # 总结
  255. success_rate = 0
  256. if stats['total_tasks'] > 0:
  257. success_rate = (stats['success_count'] / stats['total_tasks']) * 100
  258. report.append("\n--- 总结 ---")
  259. report.append(f"任务成功率: {success_rate:.2f}%")
  260. if stats['status'] == 'success':
  261. report.append("管道执行成功完成!")
  262. elif stats['status'] == 'failed':
  263. report.append(f"管道执行失败。有 {stats['fail_count']} 个任务失败。")
  264. else:
  265. report.append(f"管道当前状态: {stats['status']}")
  266. report.append("\n========== 报告结束 ==========")
  267. # 将报告转换为字符串
  268. report_str = "\n".join(report)
  269. # 记录到日志
  270. logger.info("\n" + report_str)
  271. # 保存到 XCom
  272. ti.xcom_push(key='execution_report', value=report_str)
  273. return report_str
  274. except Exception as e:
  275. logger.error(f"生成执行报告时出错: {str(e)}")
  276. import traceback
  277. logger.error(traceback.format_exc())
  278. # 返回一个简单的错误报告
  279. error_report = f"生成执行报告时出错: {str(e)}"
  280. kwargs['ti'].xcom_push(key='execution_report', value=error_report)
  281. return error_report
  282. # 创建 DAG
  283. with DAG(
  284. "dag_dataops_pipeline_summary_scheduler",
  285. start_date=datetime(2024, 1, 1),
  286. schedule_interval="@daily",
  287. catchup=False,
  288. default_args={
  289. 'owner': 'airflow',
  290. 'depends_on_past': False,
  291. 'email_on_failure': False,
  292. 'email_on_retry': False,
  293. 'retries': 1,
  294. 'retry_delay': timedelta(minutes=5)
  295. }
  296. ) as dag:
  297. # 记录 DAG 实例化时的信息
  298. now = datetime.now()
  299. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  300. default_exec_date = get_today_date()
  301. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
  302. #############################################
  303. # 等待阶段: 等待主 DAG 完成
  304. #############################################
  305. wait_for_pipeline_completion = ExternalTaskSensor(
  306. task_id="wait_for_pipeline_completion",
  307. external_dag_id="dag_dataops_pipeline_data_scheduler",
  308. external_task_id="data_processing_phase.processing_completed",
  309. mode="reschedule", # 使用 reschedule 模式,不会占用 worker
  310. timeout=7200, # 等待超时时间为 2 小时
  311. poke_interval=30, # 每30秒检查一次
  312. allowed_states=["success", "failed", "skipped"], # 允许的状态包括成功、失败和跳过
  313. failed_states=None, # 不设置失败状态,确保无论主 DAG 状态如何都会继续执行
  314. execution_date_fn=print_target_date, # 用于调试的日期打印函数
  315. dag=dag
  316. )
  317. #############################################
  318. # 统计阶段: 收集和生成统计信息
  319. #############################################
  320. collect_stats = PythonOperator(
  321. task_id="collect_pipeline_stats",
  322. python_callable=collect_pipeline_stats,
  323. provide_context=True,
  324. dag=dag
  325. )
  326. generate_report = PythonOperator(
  327. task_id="generate_execution_report",
  328. python_callable=generate_execution_report,
  329. provide_context=True,
  330. dag=dag
  331. )
  332. #############################################
  333. # 完成阶段: 标记汇总完成
  334. #############################################
  335. summary_completed = EmptyOperator(
  336. task_id="summary_completed",
  337. dag=dag
  338. )
  339. # 设置任务依赖
  340. wait_for_pipeline_completion >> collect_stats >> generate_report >> summary_completed