jobs_command.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from typing import TYPE_CHECKING
  19. from sqlalchemy import select
  20. from airflow.jobs.job import Job
  21. from airflow.utils.net import get_hostname
  22. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  23. from airflow.utils.session import NEW_SESSION, provide_session
  24. from airflow.utils.state import JobState
  25. if TYPE_CHECKING:
  26. from sqlalchemy.orm import Session
  27. @providers_configuration_loaded
  28. @provide_session
  29. def check(args, session: Session = NEW_SESSION) -> None:
  30. """Check if job(s) are still alive."""
  31. if args.allow_multiple and args.limit <= 1:
  32. raise SystemExit("To use option --allow-multiple, you must set the limit to a value greater than 1.")
  33. if args.hostname and args.local:
  34. raise SystemExit("You can't use --hostname and --local at the same time")
  35. query = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
  36. if args.job_type:
  37. query = query.where(Job.job_type == args.job_type)
  38. if args.hostname:
  39. query = query.where(Job.hostname == args.hostname)
  40. if args.local:
  41. query = query.where(Job.hostname == get_hostname())
  42. if args.limit > 0:
  43. query = query.limit(args.limit)
  44. alive_jobs: list[Job] = [job for job in session.scalars(query) if job.is_alive()]
  45. count_alive_jobs = len(alive_jobs)
  46. if count_alive_jobs == 0:
  47. raise SystemExit("No alive jobs found.")
  48. if count_alive_jobs > 1 and not args.allow_multiple:
  49. raise SystemExit(f"Found {count_alive_jobs} alive jobs. Expected only one.")
  50. if count_alive_jobs == 1:
  51. print("Found one alive job.")
  52. else:
  53. print(f"Found {count_alive_jobs} alive jobs.")