123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- from typing import TYPE_CHECKING, Any, Iterator, NamedTuple, Sequence
- from warnings import warn
- from airflow.datasets import BaseDataset
- from airflow.typing_compat import Protocol, runtime_checkable
- if TYPE_CHECKING:
- from pendulum import DateTime
- from airflow.datasets import Dataset
- from airflow.serialization.dag_dependency import DagDependency
- from airflow.utils.types import DagRunType
- class _NullDataset(BaseDataset):
- """
- Sentinel type that represents "no datasets".
- This is only implemented to make typing easier in timetables, and not
- expected to be used anywhere else.
- :meta private:
- """
- def __bool__(self) -> bool:
- return False
- def __or__(self, other: BaseDataset) -> BaseDataset:
- return NotImplemented
- def __and__(self, other: BaseDataset) -> BaseDataset:
- return NotImplemented
- def as_expression(self) -> Any:
- return None
- def evaluate(self, statuses: dict[str, bool]) -> bool:
- return False
- def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
- return iter(())
- def iter_dag_dependencies(self, source, target) -> Iterator[DagDependency]:
- return iter(())
- class DataInterval(NamedTuple):
- """
- A data interval for a DagRun to operate over.
- Both ``start`` and ``end`` **MUST** be "aware", i.e. contain timezone
- information.
- """
- start: DateTime
- end: DateTime
- @classmethod
- def exact(cls, at: DateTime) -> DataInterval:
- """Represent an "interval" containing only an exact time."""
- return cls(start=at, end=at)
- class TimeRestriction(NamedTuple):
- """
- Restriction on when a DAG can be scheduled for a run.
- Specifically, the run must not be earlier than ``earliest``, nor later than
- ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
- the current time, i.e. "missed" schedules are not backfilled.
- These values are generally set on the DAG or task's ``start_date``,
- ``end_date``, and ``catchup`` arguments.
- Both ``earliest`` and ``latest``, if not *None*, are inclusive; a DAG run
- can happen exactly at either point of time. They are guaranteed to be aware
- (i.e. contain timezone information) for ``TimeRestriction`` instances
- created by Airflow.
- """
- earliest: DateTime | None
- latest: DateTime | None
- catchup: bool
- class DagRunInfo(NamedTuple):
- """
- Information to schedule a DagRun.
- Instances of this will be returned by timetables when they are asked to
- schedule a DagRun creation.
- """
- run_after: DateTime
- """The earliest time this DagRun is created and its tasks scheduled.
- This **MUST** be "aware", i.e. contain timezone information.
- """
- data_interval: DataInterval
- """The data interval this DagRun to operate over."""
- @classmethod
- def exact(cls, at: DateTime) -> DagRunInfo:
- """Represent a run on an exact time."""
- return cls(run_after=at, data_interval=DataInterval.exact(at))
- @classmethod
- def interval(cls, start: DateTime, end: DateTime) -> DagRunInfo:
- """
- Represent a run on a continuous schedule.
- In such a schedule, each data interval starts right after the previous
- one ends, and each run is scheduled right after the interval ends. This
- applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
- """
- return cls(run_after=end, data_interval=DataInterval(start, end))
- @property
- def logical_date(self: DagRunInfo) -> DateTime:
- """
- Infer the logical date to represent a DagRun.
- This replaces ``execution_date`` in Airflow 2.1 and prior. The idea is
- essentially the same, just a different name.
- """
- return self.data_interval.start
- @runtime_checkable
- class Timetable(Protocol):
- """Protocol that all Timetable classes are expected to implement."""
- description: str = ""
- """Human-readable description of the timetable.
- For example, this can produce something like ``'At 21:30, only on Friday'``
- from the cron expression ``'30 21 * * 5'``. This is used in the webserver UI.
- """
- periodic: bool = True
- """Whether this timetable runs periodically.
- This defaults to and should generally be *True*, but some special setups
- like ``schedule=None`` and ``"@once"`` set it to *False*.
- """
- _can_be_scheduled: bool = True
- @property
- def can_be_scheduled(self):
- """
- Whether this timetable can actually schedule runs in an automated manner.
- This defaults to and should generally be *True* (including non periodic
- execution types like *@once* and data triggered tables), but
- ``NullTimetable`` sets this to *False*.
- """
- if hasattr(self, "can_run"):
- warn(
- 'can_run class variable is deprecated. Use "can_be_scheduled" instead.',
- DeprecationWarning,
- stacklevel=2,
- )
- return self.can_run
- return self._can_be_scheduled
- run_ordering: Sequence[str] = ("data_interval_end", "execution_date")
- """How runs triggered from this timetable should be ordered in UI.
- This should be a list of field names on the DAG run object.
- """
- active_runs_limit: int | None = None
- """Maximum active runs that can be active at one time for a DAG.
- This is called during DAG initialization, and the return value is used as
- the DAG's default ``max_active_runs``. This should generally return *None*,
- but there are good reasons to limit DAG run parallelism in some cases, such
- as for :class:`~airflow.timetable.simple.ContinuousTimetable`.
- """
- dataset_condition: BaseDataset = _NullDataset()
- """The dataset condition that triggers a DAG using this timetable.
- If this is not *None*, this should be a dataset, or a combination of, that
- controls the DAG's dataset triggers.
- """
- @classmethod
- def deserialize(cls, data: dict[str, Any]) -> Timetable:
- """
- Deserialize a timetable from data.
- This is called when a serialized DAG is deserialized. ``data`` will be
- whatever was returned by ``serialize`` during DAG serialization. The
- default implementation constructs the timetable without any arguments.
- """
- return cls()
- def serialize(self) -> dict[str, Any]:
- """
- Serialize the timetable for JSON encoding.
- This is called during DAG serialization to store timetable information
- in the database. This should return a JSON-serializable dict that will
- be fed into ``deserialize`` when the DAG is deserialized. The default
- implementation returns an empty dict.
- """
- return {}
- def validate(self) -> None:
- """
- Validate the timetable is correctly specified.
- Override this method to provide run-time validation raised when a DAG
- is put into a dagbag. The default implementation does nothing.
- :raises: AirflowTimetableInvalid on validation failure.
- """
- return
- @property
- def summary(self) -> str:
- """
- A short summary for the timetable.
- This is used to display the timetable in the web UI. A cron expression
- timetable, for example, can use this to display the expression. The
- default implementation returns the timetable's type name.
- """
- return type(self).__name__
- def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
- """
- When a DAG run is manually triggered, infer a data interval for it.
- This is used for e.g. manually-triggered runs, where ``run_after`` would
- be when the user triggers the run. The default implementation raises
- ``NotImplementedError``.
- """
- raise NotImplementedError()
- def next_dagrun_info(
- self,
- *,
- last_automated_data_interval: DataInterval | None,
- restriction: TimeRestriction,
- ) -> DagRunInfo | None:
- """
- Provide information to schedule the next DagRun.
- The default implementation raises ``NotImplementedError``.
- :param last_automated_data_interval: The data interval of the associated
- DAG's last scheduled or backfilled run (manual runs not considered).
- :param restriction: Restriction to apply when scheduling the DAG run.
- See documentation of :class:`TimeRestriction` for details.
- :return: Information on when the next DagRun can be scheduled. None
- means a DagRun will not happen. This does not mean no more runs
- will be scheduled even again for this DAG; the timetable can return
- a DagRunInfo object when asked at another time.
- """
- raise NotImplementedError()
- def generate_run_id(
- self,
- *,
- run_type: DagRunType,
- logical_date: DateTime,
- data_interval: DataInterval | None,
- **extra,
- ) -> str:
- return run_type.generate_run_id(logical_date)
|