backfill_job_runner.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import time
  20. from typing import TYPE_CHECKING, Any, Iterable, Iterator, Mapping, Sequence
  21. import attr
  22. import pendulum
  23. from sqlalchemy import case, or_, select, tuple_, update
  24. from sqlalchemy.exc import OperationalError
  25. from sqlalchemy.orm.session import make_transient
  26. from tabulate import tabulate
  27. from airflow import models
  28. from airflow.exceptions import (
  29. AirflowException,
  30. BackfillUnfinished,
  31. DagConcurrencyLimitReached,
  32. NoAvailablePoolSlot,
  33. PoolNotFound,
  34. TaskConcurrencyLimitReached,
  35. UnknownExecutorException,
  36. )
  37. from airflow.executors.executor_loader import ExecutorLoader
  38. from airflow.jobs.base_job_runner import BaseJobRunner
  39. from airflow.jobs.job import Job, perform_heartbeat
  40. from airflow.models import DAG, DagPickle
  41. from airflow.models.dagrun import DagRun
  42. from airflow.models.taskinstance import TaskInstance
  43. from airflow.ti_deps.dep_context import DepContext
  44. from airflow.ti_deps.dependencies_deps import BACKFILL_QUEUED_DEPS
  45. from airflow.timetables.base import DagRunInfo
  46. from airflow.utils import helpers, timezone
  47. from airflow.utils.configuration import tmp_configuration_copy
  48. from airflow.utils.log.logging_mixin import LoggingMixin
  49. from airflow.utils.session import NEW_SESSION, provide_session
  50. from airflow.utils.state import DagRunState, State, TaskInstanceState
  51. from airflow.utils.types import DagRunType
  52. if TYPE_CHECKING:
  53. import datetime
  54. from sqlalchemy.orm.session import Session
  55. from airflow.executors.base_executor import BaseExecutor
  56. from airflow.models.abstractoperator import AbstractOperator
  57. from airflow.models.taskinstance import TaskInstanceKey
  58. class BackfillJobRunner(BaseJobRunner, LoggingMixin):
  59. """
  60. A backfill job runner consists of a dag or subdag for a specific time range.
  61. It triggers a set of task instance runs, in the right order and lasts for
  62. as long as it takes for the set of task instance to be completed.
  63. """
  64. job_type = "BackfillJob"
  65. STATES_COUNT_AS_RUNNING = (TaskInstanceState.RUNNING, TaskInstanceState.QUEUED)
  66. @attr.define
  67. class _DagRunTaskStatus:
  68. """
  69. Internal status of the backfill job.
  70. This class is intended to be instantiated only within a BackfillJobRunner
  71. instance and will track the execution of tasks, e.g. running, skipped,
  72. succeeded, failed, etc. Information about the dag runs related to the
  73. backfill job are also being tracked in this structure, e.g. finished runs, etc.
  74. Any other status related information related to the execution of dag runs / tasks
  75. can be included in this structure since it makes it easier to pass it around.
  76. :param to_run: Tasks to run in the backfill
  77. :param running: Maps running task instance key to task instance object
  78. :param skipped: Tasks that have been skipped
  79. :param succeeded: Tasks that have succeeded so far
  80. :param failed: Tasks that have failed
  81. :param not_ready: Tasks not ready for execution
  82. :param deadlocked: Deadlocked tasks
  83. :param active_runs: Active dag runs at a certain point in time
  84. :param executed_dag_run_dates: Datetime objects for the executed dag runs
  85. :param finished_runs: Number of finished runs so far
  86. :param total_runs: Number of total dag runs able to run
  87. """
  88. to_run: dict[TaskInstanceKey, TaskInstance] = attr.ib(factory=dict)
  89. running: dict[TaskInstanceKey, TaskInstance] = attr.ib(factory=dict)
  90. skipped: set[TaskInstanceKey] = attr.ib(factory=set)
  91. succeeded: set[TaskInstanceKey] = attr.ib(factory=set)
  92. failed: set[TaskInstanceKey] = attr.ib(factory=set)
  93. not_ready: set[TaskInstanceKey] = attr.ib(factory=set)
  94. deadlocked: set[TaskInstance] = attr.ib(factory=set)
  95. active_runs: set[DagRun] = attr.ib(factory=set)
  96. executed_dag_run_dates: set[pendulum.DateTime] = attr.ib(factory=set)
  97. finished_runs: int = 0
  98. total_runs: int = 0
  99. def __init__(
  100. self,
  101. job: Job,
  102. dag: DAG,
  103. start_date=None,
  104. end_date=None,
  105. mark_success=False,
  106. donot_pickle=False,
  107. ignore_first_depends_on_past=False,
  108. ignore_task_deps=False,
  109. pool=None,
  110. delay_on_limit_secs=1.0,
  111. verbose=False,
  112. conf=None,
  113. rerun_failed_tasks=False,
  114. run_backwards=False,
  115. run_at_least_once=False,
  116. continue_on_failures=False,
  117. disable_retry=False,
  118. ) -> None:
  119. """
  120. Create a BackfillJobRunner.
  121. :param dag: DAG object.
  122. :param start_date: start date for the backfill date range.
  123. :param end_date: end date for the backfill date range.
  124. :param mark_success: flag whether to mark the task auto success.
  125. :param donot_pickle: whether pickle
  126. :param ignore_first_depends_on_past: whether to ignore depend on past
  127. :param ignore_task_deps: whether to ignore the task dependency
  128. :param pool: pool to backfill
  129. :param delay_on_limit_secs:
  130. :param verbose:
  131. :param conf: a dictionary which user could pass k-v pairs for backfill
  132. :param rerun_failed_tasks: flag to whether to
  133. auto rerun the failed task in backfill
  134. :param run_backwards: Whether to process the dates from most to least recent
  135. :param run_at_least_once: If true, always run the DAG at least once even
  136. if no logical run exists within the time range.
  137. :param args:
  138. :param kwargs:
  139. """
  140. super().__init__(job)
  141. self.dag = dag
  142. self.dag_id = dag.dag_id
  143. self.bf_start_date = start_date
  144. self.bf_end_date = end_date
  145. self.mark_success = mark_success
  146. self.donot_pickle = donot_pickle
  147. self.ignore_first_depends_on_past = ignore_first_depends_on_past
  148. self.ignore_task_deps = ignore_task_deps
  149. self.pool = pool
  150. self.delay_on_limit_secs = delay_on_limit_secs
  151. self.verbose = verbose
  152. self.conf = conf
  153. self.rerun_failed_tasks = rerun_failed_tasks
  154. self.run_backwards = run_backwards
  155. self.run_at_least_once = run_at_least_once
  156. self.continue_on_failures = continue_on_failures
  157. self.disable_retry = disable_retry
  158. def _update_counters(self, ti_status: _DagRunTaskStatus, session: Session) -> None:
  159. """
  160. Update the counters per state of the tasks that were running.
  161. Can re-add to tasks to run when required.
  162. :param ti_status: the internal status of the backfill job tasks
  163. """
  164. tis_to_be_scheduled = []
  165. refreshed_tis = []
  166. TI = TaskInstance
  167. ti_primary_key_to_ti_key = {ti_key.primary: ti_key for ti_key in ti_status.running.keys()}
  168. filter_for_tis = TI.filter_for_tis(list(ti_status.running.values()))
  169. if filter_for_tis is not None:
  170. refreshed_tis = session.scalars(select(TI).where(filter_for_tis)).all()
  171. for ti in refreshed_tis:
  172. # Use primary key to match in memory information
  173. ti_key = ti_primary_key_to_ti_key[ti.key.primary]
  174. if ti.state == TaskInstanceState.SUCCESS:
  175. ti_status.succeeded.add(ti_key)
  176. self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
  177. ti_status.running.pop(ti_key)
  178. continue
  179. if ti.state == TaskInstanceState.SKIPPED:
  180. ti_status.skipped.add(ti_key)
  181. self.log.debug("Task instance %s skipped. Don't rerun.", ti)
  182. ti_status.running.pop(ti_key)
  183. continue
  184. if ti.state == TaskInstanceState.FAILED:
  185. self.log.error("Task instance %s failed", ti)
  186. ti_status.failed.add(ti_key)
  187. ti_status.running.pop(ti_key)
  188. continue
  189. # special case: if the task needs to run again put it back
  190. if ti.state == TaskInstanceState.UP_FOR_RETRY:
  191. self.log.warning("Task instance %s is up for retry", ti)
  192. ti_status.running.pop(ti_key)
  193. ti_status.to_run[ti.key] = ti
  194. # special case: if the task needs to be rescheduled put it back
  195. elif ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:
  196. self.log.warning("Task instance %s is up for reschedule", ti)
  197. ti_status.running.pop(ti_key)
  198. ti_status.to_run[ti.key] = ti
  199. # special case: The state of the task can be set to NONE by the task itself
  200. # when it reaches concurrency limits. It could also happen when the state
  201. # is changed externally, e.g. by clearing tasks from the ui. We need to cover
  202. # for that as otherwise those tasks would fall outside the scope of
  203. # the backfill suddenly.
  204. elif ti.state is None:
  205. self.log.warning(
  206. "FIXME: task instance %s state was set to none externally or "
  207. "reaching concurrency limits. Re-adding task to queue.",
  208. ti,
  209. )
  210. tis_to_be_scheduled.append(ti)
  211. ti_status.running.pop(ti_key)
  212. ti_status.to_run[ti.key] = ti
  213. # special case: Deferrable task can go from DEFERRED to SCHEDULED;
  214. # when that happens, we need to put it back as in UP_FOR_RESCHEDULE
  215. elif ti.state == TaskInstanceState.SCHEDULED:
  216. self.log.debug("Task instance %s is resumed from deferred state", ti)
  217. ti_status.running.pop(ti_key)
  218. ti_status.to_run[ti.key] = ti
  219. # Batch schedule of task instances
  220. if tis_to_be_scheduled:
  221. filter_for_tis = TI.filter_for_tis(tis_to_be_scheduled)
  222. session.execute(
  223. update(TI)
  224. .where(filter_for_tis)
  225. .values(
  226. state=TaskInstanceState.SCHEDULED,
  227. try_number=case(
  228. (
  229. or_(TI.state.is_(None), TI.state != TaskInstanceState.UP_FOR_RESCHEDULE),
  230. TI.try_number + 1,
  231. ),
  232. else_=TI.try_number,
  233. ),
  234. )
  235. .execution_options(synchronize_session=False)
  236. )
  237. session.flush()
  238. def _manage_executor_state(
  239. self,
  240. running: Mapping[TaskInstanceKey, TaskInstance],
  241. executor: BaseExecutor,
  242. session: Session,
  243. ) -> Iterator[tuple[AbstractOperator, str, Sequence[TaskInstance], int]]:
  244. """
  245. Compare task instances' states with that of the executor.
  246. Expands downstream mapped tasks when necessary.
  247. :param running: dict of key, task to verify
  248. :return: An iterable of expanded TaskInstance per MappedTask
  249. """
  250. # list of tuples (dag_id, task_id, execution_date, map_index) of running tasks in executor
  251. buffered_events = list(executor.get_event_buffer().items())
  252. running_tis_ids = [
  253. (key.dag_id, key.task_id, key.run_id, key.map_index)
  254. for key, _ in buffered_events
  255. if key in running
  256. ]
  257. # list of TaskInstance of running tasks in executor (refreshed from db in batch)
  258. refreshed_running_tis = session.scalars(
  259. select(TaskInstance).where(
  260. tuple_(
  261. TaskInstance.dag_id,
  262. TaskInstance.task_id,
  263. TaskInstance.run_id,
  264. TaskInstance.map_index,
  265. ).in_(running_tis_ids)
  266. )
  267. ).all()
  268. # dict of refreshed TaskInstance by key to easily find them
  269. running_dict = {(ti.dag_id, ti.task_id, ti.run_id, ti.map_index): ti for ti in refreshed_running_tis}
  270. need_refresh = False
  271. for key, value in buffered_events:
  272. state, info = value
  273. ti_key = (key.dag_id, key.task_id, key.run_id, key.map_index)
  274. if ti_key not in running_dict:
  275. self.log.warning("%s state %s not in running=%s", key, state, running.values())
  276. continue
  277. ti = running_dict[ti_key]
  278. if need_refresh:
  279. ti.refresh_from_db(session=session)
  280. self.log.debug("Executor state: %s task %s", state, ti)
  281. if (
  282. state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
  283. and ti.state in self.STATES_COUNT_AS_RUNNING
  284. ):
  285. self.log.debug(
  286. "In-memory TaskInstance state %s does not agree with executor state %s. Attempting to resolve by refreshing in-memory task instance from DB.",
  287. ti,
  288. state,
  289. )
  290. ti.refresh_from_db(session=session)
  291. if (
  292. state in (TaskInstanceState.FAILED, TaskInstanceState.SUCCESS)
  293. and ti.state in self.STATES_COUNT_AS_RUNNING
  294. ):
  295. msg = (
  296. f"The executor reported that the task instance {ti} finished with state {state}, "
  297. f"but the task instance's state attribute is {ti.state}. "
  298. "Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally"
  299. )
  300. if info is not None:
  301. msg += f" Extra info: {info}"
  302. self.log.error(msg)
  303. ti.handle_failure(error=msg)
  304. continue
  305. def _iter_task_needing_expansion() -> Iterator[AbstractOperator]:
  306. from airflow.models.mappedoperator import AbstractOperator
  307. for node in self.dag.get_task(ti.task_id, include_subdags=True).iter_mapped_dependants():
  308. if isinstance(node, AbstractOperator):
  309. yield node
  310. else: # A (mapped) task group. All its children need expansion.
  311. yield from node.iter_tasks()
  312. if ti.state not in self.STATES_COUNT_AS_RUNNING:
  313. # Don't use ti.task; if this task is mapped, that attribute
  314. # would hold the unmapped task. We need to original task here.
  315. for node in _iter_task_needing_expansion():
  316. new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, session=session)
  317. yield node, ti.run_id, new_tis, num_mapped_tis
  318. @provide_session
  319. def _get_dag_run(
  320. self,
  321. dagrun_info: DagRunInfo,
  322. dag: DAG,
  323. session: Session = NEW_SESSION,
  324. ) -> DagRun | None:
  325. """
  326. Return an existing dag run for the given run date or create one.
  327. If the max_active_runs limit is reached, this function will return None.
  328. :param dagrun_info: Schedule information for the dag run
  329. :param dag: DAG
  330. :param session: the database session object
  331. :return: a DagRun in state RUNNING or None
  332. """
  333. run_date = dagrun_info.logical_date
  334. # consider max_active_runs but ignore when running subdags
  335. respect_dag_max_active_limit = bool(dag.timetable.can_be_scheduled and not dag.is_subdag)
  336. current_active_dag_count = dag.get_num_active_runs(external_trigger=False)
  337. # check if we are scheduling on top of an already existing DAG run
  338. # we could find a "scheduled" run instead of a "backfill"
  339. runs = DagRun.find(dag_id=dag.dag_id, execution_date=run_date, session=session)
  340. run: DagRun | None
  341. if runs:
  342. run = runs[0]
  343. if run.state == DagRunState.RUNNING:
  344. respect_dag_max_active_limit = False
  345. # Fixes --conf overwrite for backfills with already existing DagRuns
  346. run.conf = self.conf or {}
  347. # start_date is cleared for existing DagRuns
  348. run.start_date = timezone.utcnow()
  349. else:
  350. run = None
  351. # enforce max_active_runs limit for dag, special cases already
  352. # handled by respect_dag_max_active_limit
  353. if respect_dag_max_active_limit and current_active_dag_count >= dag.max_active_runs:
  354. return None
  355. run = run or dag.create_dagrun(
  356. execution_date=run_date,
  357. data_interval=dagrun_info.data_interval,
  358. start_date=timezone.utcnow(),
  359. state=DagRunState.RUNNING,
  360. external_trigger=False,
  361. session=session,
  362. conf=self.conf,
  363. run_type=DagRunType.BACKFILL_JOB,
  364. creating_job_id=self.job.id,
  365. )
  366. # set required transient field
  367. run.dag = dag
  368. # explicitly mark as backfill and running
  369. run.state = DagRunState.RUNNING
  370. run.run_type = DagRunType.BACKFILL_JOB
  371. run.verify_integrity(session=session)
  372. run.notify_dagrun_state_changed(msg="started")
  373. return run
  374. @provide_session
  375. def _task_instances_for_dag_run(
  376. self,
  377. dag: DAG,
  378. dag_run: DagRun,
  379. session: Session = NEW_SESSION,
  380. ) -> dict[TaskInstanceKey, TaskInstance]:
  381. """
  382. Return a map of task instance keys to task instance objects for the given dag run.
  383. :param dag_run: the dag run to get the tasks from
  384. :param session: the database session object
  385. """
  386. tasks_to_run = {}
  387. if dag_run is None:
  388. return tasks_to_run
  389. # check if we have orphaned tasks
  390. self.reset_state_for_orphaned_tasks(filter_by_dag_run=dag_run, session=session)
  391. # for some reason if we don't refresh the reference to run is lost
  392. dag_run.refresh_from_db(session=session)
  393. make_transient(dag_run)
  394. dag_run.dag = dag
  395. info = dag_run.task_instance_scheduling_decisions(session=session)
  396. schedulable_tis = info.schedulable_tis
  397. try:
  398. for ti in dag_run.get_task_instances(session=session):
  399. if ti in schedulable_tis:
  400. if ti.state != TaskInstanceState.UP_FOR_RESCHEDULE:
  401. ti.try_number += 1
  402. ti.set_state(TaskInstanceState.SCHEDULED)
  403. if ti.state != TaskInstanceState.REMOVED:
  404. tasks_to_run[ti.key] = ti
  405. session.commit()
  406. except Exception:
  407. session.rollback()
  408. raise
  409. return tasks_to_run
  410. def _log_progress(self, ti_status: _DagRunTaskStatus) -> None:
  411. self.log.info(
  412. "[backfill progress] | finished run %s of %s | tasks waiting: %s | succeeded: %s | "
  413. "running: %s | failed: %s | skipped: %s | deadlocked: %s | not ready: %s",
  414. ti_status.finished_runs,
  415. ti_status.total_runs,
  416. len(ti_status.to_run),
  417. len(ti_status.succeeded),
  418. len(ti_status.running),
  419. len(ti_status.failed),
  420. len(ti_status.skipped),
  421. len(ti_status.deadlocked),
  422. len(ti_status.not_ready),
  423. )
  424. self.log.debug("Finished dag run loop iteration. Remaining tasks %s", ti_status.to_run.values())
  425. def _process_backfill_task_instances(
  426. self,
  427. ti_status: _DagRunTaskStatus,
  428. pickle_id: int | None,
  429. start_date: datetime.datetime | None = None,
  430. *,
  431. session: Session,
  432. ) -> list:
  433. """
  434. Process a set of task instances from a set of DAG runs.
  435. Special handling is done to account for different task instance states
  436. that could be present when running them in a backfill process.
  437. :param ti_status: the internal status of the job
  438. :param executor: the executor to run the task instances
  439. :param pickle_id: the pickle_id if dag is pickled, None otherwise
  440. :param start_date: the start date of the backfill job
  441. :param session: the current session object
  442. :return: the list of execution_dates for the finished dag runs
  443. """
  444. executed_run_dates = []
  445. while (ti_status.to_run or ti_status.running) and not ti_status.deadlocked:
  446. self.log.debug("Clearing out not_ready list")
  447. ti_status.not_ready.clear()
  448. # we need to execute the tasks bottom to top
  449. # or leaf to root, as otherwise tasks might be
  450. # determined deadlocked while they are actually
  451. # waiting for their upstream to finish
  452. def _per_task_process(key, ti: TaskInstance, session):
  453. ti.refresh_from_db(lock_for_update=True, session=session)
  454. task = self.dag.get_task(ti.task_id, include_subdags=True)
  455. ti.task = task
  456. self.log.debug("Task instance to run %s state %s", ti, ti.state)
  457. # The task was already marked successful or skipped by a
  458. # different Job. Don't rerun it.
  459. if ti.state == TaskInstanceState.SUCCESS:
  460. ti_status.succeeded.add(key)
  461. self.log.debug("Task instance %s succeeded. Don't rerun.", ti)
  462. ti_status.to_run.pop(key)
  463. if key in ti_status.running:
  464. ti_status.running.pop(key)
  465. return
  466. elif ti.state == TaskInstanceState.SKIPPED:
  467. ti_status.skipped.add(key)
  468. self.log.debug("Task instance %s skipped. Don't rerun.", ti)
  469. ti_status.to_run.pop(key)
  470. if key in ti_status.running:
  471. ti_status.running.pop(key)
  472. return
  473. if self.rerun_failed_tasks:
  474. # Rerun failed tasks or upstreamed failed tasks
  475. if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED):
  476. self.log.error("Task instance %s with state %s", ti, ti.state)
  477. if key in ti_status.running:
  478. ti_status.running.pop(key)
  479. # Reset the failed task in backfill to scheduled state
  480. ti.try_number += 1
  481. ti.set_state(TaskInstanceState.SCHEDULED, session=session)
  482. if ti.dag_run not in ti_status.active_runs:
  483. ti_status.active_runs.add(ti.dag_run)
  484. else:
  485. # Default behaviour which works for subdag.
  486. if ti.state in (TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED):
  487. self.log.error("Task instance %s with state %s", ti, ti.state)
  488. ti_status.failed.add(key)
  489. ti_status.to_run.pop(key)
  490. if key in ti_status.running:
  491. ti_status.running.pop(key)
  492. return
  493. if self.ignore_first_depends_on_past:
  494. dagrun = ti.get_dagrun(session=session)
  495. ignore_depends_on_past = dagrun.execution_date == (start_date or ti.start_date)
  496. else:
  497. ignore_depends_on_past = False
  498. backfill_context = DepContext(
  499. deps=BACKFILL_QUEUED_DEPS,
  500. ignore_depends_on_past=ignore_depends_on_past,
  501. ignore_task_deps=self.ignore_task_deps,
  502. wait_for_past_depends_before_skipping=False,
  503. flag_upstream_failed=True,
  504. )
  505. executor = ExecutorLoader.load_executor(str(ti.executor) if ti.executor else None)
  506. # Is the task runnable? -- then run it
  507. # the dependency checker can change states of tis
  508. if ti.are_dependencies_met(
  509. dep_context=backfill_context, session=session, verbose=self.verbose
  510. ):
  511. if executor.has_task(ti):
  512. self.log.debug("Task Instance %s already in executor waiting for queue to clear", ti)
  513. else:
  514. self.log.debug("Sending %s to executor", ti)
  515. # Skip scheduled state, we are executing immediately
  516. if ti.state in (TaskInstanceState.UP_FOR_RETRY, None):
  517. # i am not sure why this is necessary.
  518. # seemingly a quirk of backfill runner.
  519. # it should be handled elsewhere i think.
  520. # seems the leaf tasks are set SCHEDULED but others not.
  521. # but i am not going to look too closely since we need
  522. # to nuke the current backfill approach anyway.
  523. ti.try_number += 1
  524. ti.state = TaskInstanceState.QUEUED
  525. ti.queued_by_job_id = self.job.id
  526. ti.queued_dttm = timezone.utcnow()
  527. session.merge(ti)
  528. try:
  529. session.commit()
  530. except OperationalError:
  531. self.log.exception("Failed to commit task state change due to operational error")
  532. session.rollback()
  533. # early exit so the outer loop can retry
  534. return
  535. cfg_path = None
  536. if executor.is_local:
  537. cfg_path = tmp_configuration_copy()
  538. executor.queue_task_instance(
  539. ti,
  540. mark_success=self.mark_success,
  541. pickle_id=pickle_id,
  542. ignore_task_deps=self.ignore_task_deps,
  543. ignore_depends_on_past=ignore_depends_on_past,
  544. wait_for_past_depends_before_skipping=False,
  545. pool=self.pool,
  546. cfg_path=cfg_path,
  547. )
  548. ti_status.running[key] = ti
  549. ti_status.to_run.pop(key)
  550. return
  551. if ti.state == TaskInstanceState.UPSTREAM_FAILED:
  552. self.log.error("Task instance %s upstream failed", ti)
  553. ti_status.failed.add(key)
  554. ti_status.to_run.pop(key)
  555. if key in ti_status.running:
  556. ti_status.running.pop(key)
  557. return
  558. # special case
  559. if ti.state == TaskInstanceState.UP_FOR_RETRY:
  560. self.log.debug("Task instance %s retry period not expired yet", ti)
  561. if key in ti_status.running:
  562. ti_status.running.pop(key)
  563. ti_status.to_run[key] = ti
  564. return
  565. # special case
  566. if ti.state == TaskInstanceState.UP_FOR_RESCHEDULE:
  567. self.log.debug("Task instance %s reschedule period not expired yet", ti)
  568. if key in ti_status.running:
  569. ti_status.running.pop(key)
  570. ti_status.to_run[key] = ti
  571. return
  572. # all remaining tasks
  573. self.log.debug("Adding %s to not_ready", ti)
  574. ti_status.not_ready.add(key)
  575. try:
  576. for task in self.dag.topological_sort(include_subdag_tasks=True):
  577. for key, ti in list(ti_status.to_run.items()):
  578. # Attempt to workaround deadlock on backfill by attempting to commit the transaction
  579. # state update few times before giving up
  580. max_attempts = 5
  581. for i in range(max_attempts):
  582. if task.task_id != ti.task_id:
  583. continue
  584. pool = session.scalar(
  585. select(models.Pool).where(models.Pool.pool == task.pool).limit(1)
  586. )
  587. if not pool:
  588. raise PoolNotFound(f"Unknown pool: {task.pool}")
  589. open_slots = pool.open_slots(session=session)
  590. if open_slots <= 0:
  591. raise NoAvailablePoolSlot(
  592. f"Not scheduling since there are {open_slots} "
  593. f"open slots in pool {task.pool}"
  594. )
  595. num_running_task_instances_in_dag = DAG.get_num_task_instances(
  596. self.dag_id,
  597. states=self.STATES_COUNT_AS_RUNNING,
  598. session=session,
  599. )
  600. if num_running_task_instances_in_dag >= self.dag.max_active_tasks:
  601. raise DagConcurrencyLimitReached(
  602. "Not scheduling since DAG max_active_tasks limit is reached."
  603. )
  604. if task.max_active_tis_per_dag is not None:
  605. num_running_task_instances_in_task = DAG.get_num_task_instances(
  606. dag_id=self.dag_id,
  607. task_ids=[task.task_id],
  608. states=self.STATES_COUNT_AS_RUNNING,
  609. session=session,
  610. )
  611. if num_running_task_instances_in_task >= task.max_active_tis_per_dag:
  612. raise TaskConcurrencyLimitReached(
  613. "Not scheduling since Task concurrency limit is reached."
  614. )
  615. if task.max_active_tis_per_dagrun is not None:
  616. num_running_task_instances_in_task_dagrun = DAG.get_num_task_instances(
  617. dag_id=self.dag_id,
  618. run_id=ti.run_id,
  619. task_ids=[task.task_id],
  620. states=self.STATES_COUNT_AS_RUNNING,
  621. session=session,
  622. )
  623. if (
  624. num_running_task_instances_in_task_dagrun
  625. >= task.max_active_tis_per_dagrun
  626. ):
  627. raise TaskConcurrencyLimitReached(
  628. "Not scheduling since Task concurrency per DAG run limit is reached."
  629. )
  630. _per_task_process(key, ti, session)
  631. try:
  632. session.commit()
  633. except OperationalError:
  634. self.log.exception(
  635. "Failed to commit task state due to operational error. "
  636. "The job will retry this operation so if your backfill succeeds, "
  637. "you can safely ignore this message.",
  638. )
  639. session.rollback()
  640. if i == max_attempts - 1:
  641. raise
  642. # retry the loop
  643. else:
  644. # break the retry loop
  645. break
  646. except (NoAvailablePoolSlot, DagConcurrencyLimitReached, TaskConcurrencyLimitReached) as e:
  647. self.log.debug(e)
  648. perform_heartbeat(
  649. job=self.job,
  650. heartbeat_callback=self.heartbeat_callback,
  651. only_if_necessary=True,
  652. )
  653. # execute the tasks in the queue
  654. for executor in self.job.executors:
  655. executor.heartbeat()
  656. # If the set of tasks that aren't ready ever equals the set of
  657. # tasks to run and there are no running tasks then the backfill
  658. # is deadlocked
  659. if ti_status.not_ready and ti_status.not_ready == set(ti_status.to_run) and not ti_status.running:
  660. self.log.warning("Deadlock discovered for ti_status.to_run=%s", ti_status.to_run.values())
  661. ti_status.deadlocked.update(ti_status.to_run.values())
  662. ti_status.to_run.clear()
  663. for executor in self.job.executors:
  664. # check executor state -- and expand any mapped TIs
  665. for node, run_id, new_mapped_tis, max_map_index in self._manage_executor_state(
  666. ti_status.running, executor, session
  667. ):
  668. def to_keep(key: TaskInstanceKey) -> bool:
  669. if key.dag_id != node.dag_id or key.task_id != node.task_id or key.run_id != run_id:
  670. # For another Dag/Task/Run -- don't remove
  671. return True
  672. return 0 <= key.map_index <= max_map_index
  673. # remove the old unmapped TIs for node -- they have been replaced with the mapped TIs
  674. ti_status.to_run = {key: ti for (key, ti) in ti_status.to_run.items() if to_keep(key)}
  675. ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis})
  676. for new_ti in new_mapped_tis:
  677. new_ti.try_number += 1
  678. new_ti.set_state(TaskInstanceState.SCHEDULED, session=session)
  679. # Set state to failed for running TIs that are set up for retry if disable-retry flag is set
  680. for ti in ti_status.running.values():
  681. if self.disable_retry and ti.state == TaskInstanceState.UP_FOR_RETRY:
  682. ti.set_state(TaskInstanceState.FAILED, session=session)
  683. # update the task counters
  684. self._update_counters(ti_status=ti_status, session=session)
  685. session.commit()
  686. # update dag run state
  687. _dag_runs = ti_status.active_runs.copy()
  688. for run in _dag_runs:
  689. run.update_state(session=session)
  690. if run.state in State.finished_dr_states:
  691. ti_status.finished_runs += 1
  692. ti_status.active_runs.remove(run)
  693. executed_run_dates.append(run.execution_date)
  694. self._log_progress(ti_status)
  695. session.commit()
  696. time.sleep(1)
  697. # return updated status
  698. return executed_run_dates
  699. @provide_session
  700. def _collect_errors(self, ti_status: _DagRunTaskStatus, session: Session = NEW_SESSION) -> Iterator[str]:
  701. def tabulate_ti_keys_set(ti_keys: Iterable[TaskInstanceKey]) -> str:
  702. # Sorting by execution date first
  703. sorted_ti_keys: Any = sorted(
  704. ti_keys,
  705. key=lambda ti_key: (
  706. ti_key.run_id,
  707. ti_key.dag_id,
  708. ti_key.task_id,
  709. ti_key.map_index,
  710. ti_key.try_number,
  711. ),
  712. )
  713. if all(key.map_index == -1 for key in ti_keys):
  714. headers = ["DAG ID", "Task ID", "Run ID", "Try number"]
  715. sorted_ti_keys = (k[0:4] for k in sorted_ti_keys)
  716. else:
  717. headers = ["DAG ID", "Task ID", "Run ID", "Map Index", "Try number"]
  718. return tabulate(sorted_ti_keys, headers=headers)
  719. if ti_status.failed:
  720. yield "Some task instances failed:\n"
  721. yield tabulate_ti_keys_set(ti_status.failed)
  722. if ti_status.deadlocked:
  723. yield "BackfillJob is deadlocked."
  724. deadlocked_depends_on_past = any(
  725. t.are_dependencies_met(
  726. dep_context=DepContext(ignore_depends_on_past=False),
  727. session=session,
  728. verbose=self.verbose,
  729. )
  730. != t.are_dependencies_met(
  731. dep_context=DepContext(ignore_depends_on_past=True), session=session, verbose=self.verbose
  732. )
  733. for t in ti_status.deadlocked
  734. )
  735. if deadlocked_depends_on_past:
  736. yield (
  737. "Some of the deadlocked tasks were unable to run because "
  738. 'of "depends_on_past" relationships. Try running the '
  739. "backfill with the option "
  740. '"ignore_first_depends_on_past=True" or passing "-I" at '
  741. "the command line."
  742. )
  743. yield "\nThese tasks have succeeded:\n"
  744. yield tabulate_ti_keys_set(ti_status.succeeded)
  745. yield "\n\nThese tasks are running:\n"
  746. yield tabulate_ti_keys_set(ti_status.running)
  747. yield "\n\nThese tasks have failed:\n"
  748. yield tabulate_ti_keys_set(ti_status.failed)
  749. yield "\n\nThese tasks are skipped:\n"
  750. yield tabulate_ti_keys_set(ti_status.skipped)
  751. yield "\n\nThese tasks are deadlocked:\n"
  752. yield tabulate_ti_keys_set([ti.key for ti in ti_status.deadlocked])
  753. def _get_dag_with_subdags(self) -> list[DAG]:
  754. return [self.dag, *self.dag.subdags]
  755. @provide_session
  756. def _execute_dagruns(
  757. self,
  758. dagrun_infos: Iterable[DagRunInfo],
  759. ti_status: _DagRunTaskStatus,
  760. pickle_id: int | None,
  761. start_date: datetime.datetime | None,
  762. session: Session = NEW_SESSION,
  763. ) -> None:
  764. """
  765. Compute and execute dag runs and their respective task instances for the given dates.
  766. Returns a list of execution dates of the dag runs that were executed.
  767. :param dagrun_infos: Schedule information for dag runs
  768. :param ti_status: internal BackfillJobRunner status structure to tis track progress
  769. :param pickle_id: numeric id of the pickled dag, None if not pickled
  770. :param start_date: backfill start date
  771. :param session: the current session object
  772. """
  773. for dagrun_info in dagrun_infos:
  774. for dag in self._get_dag_with_subdags():
  775. dag_run = self._get_dag_run(dagrun_info, dag, session=session)
  776. if dag_run is not None:
  777. tis_map = self._task_instances_for_dag_run(dag, dag_run, session=session)
  778. ti_status.active_runs.add(dag_run)
  779. ti_status.to_run.update(tis_map or {})
  780. tis_missing_executor = []
  781. for ti in ti_status.to_run.values():
  782. if ti.executor:
  783. try:
  784. ExecutorLoader.lookup_executor_name_by_str(ti.executor)
  785. except UnknownExecutorException:
  786. tis_missing_executor.append(ti)
  787. if tis_missing_executor:
  788. raise UnknownExecutorException(
  789. "The following task instances are configured to use an executor that is not present. "
  790. "Review the core.executors Airflow configuration to add it or clear the task instance to "
  791. "clear the executor configuration for this task.\n"
  792. + "\n".join(
  793. [f" {ti.task_id}: {ti.run_id} (executor: {ti.executor})" for ti in tis_missing_executor]
  794. )
  795. )
  796. processed_dag_run_dates = self._process_backfill_task_instances(
  797. ti_status=ti_status,
  798. pickle_id=pickle_id,
  799. start_date=start_date,
  800. session=session,
  801. )
  802. ti_status.executed_dag_run_dates.update(processed_dag_run_dates)
  803. @provide_session
  804. def _set_unfinished_dag_runs_to_failed(
  805. self,
  806. dag_runs: Iterable[DagRun],
  807. session: Session = NEW_SESSION,
  808. ) -> None:
  809. """
  810. Update the state of each dagrun based on the task_instance state and set unfinished runs to failed.
  811. :param dag_runs: DAG runs
  812. :param session: session
  813. :return: None
  814. """
  815. for dag_run in dag_runs:
  816. dag_run.update_state()
  817. if dag_run.state not in State.finished_dr_states:
  818. dag_run.set_state(DagRunState.FAILED)
  819. session.merge(dag_run)
  820. @provide_session
  821. def _execute(self, session: Session = NEW_SESSION) -> None:
  822. """
  823. Initialize all required components of a dag for a specified date range and execute the tasks.
  824. :meta private:
  825. """
  826. ti_status = BackfillJobRunner._DagRunTaskStatus()
  827. start_date = self.bf_start_date
  828. # Get DagRun schedule between the start/end dates, which will turn into dag runs.
  829. dagrun_start_date = timezone.coerce_datetime(start_date)
  830. if self.bf_end_date is None:
  831. dagrun_end_date = pendulum.now(timezone.utc)
  832. else:
  833. dagrun_end_date = pendulum.instance(self.bf_end_date)
  834. dagrun_infos = list(self.dag.iter_dagrun_infos_between(dagrun_start_date, dagrun_end_date))
  835. if self.run_backwards:
  836. tasks_that_depend_on_past = [t.task_id for t in self.dag.task_dict.values() if t.depends_on_past]
  837. if tasks_that_depend_on_past:
  838. raise AirflowException(
  839. f"You cannot backfill backwards because one or more "
  840. f'tasks depend_on_past: {",".join(tasks_that_depend_on_past)}'
  841. )
  842. dagrun_infos = dagrun_infos[::-1]
  843. if not dagrun_infos:
  844. if not self.run_at_least_once:
  845. self.log.info("No run dates were found for the given dates and dag interval.")
  846. return
  847. dagrun_infos = [DagRunInfo.interval(dagrun_start_date, dagrun_end_date)]
  848. dag_with_subdags_ids = [d.dag_id for d in self._get_dag_with_subdags()]
  849. running_dagruns = DagRun.find(
  850. dag_id=dag_with_subdags_ids,
  851. execution_start_date=self.bf_start_date,
  852. execution_end_date=self.bf_end_date,
  853. no_backfills=True,
  854. state=DagRunState.RUNNING,
  855. )
  856. if running_dagruns:
  857. for run in running_dagruns:
  858. self.log.error(
  859. "Backfill cannot be created for DagRun %s in %s, as there's already %s in a RUNNING "
  860. "state.",
  861. run.run_id,
  862. run.execution_date.strftime("%Y-%m-%dT%H:%M:%S"),
  863. run.run_type,
  864. )
  865. self.log.error(
  866. "Changing DagRun into BACKFILL would cause scheduler to lose track of executing "
  867. "tasks. Not changing DagRun type into BACKFILL, and trying insert another DagRun into "
  868. "database would cause database constraint violation for dag_id + execution_date "
  869. "combination. Please adjust backfill dates or wait for this DagRun to finish.",
  870. )
  871. return
  872. pickle_id = None
  873. _support_pickling = []
  874. for executor in self.job.executors:
  875. _support_pickling.append(executor.supports_pickling)
  876. executor.job_id = self.job.id
  877. executor.start()
  878. if not self.donot_pickle and all(_support_pickling):
  879. pickle = DagPickle(self.dag)
  880. session.add(pickle)
  881. session.commit()
  882. pickle_id = pickle.id
  883. ti_status.total_runs = len(dagrun_infos) # total dag runs in backfill
  884. try:
  885. remaining_dates = ti_status.total_runs
  886. while remaining_dates > 0:
  887. dagrun_infos_to_process = [
  888. dagrun_info
  889. for dagrun_info in dagrun_infos
  890. if dagrun_info.logical_date not in ti_status.executed_dag_run_dates
  891. ]
  892. self._execute_dagruns(
  893. dagrun_infos=dagrun_infos_to_process,
  894. ti_status=ti_status,
  895. pickle_id=pickle_id,
  896. start_date=start_date,
  897. session=session,
  898. )
  899. remaining_dates = ti_status.total_runs - len(ti_status.executed_dag_run_dates)
  900. err = "".join(self._collect_errors(ti_status=ti_status, session=session))
  901. if err:
  902. if not self.continue_on_failures or ti_status.deadlocked:
  903. raise BackfillUnfinished(err, ti_status)
  904. if remaining_dates > 0:
  905. self.log.info(
  906. "max_active_runs limit for dag %s has been reached "
  907. " - waiting for other dag runs to finish",
  908. self.dag_id,
  909. )
  910. time.sleep(self.delay_on_limit_secs)
  911. except (KeyboardInterrupt, SystemExit):
  912. self.log.warning("Backfill terminated by user.")
  913. # TODO: we will need to terminate running task instances and set the
  914. # state to failed.
  915. self._set_unfinished_dag_runs_to_failed(ti_status.active_runs)
  916. except OperationalError:
  917. self.log.exception(
  918. "Backfill job dead-locked. The job will retry the job so it is likely "
  919. "to heal itself. If your backfill succeeds you can ignore this exception.",
  920. )
  921. raise
  922. finally:
  923. session.commit()
  924. for executor in self.job.executors:
  925. executor.end()
  926. self.log.info("Backfill done for DAG %s. Exiting.", self.dag)
  927. @provide_session
  928. def reset_state_for_orphaned_tasks(
  929. self,
  930. filter_by_dag_run: DagRun | None = None,
  931. session: Session = NEW_SESSION,
  932. ) -> int | None:
  933. """
  934. Reset state of orphaned tasks.
  935. This function checks if there are any tasks in the dagrun (or all) that
  936. have a schedule or queued states but are not known by the executor. If
  937. it finds those it will reset the state to None so they will get picked
  938. up again. The batch option is for performance reasons as the queries
  939. are made in sequence.
  940. :param filter_by_dag_run: the dag_run we want to process, None if all
  941. :return: the number of TIs reset
  942. """
  943. queued_tis = []
  944. running_tis = []
  945. for executor in self.job.executors:
  946. queued_tis.append(executor.queued_tasks)
  947. # also consider running as the state might not have changed in the db yet
  948. running_tis.append(executor.running)
  949. # Can't use an update here since it doesn't support joins.
  950. resettable_states = [TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]
  951. if filter_by_dag_run is None:
  952. resettable_tis = (
  953. session.scalars(
  954. select(TaskInstance)
  955. .join(TaskInstance.dag_run)
  956. .where(
  957. DagRun.state == DagRunState.RUNNING,
  958. DagRun.run_type != DagRunType.BACKFILL_JOB,
  959. TaskInstance.state.in_(resettable_states),
  960. )
  961. )
  962. ).all()
  963. else:
  964. resettable_tis = filter_by_dag_run.get_task_instances(state=resettable_states, session=session)
  965. tis_to_reset = [ti for ti in resettable_tis if ti.key not in queued_tis and ti.key not in running_tis]
  966. if not tis_to_reset:
  967. return 0
  968. def query(result, items):
  969. if not items:
  970. return result
  971. filter_for_tis = TaskInstance.filter_for_tis(items)
  972. reset_tis = session.scalars(
  973. select(TaskInstance)
  974. .where(filter_for_tis, TaskInstance.state.in_(resettable_states))
  975. .with_for_update()
  976. ).all()
  977. for ti in reset_tis:
  978. ti.state = None
  979. session.merge(ti)
  980. return result + reset_tis
  981. reset_tis = helpers.reduce_in_chunks(query, tis_to_reset, [], self.job.max_tis_per_query)
  982. task_instance_str = "\n".join(f"\t{x!r}" for x in reset_tis)
  983. session.flush()
  984. self.log.info("Reset the following %s TaskInstances:\n%s", len(reset_tis), task_instance_str)
  985. return len(reset_tis)