job.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from functools import cached_property, lru_cache
  20. from time import sleep
  21. from typing import TYPE_CHECKING, Callable, NoReturn
  22. from sqlalchemy import Column, Index, Integer, String, case, select
  23. from sqlalchemy.exc import OperationalError
  24. from sqlalchemy.orm import backref, foreign, relationship
  25. from sqlalchemy.orm.session import make_transient
  26. from airflow.api_internal.internal_api_call import internal_api_call
  27. from airflow.configuration import conf
  28. from airflow.exceptions import AirflowException
  29. from airflow.executors.executor_loader import ExecutorLoader
  30. from airflow.listeners.listener import get_listener_manager
  31. from airflow.models.base import ID_LEN, Base
  32. from airflow.serialization.pydantic.job import JobPydantic
  33. from airflow.stats import Stats
  34. from airflow.traces.tracer import Trace, span
  35. from airflow.utils import timezone
  36. from airflow.utils.helpers import convert_camel_to_snake
  37. from airflow.utils.log.logging_mixin import LoggingMixin
  38. from airflow.utils.net import get_hostname
  39. from airflow.utils.platform import getuser
  40. from airflow.utils.retries import retry_db_transaction
  41. from airflow.utils.session import NEW_SESSION, provide_session
  42. from airflow.utils.sqlalchemy import UtcDateTime
  43. from airflow.utils.state import JobState
  44. if TYPE_CHECKING:
  45. import datetime
  46. from sqlalchemy.orm.session import Session
  47. from airflow.executors.base_executor import BaseExecutor
  48. def _resolve_dagrun_model():
  49. from airflow.models.dagrun import DagRun
  50. return DagRun
  51. @lru_cache
  52. def health_check_threshold(job_type: str, heartrate: int) -> int | float:
  53. grace_multiplier = 2.1
  54. health_check_threshold_value: int | float
  55. if job_type == "SchedulerJob":
  56. health_check_threshold_value = conf.getint("scheduler", "scheduler_health_check_threshold")
  57. elif job_type == "TriggererJob":
  58. health_check_threshold_value = conf.getfloat("triggerer", "triggerer_health_check_threshold")
  59. else:
  60. health_check_threshold_value = heartrate * grace_multiplier
  61. return health_check_threshold_value
  62. class Job(Base, LoggingMixin):
  63. """
  64. The ORM class representing Job stored in the database.
  65. Jobs are processing items with state and duration that aren't task instances.
  66. For instance a BackfillJob is a collection of task instance runs,
  67. but should have its own state, start and end time.
  68. """
  69. __tablename__ = "job"
  70. id = Column(Integer, primary_key=True)
  71. dag_id = Column(
  72. String(ID_LEN),
  73. )
  74. state = Column(String(20))
  75. job_type = Column(String(30))
  76. start_date = Column(UtcDateTime())
  77. end_date = Column(UtcDateTime())
  78. latest_heartbeat = Column(UtcDateTime())
  79. executor_class = Column(String(500))
  80. hostname = Column(String(500))
  81. unixname = Column(String(1000))
  82. __table_args__ = (
  83. Index("job_type_heart", job_type, latest_heartbeat),
  84. Index("idx_job_state_heartbeat", state, latest_heartbeat),
  85. Index("idx_job_dag_id", dag_id),
  86. )
  87. task_instances_enqueued = relationship(
  88. "TaskInstance",
  89. primaryjoin="Job.id == foreign(TaskInstance.queued_by_job_id)",
  90. backref=backref("queued_by_job", uselist=False),
  91. )
  92. dag_runs = relationship(
  93. "DagRun",
  94. primaryjoin=lambda: Job.id == foreign(_resolve_dagrun_model().creating_job_id),
  95. backref="creating_job",
  96. )
  97. """
  98. TaskInstances which have been enqueued by this Job.
  99. Only makes sense for SchedulerJob and BackfillJob instances.
  100. """
  101. def __init__(self, executor: BaseExecutor | None = None, heartrate=None, **kwargs):
  102. # Save init parameters as DB fields
  103. self.heartbeat_failed = False
  104. self.hostname = get_hostname()
  105. if executor:
  106. self.executor = executor
  107. self.executors = [executor]
  108. self.start_date = timezone.utcnow()
  109. self.latest_heartbeat = timezone.utcnow()
  110. self.previous_heartbeat = None
  111. if heartrate is not None:
  112. self.heartrate = heartrate
  113. self.unixname = getuser()
  114. self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query")
  115. get_listener_manager().hook.on_starting(component=self)
  116. super().__init__(**kwargs)
  117. @cached_property
  118. def executor(self):
  119. return ExecutorLoader.get_default_executor()
  120. @cached_property
  121. def executors(self):
  122. return ExecutorLoader.init_executors()
  123. @cached_property
  124. def heartrate(self) -> float:
  125. return Job._heartrate(self.job_type)
  126. def is_alive(self) -> bool:
  127. """
  128. Is this job currently alive.
  129. We define alive as in a state of RUNNING, and having sent a heartbeat
  130. within a multiple of the heartrate (default of 2.1)
  131. """
  132. threshold_value = health_check_threshold(self.job_type, self.heartrate)
  133. return Job._is_alive(
  134. state=self.state,
  135. health_check_threshold_value=threshold_value,
  136. latest_heartbeat=self.latest_heartbeat,
  137. )
  138. @provide_session
  139. def kill(self, session: Session = NEW_SESSION) -> NoReturn:
  140. """Handle on_kill callback and updates state in database."""
  141. try:
  142. self.on_kill()
  143. except Exception as e:
  144. self.log.error("on_kill() method failed: %s", e)
  145. Job._kill(job_id=self.id, session=session)
  146. raise AirflowException("Job shut down externally.")
  147. def on_kill(self):
  148. """Will be called when an external kill command is received."""
  149. @provide_session
  150. def heartbeat(
  151. self, heartbeat_callback: Callable[[Session], None], session: Session = NEW_SESSION
  152. ) -> None:
  153. """
  154. Update the job's entry in the database with the latest_heartbeat timestamp.
  155. This allows for the job to be killed externally and allows the system
  156. to monitor what is actually active. For instance, an old heartbeat
  157. for SchedulerJob would mean something is wrong. This also allows for
  158. any job to be killed externally, regardless of who is running it or on
  159. which machine it is running.
  160. Note that if your heart rate is set to 60 seconds and you call this
  161. method after 10 seconds of processing since the last heartbeat, it
  162. will sleep 50 seconds to complete the 60 seconds and keep a steady
  163. heart rate. If you go over 60 seconds before calling it, it won't
  164. sleep at all.
  165. :param heartbeat_callback: Callback that will be run when the heartbeat is recorded in the Job
  166. :param session to use for saving the job
  167. """
  168. previous_heartbeat = self.latest_heartbeat
  169. with Trace.start_span(span_name="heartbeat", component="Job") as span:
  170. try:
  171. span.set_attribute("heartbeat", str(self.latest_heartbeat))
  172. # This will cause it to load from the db
  173. self._merge_from(Job._fetch_from_db(self, session))
  174. previous_heartbeat = self.latest_heartbeat
  175. if self.state == JobState.RESTARTING:
  176. self.kill()
  177. # Figure out how long to sleep for
  178. sleep_for = 0
  179. if self.latest_heartbeat:
  180. seconds_remaining = (
  181. self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
  182. )
  183. sleep_for = max(0, seconds_remaining)
  184. if span.is_recording():
  185. span.add_event(name="sleep", attributes={"sleep_for": sleep_for})
  186. sleep(sleep_for)
  187. job = Job._update_heartbeat(job=self, session=session)
  188. self._merge_from(job)
  189. time_since_last_heartbeat = (timezone.utcnow() - previous_heartbeat).total_seconds()
  190. health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
  191. if time_since_last_heartbeat > health_check_threshold_value:
  192. self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
  193. # At this point, the DB has updated.
  194. previous_heartbeat = self.latest_heartbeat
  195. heartbeat_callback(session)
  196. self.log.debug("[heartbeat]")
  197. self.heartbeat_failed = False
  198. except OperationalError:
  199. Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
  200. if not self.heartbeat_failed:
  201. self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
  202. self.heartbeat_failed = True
  203. msg = f"{self.__class__.__name__} heartbeat got an exception"
  204. if span.is_recording():
  205. span.add_event(name="error", attributes={"message": msg})
  206. if self.is_alive():
  207. self.log.error(
  208. "%s heartbeat failed with error. Scheduler may go into unhealthy state",
  209. self.__class__.__name__,
  210. )
  211. msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state"
  212. if span.is_recording():
  213. span.add_event(name="error", attributes={"message": msg})
  214. else:
  215. msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state"
  216. self.log.error(msg)
  217. if span.is_recording():
  218. span.add_event(name="error", attributes={"message": msg})
  219. # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
  220. self.latest_heartbeat = previous_heartbeat
  221. @provide_session
  222. def prepare_for_execution(self, session: Session = NEW_SESSION):
  223. """Prepare the job for execution."""
  224. Stats.incr(self.__class__.__name__.lower() + "_start", 1, 1)
  225. self.state = JobState.RUNNING
  226. self.start_date = timezone.utcnow()
  227. self._merge_from(Job._add_to_db(job=self, session=session))
  228. make_transient(self)
  229. @provide_session
  230. def complete_execution(self, session: Session = NEW_SESSION):
  231. get_listener_manager().hook.before_stopping(component=self)
  232. self.end_date = timezone.utcnow()
  233. Job._update_in_db(job=self, session=session)
  234. Stats.incr(self.__class__.__name__.lower() + "_end", 1, 1)
  235. @provide_session
  236. def most_recent_job(self, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
  237. """Return the most recent job of this type, if any, based on last heartbeat received."""
  238. return most_recent_job(self.job_type, session=session)
  239. def _merge_from(self, job: Job | JobPydantic | None):
  240. if job is None:
  241. self.log.error("Job is empty: %s", self.id)
  242. return
  243. self.id = job.id
  244. self.dag_id = job.dag_id
  245. self.state = job.state
  246. self.job_type = job.job_type
  247. self.start_date = job.start_date
  248. self.end_date = job.end_date
  249. self.latest_heartbeat = job.latest_heartbeat
  250. self.executor_class = job.executor_class
  251. self.hostname = job.hostname
  252. self.unixname = job.unixname
  253. @staticmethod
  254. def _heartrate(job_type: str) -> float:
  255. if job_type == "TriggererJob":
  256. return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
  257. elif job_type == "SchedulerJob":
  258. return conf.getfloat("scheduler", "SCHEDULER_HEARTBEAT_SEC")
  259. else:
  260. # Heartrate used to be hardcoded to scheduler, so in all other
  261. # cases continue to use that value for back compat
  262. return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC")
  263. @staticmethod
  264. def _is_alive(
  265. state: JobState | str | None,
  266. health_check_threshold_value: float | int,
  267. latest_heartbeat: datetime.datetime,
  268. ) -> bool:
  269. return (
  270. state == JobState.RUNNING
  271. and (timezone.utcnow() - latest_heartbeat).total_seconds() < health_check_threshold_value
  272. )
  273. @staticmethod
  274. @internal_api_call
  275. @provide_session
  276. def _kill(job_id: str, session: Session = NEW_SESSION) -> Job | JobPydantic:
  277. job = session.scalar(select(Job).where(Job.id == job_id).limit(1))
  278. job.end_date = timezone.utcnow()
  279. session.merge(job)
  280. session.commit()
  281. return job
  282. @staticmethod
  283. @internal_api_call
  284. @provide_session
  285. @retry_db_transaction
  286. def _fetch_from_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
  287. if isinstance(job, Job):
  288. # not Internal API
  289. session.merge(job)
  290. return job
  291. # Internal API,
  292. return session.scalar(select(Job).where(Job.id == job.id).limit(1))
  293. @staticmethod
  294. @internal_api_call
  295. @provide_session
  296. def _add_to_db(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic:
  297. if isinstance(job, JobPydantic):
  298. orm_job = Job()
  299. orm_job._merge_from(job)
  300. else:
  301. orm_job = job
  302. session.add(orm_job)
  303. session.commit()
  304. return orm_job
  305. @staticmethod
  306. @internal_api_call
  307. @provide_session
  308. def _update_in_db(job: Job | JobPydantic, session: Session = NEW_SESSION):
  309. if isinstance(job, Job):
  310. # not Internal API
  311. session.merge(job)
  312. session.commit()
  313. # Internal API.
  314. orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1))
  315. if orm_job is None:
  316. return
  317. orm_job._merge_from(job)
  318. session.merge(orm_job)
  319. session.commit()
  320. @staticmethod
  321. @internal_api_call
  322. @provide_session
  323. @retry_db_transaction
  324. def _update_heartbeat(job: Job | JobPydantic, session: Session = NEW_SESSION) -> Job | JobPydantic:
  325. orm_job: Job | None = session.scalar(select(Job).where(Job.id == job.id).limit(1))
  326. if orm_job is None:
  327. return job
  328. orm_job.latest_heartbeat = timezone.utcnow()
  329. session.merge(orm_job)
  330. session.commit()
  331. return orm_job
  332. @internal_api_call
  333. @provide_session
  334. def most_recent_job(job_type: str, session: Session = NEW_SESSION) -> Job | JobPydantic | None:
  335. """
  336. Return the most recent job of this type, if any, based on last heartbeat received.
  337. Jobs in "running" state take precedence over others to make sure alive
  338. job is returned if it is available.
  339. :param job_type: job type to query for to get the most recent job for
  340. :param session: Database session
  341. """
  342. return session.scalar(
  343. select(Job)
  344. .where(Job.job_type == job_type)
  345. .order_by(
  346. # Put "running" jobs at the front.
  347. case({JobState.RUNNING: 0}, value=Job.state, else_=1),
  348. Job.latest_heartbeat.desc(),
  349. )
  350. .limit(1)
  351. )
  352. @provide_session
  353. def run_job(
  354. job: Job, execute_callable: Callable[[], int | None], session: Session = NEW_SESSION
  355. ) -> int | None:
  356. """
  357. Run the job.
  358. The Job is always an ORM object and setting the state is happening within the
  359. same DB session and the session is kept open throughout the whole execution.
  360. :meta private:
  361. """
  362. job.prepare_for_execution(session=session)
  363. try:
  364. return execute_job(job, execute_callable=execute_callable)
  365. finally:
  366. job.complete_execution(session=session)
  367. def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | None:
  368. """
  369. Execute the job.
  370. Job execution requires no session as generally executing session does not require an
  371. active database connection. The session might be temporary acquired and used if the job
  372. runs heartbeat during execution, but this connection is only acquired for the time of heartbeat
  373. and in case of AIP-44 implementation it happens over the Internal API rather than directly via
  374. the database.
  375. After the job is completed, state of the Job is updated and it should be updated in the database,
  376. which happens in the "complete_execution" step (which again can be executed locally in case of
  377. database operations or over the Internal API call.
  378. :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
  379. not really matter, because except of running the heartbeat and state setting,
  380. the runner should not modify the job state.
  381. :param execute_callable: callable to execute when running the job.
  382. :meta private:
  383. """
  384. ret = None
  385. try:
  386. ret = execute_callable()
  387. # In case of max runs or max duration
  388. job.state = JobState.SUCCESS
  389. except SystemExit:
  390. # In case of ^C or SIGTERM
  391. job.state = JobState.SUCCESS
  392. except Exception:
  393. job.state = JobState.FAILED
  394. raise
  395. return ret
  396. @span
  397. def perform_heartbeat(
  398. job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
  399. ) -> None:
  400. """
  401. Perform heartbeat for the Job passed to it,optionally checking if it is necessary.
  402. :param job: job to perform heartbeat for
  403. :param heartbeat_callback: callback to run by the heartbeat
  404. :param only_if_necessary: only heartbeat if it is necessary (i.e. if there are things to run for
  405. triggerer for example)
  406. """
  407. seconds_remaining: float = 0.0
  408. if job.latest_heartbeat and job.heartrate:
  409. seconds_remaining = job.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds()
  410. if seconds_remaining > 0 and only_if_necessary:
  411. return
  412. job.heartbeat(heartbeat_callback=heartbeat_callback)