simple.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from typing import TYPE_CHECKING, Any, Collection, Sequence
  19. from airflow.datasets import DatasetAlias, _DatasetAliasCondition
  20. from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
  21. from airflow.utils import timezone
  22. if TYPE_CHECKING:
  23. from pendulum import DateTime
  24. from sqlalchemy import Session
  25. from airflow.datasets import BaseDataset
  26. from airflow.models.dataset import DatasetEvent
  27. from airflow.timetables.base import TimeRestriction
  28. from airflow.utils.types import DagRunType
  29. class _TrivialTimetable(Timetable):
  30. """Some code reuse for "trivial" timetables that has nothing complex."""
  31. periodic = False
  32. run_ordering: Sequence[str] = ("execution_date",)
  33. @classmethod
  34. def deserialize(cls, data: dict[str, Any]) -> Timetable:
  35. return cls()
  36. def __eq__(self, other: Any) -> bool:
  37. """
  38. As long as *other* is of the same type.
  39. This is only for testing purposes and should not be relied on otherwise.
  40. """
  41. if not isinstance(other, type(self)):
  42. return NotImplemented
  43. return True
  44. def serialize(self) -> dict[str, Any]:
  45. return {}
  46. def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
  47. return DataInterval.exact(run_after)
  48. class NullTimetable(_TrivialTimetable):
  49. """
  50. Timetable that never schedules anything.
  51. This corresponds to ``schedule=None``.
  52. """
  53. can_be_scheduled = False
  54. description: str = "Never, external triggers only"
  55. @property
  56. def summary(self) -> str:
  57. return "None"
  58. def next_dagrun_info(
  59. self,
  60. *,
  61. last_automated_data_interval: DataInterval | None,
  62. restriction: TimeRestriction,
  63. ) -> DagRunInfo | None:
  64. return None
  65. class OnceTimetable(_TrivialTimetable):
  66. """
  67. Timetable that schedules the execution once as soon as possible.
  68. This corresponds to ``schedule="@once"``.
  69. """
  70. description: str = "Once, as soon as possible"
  71. @property
  72. def summary(self) -> str:
  73. return "@once"
  74. def next_dagrun_info(
  75. self,
  76. *,
  77. last_automated_data_interval: DataInterval | None,
  78. restriction: TimeRestriction,
  79. ) -> DagRunInfo | None:
  80. if last_automated_data_interval is not None:
  81. return None # Already run, no more scheduling.
  82. if restriction.earliest is None: # No start date, won't run.
  83. return None
  84. # "@once" always schedule to the start_date determined by the DAG and
  85. # tasks, regardless of catchup or not. This has been the case since 1.10
  86. # and we're inheriting it.
  87. run_after = restriction.earliest
  88. if restriction.latest is not None and run_after > restriction.latest:
  89. return None
  90. return DagRunInfo.exact(run_after)
  91. class ContinuousTimetable(_TrivialTimetable):
  92. """
  93. Timetable that schedules continually, while still respecting start_date and end_date.
  94. This corresponds to ``schedule="@continuous"``.
  95. """
  96. description: str = "As frequently as possible, but only one run at a time."
  97. active_runs_limit = 1 # Continuous DAGRuns should be constrained to one run at a time
  98. @property
  99. def summary(self) -> str:
  100. return "@continuous"
  101. def next_dagrun_info(
  102. self,
  103. *,
  104. last_automated_data_interval: DataInterval | None,
  105. restriction: TimeRestriction,
  106. ) -> DagRunInfo | None:
  107. if restriction.earliest is None: # No start date, won't run.
  108. return None
  109. if last_automated_data_interval is not None: # has already run once
  110. start = last_automated_data_interval.end
  111. end = timezone.coerce_datetime(timezone.utcnow())
  112. else: # first run
  113. start = restriction.earliest
  114. end = max(
  115. restriction.earliest, timezone.coerce_datetime(timezone.utcnow())
  116. ) # won't run any earlier than start_date
  117. if restriction.latest is not None and end > restriction.latest:
  118. return None
  119. return DagRunInfo.interval(start, end)
  120. class DatasetTriggeredTimetable(_TrivialTimetable):
  121. """
  122. Timetable that never schedules anything.
  123. This should not be directly used anywhere, but only set if a DAG is triggered by datasets.
  124. :meta private:
  125. """
  126. description: str = "Triggered by datasets"
  127. def __init__(self, datasets: BaseDataset) -> None:
  128. super().__init__()
  129. self.dataset_condition = datasets
  130. if isinstance(self.dataset_condition, DatasetAlias):
  131. self.dataset_condition = _DatasetAliasCondition(self.dataset_condition.name)
  132. if not next(self.dataset_condition.iter_datasets(), False):
  133. self._summary = "Unresolved DatasetAlias"
  134. else:
  135. self._summary = "Dataset"
  136. @classmethod
  137. def deserialize(cls, data: dict[str, Any]) -> Timetable:
  138. from airflow.serialization.serialized_objects import decode_dataset_condition
  139. return cls(decode_dataset_condition(data["dataset_condition"]))
  140. @property
  141. def summary(self) -> str:
  142. return self._summary
  143. def serialize(self) -> dict[str, Any]:
  144. from airflow.serialization.serialized_objects import encode_dataset_condition
  145. return {"dataset_condition": encode_dataset_condition(self.dataset_condition)}
  146. def generate_run_id(
  147. self,
  148. *,
  149. run_type: DagRunType,
  150. logical_date: DateTime,
  151. data_interval: DataInterval | None,
  152. session: Session | None = None,
  153. events: Collection[DatasetEvent] | None = None,
  154. **extra,
  155. ) -> str:
  156. from airflow.models.dagrun import DagRun
  157. return DagRun.generate_run_id(run_type, logical_date)
  158. def data_interval_for_events(
  159. self,
  160. logical_date: DateTime,
  161. events: Collection[DatasetEvent],
  162. ) -> DataInterval:
  163. if not events:
  164. return DataInterval(logical_date, logical_date)
  165. start_dates, end_dates = [], []
  166. for event in events:
  167. if event.source_dag_run is not None:
  168. start_dates.append(event.source_dag_run.data_interval_start)
  169. end_dates.append(event.source_dag_run.data_interval_end)
  170. else:
  171. start_dates.append(event.timestamp)
  172. end_dates.append(event.timestamp)
  173. start = min(start_dates)
  174. end = max(end_dates)
  175. return DataInterval(start, end)
  176. def next_dagrun_info(
  177. self,
  178. *,
  179. last_automated_data_interval: DataInterval | None,
  180. restriction: TimeRestriction,
  181. ) -> DagRunInfo | None:
  182. return None