dep_context.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from typing import TYPE_CHECKING
  20. import attr
  21. from airflow.exceptions import TaskNotFound
  22. from airflow.utils.state import State
  23. if TYPE_CHECKING:
  24. from sqlalchemy.orm.session import Session
  25. from airflow.models.dagrun import DagRun
  26. from airflow.models.taskinstance import TaskInstance
  27. @attr.define
  28. class DepContext:
  29. """
  30. A base class for dependency contexts.
  31. Specifies which dependencies should be evaluated in the context for a task
  32. instance to satisfy the requirements of the context. Also stores state
  33. related to the context that can be used by dependency classes.
  34. For example there could be a SomeRunContext that subclasses this class which has
  35. dependencies for:
  36. - Making sure there are slots available on the infrastructure to run the task instance
  37. - A task-instance's task-specific dependencies are met (e.g. the previous task
  38. instance completed successfully)
  39. - ...
  40. :param deps: The context-specific dependencies that need to be evaluated for a
  41. task instance to run in this execution context.
  42. :param flag_upstream_failed: This is a hack to generate the upstream_failed state
  43. creation while checking to see whether the task instance is runnable. It was the
  44. shortest path to add the feature. This is bad since this class should be pure (no
  45. side effects).
  46. :param ignore_all_deps: Whether or not the context should ignore all ignorable
  47. dependencies. Overrides the other ignore_* parameters
  48. :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs (e.g. for
  49. Backfills)
  50. :param wait_for_past_depends_before_skipping: Wait for past depends before marking the ti as skipped
  51. :param ignore_in_retry_period: Ignore the retry period for task instances
  52. :param ignore_in_reschedule_period: Ignore the reschedule period for task instances
  53. :param ignore_unmapped_tasks: Ignore errors about mapped tasks not yet being expanded
  54. :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past and
  55. trigger rule
  56. :param ignore_ti_state: Ignore the task instance's previous failure/success
  57. :param finished_tis: A list of all the finished task instances of this run
  58. """
  59. deps: set = attr.ib(factory=set)
  60. flag_upstream_failed: bool = False
  61. ignore_all_deps: bool = False
  62. ignore_depends_on_past: bool = False
  63. wait_for_past_depends_before_skipping: bool = False
  64. ignore_in_retry_period: bool = False
  65. ignore_in_reschedule_period: bool = False
  66. ignore_task_deps: bool = False
  67. ignore_ti_state: bool = False
  68. ignore_unmapped_tasks: bool = False
  69. finished_tis: list[TaskInstance] | None = None
  70. description: str | None = None
  71. have_changed_ti_states: bool = False
  72. """Have any of the TIs state's been changed as a result of evaluating dependencies"""
  73. def ensure_finished_tis(self, dag_run: DagRun, session: Session) -> list[TaskInstance]:
  74. """
  75. Ensure finished_tis is populated if it's currently None, which allows running tasks without dag_run.
  76. :param dag_run: The DagRun for which to find finished tasks
  77. :return: A list of all the finished tasks of this DAG and execution_date
  78. """
  79. if self.finished_tis is None:
  80. finished_tis = dag_run.get_task_instances(state=State.finished, session=session)
  81. for ti in finished_tis:
  82. if not getattr(ti, "task", None) is not None and dag_run.dag:
  83. try:
  84. ti.task = dag_run.dag.get_task(ti.task_id)
  85. except TaskNotFound:
  86. pass
  87. self.finished_tis = finished_tis
  88. else:
  89. finished_tis = self.finished_tis
  90. return finished_tis