scheduler_job_runner.py 100 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import itertools
  20. import multiprocessing
  21. import os
  22. import signal
  23. import sys
  24. import time
  25. import warnings
  26. from collections import Counter, defaultdict, deque
  27. from contextlib import suppress
  28. from dataclasses import dataclass
  29. from datetime import timedelta
  30. from functools import lru_cache, partial
  31. from pathlib import Path
  32. from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
  33. from deprecated import deprecated
  34. from sqlalchemy import and_, delete, func, not_, or_, select, text, update
  35. from sqlalchemy.exc import OperationalError
  36. from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
  37. from sqlalchemy.sql import expression
  38. from airflow import settings
  39. from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
  40. from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
  41. from airflow.configuration import conf
  42. from airflow.exceptions import RemovedInAirflow3Warning, UnknownExecutorException
  43. from airflow.executors.executor_loader import ExecutorLoader
  44. from airflow.jobs.base_job_runner import BaseJobRunner
  45. from airflow.jobs.job import Job, perform_heartbeat
  46. from airflow.models import Log
  47. from airflow.models.dag import DAG, DagModel
  48. from airflow.models.dagbag import DagBag
  49. from airflow.models.dagrun import DagRun
  50. from airflow.models.dataset import (
  51. DagScheduleDatasetReference,
  52. DatasetDagRunQueue,
  53. DatasetEvent,
  54. DatasetModel,
  55. TaskOutletDatasetReference,
  56. )
  57. from airflow.models.serialized_dag import SerializedDagModel
  58. from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
  59. from airflow.stats import Stats
  60. from airflow.ti_deps.dependencies_states import EXECUTION_STATES
  61. from airflow.timetables.simple import DatasetTriggeredTimetable
  62. from airflow.traces import utils as trace_utils
  63. from airflow.traces.tracer import Trace, span
  64. from airflow.utils import timezone
  65. from airflow.utils.dates import datetime_to_nano
  66. from airflow.utils.event_scheduler import EventScheduler
  67. from airflow.utils.log.logging_mixin import LoggingMixin
  68. from airflow.utils.retries import MAX_DB_RETRIES, retry_db_transaction, run_with_db_retries
  69. from airflow.utils.session import NEW_SESSION, create_session, provide_session
  70. from airflow.utils.sqlalchemy import (
  71. is_lock_not_available_error,
  72. prohibit_commit,
  73. tuple_in_condition,
  74. with_row_locks,
  75. )
  76. from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
  77. from airflow.utils.types import DagRunType
  78. if TYPE_CHECKING:
  79. import logging
  80. from datetime import datetime
  81. from types import FrameType
  82. from sqlalchemy.engine import Result
  83. from sqlalchemy.orm import Query, Session
  84. from airflow.dag_processing.manager import DagFileProcessorAgent
  85. from airflow.executors.base_executor import BaseExecutor
  86. from airflow.executors.executor_utils import ExecutorName
  87. from airflow.models.taskinstance import TaskInstanceKey
  88. from airflow.utils.sqlalchemy import (
  89. CommitProhibitorGuard,
  90. )
  91. TI = TaskInstance
  92. DR = DagRun
  93. DM = DagModel
  94. TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
  95. """:meta private:"""
  96. @dataclass
  97. class ConcurrencyMap:
  98. """
  99. Dataclass to represent concurrency maps.
  100. It contains a map from (dag_id, task_id) to # of task instances, a map from (dag_id, task_id)
  101. to # of task instances in the given state list and a map from (dag_id, run_id, task_id)
  102. to # of task instances in the given state list in each DAG run.
  103. """
  104. dag_active_tasks_map: dict[str, int]
  105. task_concurrency_map: dict[tuple[str, str], int]
  106. task_dagrun_concurrency_map: dict[tuple[str, str, str], int]
  107. @classmethod
  108. def from_concurrency_map(cls, mapping: dict[tuple[str, str, str], int]) -> ConcurrencyMap:
  109. instance = cls(Counter(), Counter(), Counter(mapping))
  110. for (d, _, t), c in mapping.items():
  111. instance.dag_active_tasks_map[d] += c
  112. instance.task_concurrency_map[(d, t)] += c
  113. return instance
  114. def _is_parent_process() -> bool:
  115. """
  116. Whether this is a parent process.
  117. Return True if the current process is the parent process.
  118. False if the current process is a child process started by multiprocessing.
  119. """
  120. return multiprocessing.current_process().name == "MainProcess"
  121. class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
  122. """
  123. SchedulerJobRunner runs for a specific time interval and schedules jobs that are ready to run.
  124. It figures out the latest runs for each task and sees if the dependencies
  125. for the next schedules are met.
  126. If so, it creates appropriate TaskInstances and sends run commands to the
  127. executor. It does this for each task in each DAG and repeats.
  128. :param subdir: directory containing Python files with Airflow DAG
  129. definitions, or a specific path to a file
  130. :param num_runs: The number of times to run the scheduling loop. If you
  131. have a large number of DAG files this could complete before each file
  132. has been parsed. -1 for unlimited times.
  133. :param num_times_parse_dags: The number of times to try to parse each DAG file.
  134. -1 for unlimited times.
  135. :param scheduler_idle_sleep_time: The number of seconds to wait between
  136. polls of running processors
  137. :param do_pickle: once a DAG object is obtained by executing the Python
  138. file, whether to serialize the DAG object to the DB
  139. :param log: override the default Logger
  140. """
  141. job_type = "SchedulerJob"
  142. def __init__(
  143. self,
  144. job: Job,
  145. subdir: str = settings.DAGS_FOLDER,
  146. num_runs: int = conf.getint("scheduler", "num_runs"),
  147. num_times_parse_dags: int = -1,
  148. scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"),
  149. do_pickle: bool = False,
  150. log: logging.Logger | None = None,
  151. processor_poll_interval: float | None = None,
  152. ):
  153. super().__init__(job)
  154. self.subdir = subdir
  155. self.num_runs = num_runs
  156. # In specific tests, we want to stop the parse loop after the _files_ have been parsed a certain
  157. # number of times. This is only to support testing, and isn't something a user is likely to want to
  158. # configure -- they'll want num_runs
  159. self.num_times_parse_dags = num_times_parse_dags
  160. if processor_poll_interval:
  161. # TODO: Remove in Airflow 3.0
  162. warnings.warn(
  163. "The 'processor_poll_interval' parameter is deprecated. "
  164. "Please use 'scheduler_idle_sleep_time'.",
  165. RemovedInAirflow3Warning,
  166. stacklevel=2,
  167. )
  168. scheduler_idle_sleep_time = processor_poll_interval
  169. self._scheduler_idle_sleep_time = scheduler_idle_sleep_time
  170. # How many seconds do we wait for tasks to heartbeat before mark them as zombies.
  171. self._zombie_threshold_secs = conf.getint("scheduler", "scheduler_zombie_task_threshold")
  172. self._standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
  173. self._dag_stale_not_seen_duration = conf.getint("scheduler", "dag_stale_not_seen_duration")
  174. # Since the functionality for stalled_task_timeout, task_adoption_timeout, and
  175. # worker_pods_pending_timeout are now handled by a single config (task_queued_timeout),
  176. # we can't deprecate them as we normally would. So, we'll read each config and take
  177. # the max value in order to ensure we're not undercutting a legitimate
  178. # use of any of these configs.
  179. stalled_task_timeout = conf.getfloat("celery", "stalled_task_timeout", fallback=0)
  180. if stalled_task_timeout:
  181. # TODO: Remove in Airflow 3.0
  182. warnings.warn(
  183. "The '[celery] stalled_task_timeout' config option is deprecated. "
  184. "Please update your config to use '[scheduler] task_queued_timeout' instead.",
  185. DeprecationWarning,
  186. stacklevel=2,
  187. )
  188. task_adoption_timeout = conf.getfloat("celery", "task_adoption_timeout", fallback=0)
  189. if task_adoption_timeout:
  190. # TODO: Remove in Airflow 3.0
  191. warnings.warn(
  192. "The '[celery] task_adoption_timeout' config option is deprecated. "
  193. "Please update your config to use '[scheduler] task_queued_timeout' instead.",
  194. DeprecationWarning,
  195. stacklevel=2,
  196. )
  197. worker_pods_pending_timeout = conf.getfloat(
  198. "kubernetes_executor", "worker_pods_pending_timeout", fallback=0
  199. )
  200. if worker_pods_pending_timeout:
  201. # TODO: Remove in Airflow 3.0
  202. warnings.warn(
  203. "The '[kubernetes_executor] worker_pods_pending_timeout' config option is deprecated. "
  204. "Please update your config to use '[scheduler] task_queued_timeout' instead.",
  205. DeprecationWarning,
  206. stacklevel=2,
  207. )
  208. task_queued_timeout = conf.getfloat("scheduler", "task_queued_timeout")
  209. self._task_queued_timeout = max(
  210. stalled_task_timeout, task_adoption_timeout, worker_pods_pending_timeout, task_queued_timeout
  211. )
  212. # this param is intentionally undocumented
  213. self._num_stuck_queued_retries = conf.getint(
  214. section="scheduler",
  215. key="num_stuck_in_queued_retries",
  216. fallback=2,
  217. )
  218. self.do_pickle = do_pickle
  219. if log:
  220. self._log = log
  221. # Check what SQL backend we use
  222. sql_conn: str = conf.get_mandatory_value("database", "sql_alchemy_conn").lower()
  223. self.using_sqlite = sql_conn.startswith("sqlite")
  224. # Dag Processor agent - not used in Dag Processor standalone mode.
  225. self.processor_agent: DagFileProcessorAgent | None = None
  226. self.dagbag = DagBag(dag_folder=self.subdir, read_dags_from_db=True, load_op_links=False)
  227. @provide_session
  228. def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
  229. Stats.incr("scheduler_heartbeat", 1, 1)
  230. def register_signals(self) -> None:
  231. """Register signals that stop child processes."""
  232. signal.signal(signal.SIGINT, self._exit_gracefully)
  233. signal.signal(signal.SIGTERM, self._exit_gracefully)
  234. signal.signal(signal.SIGUSR2, self._debug_dump)
  235. def _exit_gracefully(self, signum: int, frame: FrameType | None) -> None:
  236. """Clean up processor_agent to avoid leaving orphan processes."""
  237. if not _is_parent_process():
  238. # Only the parent process should perform the cleanup.
  239. return
  240. self.log.info("Exiting gracefully upon receiving signal %s", signum)
  241. if self.processor_agent:
  242. self.processor_agent.end()
  243. sys.exit(os.EX_OK)
  244. def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
  245. if not _is_parent_process():
  246. # Only the parent process should perform the debug dump.
  247. return
  248. try:
  249. sig_name = signal.Signals(signum).name
  250. except Exception:
  251. sig_name = str(signum)
  252. self.log.info("%s\n%s received, printing debug\n%s", "-" * 80, sig_name, "-" * 80)
  253. for executor in self.job.executors:
  254. self.log.info("Debug dump for the executor %s", executor)
  255. executor.debug_dump()
  256. self.log.info("-" * 80)
  257. def __get_concurrency_maps(self, states: Iterable[TaskInstanceState], session: Session) -> ConcurrencyMap:
  258. """
  259. Get the concurrency maps.
  260. :param states: List of states to query for
  261. :return: Concurrency map
  262. """
  263. ti_concurrency_query: Result = session.execute(
  264. select(TI.task_id, TI.run_id, TI.dag_id, func.count("*"))
  265. .where(TI.state.in_(states))
  266. .group_by(TI.task_id, TI.run_id, TI.dag_id)
  267. )
  268. return ConcurrencyMap.from_concurrency_map(
  269. {(dag_id, run_id, task_id): count for task_id, run_id, dag_id, count in ti_concurrency_query}
  270. )
  271. def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -> list[TI]:
  272. """
  273. Find TIs that are ready for execution based on conditions.
  274. Conditions include:
  275. - pool limits
  276. - DAG max_active_tasks
  277. - executor state
  278. - priority
  279. - max active tis per DAG
  280. - max active tis per DAG run
  281. :param max_tis: Maximum number of TIs to queue in this loop.
  282. :return: list[airflow.models.TaskInstance]
  283. """
  284. from airflow.models.pool import Pool
  285. from airflow.utils.db import DBLocks
  286. executable_tis: list[TI] = []
  287. if session.get_bind().dialect.name == "postgresql":
  288. # Optimization: to avoid littering the DB errors of "ERROR: canceling statement due to lock
  289. # timeout", try to take out a transactional advisory lock (unlocks automatically on
  290. # COMMIT/ROLLBACK)
  291. lock_acquired = session.execute(
  292. text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
  293. id=DBLocks.SCHEDULER_CRITICAL_SECTION.value
  294. )
  295. ).scalar()
  296. if not lock_acquired:
  297. # Throw an error like the one that would happen with NOWAIT
  298. raise OperationalError(
  299. "Failed to acquire advisory lock", params=None, orig=RuntimeError("55P03")
  300. )
  301. # Get the pool settings. We get a lock on the pool rows, treating this as a "critical section"
  302. # Throws an exception if lock cannot be obtained, rather than blocking
  303. pools = Pool.slots_stats(lock_rows=True, session=session)
  304. # If the pools are full, there is no point doing anything!
  305. # If _somehow_ the pool is overfull, don't let the limit go negative - it breaks SQL
  306. pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values())
  307. if pool_slots_free == 0:
  308. self.log.debug("All pools are full!")
  309. return []
  310. max_tis = min(max_tis, pool_slots_free)
  311. starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0}
  312. # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks.
  313. concurrency_map = self.__get_concurrency_maps(states=EXECUTION_STATES, session=session)
  314. # Number of tasks that cannot be scheduled because of no open slot in pool
  315. num_starving_tasks_total = 0
  316. # dag and task ids that can't be queued because of concurrency limits
  317. starved_dags: set[str] = set()
  318. starved_tasks: set[tuple[str, str]] = set()
  319. starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set()
  320. pool_num_starving_tasks: dict[str, int] = Counter()
  321. for loop_count in itertools.count(start=1):
  322. num_starved_pools = len(starved_pools)
  323. num_starved_dags = len(starved_dags)
  324. num_starved_tasks = len(starved_tasks)
  325. num_starved_tasks_task_dagrun_concurrency = len(starved_tasks_task_dagrun_concurrency)
  326. # Get task instances associated with scheduled
  327. # DagRuns which are not backfilled, in the given states,
  328. # and the dag is not paused
  329. query = (
  330. select(TI)
  331. .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
  332. .join(TI.dag_run)
  333. .where(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
  334. .join(TI.dag_model)
  335. .where(not_(DM.is_paused))
  336. .where(TI.state == TaskInstanceState.SCHEDULED)
  337. .options(selectinload(TI.dag_model))
  338. .order_by(-TI.priority_weight, DR.execution_date, TI.map_index)
  339. )
  340. if starved_pools:
  341. query = query.where(not_(TI.pool.in_(starved_pools)))
  342. if starved_dags:
  343. query = query.where(not_(TI.dag_id.in_(starved_dags)))
  344. if starved_tasks:
  345. task_filter = tuple_in_condition((TI.dag_id, TI.task_id), starved_tasks)
  346. query = query.where(not_(task_filter))
  347. if starved_tasks_task_dagrun_concurrency:
  348. task_filter = tuple_in_condition(
  349. (TI.dag_id, TI.run_id, TI.task_id),
  350. starved_tasks_task_dagrun_concurrency,
  351. )
  352. query = query.where(not_(task_filter))
  353. query = query.limit(max_tis)
  354. timer = Stats.timer("scheduler.critical_section_query_duration")
  355. timer.start()
  356. try:
  357. query = with_row_locks(query, of=TI, session=session, skip_locked=True)
  358. task_instances_to_examine: list[TI] = session.scalars(query).all()
  359. timer.stop(send=True)
  360. except OperationalError as e:
  361. timer.stop(send=False)
  362. raise e
  363. # TODO[HA]: This was wrong before anyway, as it only looked at a sub-set of dags, not everything.
  364. # Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine))
  365. if not task_instances_to_examine:
  366. self.log.debug("No tasks to consider for execution.")
  367. break
  368. # Put one task instance on each line
  369. task_instance_str = "\n".join(f"\t{x!r}" for x in task_instances_to_examine)
  370. self.log.info("%s tasks up for execution:\n%s", len(task_instances_to_examine), task_instance_str)
  371. executor_slots_available: dict[ExecutorName, int] = {}
  372. # First get a mapping of executor names to slots they have available
  373. for executor in self.job.executors:
  374. if TYPE_CHECKING:
  375. # All executors should have a name if they are initted from the executor_loader.
  376. # But we need to check for None to make mypy happy.
  377. assert executor.name
  378. executor_slots_available[executor.name] = executor.slots_available
  379. for task_instance in task_instances_to_examine:
  380. pool_name = task_instance.pool
  381. pool_stats = pools.get(pool_name)
  382. if not pool_stats:
  383. self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool_name)
  384. starved_pools.add(pool_name)
  385. continue
  386. # Make sure to emit metrics if pool has no starving tasks
  387. pool_num_starving_tasks.setdefault(pool_name, 0)
  388. pool_total = pool_stats["total"]
  389. open_slots = pool_stats["open"]
  390. if open_slots <= 0:
  391. self.log.info(
  392. "Not scheduling since there are %s open slots in pool %s", open_slots, pool_name
  393. )
  394. # Can't schedule any more since there are no more open slots.
  395. pool_num_starving_tasks[pool_name] += 1
  396. num_starving_tasks_total += 1
  397. starved_pools.add(pool_name)
  398. continue
  399. if task_instance.pool_slots > pool_total:
  400. self.log.warning(
  401. "Not executing %s. Requested pool slots (%s) are greater than "
  402. "total pool slots: '%s' for pool: %s.",
  403. task_instance,
  404. task_instance.pool_slots,
  405. pool_total,
  406. pool_name,
  407. )
  408. pool_num_starving_tasks[pool_name] += 1
  409. num_starving_tasks_total += 1
  410. starved_tasks.add((task_instance.dag_id, task_instance.task_id))
  411. continue
  412. if task_instance.pool_slots > open_slots:
  413. self.log.info(
  414. "Not executing %s since it requires %s slots "
  415. "but there are %s open slots in the pool %s.",
  416. task_instance,
  417. task_instance.pool_slots,
  418. open_slots,
  419. pool_name,
  420. )
  421. pool_num_starving_tasks[pool_name] += 1
  422. num_starving_tasks_total += 1
  423. starved_tasks.add((task_instance.dag_id, task_instance.task_id))
  424. # Though we can execute tasks with lower priority if there's enough room
  425. continue
  426. # Check to make sure that the task max_active_tasks of the DAG hasn't been
  427. # reached.
  428. dag_id = task_instance.dag_id
  429. current_active_tasks_per_dag = concurrency_map.dag_active_tasks_map[dag_id]
  430. max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks
  431. self.log.info(
  432. "DAG %s has %s/%s running and queued tasks",
  433. dag_id,
  434. current_active_tasks_per_dag,
  435. max_active_tasks_per_dag_limit,
  436. )
  437. if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit:
  438. self.log.info(
  439. "Not executing %s since the number of tasks running or queued "
  440. "from DAG %s is >= to the DAG's max_active_tasks limit of %s",
  441. task_instance,
  442. dag_id,
  443. max_active_tasks_per_dag_limit,
  444. )
  445. starved_dags.add(dag_id)
  446. continue
  447. if task_instance.dag_model.has_task_concurrency_limits:
  448. # Many dags don't have a task_concurrency, so where we can avoid loading the full
  449. # serialized DAG the better.
  450. serialized_dag = self.dagbag.get_dag(dag_id, session=session)
  451. # If the dag is missing, fail the task and continue to the next task.
  452. if not serialized_dag:
  453. self.log.error(
  454. "DAG '%s' for task instance %s not found in serialized_dag table",
  455. dag_id,
  456. task_instance,
  457. )
  458. session.execute(
  459. update(TI)
  460. .where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
  461. .values(state=TaskInstanceState.FAILED)
  462. .execution_options(synchronize_session="fetch")
  463. )
  464. continue
  465. task_concurrency_limit: int | None = None
  466. if serialized_dag.has_task(task_instance.task_id):
  467. task_concurrency_limit = serialized_dag.get_task(
  468. task_instance.task_id
  469. ).max_active_tis_per_dag
  470. if task_concurrency_limit is not None:
  471. current_task_concurrency = concurrency_map.task_concurrency_map[
  472. (task_instance.dag_id, task_instance.task_id)
  473. ]
  474. if current_task_concurrency >= task_concurrency_limit:
  475. self.log.info(
  476. "Not executing %s since the task concurrency for"
  477. " this task has been reached.",
  478. task_instance,
  479. )
  480. starved_tasks.add((task_instance.dag_id, task_instance.task_id))
  481. continue
  482. task_dagrun_concurrency_limit: int | None = None
  483. if serialized_dag.has_task(task_instance.task_id):
  484. task_dagrun_concurrency_limit = serialized_dag.get_task(
  485. task_instance.task_id
  486. ).max_active_tis_per_dagrun
  487. if task_dagrun_concurrency_limit is not None:
  488. current_task_dagrun_concurrency = concurrency_map.task_dagrun_concurrency_map[
  489. (task_instance.dag_id, task_instance.run_id, task_instance.task_id)
  490. ]
  491. if current_task_dagrun_concurrency >= task_dagrun_concurrency_limit:
  492. self.log.info(
  493. "Not executing %s since the task concurrency per DAG run for"
  494. " this task has been reached.",
  495. task_instance,
  496. )
  497. starved_tasks_task_dagrun_concurrency.add(
  498. (task_instance.dag_id, task_instance.run_id, task_instance.task_id)
  499. )
  500. continue
  501. if executor_obj := self._try_to_load_executor(task_instance.executor):
  502. if TYPE_CHECKING:
  503. # All executors should have a name if they are initted from the executor_loader.
  504. # But we need to check for None to make mypy happy.
  505. assert executor_obj.name
  506. if executor_slots_available[executor_obj.name] <= 0:
  507. self.log.debug(
  508. "Not scheduling %s since its executor %s does not currently have any more "
  509. "available slots"
  510. )
  511. starved_tasks.add((task_instance.dag_id, task_instance.task_id))
  512. continue
  513. else:
  514. executor_slots_available[executor_obj.name] -= 1
  515. else:
  516. # This is a defensive guard for if we happen to have a task who's executor cannot be
  517. # found. The check in the dag parser should make this not realistically possible but the
  518. # loader can fail if some direct DB modification has happened or another as yet unknown
  519. # edge case. _try_to_load_executor will log an error message explaining the executor
  520. # cannot be found.
  521. starved_tasks.add((task_instance.dag_id, task_instance.task_id))
  522. continue
  523. executable_tis.append(task_instance)
  524. open_slots -= task_instance.pool_slots
  525. concurrency_map.dag_active_tasks_map[dag_id] += 1
  526. concurrency_map.task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1
  527. concurrency_map.task_dagrun_concurrency_map[
  528. (task_instance.dag_id, task_instance.run_id, task_instance.task_id)
  529. ] += 1
  530. pool_stats["open"] = open_slots
  531. is_done = executable_tis or len(task_instances_to_examine) < max_tis
  532. # Check this to avoid accidental infinite loops
  533. found_new_filters = (
  534. len(starved_pools) > num_starved_pools
  535. or len(starved_dags) > num_starved_dags
  536. or len(starved_tasks) > num_starved_tasks
  537. or len(starved_tasks_task_dagrun_concurrency) > num_starved_tasks_task_dagrun_concurrency
  538. )
  539. if is_done or not found_new_filters:
  540. break
  541. self.log.info(
  542. "Found no task instances to queue on query iteration %s "
  543. "but there could be more candidate task instances to check.",
  544. loop_count,
  545. )
  546. for pool_name, num_starving_tasks in pool_num_starving_tasks.items():
  547. Stats.gauge(f"pool.starving_tasks.{pool_name}", num_starving_tasks)
  548. # Same metric with tagging
  549. Stats.gauge("pool.starving_tasks", num_starving_tasks, tags={"pool_name": pool_name})
  550. Stats.gauge("scheduler.tasks.starving", num_starving_tasks_total)
  551. Stats.gauge("scheduler.tasks.executable", len(executable_tis))
  552. if executable_tis:
  553. task_instance_str = "\n".join(f"\t{x!r}" for x in executable_tis)
  554. self.log.info("Setting the following tasks to queued state:\n%s", task_instance_str)
  555. # set TIs to queued state
  556. filter_for_tis = TI.filter_for_tis(executable_tis)
  557. session.execute(
  558. update(TI)
  559. .where(filter_for_tis)
  560. .values(
  561. # TODO[ha]: should we use func.now()? How does that work with DB timezone
  562. # on mysql when it's not UTC?
  563. state=TaskInstanceState.QUEUED,
  564. queued_dttm=timezone.utcnow(),
  565. queued_by_job_id=self.job.id,
  566. )
  567. .execution_options(synchronize_session=False)
  568. )
  569. for ti in executable_tis:
  570. ti.emit_state_change_metric(TaskInstanceState.QUEUED)
  571. for ti in executable_tis:
  572. make_transient(ti)
  573. return executable_tis
  574. def _enqueue_task_instances_with_queued_state(
  575. self, task_instances: list[TI], executor: BaseExecutor, session: Session
  576. ) -> None:
  577. """
  578. Enqueue task_instances which should have been set to queued with the executor.
  579. :param task_instances: TaskInstances to enqueue
  580. :param executor: The executor to enqueue tasks for
  581. :param session: The session object
  582. """
  583. # actually enqueue them
  584. for ti in task_instances:
  585. if ti.dag_run.state in State.finished_dr_states:
  586. ti.set_state(None, session=session)
  587. continue
  588. command = ti.command_as_list(
  589. local=True,
  590. pickle_id=ti.dag_model.pickle_id,
  591. )
  592. priority = ti.priority_weight
  593. queue = ti.queue
  594. self.log.info(
  595. "Sending %s to %s with priority %s and queue %s", ti.key, executor.name, priority, queue
  596. )
  597. executor.queue_command(
  598. ti,
  599. command,
  600. priority=priority,
  601. queue=queue,
  602. )
  603. def _critical_section_enqueue_task_instances(self, session: Session) -> int:
  604. """
  605. Enqueues TaskInstances for execution.
  606. There are three steps:
  607. 1. Pick TIs by priority with the constraint that they are in the expected states
  608. and that we do not exceed max_active_runs or pool limits.
  609. 2. Change the state for the TIs above atomically.
  610. 3. Enqueue the TIs in the executor.
  611. HA note: This function is a "critical section" meaning that only a single scheduler process can
  612. execute this function at the same time. This is achieved by doing
  613. ``SELECT ... from pool FOR UPDATE``. For DBs that support NOWAIT, a "blocked" scheduler will skip
  614. this and continue on with other tasks (creating new DAG runs, progressing TIs from None to SCHEDULED
  615. etc.); DBs that don't support this (such as MariaDB or MySQL 5.x) the other schedulers will wait for
  616. the lock before continuing.
  617. :param session:
  618. :return: Number of task instance with state changed.
  619. """
  620. # The user can either request a certain number of tis to schedule per main scheduler loop (default
  621. # is non-zero). If that value has been set to zero, that means use the value of core.parallelism (or
  622. # however many free slots are left). core.parallelism represents the max number of running TIs per
  623. # scheduler. Historically this value was stored in the executor, who's job it was to control/enforce
  624. # it. However, with multiple executors, any of which can run up to core.parallelism TIs individually,
  625. # we need to make sure in the scheduler now that we don't schedule more than core.parallelism totally
  626. # across all executors.
  627. num_occupied_slots = sum([executor.slots_occupied for executor in self.job.executors])
  628. parallelism = conf.getint("core", "parallelism")
  629. # Parallelism configured to 0 means infinite currently running tasks
  630. if parallelism == 0:
  631. parallelism = sys.maxsize
  632. if self.job.max_tis_per_query == 0:
  633. max_tis = parallelism - num_occupied_slots
  634. else:
  635. max_tis = min(self.job.max_tis_per_query, parallelism - num_occupied_slots)
  636. if max_tis <= 0:
  637. self.log.debug("max_tis query size is less than or equal to zero. No query will be performed!")
  638. return 0
  639. queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
  640. # Sort queued TIs to there respective executor
  641. executor_to_queued_tis = self._executor_to_tis(queued_tis)
  642. for executor, queued_tis_per_executor in executor_to_queued_tis.items():
  643. self.log.info(
  644. "Trying to enqueue tasks: %s for executor: %s",
  645. queued_tis_per_executor,
  646. executor,
  647. )
  648. self._enqueue_task_instances_with_queued_state(queued_tis_per_executor, executor, session=session)
  649. return len(queued_tis)
  650. @staticmethod
  651. def _process_task_event_logs(log_records: deque[Log], session: Session):
  652. objects = (log_records.popleft() for _ in range(len(log_records)))
  653. session.bulk_save_objects(objects=objects, preserve_order=False)
  654. def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int:
  655. """Respond to executor events."""
  656. if not self._standalone_dag_processor and not self.processor_agent:
  657. raise ValueError("Processor agent is not started.")
  658. ti_primary_key_to_try_number_map: dict[tuple[str, str, str, int], int] = {}
  659. event_buffer = executor.get_event_buffer()
  660. tis_with_right_state: list[TaskInstanceKey] = []
  661. # Report execution
  662. for ti_key, (state, _) in event_buffer.items():
  663. # We create map (dag_id, task_id, execution_date) -> in-memory try_number
  664. ti_primary_key_to_try_number_map[ti_key.primary] = ti_key.try_number
  665. self.log.info("Received executor event with state %s for task instance %s", state, ti_key)
  666. if state in (
  667. TaskInstanceState.FAILED,
  668. TaskInstanceState.SUCCESS,
  669. TaskInstanceState.QUEUED,
  670. TaskInstanceState.RUNNING,
  671. ):
  672. tis_with_right_state.append(ti_key)
  673. # Return if no finished tasks
  674. if not tis_with_right_state:
  675. return len(event_buffer)
  676. # Check state of finished tasks
  677. filter_for_tis = TI.filter_for_tis(tis_with_right_state)
  678. query = select(TI).where(filter_for_tis).options(selectinload(TI.dag_model))
  679. # row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have
  680. # multi-schedulers
  681. tis_query: Query = with_row_locks(query, of=TI, session=session, skip_locked=True)
  682. tis: Iterator[TI] = session.scalars(tis_query)
  683. for ti in tis:
  684. try_number = ti_primary_key_to_try_number_map[ti.key.primary]
  685. buffer_key = ti.key.with_try_number(try_number)
  686. state, info = event_buffer.pop(buffer_key)
  687. if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):
  688. ti.external_executor_id = info
  689. self.log.info("Setting external_id for %s to %s", ti, info)
  690. continue
  691. msg = (
  692. "TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, map_index=%s, "
  693. "run_start_date=%s, run_end_date=%s, "
  694. "run_duration=%s, state=%s, executor=%s, executor_state=%s, try_number=%s, max_tries=%s, "
  695. "job_id=%s, pool=%s, queue=%s, priority_weight=%d, operator=%s, queued_dttm=%s, "
  696. "queued_by_job_id=%s, pid=%s"
  697. )
  698. self.log.info(
  699. msg,
  700. ti.dag_id,
  701. ti.task_id,
  702. ti.run_id,
  703. ti.map_index,
  704. ti.start_date,
  705. ti.end_date,
  706. ti.duration,
  707. ti.state,
  708. executor,
  709. state,
  710. try_number,
  711. ti.max_tries,
  712. ti.job_id,
  713. ti.pool,
  714. ti.queue,
  715. ti.priority_weight,
  716. ti.operator,
  717. ti.queued_dttm,
  718. ti.queued_by_job_id,
  719. ti.pid,
  720. )
  721. with Trace.start_span_from_taskinstance(ti=ti) as span:
  722. span.set_attribute("category", "scheduler")
  723. span.set_attribute("task_id", ti.task_id)
  724. span.set_attribute("dag_id", ti.dag_id)
  725. span.set_attribute("state", ti.state)
  726. if ti.state == TaskInstanceState.FAILED:
  727. span.set_attribute("error", True)
  728. span.set_attribute("start_date", str(ti.start_date))
  729. span.set_attribute("end_date", str(ti.end_date))
  730. span.set_attribute("duration", ti.duration)
  731. span.set_attribute("executor_config", str(ti.executor_config))
  732. span.set_attribute("execution_date", str(ti.execution_date))
  733. span.set_attribute("hostname", ti.hostname)
  734. span.set_attribute("log_url", ti.log_url)
  735. span.set_attribute("operator", str(ti.operator))
  736. span.set_attribute("try_number", ti.try_number)
  737. span.set_attribute("executor_state", state)
  738. span.set_attribute("job_id", ti.job_id)
  739. span.set_attribute("pool", ti.pool)
  740. span.set_attribute("queue", ti.queue)
  741. span.set_attribute("priority_weight", ti.priority_weight)
  742. span.set_attribute("queued_dttm", str(ti.queued_dttm))
  743. span.set_attribute("ququed_by_job_id", ti.queued_by_job_id)
  744. span.set_attribute("pid", ti.pid)
  745. if span.is_recording():
  746. if ti.queued_dttm:
  747. span.add_event(name="queued", timestamp=datetime_to_nano(ti.queued_dttm))
  748. if ti.start_date:
  749. span.add_event(name="started", timestamp=datetime_to_nano(ti.start_date))
  750. if ti.end_date:
  751. span.add_event(name="ended", timestamp=datetime_to_nano(ti.end_date))
  752. if conf.has_option("traces", "otel_task_log_event") and conf.getboolean(
  753. "traces", "otel_task_log_event"
  754. ):
  755. from airflow.utils.log.log_reader import TaskLogReader
  756. task_log_reader = TaskLogReader()
  757. if task_log_reader.supports_read:
  758. metadata: dict[str, Any] = {}
  759. logs, metadata = task_log_reader.read_log_chunks(ti, ti.try_number, metadata)
  760. if ti.hostname in dict(logs[0]):
  761. message = str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
  762. while metadata["end_of_log"] is False:
  763. logs, metadata = task_log_reader.read_log_chunks(
  764. ti, ti.try_number - 1, metadata
  765. )
  766. if ti.hostname in dict(logs[0]):
  767. message = message + str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
  768. if span.is_recording():
  769. span.add_event(
  770. name="task_log",
  771. attributes={
  772. "message": message,
  773. "metadata": str(metadata),
  774. },
  775. )
  776. # There are two scenarios why the same TI with the same try_number is queued
  777. # after executor is finished with it:
  778. # 1) the TI was killed externally and it had no time to mark itself failed
  779. # - in this case we should mark it as failed here.
  780. # 2) the TI has been requeued after getting deferred - in this case either our executor has it
  781. # or the TI is queued by another job. Either ways we should not fail it.
  782. # All of this could also happen if the state is "running",
  783. # but that is handled by the zombie detection.
  784. ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED
  785. ti_requeued = (
  786. ti.queued_by_job_id != self.job.id # Another scheduler has queued this task again
  787. or executor.has_task(ti) # This scheduler has this task already
  788. )
  789. if ti_queued and not ti_requeued:
  790. Stats.incr(
  791. "scheduler.tasks.killed_externally",
  792. tags={"dag_id": ti.dag_id, "task_id": ti.task_id},
  793. )
  794. msg = (
  795. "Executor %s reported that the task instance %s finished with state %s, but the task instance's state attribute is %s. " # noqa: RUF100, UP031, flynt
  796. "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"
  797. % (executor, ti, state, ti.state)
  798. )
  799. if info is not None:
  800. msg += " Extra info: %s" % info # noqa: RUF100, UP031, flynt
  801. self.log.error(msg)
  802. session.add(Log(event="state mismatch", extra=msg, task_instance=ti.key))
  803. # Get task from the Serialized DAG
  804. try:
  805. dag = self.dagbag.get_dag(ti.dag_id)
  806. task = dag.get_task(ti.task_id)
  807. except Exception:
  808. self.log.exception("Marking task instance %s as %s", ti, state)
  809. ti.set_state(state)
  810. continue
  811. ti.task = task
  812. if task.on_retry_callback or task.on_failure_callback:
  813. request = TaskCallbackRequest(
  814. full_filepath=ti.dag_model.fileloc,
  815. simple_task_instance=SimpleTaskInstance.from_ti(ti),
  816. msg=msg,
  817. processor_subdir=ti.dag_model.processor_subdir,
  818. )
  819. executor.send_callback(request)
  820. else:
  821. ti.handle_failure(error=msg, session=session)
  822. return len(event_buffer)
  823. def _execute(self) -> int | None:
  824. from airflow.dag_processing.manager import DagFileProcessorAgent
  825. self.log.info("Starting the scheduler")
  826. executor_class, _ = ExecutorLoader.import_default_executor_cls()
  827. # DAGs can be pickled for easier remote execution by some executors
  828. pickle_dags = self.do_pickle and executor_class.supports_pickling
  829. self.log.info("Processing each file at most %s times", self.num_times_parse_dags)
  830. # When using sqlite, we do not use async_mode
  831. # so the scheduler job and DAG parser don't access the DB at the same time.
  832. async_mode = not self.using_sqlite
  833. processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
  834. processor_timeout = timedelta(seconds=processor_timeout_seconds)
  835. if not self._standalone_dag_processor and not self.processor_agent:
  836. self.processor_agent = DagFileProcessorAgent(
  837. dag_directory=Path(self.subdir),
  838. max_runs=self.num_times_parse_dags,
  839. processor_timeout=processor_timeout,
  840. dag_ids=[],
  841. pickle_dags=pickle_dags,
  842. async_mode=async_mode,
  843. )
  844. try:
  845. callback_sink: PipeCallbackSink | DatabaseCallbackSink
  846. if self.processor_agent:
  847. self.log.debug("Using PipeCallbackSink as callback sink.")
  848. callback_sink = PipeCallbackSink(get_sink_pipe=self.processor_agent.get_callbacks_pipe)
  849. else:
  850. from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
  851. self.log.debug("Using DatabaseCallbackSink as callback sink.")
  852. callback_sink = DatabaseCallbackSink()
  853. for executor in self.job.executors:
  854. executor.job_id = self.job.id
  855. executor.callback_sink = callback_sink
  856. executor.start()
  857. self.register_signals()
  858. if self.processor_agent:
  859. self.processor_agent.start()
  860. execute_start_time = timezone.utcnow()
  861. self._run_scheduler_loop()
  862. if self.processor_agent:
  863. # Stop any processors
  864. self.processor_agent.terminate()
  865. # Verify that all files were processed, and if so, deactivate DAGs that
  866. # haven't been touched by the scheduler as they likely have been
  867. # deleted.
  868. if self.processor_agent.all_files_processed:
  869. self.log.info(
  870. "Deactivating DAGs that haven't been touched since %s", execute_start_time.isoformat()
  871. )
  872. DAG.deactivate_stale_dags(execute_start_time)
  873. settings.Session.remove() # type: ignore
  874. except Exception:
  875. self.log.exception("Exception when executing SchedulerJob._run_scheduler_loop")
  876. raise
  877. finally:
  878. for executor in self.job.executors:
  879. try:
  880. executor.end()
  881. except Exception:
  882. self.log.exception("Exception when executing Executor.end on %s", executor)
  883. if self.processor_agent:
  884. try:
  885. self.processor_agent.end()
  886. except Exception:
  887. self.log.exception("Exception when executing DagFileProcessorAgent.end")
  888. self.log.info("Exited execute loop")
  889. return None
  890. @provide_session
  891. def _update_dag_run_state_for_paused_dags(self, session: Session = NEW_SESSION) -> None:
  892. try:
  893. paused_runs = session.scalars(
  894. select(DagRun)
  895. .join(DagRun.dag_model)
  896. .join(TI)
  897. .where(
  898. DagModel.is_paused == expression.true(),
  899. DagRun.state == DagRunState.RUNNING,
  900. DagRun.run_type != DagRunType.BACKFILL_JOB,
  901. )
  902. .having(DagRun.last_scheduling_decision <= func.max(TI.updated_at))
  903. .group_by(DagRun)
  904. )
  905. for dag_run in paused_runs:
  906. dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
  907. if dag is not None:
  908. dag_run.dag = dag
  909. _, callback_to_run = dag_run.update_state(execute_callbacks=False, session=session)
  910. if callback_to_run:
  911. self._send_dag_callbacks_to_processor(dag, callback_to_run)
  912. except Exception as e: # should not fail the scheduler
  913. self.log.exception("Failed to update dag run state for paused dags due to %s", e)
  914. def _run_scheduler_loop(self) -> None:
  915. """
  916. Harvest DAG parsing results, queue tasks, and perform executor heartbeat; the actual scheduler loop.
  917. The main steps in the loop are:
  918. #. Harvest DAG parsing results through DagFileProcessorAgent
  919. #. Find and queue executable tasks
  920. #. Change task instance state in DB
  921. #. Queue tasks in executor
  922. #. Heartbeat executor
  923. #. Execute queued tasks in executor asynchronously
  924. #. Sync on the states of running tasks
  925. Following is a graphic representation of these steps.
  926. .. image:: ../docs/apache-airflow/img/scheduler_loop.jpg
  927. """
  928. if not self.processor_agent and not self._standalone_dag_processor:
  929. raise ValueError("Processor agent is not started.")
  930. is_unit_test: bool = conf.getboolean("core", "unit_test_mode")
  931. timers = EventScheduler()
  932. # Check on start up, then every configured interval
  933. self.adopt_or_reset_orphaned_tasks()
  934. timers.call_regular_interval(
  935. conf.getfloat("scheduler", "orphaned_tasks_check_interval", fallback=300.0),
  936. self.adopt_or_reset_orphaned_tasks,
  937. )
  938. timers.call_regular_interval(
  939. conf.getfloat("scheduler", "trigger_timeout_check_interval", fallback=15.0),
  940. self.check_trigger_timeouts,
  941. )
  942. timers.call_regular_interval(
  943. conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
  944. self._emit_pool_metrics,
  945. )
  946. timers.call_regular_interval(
  947. conf.getfloat("scheduler", "zombie_detection_interval", fallback=10.0),
  948. self._find_and_purge_zombies,
  949. )
  950. timers.call_regular_interval(60.0, self._update_dag_run_state_for_paused_dags)
  951. timers.call_regular_interval(
  952. conf.getfloat("scheduler", "task_queued_timeout_check_interval"),
  953. self._handle_tasks_stuck_in_queued,
  954. )
  955. timers.call_regular_interval(
  956. conf.getfloat("scheduler", "parsing_cleanup_interval"),
  957. self._orphan_unreferenced_datasets,
  958. )
  959. if self._standalone_dag_processor:
  960. timers.call_regular_interval(
  961. conf.getfloat("scheduler", "parsing_cleanup_interval"),
  962. self._cleanup_stale_dags,
  963. )
  964. for loop_count in itertools.count(start=1):
  965. with Trace.start_span(
  966. span_name="scheduler_job_loop", component="SchedulerJobRunner"
  967. ) as span, Stats.timer("scheduler.scheduler_loop_duration") as timer:
  968. span.set_attribute("category", "scheduler")
  969. span.set_attribute("loop_count", loop_count)
  970. if self.using_sqlite and self.processor_agent:
  971. self.processor_agent.run_single_parsing_loop()
  972. # For the sqlite case w/ 1 thread, wait until the processor
  973. # is finished to avoid concurrent access to the DB.
  974. self.log.debug("Waiting for processors to finish since we're using sqlite")
  975. self.processor_agent.wait_until_finished()
  976. with create_session() as session:
  977. # This will schedule for as many executors as possible.
  978. num_queued_tis = self._do_scheduling(session)
  979. # Heartbeat all executors, even if they're not receiving new tasks this loop. It will be
  980. # either a no-op, or they will check-in on currently running tasks and send out new
  981. # events to be processed below.
  982. for executor in self.job.executors:
  983. executor.heartbeat()
  984. session.expunge_all()
  985. num_finished_events = 0
  986. for executor in self.job.executors:
  987. num_finished_events += self._process_executor_events(
  988. executor=executor, session=session
  989. )
  990. for executor in self.job.executors:
  991. try:
  992. # this is backcompat check if executor does not inherit from BaseExecutor
  993. # todo: remove in airflow 3.0
  994. if not hasattr(executor, "_task_event_logs"):
  995. continue
  996. with create_session() as session:
  997. self._process_task_event_logs(executor._task_event_logs, session)
  998. except Exception:
  999. self.log.exception("Something went wrong when trying to save task event logs.")
  1000. if self.processor_agent:
  1001. self.processor_agent.heartbeat()
  1002. # Heartbeat the scheduler periodically
  1003. perform_heartbeat(
  1004. job=self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
  1005. )
  1006. # Run any pending timed events
  1007. next_event = timers.run(blocking=False)
  1008. self.log.debug("Next timed event is in %f", next_event)
  1009. self.log.debug("Ran scheduling loop in %.2f seconds", timer.duration)
  1010. if span.is_recording():
  1011. span.add_event(
  1012. name="Ran scheduling loop",
  1013. attributes={
  1014. "duration in seconds": timer.duration,
  1015. },
  1016. )
  1017. if not is_unit_test and not num_queued_tis and not num_finished_events:
  1018. # If the scheduler is doing things, don't sleep. This means when there is work to do, the
  1019. # scheduler will run "as quick as possible", but when it's stopped, it can sleep, dropping CPU
  1020. # usage when "idle"
  1021. time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
  1022. if loop_count >= self.num_runs > 0:
  1023. self.log.info(
  1024. "Exiting scheduler loop as requested number of runs (%d - got to %d) has been reached",
  1025. self.num_runs,
  1026. loop_count,
  1027. )
  1028. if span.is_recording():
  1029. span.add_event("Exiting scheduler loop as requested number of runs has been reached")
  1030. break
  1031. if self.processor_agent and self.processor_agent.done:
  1032. self.log.info(
  1033. "Exiting scheduler loop as requested DAG parse count (%d) has been reached after %d"
  1034. " scheduler loops",
  1035. self.num_times_parse_dags,
  1036. loop_count,
  1037. )
  1038. if span.is_recording():
  1039. span.add_event("Exiting scheduler loop as requested DAG parse count has been reached")
  1040. break
  1041. def _do_scheduling(self, session: Session) -> int:
  1042. """
  1043. Make the main scheduling decisions.
  1044. It:
  1045. - Creates any necessary DAG runs by examining the next_dagrun_create_after column of DagModel
  1046. Since creating Dag Runs is a relatively time consuming process, we select only 10 dags by default
  1047. (configurable via ``scheduler.max_dagruns_to_create_per_loop`` setting) - putting this higher will
  1048. mean one scheduler could spend a chunk of time creating dag runs, and not ever get around to
  1049. scheduling tasks.
  1050. - Finds the "next n oldest" running DAG Runs to examine for scheduling (n=20 by default, configurable
  1051. via ``scheduler.max_dagruns_per_loop_to_schedule`` config setting) and tries to progress state (TIs
  1052. to SCHEDULED, or DagRuns to SUCCESS/FAILURE etc)
  1053. By "next oldest", we mean hasn't been examined/scheduled in the most time.
  1054. We don't select all dagruns at once, because the rows are selected with row locks, meaning
  1055. that only one scheduler can "process them", even it is waiting behind other dags. Increasing this
  1056. limit will allow more throughput for smaller DAGs but will likely slow down throughput for larger
  1057. (>500 tasks.) DAGs
  1058. - Then, via a Critical Section (locking the rows of the Pool model) we queue tasks, and then send them
  1059. to the executor.
  1060. See docs of _critical_section_enqueue_task_instances for more.
  1061. :return: Number of TIs enqueued in this iteration
  1062. """
  1063. # Put a check in place to make sure we don't commit unexpectedly
  1064. with prohibit_commit(session) as guard:
  1065. if settings.USE_JOB_SCHEDULE:
  1066. self._create_dagruns_for_dags(guard, session)
  1067. self._start_queued_dagruns(session)
  1068. guard.commit()
  1069. dag_runs = self._get_next_dagruns_to_examine(DagRunState.RUNNING, session)
  1070. # Bulk fetch the currently active dag runs for the dags we are
  1071. # examining, rather than making one query per DagRun
  1072. callback_tuples = self._schedule_all_dag_runs(guard, dag_runs, session)
  1073. # Send the callbacks after we commit to ensure the context is up to date when it gets run
  1074. # cache saves time during scheduling of many dag_runs for same dag
  1075. cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
  1076. partial(self.dagbag.get_dag, session=session)
  1077. )
  1078. for dag_run, callback_to_run in callback_tuples:
  1079. dag = cached_get_dag(dag_run.dag_id)
  1080. if dag:
  1081. # Sending callbacks there as in standalone_dag_processor they are adding to the database,
  1082. # so it must be done outside of prohibit_commit.
  1083. self._send_dag_callbacks_to_processor(dag, callback_to_run)
  1084. else:
  1085. self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
  1086. with prohibit_commit(session) as guard:
  1087. # Without this, the session has an invalid view of the DB
  1088. session.expunge_all()
  1089. # END: schedule TIs
  1090. # Attempt to schedule even if some executors are full but not all.
  1091. total_free_executor_slots = sum([executor.slots_available for executor in self.job.executors])
  1092. if total_free_executor_slots <= 0:
  1093. # We know we can't do anything here, so don't even try!
  1094. self.log.debug("All executors are full, skipping critical section")
  1095. num_queued_tis = 0
  1096. else:
  1097. try:
  1098. timer = Stats.timer("scheduler.critical_section_duration")
  1099. timer.start()
  1100. # Find any TIs in state SCHEDULED, try to QUEUE them (send it to the executors)
  1101. num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
  1102. # Make sure we only sent this metric if we obtained the lock, otherwise we'll skew the
  1103. # metric, way down
  1104. timer.stop(send=True)
  1105. except OperationalError as e:
  1106. timer.stop(send=False)
  1107. if is_lock_not_available_error(error=e):
  1108. self.log.debug("Critical section lock held by another Scheduler")
  1109. Stats.incr("scheduler.critical_section_busy")
  1110. session.rollback()
  1111. return 0
  1112. raise
  1113. guard.commit()
  1114. return num_queued_tis
  1115. @retry_db_transaction
  1116. def _get_next_dagruns_to_examine(self, state: DagRunState, session: Session) -> Query:
  1117. """Get Next DagRuns to Examine with retries."""
  1118. return DagRun.next_dagruns_to_examine(state, session)
  1119. @retry_db_transaction
  1120. def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Session) -> None:
  1121. """Find Dag Models needing DagRuns and Create Dag Runs with retries in case of OperationalError."""
  1122. query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)
  1123. all_dags_needing_dag_runs = set(query.all())
  1124. dataset_triggered_dags = [
  1125. dag for dag in all_dags_needing_dag_runs if dag.dag_id in dataset_triggered_dag_info
  1126. ]
  1127. non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags)
  1128. self._create_dag_runs(non_dataset_dags, session)
  1129. if dataset_triggered_dags:
  1130. self._create_dag_runs_dataset_triggered(
  1131. dataset_triggered_dags, dataset_triggered_dag_info, session
  1132. )
  1133. # commit the session - Release the write lock on DagModel table.
  1134. guard.commit()
  1135. # END: create dagruns
  1136. @span
  1137. def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None:
  1138. """Create a DAG run and update the dag_model to control if/when the next DAGRun should be created."""
  1139. # Bulk Fetch DagRuns with dag_id and execution_date same
  1140. # as DagModel.dag_id and DagModel.next_dagrun
  1141. # This list is used to verify if the DagRun already exist so that we don't attempt to create
  1142. # duplicate dag runs
  1143. existing_dagruns = (
  1144. session.execute(
  1145. select(DagRun.dag_id, DagRun.execution_date).where(
  1146. tuple_in_condition(
  1147. (DagRun.dag_id, DagRun.execution_date),
  1148. ((dm.dag_id, dm.next_dagrun) for dm in dag_models),
  1149. ),
  1150. )
  1151. )
  1152. .unique()
  1153. .all()
  1154. )
  1155. active_runs_of_dags = Counter(
  1156. DagRun.active_runs_of_dags(dag_ids=(dm.dag_id for dm in dag_models), session=session),
  1157. )
  1158. for dag_model in dag_models:
  1159. dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  1160. if not dag:
  1161. self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
  1162. continue
  1163. dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
  1164. data_interval = dag.get_next_data_interval(dag_model)
  1165. # Explicitly check if the DagRun already exists. This is an edge case
  1166. # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
  1167. # are not updated.
  1168. # We opted to check DagRun existence instead
  1169. # of catching an Integrity error and rolling back the session i.e
  1170. # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
  1171. # create a new one. This is so that in the next Scheduling loop we try to create new runs
  1172. # instead of falling in a loop of Integrity Error.
  1173. if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
  1174. try:
  1175. dag.create_dagrun(
  1176. run_type=DagRunType.SCHEDULED,
  1177. execution_date=dag_model.next_dagrun,
  1178. state=DagRunState.QUEUED,
  1179. data_interval=data_interval,
  1180. external_trigger=False,
  1181. session=session,
  1182. dag_hash=dag_hash,
  1183. creating_job_id=self.job.id,
  1184. )
  1185. active_runs_of_dags[dag.dag_id] += 1
  1186. # Exceptions like ValueError, ParamValidationError, etc. are raised by
  1187. # dag.create_dagrun() when dag is misconfigured. The scheduler should not
  1188. # crash due to misconfigured dags. We should log any exception encountered
  1189. # and continue to the next dag.
  1190. except Exception:
  1191. self.log.exception("Failed creating DagRun for %s", dag.dag_id)
  1192. continue
  1193. if self._should_update_dag_next_dagruns(
  1194. dag,
  1195. dag_model,
  1196. last_dag_run=None,
  1197. total_active_runs=active_runs_of_dags[dag.dag_id],
  1198. session=session,
  1199. ):
  1200. dag_model.calculate_dagrun_date_fields(dag, data_interval)
  1201. # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
  1202. # memory for larger dags? or expunge_all()
  1203. def _create_dag_runs_dataset_triggered(
  1204. self,
  1205. dag_models: Collection[DagModel],
  1206. dataset_triggered_dag_info: dict[str, tuple[datetime, datetime]],
  1207. session: Session,
  1208. ) -> None:
  1209. """For DAGs that are triggered by datasets, create dag runs."""
  1210. # Bulk Fetch DagRuns with dag_id and execution_date same
  1211. # as DagModel.dag_id and DagModel.next_dagrun
  1212. # This list is used to verify if the DagRun already exist so that we don't attempt to create
  1213. # duplicate dag runs
  1214. exec_dates = {
  1215. dag_id: timezone.coerce_datetime(last_time)
  1216. for dag_id, (_, last_time) in dataset_triggered_dag_info.items()
  1217. }
  1218. existing_dagruns: set[tuple[str, timezone.DateTime]] = set(
  1219. session.execute(
  1220. select(DagRun.dag_id, DagRun.execution_date).where(
  1221. tuple_in_condition((DagRun.dag_id, DagRun.execution_date), exec_dates.items())
  1222. )
  1223. )
  1224. )
  1225. for dag_model in dag_models:
  1226. dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
  1227. if not dag:
  1228. self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
  1229. continue
  1230. if not isinstance(dag.timetable, DatasetTriggeredTimetable):
  1231. self.log.error(
  1232. "DAG '%s' was dataset-scheduled, but didn't have a DatasetTriggeredTimetable!",
  1233. dag_model.dag_id,
  1234. )
  1235. continue
  1236. dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
  1237. # Explicitly check if the DagRun already exists. This is an edge case
  1238. # where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
  1239. # are not updated.
  1240. # We opted to check DagRun existence instead
  1241. # of catching an Integrity error and rolling back the session i.e
  1242. # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
  1243. # create a new one. This is so that in the next Scheduling loop we try to create new runs
  1244. # instead of falling in a loop of Integrity Error.
  1245. exec_date = exec_dates[dag.dag_id]
  1246. if (dag.dag_id, exec_date) not in existing_dagruns:
  1247. previous_dag_run = session.scalar(
  1248. select(DagRun)
  1249. .where(
  1250. DagRun.dag_id == dag.dag_id,
  1251. DagRun.execution_date < exec_date,
  1252. DagRun.run_type == DagRunType.DATASET_TRIGGERED,
  1253. )
  1254. .order_by(DagRun.execution_date.desc())
  1255. .limit(1)
  1256. )
  1257. dataset_event_filters = [
  1258. DagScheduleDatasetReference.dag_id == dag.dag_id,
  1259. DatasetEvent.timestamp <= exec_date,
  1260. ]
  1261. if previous_dag_run:
  1262. dataset_event_filters.append(DatasetEvent.timestamp > previous_dag_run.execution_date)
  1263. dataset_events = session.scalars(
  1264. select(DatasetEvent)
  1265. .join(
  1266. DagScheduleDatasetReference,
  1267. DatasetEvent.dataset_id == DagScheduleDatasetReference.dataset_id,
  1268. )
  1269. .where(*dataset_event_filters)
  1270. ).all()
  1271. data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events)
  1272. run_id = dag.timetable.generate_run_id(
  1273. run_type=DagRunType.DATASET_TRIGGERED,
  1274. logical_date=exec_date,
  1275. data_interval=data_interval,
  1276. session=session,
  1277. events=dataset_events,
  1278. )
  1279. dag_run = dag.create_dagrun(
  1280. run_id=run_id,
  1281. run_type=DagRunType.DATASET_TRIGGERED,
  1282. execution_date=exec_date,
  1283. data_interval=data_interval,
  1284. state=DagRunState.QUEUED,
  1285. external_trigger=False,
  1286. session=session,
  1287. dag_hash=dag_hash,
  1288. creating_job_id=self.job.id,
  1289. )
  1290. Stats.incr("dataset.triggered_dagruns")
  1291. dag_run.consumed_dataset_events.extend(dataset_events)
  1292. session.execute(
  1293. delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id)
  1294. )
  1295. def _should_update_dag_next_dagruns(
  1296. self,
  1297. dag: DAG,
  1298. dag_model: DagModel,
  1299. *,
  1300. last_dag_run: DagRun | None = None,
  1301. total_active_runs: int | None = None,
  1302. session: Session,
  1303. ) -> bool:
  1304. """Check if the dag's next_dagruns_create_after should be updated."""
  1305. # If last_dag_run is defined, the update was triggered by a scheduling decision in this DAG run.
  1306. # In such case, schedule next only if last_dag_run is finished and was an automated run.
  1307. if last_dag_run and not (
  1308. last_dag_run.state in State.finished_dr_states
  1309. and last_dag_run.run_type in [DagRunType.SCHEDULED, DagRunType.BACKFILL_JOB]
  1310. ):
  1311. return False
  1312. # If the DAG never schedules skip save runtime
  1313. if not dag.timetable.can_be_scheduled:
  1314. return False
  1315. # get active dag runs from DB if not available
  1316. if not total_active_runs:
  1317. total_active_runs = dag.get_num_active_runs(only_running=False, session=session)
  1318. if total_active_runs and total_active_runs >= dag.max_active_runs:
  1319. self.log.info(
  1320. "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
  1321. dag_model.dag_id,
  1322. total_active_runs,
  1323. dag.max_active_runs,
  1324. )
  1325. dag_model.next_dagrun_create_after = None
  1326. return False
  1327. return True
  1328. @span
  1329. def _start_queued_dagruns(self, session: Session) -> None:
  1330. """Find DagRuns in queued state and decide moving them to running state."""
  1331. # added all() to save runtime, otherwise query is executed more than once
  1332. dag_runs: Collection[DagRun] = self._get_next_dagruns_to_examine(DagRunState.QUEUED, session).all()
  1333. active_runs_of_dags = Counter(
  1334. DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
  1335. )
  1336. @span
  1337. def _update_state(dag: DAG, dag_run: DagRun):
  1338. __span = Trace.get_current_span()
  1339. __span.set_attribute("state", str(DagRunState.RUNNING))
  1340. __span.set_attribute("run_id", dag_run.run_id)
  1341. __span.set_attribute("type", dag_run.run_type)
  1342. __span.set_attribute("dag_id", dag_run.dag_id)
  1343. dag_run.state = DagRunState.RUNNING
  1344. dag_run.start_date = timezone.utcnow()
  1345. if dag.timetable.periodic and not dag_run.external_trigger and dag_run.clear_number < 1:
  1346. # TODO: Logically, this should be DagRunInfo.run_after, but the
  1347. # information is not stored on a DagRun, only before the actual
  1348. # execution on DagModel.next_dagrun_create_after. We should add
  1349. # a field on DagRun for this instead of relying on the run
  1350. # always happening immediately after the data interval.
  1351. # We only publish these metrics for scheduled dag runs and only
  1352. # when ``external_trigger`` is *False* and ``clear_number`` is 0.
  1353. expected_start_date = dag.get_run_data_interval(dag_run).end
  1354. schedule_delay = dag_run.start_date - expected_start_date
  1355. # Publish metrics twice with backward compatible name, and then with tags
  1356. Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)
  1357. Stats.timing(
  1358. "dagrun.schedule_delay",
  1359. schedule_delay,
  1360. tags={"dag_id": dag.dag_id},
  1361. )
  1362. if __span.is_recording():
  1363. __span.add_event(
  1364. name="schedule_delay",
  1365. attributes={"dag_id": dag.dag_id, "schedule_delay": str(schedule_delay)},
  1366. )
  1367. # cache saves time during scheduling of many dag_runs for same dag
  1368. cached_get_dag: Callable[[str], DAG | None] = lru_cache()(
  1369. partial(self.dagbag.get_dag, session=session)
  1370. )
  1371. _span = Trace.get_current_span()
  1372. for dag_run in dag_runs:
  1373. dag = dag_run.dag = cached_get_dag(dag_run.dag_id)
  1374. if not dag:
  1375. self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
  1376. continue
  1377. active_runs = active_runs_of_dags[dag_run.dag_id]
  1378. if dag.max_active_runs and active_runs >= dag.max_active_runs:
  1379. self.log.debug(
  1380. "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
  1381. dag.dag_id,
  1382. active_runs,
  1383. dag_run.execution_date,
  1384. )
  1385. else:
  1386. if _span.is_recording():
  1387. _span.add_event(
  1388. name="dag_run",
  1389. attributes={
  1390. "run_id": dag_run.run_id,
  1391. "dag_id": dag_run.dag_id,
  1392. "conf": str(dag_run.conf),
  1393. },
  1394. )
  1395. active_runs_of_dags[dag_run.dag_id] += 1
  1396. _update_state(dag, dag_run)
  1397. dag_run.notify_dagrun_state_changed()
  1398. @retry_db_transaction
  1399. def _schedule_all_dag_runs(
  1400. self,
  1401. guard: CommitProhibitorGuard,
  1402. dag_runs: Iterable[DagRun],
  1403. session: Session,
  1404. ) -> list[tuple[DagRun, DagCallbackRequest | None]]:
  1405. """Make scheduling decisions for all `dag_runs`."""
  1406. callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
  1407. guard.commit()
  1408. return callback_tuples
  1409. def _schedule_dag_run(
  1410. self,
  1411. dag_run: DagRun,
  1412. session: Session,
  1413. ) -> DagCallbackRequest | None:
  1414. """
  1415. Make scheduling decisions about an individual dag run.
  1416. :param dag_run: The DagRun to schedule
  1417. :return: Callback that needs to be executed
  1418. """
  1419. trace_id = int(trace_utils.gen_trace_id(dag_run=dag_run, as_int=True))
  1420. span_id = int(trace_utils.gen_dag_span_id(dag_run=dag_run, as_int=True))
  1421. links = [{"trace_id": trace_id, "span_id": span_id}]
  1422. with Trace.start_span(
  1423. span_name="_schedule_dag_run", component="SchedulerJobRunner", links=links
  1424. ) as span:
  1425. span.set_attribute("dag_id", dag_run.dag_id)
  1426. span.set_attribute("run_id", dag_run.run_id)
  1427. span.set_attribute("run_type", dag_run.run_type)
  1428. callback: DagCallbackRequest | None = None
  1429. dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
  1430. dag_model = DM.get_dagmodel(dag_run.dag_id, session)
  1431. if not dag or not dag_model:
  1432. self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id)
  1433. return callback
  1434. if (
  1435. dag_run.start_date
  1436. and dag.dagrun_timeout
  1437. and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
  1438. ):
  1439. dag_run.set_state(DagRunState.FAILED)
  1440. unfinished_task_instances = session.scalars(
  1441. select(TI)
  1442. .where(TI.dag_id == dag_run.dag_id)
  1443. .where(TI.run_id == dag_run.run_id)
  1444. .where(TI.state.in_(State.unfinished))
  1445. )
  1446. for task_instance in unfinished_task_instances:
  1447. task_instance.state = TaskInstanceState.SKIPPED
  1448. session.merge(task_instance)
  1449. session.flush()
  1450. self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
  1451. if self._should_update_dag_next_dagruns(
  1452. dag, dag_model, last_dag_run=dag_run, session=session
  1453. ):
  1454. dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
  1455. callback_to_execute = DagCallbackRequest(
  1456. full_filepath=dag.fileloc,
  1457. dag_id=dag.dag_id,
  1458. run_id=dag_run.run_id,
  1459. is_failure_callback=True,
  1460. processor_subdir=dag_model.processor_subdir,
  1461. msg="timed_out",
  1462. )
  1463. dag_run.notify_dagrun_state_changed()
  1464. duration = dag_run.end_date - dag_run.start_date
  1465. Stats.timing(f"dagrun.duration.failed.{dag_run.dag_id}", duration)
  1466. Stats.timing("dagrun.duration.failed", duration, tags={"dag_id": dag_run.dag_id})
  1467. span.set_attribute("error", True)
  1468. if span.is_recording():
  1469. span.add_event(
  1470. name="error",
  1471. attributes={
  1472. "message": f"Run {dag_run.run_id} of {dag_run.dag_id} has timed-out",
  1473. "duration": str(duration),
  1474. },
  1475. )
  1476. return callback_to_execute
  1477. if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
  1478. self.log.error("Execution date is in future: %s", dag_run.execution_date)
  1479. return callback
  1480. if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session):
  1481. self.log.warning(
  1482. "The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id
  1483. )
  1484. return callback
  1485. # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
  1486. schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  1487. if self._should_update_dag_next_dagruns(dag, dag_model, last_dag_run=dag_run, session=session):
  1488. dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))
  1489. # This will do one query per dag run. We "could" build up a complex
  1490. # query to update all the TIs across all the execution dates and dag
  1491. # IDs in a single query, but it turns out that can be _very very slow_
  1492. # see #11147/commit ee90807ac for more details
  1493. if span.is_recording():
  1494. span.add_event(
  1495. name="schedule_tis",
  1496. attributes={
  1497. "message": "dag_run scheduling its tis",
  1498. "schedulable_tis": [_ti.task_id for _ti in schedulable_tis],
  1499. },
  1500. )
  1501. dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query)
  1502. return callback_to_run
  1503. def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool:
  1504. """
  1505. Only run DagRun.verify integrity if Serialized DAG has changed since it is slow.
  1506. Return True if we determine that DAG still exists.
  1507. """
  1508. latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
  1509. if dag_run.dag_hash == latest_version:
  1510. self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
  1511. return True
  1512. dag_run.dag_hash = latest_version
  1513. # Refresh the DAG
  1514. dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
  1515. if not dag_run.dag:
  1516. return False
  1517. # Verify integrity also takes care of session.flush
  1518. dag_run.verify_integrity(session=session)
  1519. return True
  1520. def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None:
  1521. self._send_sla_callbacks_to_processor(dag)
  1522. if callback:
  1523. self.job.executor.send_callback(callback)
  1524. else:
  1525. self.log.debug("callback is empty")
  1526. def _send_sla_callbacks_to_processor(self, dag: DAG) -> None:
  1527. """Send SLA Callbacks to DagFileProcessor if tasks have SLAs set and check_slas=True."""
  1528. if not settings.CHECK_SLAS:
  1529. return
  1530. if not any(isinstance(task.sla, timedelta) for task in dag.tasks):
  1531. self.log.debug("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
  1532. return
  1533. if not dag.timetable.periodic:
  1534. self.log.debug("Skipping SLA check for %s because DAG is not scheduled", dag)
  1535. return
  1536. dag_model = DagModel.get_dagmodel(dag.dag_id)
  1537. if not dag_model:
  1538. self.log.error("Couldn't find DAG %s in database!", dag.dag_id)
  1539. return
  1540. request = SlaCallbackRequest(
  1541. full_filepath=dag.fileloc,
  1542. dag_id=dag.dag_id,
  1543. processor_subdir=dag_model.processor_subdir,
  1544. )
  1545. self.job.executor.send_callback(request)
  1546. @provide_session
  1547. def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
  1548. """
  1549. Handle the scenario where a task is queued for longer than `task_queued_timeout`.
  1550. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses
  1551. track of a task, a cluster can't further scale up its workers, etc.), but tasks
  1552. should not be stuck in queued for a long time.
  1553. We will attempt to requeue the task (by revoking it from executor and setting to
  1554. scheduled) up to 2 times before failing the task.
  1555. """
  1556. tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
  1557. for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items():
  1558. try:
  1559. for ti in stuck_tis:
  1560. executor.revoke_task(ti=ti)
  1561. self._maybe_requeue_stuck_ti(
  1562. ti=ti,
  1563. session=session,
  1564. )
  1565. except NotImplementedError:
  1566. # this block only gets entered if the executor has not implemented `revoke_task`.
  1567. # in which case, we try the fallback logic
  1568. # todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0.
  1569. # after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should
  1570. # just continue immediately.
  1571. self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
  1572. continue
  1573. def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
  1574. """Query db for TIs that are stuck in queued."""
  1575. return session.scalars(
  1576. select(TI).where(
  1577. TI.state == TaskInstanceState.QUEUED,
  1578. TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)),
  1579. TI.queued_by_job_id == self.job.id,
  1580. )
  1581. )
  1582. def _maybe_requeue_stuck_ti(self, *, ti, session):
  1583. """
  1584. Requeue task if it has not been attempted too many times.
  1585. Otherwise, fail it.
  1586. """
  1587. num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
  1588. if num_times_stuck < self._num_stuck_queued_retries:
  1589. self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id)
  1590. session.add(
  1591. Log(
  1592. event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
  1593. task_instance=ti.key,
  1594. extra=(
  1595. f"Task was in queued state for longer than {self._task_queued_timeout} "
  1596. "seconds; task state will be set back to scheduled."
  1597. ),
  1598. )
  1599. )
  1600. self._reschedule_stuck_task(ti)
  1601. else:
  1602. self.log.info(
  1603. "Task requeue attempts exceeded max; marking failed. task_instance=%s",
  1604. ti,
  1605. )
  1606. session.add(
  1607. Log(
  1608. event="stuck in queued tries exceeded",
  1609. task_instance=ti.key,
  1610. extra=f"Task was requeued more than {self._num_stuck_queued_retries} times and will be failed.",
  1611. )
  1612. )
  1613. ti.set_state(TaskInstanceState.FAILED, session=session)
  1614. @deprecated(
  1615. reason="This is backcompat layer for older executor interface. Should be removed in 3.0",
  1616. category=RemovedInAirflow3Warning,
  1617. action="ignore",
  1618. )
  1619. def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
  1620. """
  1621. Try to invoke stuck in queued cleanup for older executor interface.
  1622. TODO: remove in airflow 3.0
  1623. Here we handle case where the executor pre-dates the interface change that
  1624. introduced `cleanup_tasks_stuck_in_queued` and deprecated `cleanup_stuck_queued_tasks`.
  1625. """
  1626. with suppress(NotImplementedError):
  1627. for ti_repr in executor.cleanup_stuck_queued_tasks(tis=stuck_tis):
  1628. self.log.warning(
  1629. "Task instance %s stuck in queued. Will be set to failed.",
  1630. ti_repr,
  1631. )
  1632. @provide_session
  1633. def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
  1634. session.execute(
  1635. update(TI)
  1636. .where(TI.filter_for_tis([ti]))
  1637. .values(
  1638. state=TaskInstanceState.SCHEDULED,
  1639. queued_dttm=None,
  1640. )
  1641. .execution_options(synchronize_session=False)
  1642. )
  1643. @provide_session
  1644. def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
  1645. """
  1646. Check the Log table to see how many times a taskinstance has been stuck in queued.
  1647. We can then use this information to determine whether to reschedule a task or fail it.
  1648. """
  1649. return (
  1650. session.query(Log)
  1651. .where(
  1652. Log.task_id == ti.task_id,
  1653. Log.dag_id == ti.dag_id,
  1654. Log.run_id == ti.run_id,
  1655. Log.map_index == ti.map_index,
  1656. Log.try_number == ti.try_number,
  1657. Log.event == TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
  1658. )
  1659. .count()
  1660. )
  1661. @provide_session
  1662. def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
  1663. from airflow.models.pool import Pool
  1664. with Trace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span:
  1665. pools = Pool.slots_stats(session=session)
  1666. for pool_name, slot_stats in pools.items():
  1667. Stats.gauge(f"pool.open_slots.{pool_name}", slot_stats["open"])
  1668. Stats.gauge(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
  1669. Stats.gauge(f"pool.running_slots.{pool_name}", slot_stats["running"])
  1670. Stats.gauge(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"])
  1671. Stats.gauge(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"])
  1672. # Same metrics with tagging
  1673. Stats.gauge("pool.open_slots", slot_stats["open"], tags={"pool_name": pool_name})
  1674. Stats.gauge("pool.queued_slots", slot_stats["queued"], tags={"pool_name": pool_name})
  1675. Stats.gauge("pool.running_slots", slot_stats["running"], tags={"pool_name": pool_name})
  1676. Stats.gauge("pool.deferred_slots", slot_stats["deferred"], tags={"pool_name": pool_name})
  1677. Stats.gauge("pool.scheduled_slots", slot_stats["scheduled"], tags={"pool_name": pool_name})
  1678. span.set_attribute("category", "scheduler")
  1679. span.set_attribute(f"pool.open_slots.{pool_name}", slot_stats["open"])
  1680. span.set_attribute(f"pool.queued_slots.{pool_name}", slot_stats["queued"])
  1681. span.set_attribute(f"pool.running_slots.{pool_name}", slot_stats["running"])
  1682. span.set_attribute(f"pool.deferred_slots.{pool_name}", slot_stats["deferred"])
  1683. span.set_attribute(f"pool.scheduled_slots.{pool_name}", slot_stats["scheduled"])
  1684. @provide_session
  1685. def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int:
  1686. """
  1687. Adopt or reset any TaskInstance in resettable state if its SchedulerJob is no longer running.
  1688. :return: the number of TIs reset
  1689. """
  1690. self.log.info("Adopting or resetting orphaned tasks for active dag runs")
  1691. timeout = conf.getint("scheduler", "scheduler_health_check_threshold")
  1692. for attempt in run_with_db_retries(logger=self.log):
  1693. with attempt:
  1694. self.log.debug(
  1695. "Running SchedulerJob.adopt_or_reset_orphaned_tasks with retries. Try %d of %d",
  1696. attempt.retry_state.attempt_number,
  1697. MAX_DB_RETRIES,
  1698. )
  1699. self.log.debug("Calling SchedulerJob.adopt_or_reset_orphaned_tasks method")
  1700. try:
  1701. num_failed = session.execute(
  1702. update(Job)
  1703. .where(
  1704. Job.job_type == "SchedulerJob",
  1705. Job.state == JobState.RUNNING,
  1706. Job.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
  1707. )
  1708. .values(state=JobState.FAILED)
  1709. ).rowcount
  1710. if num_failed:
  1711. self.log.info("Marked %d SchedulerJob instances as failed", num_failed)
  1712. Stats.incr(self.__class__.__name__.lower() + "_end", num_failed)
  1713. query = (
  1714. select(TI)
  1715. .options(lazyload(TI.dag_run)) # avoids double join to dag_run
  1716. .where(TI.state.in_(State.adoptable_states))
  1717. .join(TI.queued_by_job)
  1718. .where(Job.state.is_distinct_from(JobState.RUNNING))
  1719. .join(TI.dag_run)
  1720. .where(
  1721. DagRun.run_type != DagRunType.BACKFILL_JOB,
  1722. DagRun.state == DagRunState.RUNNING,
  1723. )
  1724. .options(load_only(TI.dag_id, TI.task_id, TI.run_id))
  1725. )
  1726. # Lock these rows, so that another scheduler can't try and adopt these too
  1727. tis_to_adopt_or_reset = with_row_locks(query, of=TI, session=session, skip_locked=True)
  1728. tis_to_adopt_or_reset = session.scalars(tis_to_adopt_or_reset).all()
  1729. to_reset: list[TaskInstance] = []
  1730. exec_to_tis = self._executor_to_tis(tis_to_adopt_or_reset)
  1731. for executor, tis in exec_to_tis.items():
  1732. to_reset.extend(executor.try_adopt_task_instances(tis))
  1733. reset_tis_message = []
  1734. for ti in to_reset:
  1735. reset_tis_message.append(repr(ti))
  1736. ti.state = None
  1737. ti.queued_by_job_id = None
  1738. for ti in set(tis_to_adopt_or_reset) - set(to_reset):
  1739. ti.queued_by_job_id = self.job.id
  1740. Stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset))
  1741. Stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset))
  1742. if to_reset:
  1743. task_instance_str = "\n\t".join(reset_tis_message)
  1744. self.log.info(
  1745. "Reset the following %s orphaned TaskInstances:\n\t%s",
  1746. len(to_reset),
  1747. task_instance_str,
  1748. )
  1749. # Issue SQL/finish "Unit of Work", but let @provide_session
  1750. # commit (or if passed a session, let caller decide when to commit
  1751. session.flush()
  1752. except OperationalError:
  1753. session.rollback()
  1754. raise
  1755. return len(to_reset)
  1756. @provide_session
  1757. def check_trigger_timeouts(
  1758. self, max_retries: int = MAX_DB_RETRIES, session: Session = NEW_SESSION
  1759. ) -> None:
  1760. """Mark any "deferred" task as failed if the trigger or execution timeout has passed."""
  1761. for attempt in run_with_db_retries(max_retries, logger=self.log):
  1762. with attempt:
  1763. num_timed_out_tasks = session.execute(
  1764. update(TI)
  1765. .where(
  1766. TI.state == TaskInstanceState.DEFERRED,
  1767. TI.trigger_timeout < timezone.utcnow(),
  1768. )
  1769. .values(
  1770. state=TaskInstanceState.SCHEDULED,
  1771. next_method="__fail__",
  1772. next_kwargs={"error": "Trigger/execution timeout"},
  1773. trigger_id=None,
  1774. )
  1775. ).rowcount
  1776. if num_timed_out_tasks:
  1777. self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)
  1778. # [START find_and_purge_zombies]
  1779. def _find_and_purge_zombies(self) -> None:
  1780. """
  1781. Find and purge zombie task instances.
  1782. Zombie instances are tasks that failed to heartbeat for too long, or
  1783. have a no-longer-running LocalTaskJob.
  1784. A TaskCallbackRequest is also created for the killed zombie to be
  1785. handled by the DAG processor, and the executor is informed to no longer
  1786. count the zombie as running when it calculates parallelism.
  1787. """
  1788. with create_session() as session:
  1789. if zombies := self._find_zombies(session=session):
  1790. self._purge_zombies(zombies, session=session)
  1791. def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
  1792. from airflow.jobs.job import Job
  1793. self.log.debug("Finding 'running' jobs without a recent heartbeat")
  1794. limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
  1795. zombies = (
  1796. session.execute(
  1797. select(TI, DM.fileloc, DM.processor_subdir)
  1798. .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
  1799. .join(Job, TI.job_id == Job.id)
  1800. .join(DM, TI.dag_id == DM.dag_id)
  1801. .where(TI.state == TaskInstanceState.RUNNING)
  1802. .where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm))
  1803. .where(Job.job_type == "LocalTaskJob")
  1804. .where(TI.queued_by_job_id == self.job.id)
  1805. )
  1806. .unique()
  1807. .all()
  1808. )
  1809. if zombies:
  1810. self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
  1811. return zombies
  1812. def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
  1813. for ti, file_loc, processor_subdir in zombies:
  1814. zombie_message_details = self._generate_zombie_message_details(ti)
  1815. request = TaskCallbackRequest(
  1816. full_filepath=file_loc,
  1817. processor_subdir=processor_subdir,
  1818. simple_task_instance=SimpleTaskInstance.from_ti(ti),
  1819. msg=str(zombie_message_details),
  1820. )
  1821. session.add(
  1822. Log(
  1823. event="heartbeat timeout",
  1824. task_instance=ti.key,
  1825. extra=(
  1826. f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
  1827. "seconds) and will be terminated. "
  1828. "See https://airflow.apache.org/docs/apache-airflow/"
  1829. "stable/core-concepts/tasks.html#zombie-undead-tasks"
  1830. ),
  1831. )
  1832. )
  1833. self.log.error(
  1834. "Detected zombie job: %s "
  1835. "(See https://airflow.apache.org/docs/apache-airflow/"
  1836. "stable/core-concepts/tasks.html#zombie-undead-tasks)",
  1837. request,
  1838. )
  1839. self.job.executor.send_callback(request)
  1840. if (executor := self._try_to_load_executor(ti.executor)) is None:
  1841. self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
  1842. continue
  1843. executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
  1844. Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
  1845. # [END find_and_purge_zombies]
  1846. @staticmethod
  1847. def _generate_zombie_message_details(ti: TI) -> dict[str, Any]:
  1848. zombie_message_details = {
  1849. "DAG Id": ti.dag_id,
  1850. "Task Id": ti.task_id,
  1851. "Run Id": ti.run_id,
  1852. }
  1853. if ti.map_index != -1:
  1854. zombie_message_details["Map Index"] = ti.map_index
  1855. if ti.hostname:
  1856. zombie_message_details["Hostname"] = ti.hostname
  1857. if ti.external_executor_id:
  1858. zombie_message_details["External Executor Id"] = ti.external_executor_id
  1859. return zombie_message_details
  1860. @provide_session
  1861. def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
  1862. """
  1863. Find all dags that were not updated by Dag Processor recently and mark them as inactive.
  1864. In case one of DagProcessors is stopped (in case there are multiple of them
  1865. for different dag folders), its dags are never marked as inactive.
  1866. Also remove dags from SerializedDag table.
  1867. Executed on schedule only if [scheduler]standalone_dag_processor is True.
  1868. """
  1869. self.log.debug("Checking dags not parsed within last %s seconds.", self._dag_stale_not_seen_duration)
  1870. limit_lpt = timezone.utcnow() - timedelta(seconds=self._dag_stale_not_seen_duration)
  1871. stale_dags = session.scalars(
  1872. select(DagModel).where(DagModel.is_active, DagModel.last_parsed_time < limit_lpt)
  1873. ).all()
  1874. if not stale_dags:
  1875. self.log.debug("Not stale dags found.")
  1876. return
  1877. self.log.info("Found (%d) stales dags not parsed after %s.", len(stale_dags), limit_lpt)
  1878. for dag in stale_dags:
  1879. dag.is_active = False
  1880. SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
  1881. session.flush()
  1882. def _set_orphaned(self, dataset: DatasetModel) -> int:
  1883. self.log.info("Orphaning unreferenced dataset '%s'", dataset.uri)
  1884. dataset.is_orphaned = expression.true()
  1885. return 1
  1886. @provide_session
  1887. def _orphan_unreferenced_datasets(self, session: Session = NEW_SESSION) -> None:
  1888. """
  1889. Detect orphaned datasets and set is_orphaned flag to True.
  1890. An orphaned dataset is no longer referenced in any DAG schedule parameters or task outlets.
  1891. """
  1892. orphaned_dataset_query = session.scalars(
  1893. select(DatasetModel)
  1894. .join(
  1895. DagScheduleDatasetReference,
  1896. isouter=True,
  1897. )
  1898. .join(
  1899. TaskOutletDatasetReference,
  1900. isouter=True,
  1901. )
  1902. .group_by(DatasetModel.id)
  1903. .where(~DatasetModel.is_orphaned)
  1904. .having(
  1905. and_(
  1906. func.count(DagScheduleDatasetReference.dag_id) == 0,
  1907. func.count(TaskOutletDatasetReference.dag_id) == 0,
  1908. )
  1909. )
  1910. )
  1911. updated_count = sum(self._set_orphaned(dataset) for dataset in orphaned_dataset_query)
  1912. Stats.gauge("dataset.orphaned", updated_count)
  1913. def _executor_to_tis(self, tis: Iterable[TaskInstance]) -> dict[BaseExecutor, list[TaskInstance]]:
  1914. """Organize TIs into lists per their respective executor."""
  1915. _executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = defaultdict(list)
  1916. for ti in tis:
  1917. if executor_obj := self._try_to_load_executor(ti.executor):
  1918. _executor_to_tis[executor_obj].append(ti)
  1919. return _executor_to_tis
  1920. def _try_to_load_executor(self, executor_name: str | None) -> BaseExecutor | None:
  1921. """
  1922. Try to load the given executor.
  1923. In this context, we don't want to fail if the executor does not exist. Catch the exception and
  1924. log to the user.
  1925. """
  1926. try:
  1927. return ExecutorLoader.load_executor(executor_name)
  1928. except UnknownExecutorException:
  1929. # This case should not happen unless some (as of now unknown) edge case occurs or direct DB
  1930. # modification, since the DAG parser will validate the tasks in the DAG and ensure the executor
  1931. # they request is available and if not, disallow the DAG to be scheduled.
  1932. # Keeping this exception handling because this is a critical issue if we do somehow find
  1933. # ourselves here and the user should get some feedback about that.
  1934. self.log.warning("Executor, %s, was not found but a Task was configured to use it", executor_name)
  1935. return None