base.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Sequence
  19. from warnings import warn
  20. from airflow.datasets import BaseDataset
  21. from airflow.typing_compat import Protocol, runtime_checkable
  22. if TYPE_CHECKING:
  23. from pendulum import DateTime
  24. from airflow.datasets import Dataset
  25. from airflow.serialization.dag_dependency import DagDependency
  26. from airflow.utils.types import DagRunType
  27. class _NullDataset(BaseDataset):
  28. """
  29. Sentinel type that represents "no datasets".
  30. This is only implemented to make typing easier in timetables, and not
  31. expected to be used anywhere else.
  32. :meta private:
  33. """
  34. def __bool__(self) -> bool:
  35. return False
  36. def __or__(self, other: BaseDataset) -> BaseDataset:
  37. return NotImplemented
  38. def __and__(self, other: BaseDataset) -> BaseDataset:
  39. return NotImplemented
  40. def as_expression(self) -> Any:
  41. return None
  42. def evaluate(self, statuses: dict[str, bool]) -> bool:
  43. return False
  44. def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
  45. return iter(())
  46. def iter_dag_dependencies(self, source, target) -> Iterator[DagDependency]:
  47. return iter(())
  48. class DataInterval(NamedTuple):
  49. """
  50. A data interval for a DagRun to operate over.
  51. Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone
  52. information.
  53. """
  54. start: DateTime
  55. end: DateTime
  56. @classmethod
  57. def exact(cls, at: DateTime) -> DataInterval:
  58. """Represent an "interval" containing only an exact time."""
  59. return cls(start=at, end=at)
  60. class TimeRestriction(NamedTuple):
  61. """
  62. Restriction on when a DAG can be scheduled for a run.
  63. Specifically, the run must not be earlier than ``earliest``, nor later than
  64. ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
  65. the current time, i.e. "missed" schedules are not backfilled.
  66. These values are generally set on the DAG or task's ``start_date``,
  67. ``end_date``, and ``catchup`` arguments.
  68. Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run
  69. can happen exactly at either point of time. They are guaranteed to be aware
  70. (i.e. contain timezone information) for ``TimeRestriction`` instances
  71. created by Airflow.
  72. """
  73. earliest: DateTime | None
  74. latest: DateTime | None
  75. catchup: bool
  76. class DagRunInfo(NamedTuple):
  77. """
  78. Information to schedule a DagRun.
  79. Instances of this will be returned by timetables when they are asked to
  80. schedule a DagRun creation.
  81. """
  82. run_after: DateTime
  83. """The earliest time this DagRun is created and its tasks scheduled.
  84. This **MUST** be "aware", i.e. contain timezone information.
  85. """
  86. data_interval: DataInterval
  87. """The data interval this DagRun to operate over."""
  88. @classmethod
  89. def exact(cls, at: DateTime) -> DagRunInfo:
  90. """Represent a run on an exact time."""
  91. return cls(run_after=at, data_interval=DataInterval.exact(at))
  92. @classmethod
  93. def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
  94. """
  95. Represent a run on a continuous schedule.
  96. In such a schedule, each data interval starts right after the previous
  97. one ends, and each run is scheduled right after the interval ends. This
  98. applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
  99. """
  100. return cls(run_after=end, data_interval=DataInterval(start, end))
  101. @property
  102. def logical_date(self: DagRunInfo) -> DateTime:
  103. """
  104. Infer the logical date to represent a DagRun.
  105. This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
  106. essentially the same, just a different name.
  107. """
  108. return self.data_interval.start
  109. @runtime_checkable
  110. class Timetable(Protocol):
  111. """Protocol that all Timetable classes are expected to implement."""
  112. description: str = ""
  113. """Human-readable description of the timetable.
  114. For example, this can produce something like ``'At 21:30, only on Friday'``
  115. from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
  116. """
  117. periodic: bool = True
  118. """Whether this timetable runs periodically.
  119. This defaults to and should generally be *True*, but some special setups
  120. like ``schedule=None`` and ``"@once"`` set it to *False*.
  121. """
  122. _can_be_scheduled: bool = True
  123. @property
  124. def can_be_scheduled(self):
  125. """
  126. Whether this timetable can actually schedule runs in an automated manner.
  127. This defaults to and should generally be *True* (including non periodic
  128. execution types like *@once* and data triggered tables), but
  129. ``NullTimetable`` sets this to *False*.
  130. """
  131. if hasattr(self, "can_run"):
  132. warn(
  133. 'can_run class variable is deprecated. Use "can_be_scheduled" instead.',
  134. DeprecationWarning,
  135. stacklevel=2,
  136. )
  137. return self.can_run
  138. return self._can_be_scheduled
  139. run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
  140. """How runs triggered from this timetable should be ordered in UI.
  141. This should be a list of field names on the DAG run object.
  142. """
  143. active_runs_limit: int | None = None
  144. """Maximum active runs that can be active at one time for a DAG.
  145. This is called during DAG initialization, and the return value is used as
  146. the DAG's default ``max_active_runs``. This should generally return *None*,
  147. but there are good reasons to limit DAG run parallelism in some cases, such
  148. as for :class:`~airflow.timetable.simple.ContinuousTimetable`.
  149. """
  150. dataset_condition: BaseDataset = _NullDataset()
  151. """The dataset condition that triggers a DAG using this timetable.
  152. If this is not *None*, this should be a dataset, or a combination of, that
  153. controls the DAG's dataset triggers.
  154. """
  155. @classmethod
  156. def deserialize(cls, data: dict[str, Any]) -> Timetable:
  157. """
  158. Deserialize a timetable from data.
  159. This is called when a serialized DAG is deserialized. ``data`` will be
  160. whatever was returned by ``serialize`` during DAG serialization. The
  161. default implementation constructs the timetable without any arguments.
  162. """
  163. return cls()
  164. def serialize(self) -> dict[str, Any]:
  165. """
  166. Serialize the timetable for JSON encoding.
  167. This is called during DAG serialization to store timetable information
  168. in the database. This should return a JSON-serializable dict that will
  169. be fed into ``deserialize`` when the DAG is deserialized. The default
  170. implementation returns an empty dict.
  171. """
  172. return {}
  173. def validate(self) -> None:
  174. """
  175. Validate the timetable is correctly specified.
  176. Override this method to provide run-time validation raised when a DAG
  177. is put into a dagbag. The default implementation does nothing.
  178. :raises: AirflowTimetableInvalid on validation failure.
  179. """
  180. return
  181. @property
  182. def summary(self) -> str:
  183. """
  184. A short summary for the timetable.
  185. This is used to display the timetable in the web UI. A cron expression
  186. timetable, for example, can use this to display the expression. The
  187. default implementation returns the timetable's type name.
  188. """
  189. return type(self).__name__
  190. def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
  191. """
  192. When a DAG run is manually triggered, infer a data interval for it.
  193. This is used for e.g. manually-triggered runs, where ``run_after`` would
  194. be when the user triggers the run. The default implementation raises
  195. ``NotImplementedError``.
  196. """
  197. raise NotImplementedError()
  198. def next_dagrun_info(
  199. self,
  200. *,
  201. last_automated_data_interval: DataInterval | None,
  202. restriction: TimeRestriction,
  203. ) -> DagRunInfo | None:
  204. """
  205. Provide information to schedule the next DagRun.
  206. The default implementation raises ``NotImplementedError``.
  207. :param last_automated_data_interval: The data interval of the associated
  208. DAG's last scheduled or backfilled run (manual runs not considered).
  209. :param restriction: Restriction to apply when scheduling the DAG run.
  210. See documentation of :class:`TimeRestriction` for details.
  211. :return: Information on when the next DagRun can be scheduled. None
  212. means a DagRun will not happen. This does not mean no more runs
  213. will be scheduled even again for this DAG; the timetable can return
  214. a DagRunInfo object when asked at another time.
  215. """
  216. raise NotImplementedError()
  217. def generate_run_id(
  218. self,
  219. *,
  220. run_type: DagRunType,
  221. logical_date: DateTime,
  222. data_interval: DataInterval | None,
  223. **extra,
  224. ) -> str:
  225. return run_type.generate_run_id(logical_date)