dependencies_deps.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from airflow.ti_deps.dependencies_states import (
  19. BACKFILL_QUEUEABLE_STATES,
  20. QUEUEABLE_STATES,
  21. RUNNABLE_STATES,
  22. )
  23. from airflow.ti_deps.deps.dag_ti_slots_available_dep import DagTISlotsAvailableDep
  24. from airflow.ti_deps.deps.dag_unpaused_dep import DagUnpausedDep
  25. from airflow.ti_deps.deps.dagrun_backfill_dep import DagRunNotBackfillDep
  26. from airflow.ti_deps.deps.dagrun_exists_dep import DagrunRunningDep
  27. from airflow.ti_deps.deps.exec_date_after_start_date_dep import ExecDateAfterStartDateDep
  28. from airflow.ti_deps.deps.pool_slots_available_dep import PoolSlotsAvailableDep
  29. from airflow.ti_deps.deps.runnable_exec_date_dep import RunnableExecDateDep
  30. from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
  31. from airflow.ti_deps.deps.task_not_running_dep import TaskNotRunningDep
  32. from airflow.ti_deps.deps.valid_state_dep import ValidStateDep
  33. # Dependencies that if met, task instance should be re-queued.
  34. REQUEUEABLE_DEPS = {
  35. DagTISlotsAvailableDep(),
  36. TaskConcurrencyDep(),
  37. PoolSlotsAvailableDep(),
  38. }
  39. # Dependencies that need to be met for a given task instance to be set to 'RUNNING' state.
  40. RUNNING_DEPS = {
  41. RunnableExecDateDep(),
  42. ValidStateDep(RUNNABLE_STATES),
  43. DagTISlotsAvailableDep(),
  44. TaskConcurrencyDep(),
  45. PoolSlotsAvailableDep(),
  46. TaskNotRunningDep(),
  47. }
  48. BACKFILL_QUEUED_DEPS = {
  49. RunnableExecDateDep(),
  50. ValidStateDep(BACKFILL_QUEUEABLE_STATES),
  51. DagrunRunningDep(),
  52. TaskNotRunningDep(),
  53. }
  54. # TODO(aoen): SCHEDULER_QUEUED_DEPS is not coupled to actual scheduling/execution
  55. # in any way and could easily be modified or removed from the scheduler causing
  56. # this dependency to become outdated and incorrect. This coupling should be created
  57. # (e.g. via a dag_deps analog of ti_deps that will be used in the scheduler code,
  58. # or allow batch deps checks) to ensure that the logic here is equivalent to the logic
  59. # in the scheduler.
  60. # Right now there's one discrepancy between this context and how scheduler schedule tasks:
  61. # Scheduler will check if the executor has the task instance--it is not possible
  62. # to check the executor outside scheduler main process.
  63. # Dependencies that need to be met for a given task instance to be set to 'queued' state
  64. # by the scheduler.
  65. # This context has more DEPs than RUNNING_DEPS, as we can have task triggered by
  66. # components other than scheduler, e.g. webserver.
  67. SCHEDULER_QUEUED_DEPS = {
  68. RunnableExecDateDep(),
  69. ValidStateDep(QUEUEABLE_STATES),
  70. DagTISlotsAvailableDep(),
  71. TaskConcurrencyDep(),
  72. PoolSlotsAvailableDep(),
  73. DagrunRunningDep(),
  74. DagRunNotBackfillDep(),
  75. DagUnpausedDep(),
  76. ExecDateAfterStartDateDep(),
  77. TaskNotRunningDep(),
  78. }