state.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from enum import Enum
  20. class JobState(str, Enum):
  21. """All possible states that a Job can be in."""
  22. RUNNING = "running"
  23. SUCCESS = "success"
  24. RESTARTING = "restarting"
  25. FAILED = "failed"
  26. def __str__(self) -> str:
  27. return self.value
  28. class TaskInstanceState(str, Enum):
  29. """
  30. All possible states that a Task Instance can be in.
  31. Note that None is also allowed, so always use this in a type hint with Optional.
  32. """
  33. # The scheduler sets a TaskInstance state to None when it's created but not
  34. # yet run, but we don't list it here since TaskInstance is a string enum.
  35. # Use None instead if need this state.
  36. # Set by the scheduler
  37. REMOVED = "removed" # Task vanished from DAG before it ran
  38. SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
  39. # Set by the task instance itself
  40. QUEUED = "queued" # Executor has enqueued the task
  41. RUNNING = "running" # Task is executing
  42. SUCCESS = "success" # Task completed
  43. RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
  44. FAILED = "failed" # Task errored out
  45. UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
  46. UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
  47. UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
  48. SKIPPED = "skipped" # Skipped by branching or some other mechanism
  49. DEFERRED = "deferred" # Deferrable operator waiting on a trigger
  50. # Not used anymore, kept for compatibility.
  51. # TODO: Remove in Airflow 3.0.
  52. SHUTDOWN = "shutdown"
  53. """The task instance is being shut down.
  54. :meta private:
  55. """
  56. def __str__(self) -> str:
  57. return self.value
  58. class DagRunState(str, Enum):
  59. """
  60. All possible states that a DagRun can be in.
  61. These are "shared" with TaskInstanceState in some parts of the code,
  62. so please ensure that their values always match the ones with the
  63. same name in TaskInstanceState.
  64. """
  65. QUEUED = "queued"
  66. RUNNING = "running"
  67. SUCCESS = "success"
  68. FAILED = "failed"
  69. def __str__(self) -> str:
  70. return self.value
  71. class State:
  72. """Static class with task instance state constants and color methods to avoid hard-coding."""
  73. # Backwards-compat constants for code that does not yet use the enum
  74. # These first three are shared by DagState and TaskState
  75. SUCCESS = TaskInstanceState.SUCCESS
  76. RUNNING = TaskInstanceState.RUNNING
  77. FAILED = TaskInstanceState.FAILED
  78. # These are TaskState only
  79. NONE = None
  80. REMOVED = TaskInstanceState.REMOVED
  81. SCHEDULED = TaskInstanceState.SCHEDULED
  82. QUEUED = TaskInstanceState.QUEUED
  83. RESTARTING = TaskInstanceState.RESTARTING
  84. UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
  85. UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
  86. UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
  87. SKIPPED = TaskInstanceState.SKIPPED
  88. DEFERRED = TaskInstanceState.DEFERRED
  89. # Not used anymore, kept for compatibility.
  90. # TODO: Remove in Airflow 3.0.
  91. SHUTDOWN = TaskInstanceState.SHUTDOWN
  92. """The task instance is being shut down.
  93. :meta private:
  94. """
  95. finished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.SUCCESS, DagRunState.FAILED])
  96. unfinished_dr_states: frozenset[DagRunState] = frozenset([DagRunState.QUEUED, DagRunState.RUNNING])
  97. task_states: tuple[TaskInstanceState | None, ...] = (None, *TaskInstanceState)
  98. dag_states: tuple[DagRunState, ...] = (
  99. DagRunState.QUEUED,
  100. DagRunState.SUCCESS,
  101. DagRunState.RUNNING,
  102. DagRunState.FAILED,
  103. )
  104. state_color: dict[TaskInstanceState | None, str] = {
  105. None: "lightblue",
  106. TaskInstanceState.QUEUED: "gray",
  107. TaskInstanceState.RUNNING: "lime",
  108. TaskInstanceState.SUCCESS: "green",
  109. TaskInstanceState.RESTARTING: "violet",
  110. TaskInstanceState.FAILED: "red",
  111. TaskInstanceState.UP_FOR_RETRY: "gold",
  112. TaskInstanceState.UP_FOR_RESCHEDULE: "turquoise",
  113. TaskInstanceState.UPSTREAM_FAILED: "orange",
  114. TaskInstanceState.SKIPPED: "hotpink",
  115. TaskInstanceState.REMOVED: "lightgrey",
  116. TaskInstanceState.SCHEDULED: "tan",
  117. TaskInstanceState.DEFERRED: "mediumpurple",
  118. }
  119. @classmethod
  120. def color(cls, state):
  121. """Return color for a state."""
  122. return cls.state_color.get(state, "white")
  123. @classmethod
  124. def color_fg(cls, state):
  125. """Black&white colors for a state."""
  126. color = cls.color(state)
  127. if color in ["green", "red"]:
  128. return "white"
  129. return "black"
  130. finished: frozenset[TaskInstanceState] = frozenset(
  131. [
  132. TaskInstanceState.SUCCESS,
  133. TaskInstanceState.FAILED,
  134. TaskInstanceState.SKIPPED,
  135. TaskInstanceState.UPSTREAM_FAILED,
  136. TaskInstanceState.REMOVED,
  137. ]
  138. )
  139. """
  140. A list of states indicating a task has reached a terminal state (i.e. it has "finished") and needs no
  141. further action.
  142. Note that the attempt could have resulted in failure or have been
  143. interrupted; or perhaps never run at all (skip, or upstream_failed) in any
  144. case, it is no longer running.
  145. """
  146. unfinished: frozenset[TaskInstanceState | None] = frozenset(
  147. [
  148. None,
  149. TaskInstanceState.SCHEDULED,
  150. TaskInstanceState.QUEUED,
  151. TaskInstanceState.RUNNING,
  152. TaskInstanceState.RESTARTING,
  153. TaskInstanceState.UP_FOR_RETRY,
  154. TaskInstanceState.UP_FOR_RESCHEDULE,
  155. TaskInstanceState.DEFERRED,
  156. ]
  157. )
  158. """
  159. A list of states indicating that a task either has not completed
  160. a run or has not even started.
  161. """
  162. failed_states: frozenset[TaskInstanceState] = frozenset(
  163. [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
  164. )
  165. """
  166. A list of states indicating that a task or dag is a failed state.
  167. """
  168. success_states: frozenset[TaskInstanceState] = frozenset(
  169. [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
  170. )
  171. """
  172. A list of states indicating that a task or dag is a success state.
  173. """
  174. # Kept for compatibility. DO NOT USE.
  175. # TODO: Remove in Airflow 3.0.
  176. terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
  177. """
  178. A list of states indicating that a task has been terminated.
  179. :meta private:
  180. """
  181. adoptable_states = frozenset(
  182. [TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING]
  183. )
  184. """
  185. A list of states indicating that a task can be adopted or reset by a scheduler job
  186. if it was queued by another scheduler job that is not running anymore.
  187. """