airflow_health.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from typing import Any
  19. from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
  20. from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
  21. from airflow.jobs.triggerer_job_runner import TriggererJobRunner
  22. HEALTHY = "healthy"
  23. UNHEALTHY = "unhealthy"
  24. def get_airflow_health() -> dict[str, Any]:
  25. """Get the health for Airflow metadatabase, scheduler and triggerer."""
  26. metadatabase_status = HEALTHY
  27. latest_scheduler_heartbeat = None
  28. latest_triggerer_heartbeat = None
  29. latest_dag_processor_heartbeat = None
  30. scheduler_status = UNHEALTHY
  31. triggerer_status: str | None = UNHEALTHY
  32. dag_processor_status: str | None = UNHEALTHY
  33. try:
  34. latest_scheduler_job = SchedulerJobRunner.most_recent_job()
  35. if latest_scheduler_job:
  36. latest_scheduler_heartbeat = latest_scheduler_job.latest_heartbeat.isoformat()
  37. if latest_scheduler_job.is_alive():
  38. scheduler_status = HEALTHY
  39. except Exception:
  40. metadatabase_status = UNHEALTHY
  41. try:
  42. latest_triggerer_job = TriggererJobRunner.most_recent_job()
  43. if latest_triggerer_job:
  44. latest_triggerer_heartbeat = latest_triggerer_job.latest_heartbeat.isoformat()
  45. if latest_triggerer_job.is_alive():
  46. triggerer_status = HEALTHY
  47. else:
  48. triggerer_status = None
  49. except Exception:
  50. metadatabase_status = UNHEALTHY
  51. try:
  52. latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
  53. if latest_dag_processor_job:
  54. latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
  55. if latest_dag_processor_job.is_alive():
  56. dag_processor_status = HEALTHY
  57. else:
  58. dag_processor_status = None
  59. except Exception:
  60. metadatabase_status = UNHEALTHY
  61. airflow_health_status = {
  62. "metadatabase": {"status": metadatabase_status},
  63. "scheduler": {
  64. "status": scheduler_status,
  65. "latest_scheduler_heartbeat": latest_scheduler_heartbeat,
  66. },
  67. "triggerer": {
  68. "status": triggerer_status,
  69. "latest_triggerer_heartbeat": latest_triggerer_heartbeat,
  70. },
  71. "dag_processor": {
  72. "status": dag_processor_status,
  73. "latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
  74. },
  75. }
  76. return airflow_health_status