scheduler_health.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import logging
  19. from http.server import BaseHTTPRequestHandler, HTTPServer
  20. from sqlalchemy import select
  21. from airflow.configuration import conf
  22. from airflow.jobs.job import Job
  23. from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
  24. from airflow.utils.net import get_hostname
  25. from airflow.utils.session import create_session
  26. log = logging.getLogger(__name__)
  27. class HealthServer(BaseHTTPRequestHandler):
  28. """Small webserver to serve scheduler health check."""
  29. def do_GET(self):
  30. if self.path == "/health":
  31. try:
  32. with create_session() as session:
  33. scheduler_job = session.scalar(
  34. select(Job)
  35. .filter_by(job_type=SchedulerJobRunner.job_type)
  36. .filter_by(hostname=get_hostname())
  37. .order_by(Job.latest_heartbeat.desc())
  38. .limit(1)
  39. )
  40. if scheduler_job and scheduler_job.is_alive():
  41. self.send_response(200)
  42. self.end_headers()
  43. else:
  44. self.send_error(503)
  45. except Exception:
  46. log.exception("Exception when executing Health check")
  47. self.send_error(503)
  48. else:
  49. self.send_error(404)
  50. def serve_health_check():
  51. """Start a http server to serve scheduler health check."""
  52. health_check_host = conf.get("scheduler", "SCHEDULER_HEALTH_CHECK_SERVER_HOST")
  53. health_check_port = conf.getint("scheduler", "SCHEDULER_HEALTH_CHECK_SERVER_PORT")
  54. httpd = HTTPServer((health_check_host, health_check_port), HealthServer)
  55. httpd.serve_forever()
  56. if __name__ == "__main__":
  57. serve_health_check()