dataops_productline_finalize_dag.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. """
  2. 统一数据产品线完成器 DAG
  3. 功能:
  4. 1. 由dataops_productline_execute_dag触发,不自行调度
  5. 2. 收集执行DAG的执行统计信息
  6. 3. 生成执行报告
  7. 4. 无论execute DAG执行成功与否都会运行
  8. """
  9. from airflow import DAG
  10. from airflow.operators.python import PythonOperator
  11. from airflow.operators.empty import EmptyOperator
  12. from datetime import datetime, timedelta, date
  13. import logging
  14. import json
  15. import pendulum
  16. import pytz
  17. from airflow.models import DagRun, TaskInstance
  18. from airflow.utils.state import State
  19. from sqlalchemy import desc
  20. from airflow import settings
  21. from common import get_today_date
  22. from decimal import Decimal
  23. # 创建日志记录器
  24. logger = logging.getLogger(__name__)
  25. # 开启详细日志记录
  26. ENABLE_DEBUG_LOGGING = True
  27. def log_debug(message):
  28. """记录调试日志,但只在启用调试模式时"""
  29. if ENABLE_DEBUG_LOGGING:
  30. logger.info(f"[DEBUG] {message}")
  31. # 在DAG启动时输出诊断信息
  32. log_debug("======== 诊断信息 ========")
  33. log_debug(f"DAG dataops_productline_finalize_dag 初始化")
  34. log_debug("======== 诊断信息结束 ========")
  35. #############################################
  36. # 通用工具函数
  37. #############################################
  38. def json_serial(obj):
  39. """将日期对象序列化为ISO格式字符串的JSON序列化器"""
  40. if isinstance(obj, (datetime, date)):
  41. return obj.isoformat()
  42. raise TypeError(f"类型 {type(obj)} 不能被序列化为JSON")
  43. # 添加自定义JSON编码器解决Decimal序列化问题
  44. class DecimalEncoder(json.JSONEncoder):
  45. def default(self, obj):
  46. if isinstance(obj, Decimal):
  47. return float(obj)
  48. # 处理日期类型
  49. elif isinstance(obj, (datetime, date)):
  50. return obj.isoformat()
  51. # 让父类处理其他类型
  52. return super(DecimalEncoder, self).default(obj)
  53. #############################################
  54. # 统计和报告生成函数
  55. #############################################
  56. def collect_execution_stats(**kwargs):
  57. """
  58. 从Airflow元数据收集执行DAG的执行统计信息
  59. """
  60. # 获取当前执行的日期和时间信息
  61. dag_run = kwargs.get('dag_run')
  62. logical_date = dag_run.logical_date
  63. local_logical_date = pendulum.instance(logical_date).in_timezone('Asia/Shanghai')
  64. exec_date = local_logical_date.strftime('%Y-%m-%d')
  65. # 获取触发此DAG的配置信息(如果有)
  66. conf = dag_run.conf or {}
  67. parent_exec_date = conf.get('execution_date', exec_date)
  68. parent_run_id = conf.get('parent_run_id')
  69. # 记录完整的conf内容
  70. logger.info(f"【从上游DAG接收的配置】complete conf: {conf}")
  71. # 记录重要的时间参数
  72. logger.info(f"【时间参数】collect_execution_stats: exec_date={exec_date}, logical_date={logical_date}, parent_exec_date={parent_exec_date}")
  73. logger.info(f"【上游DAG信息】parent_run_id={parent_run_id}")
  74. logger.info(f"开始收集执行日期 {parent_exec_date} 的脚本执行统计信息")
  75. # 执行DAG的ID
  76. target_dag_id = "dataops_productline_execute_dag"
  77. try:
  78. # 创建数据库会话
  79. session = settings.Session()
  80. # 首先通过run_id查询(如果提供了)
  81. dag_runs = []
  82. if parent_run_id:
  83. logger.info(f"使用parent_run_id={parent_run_id}查询DAG运行记录")
  84. dag_runs = session.query(DagRun).filter(
  85. DagRun.dag_id == target_dag_id,
  86. DagRun.run_id == parent_run_id
  87. ).all()
  88. if dag_runs:
  89. logger.info(f"通过run_id找到匹配的DAG运行记录")
  90. else:
  91. logger.warning(f"未通过run_id找到匹配的DAG运行记录,尝试使用执行日期")
  92. # 如果通过run_id未找到记录,尝试使用执行日期
  93. if not dag_runs and parent_exec_date:
  94. # 尝试解析父执行日期为datetime对象
  95. if isinstance(parent_exec_date, str):
  96. try:
  97. parent_exec_date_dt = pendulum.parse(parent_exec_date)
  98. except:
  99. parent_exec_date_dt = None
  100. else:
  101. parent_exec_date_dt = parent_exec_date
  102. # 记录解析结果
  103. logger.info(f"解析后的父执行日期: {parent_exec_date_dt}")
  104. # 如果成功解析,使用它查询
  105. if parent_exec_date_dt:
  106. logger.info(f"使用父执行日期 {parent_exec_date_dt} 查询DAG运行")
  107. dag_runs = session.query(DagRun).filter(
  108. DagRun.dag_id == target_dag_id,
  109. DagRun.execution_date == parent_exec_date_dt
  110. ).order_by(desc(DagRun.execution_date)).limit(1).all()
  111. if dag_runs:
  112. logger.info(f"通过执行日期找到匹配的DAG运行记录")
  113. else:
  114. logger.error(f"未通过执行日期找到匹配的DAG运行记录")
  115. else:
  116. logger.error(f"无法解析父执行日期 {parent_exec_date}")
  117. # 如果两种方法都无法找到记录,则报错
  118. if not dag_runs:
  119. error_msg = f"无法找到DAG {target_dag_id} 的相关运行记录。提供的run_id: {parent_run_id}, 执行日期: {parent_exec_date}"
  120. logger.error(error_msg)
  121. session.close()
  122. raise ValueError(error_msg)
  123. # 获取DAG运行记录
  124. dag_run = dag_runs[0]
  125. dag_run_id = dag_run.run_id
  126. dag_start_time = dag_run.start_date
  127. dag_end_time = dag_run.end_date
  128. dag_state = dag_run.state
  129. dag_execution_date = dag_run.execution_date
  130. # 记录匹配方式
  131. if parent_run_id and dag_run_id == parent_run_id:
  132. match_method = "run_id精确匹配"
  133. else:
  134. match_method = "执行日期匹配"
  135. logger.info(f"【匹配方式】成功通过{match_method}找到DAG运行记录")
  136. logger.info(f"【匹配结果】run_id={dag_run_id}, 执行日期={dag_execution_date}, 状态={dag_state}")
  137. # 计算DAG运行时间
  138. dag_duration = None
  139. if dag_start_time and dag_end_time:
  140. dag_duration = (dag_end_time - dag_start_time).total_seconds()
  141. # 时区转换
  142. if dag_start_time:
  143. dag_start_time_local = pendulum.instance(dag_start_time).in_timezone('Asia/Shanghai')
  144. dag_start_time_str = dag_start_time_local.strftime('%Y-%m-%d %H:%M:%S')
  145. else:
  146. dag_start_time_str = 'N/A'
  147. if dag_end_time:
  148. dag_end_time_local = pendulum.instance(dag_end_time).in_timezone('Asia/Shanghai')
  149. dag_end_time_str = dag_end_time_local.strftime('%Y-%m-%d %H:%M:%S')
  150. else:
  151. dag_end_time_str = 'N/A'
  152. # 获取所有相关的任务实例
  153. task_instances = session.query(TaskInstance).filter(
  154. TaskInstance.dag_id == target_dag_id,
  155. TaskInstance.run_id == dag_run_id
  156. ).all()
  157. # 关闭会话
  158. session.close()
  159. # 统计任务状态信息
  160. total_tasks = len(task_instances)
  161. success_count = sum(1 for ti in task_instances if ti.state == State.SUCCESS)
  162. fail_count = sum(1 for ti in task_instances if ti.state == State.FAILED)
  163. skipped_count = sum(1 for ti in task_instances if ti.state == State.SKIPPED)
  164. upstream_failed_count = sum(1 for ti in task_instances if ti.state == State.UPSTREAM_FAILED)
  165. # 统计脚本类型任务数量
  166. script_task_count = sum(1 for ti in task_instances if "-TO-" in ti.task_id)
  167. # 获取执行时间最长的几个任务
  168. task_durations = []
  169. for ti in task_instances:
  170. if ti.start_date and ti.end_date:
  171. duration = (ti.end_date - ti.start_date).total_seconds()
  172. task_durations.append({
  173. "task_id": ti.task_id,
  174. "duration": duration,
  175. "state": ti.state
  176. })
  177. # 按持续时间降序排序
  178. task_durations.sort(key=lambda x: x["duration"] if x["duration"] is not None else 0, reverse=True)
  179. top_tasks_by_duration = task_durations[:5] # 取前5个
  180. # 获取失败的任务
  181. failed_tasks = []
  182. for ti in task_instances:
  183. if ti.state in [State.FAILED, State.UPSTREAM_FAILED]:
  184. failed_task = {
  185. "task_id": ti.task_id,
  186. "state": ti.state,
  187. "try_number": ti.try_number,
  188. }
  189. if ti.start_date and ti.end_date:
  190. failed_task["duration"] = (ti.end_date - ti.start_date).total_seconds()
  191. failed_tasks.append(failed_task)
  192. # 构建统计结果
  193. stats = {
  194. "exec_date": exec_date,
  195. "dag_id": target_dag_id,
  196. "dag_execution_date": dag_execution_date.isoformat() if dag_execution_date else None,
  197. "dag_run_id": dag_run_id,
  198. "status": dag_state,
  199. "total_tasks": total_tasks,
  200. "success_count": success_count,
  201. "fail_count": fail_count,
  202. "skipped_count": skipped_count,
  203. "upstream_failed_count": upstream_failed_count,
  204. "script_task_count": script_task_count,
  205. "duration": dag_duration,
  206. "start_time": dag_start_time_str,
  207. "end_time": dag_end_time_str,
  208. "top_tasks_by_duration": top_tasks_by_duration,
  209. "failed_tasks": failed_tasks
  210. }
  211. # 将统计结果保存到XCom
  212. kwargs['ti'].xcom_push(key='execution_stats', value=stats)
  213. logger.info(f"成功收集脚本执行统计信息: 总任务数={total_tasks}, 成功={success_count}, 失败={fail_count}")
  214. return stats
  215. except Exception as e:
  216. logger.error(f"收集脚本执行统计信息时出错: {str(e)}")
  217. import traceback
  218. logger.error(traceback.format_exc())
  219. # 返回一个基本的错误信息
  220. error_stats = {
  221. "exec_date": exec_date,
  222. "dag_id": target_dag_id,
  223. "status": "ERROR",
  224. "error": str(e),
  225. "total_tasks": 0,
  226. "success_count": 0,
  227. "fail_count": 0
  228. }
  229. kwargs['ti'].xcom_push(key='execution_stats', value=error_stats)
  230. return error_stats
  231. def generate_execution_report(**kwargs):
  232. """
  233. 基于收集的统计信息生成执行报告
  234. """
  235. try:
  236. # 从XCom获取统计信息
  237. ti = kwargs['ti']
  238. stats = ti.xcom_pull(task_ids='collect_execution_stats')
  239. if not stats:
  240. logger.warning("未找到脚本执行统计信息,无法生成报告")
  241. report = "未找到脚本执行统计信息,无法生成报告。"
  242. ti.xcom_push(key='execution_report', value=report)
  243. return report
  244. # 构建报告
  245. report = []
  246. report.append(f"\n========== 脚本执行报告 ==========")
  247. report.append(f"执行日期: {stats['exec_date']}")
  248. report.append(f"DAG ID: {stats['dag_id']}")
  249. report.append(f"Run ID: {stats.get('dag_run_id', 'N/A')}")
  250. report.append(f"状态: {stats['status']}")
  251. report.append(f"总任务数: {stats['total_tasks']}")
  252. # 任务状态统计
  253. report.append("\n--- 任务状态统计 ---")
  254. report.append(f"成功任务: {stats['success_count']} 个")
  255. report.append(f"失败任务: {stats['fail_count']} 个")
  256. report.append(f"跳过任务: {stats.get('skipped_count', 0)} 个")
  257. report.append(f"上游失败任务: {stats.get('upstream_failed_count', 0)} 个")
  258. # 任务类型统计
  259. report.append("\n--- 任务类型统计 ---")
  260. report.append(f"脚本执行任务: {stats.get('script_task_count', 0)} 个")
  261. # 执行时间统计
  262. report.append("\n--- 执行时间统计 ---")
  263. if stats.get('duration') is not None:
  264. hours, remainder = divmod(stats['duration'], 3600)
  265. minutes, seconds = divmod(remainder, 60)
  266. report.append(f"总执行时间: {int(hours)}小时 {int(minutes)}分钟 {int(seconds)}秒")
  267. else:
  268. report.append("总执行时间: N/A")
  269. report.append(f"开始时间(北京时间): {stats.get('start_time', 'N/A')}")
  270. report.append(f"结束时间(北京时间): {stats.get('end_time', 'N/A')}")
  271. # 执行时间最长的任务
  272. top_tasks = stats.get('top_tasks_by_duration', [])
  273. if top_tasks:
  274. report.append("\n--- 执行时间最长的任务 ---")
  275. for i, task in enumerate(top_tasks, 1):
  276. duration_secs = task.get('duration', 0)
  277. minutes, seconds = divmod(duration_secs, 60)
  278. report.append(f"{i}. {task['task_id']}: {int(minutes)}分钟 {int(seconds)}秒 ({task['state']})")
  279. # 失败任务详情
  280. failed_tasks = stats.get('failed_tasks', [])
  281. if failed_tasks:
  282. report.append("\n--- 失败任务详情 ---")
  283. for i, task in enumerate(failed_tasks, 1):
  284. report.append(f"{i}. 任务ID: {task['task_id']}")
  285. report.append(f" 状态: {task['state']}")
  286. report.append(f" 尝试次数: {task.get('try_number', 'N/A')}")
  287. if 'duration' in task:
  288. minutes, seconds = divmod(task['duration'], 60)
  289. report.append(f" 执行时间: {int(minutes)}分钟 {int(seconds)}秒")
  290. else:
  291. report.append(" 执行时间: N/A")
  292. # 总结
  293. success_rate = 0
  294. if stats['total_tasks'] > 0:
  295. success_rate = (stats['success_count'] / stats['total_tasks']) * 100
  296. report.append("\n--- 总结 ---")
  297. report.append(f"任务成功率: {success_rate:.2f}%")
  298. if stats['status'] == 'success':
  299. report.append("所有脚本执行成功完成!")
  300. elif stats['status'] == 'failed':
  301. report.append(f"脚本执行过程中出现失败。有 {stats['fail_count']} 个任务失败。")
  302. else:
  303. report.append(f"当前状态: {stats['status']}")
  304. report.append("\n========== 报告结束 ==========")
  305. # 将报告转换为字符串
  306. report_str = "\n".join(report)
  307. # 记录到日志
  308. logger.info("\n" + report_str)
  309. # 保存到XCom
  310. ti.xcom_push(key='execution_report', value=report_str)
  311. return report_str
  312. except Exception as e:
  313. logger.error(f"生成执行报告时出错: {str(e)}")
  314. import traceback
  315. logger.error(traceback.format_exc())
  316. # 返回一个简单的错误报告
  317. error_report = f"生成执行报告时出错: {str(e)}"
  318. kwargs['ti'].xcom_push(key='execution_report', value=error_report)
  319. return error_report
  320. #############################################
  321. # 创建DAG
  322. #############################################
  323. with DAG(
  324. "dataops_productline_finalize_dag",
  325. start_date=datetime(2024, 1, 1),
  326. schedule_interval=None, # 不自行调度,由dataops_productline_execute_dag触发
  327. catchup=False,
  328. default_args={
  329. 'owner': 'airflow',
  330. 'depends_on_past': False,
  331. 'email_on_failure': False,
  332. 'email_on_retry': False,
  333. 'retries': 1,
  334. 'retry_delay': timedelta(minutes=5)
  335. }
  336. ) as dag:
  337. # 记录DAG实例化时的信息
  338. now = datetime.now()
  339. now_with_tz = now.replace(tzinfo=pytz.timezone('Asia/Shanghai'))
  340. default_exec_date = get_today_date()
  341. logger.info(f"【DAG初始化】当前时间: {now} / {now_with_tz}, 默认执行日期: {default_exec_date}")
  342. #############################################
  343. # 收集统计信息
  344. #############################################
  345. collect_stats = PythonOperator(
  346. task_id="collect_execution_stats",
  347. python_callable=collect_execution_stats,
  348. provide_context=True,
  349. dag=dag
  350. )
  351. #############################################
  352. # 生成执行报告
  353. #############################################
  354. generate_report = PythonOperator(
  355. task_id="generate_execution_report",
  356. python_callable=generate_execution_report,
  357. provide_context=True,
  358. dag=dag
  359. )
  360. #############################################
  361. # 完成标记
  362. #############################################
  363. finalize_completed = EmptyOperator(
  364. task_id="finalize_completed",
  365. dag=dag
  366. )
  367. # 设置任务依赖
  368. collect_stats >> generate_report >> finalize_completed