trigger.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import datetime
  19. from typing import TYPE_CHECKING, Any
  20. from airflow.timetables._cron import CronMixin
  21. from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
  22. from airflow.utils import timezone
  23. if TYPE_CHECKING:
  24. from dateutil.relativedelta import relativedelta
  25. from pendulum import DateTime
  26. from pendulum.tz.timezone import FixedTimezone, Timezone
  27. from airflow.timetables.base import TimeRestriction
  28. class CronTriggerTimetable(CronMixin, Timetable):
  29. """
  30. Timetable that triggers DAG runs according to a cron expression.
  31. This is different from ``CronDataIntervalTimetable``, where the cron
  32. expression specifies the *data interval* of a DAG run. With this timetable,
  33. the data intervals are specified independently from the cron expression.
  34. Also for the same reason, this timetable kicks off a DAG run immediately at
  35. the start of the period (similar to POSIX cron), instead of needing to wait
  36. for one data interval to pass.
  37. Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
  38. """
  39. def __init__(
  40. self,
  41. cron: str,
  42. *,
  43. timezone: str | Timezone | FixedTimezone,
  44. interval: datetime.timedelta | relativedelta = datetime.timedelta(),
  45. ) -> None:
  46. super().__init__(cron, timezone)
  47. self._interval = interval
  48. @classmethod
  49. def deserialize(cls, data: dict[str, Any]) -> Timetable:
  50. from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
  51. interval: datetime.timedelta | relativedelta
  52. if isinstance(data["interval"], dict):
  53. interval = decode_relativedelta(data["interval"])
  54. else:
  55. interval = datetime.timedelta(seconds=data["interval"])
  56. return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
  57. def serialize(self) -> dict[str, Any]:
  58. from airflow.serialization.serialized_objects import encode_relativedelta, encode_timezone
  59. interval: float | dict[str, Any]
  60. if isinstance(self._interval, datetime.timedelta):
  61. interval = self._interval.total_seconds()
  62. else:
  63. interval = encode_relativedelta(self._interval)
  64. timezone = encode_timezone(self._timezone)
  65. return {"expression": self._expression, "timezone": timezone, "interval": interval}
  66. def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
  67. return DataInterval(
  68. # pendulum.Datetime ± timedelta should return pendulum.Datetime
  69. # however mypy decide that output would be datetime.datetime
  70. run_after - self._interval, # type: ignore[arg-type]
  71. run_after,
  72. )
  73. def next_dagrun_info(
  74. self,
  75. *,
  76. last_automated_data_interval: DataInterval | None,
  77. restriction: TimeRestriction,
  78. ) -> DagRunInfo | None:
  79. if restriction.catchup:
  80. if last_automated_data_interval is not None:
  81. next_start_time = self._get_next(last_automated_data_interval.end)
  82. elif restriction.earliest is None:
  83. return None # Don't know where to catch up from, give up.
  84. else:
  85. next_start_time = self._align_to_next(restriction.earliest)
  86. else:
  87. start_time_candidates = [self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
  88. if last_automated_data_interval is not None:
  89. start_time_candidates.append(self._get_next(last_automated_data_interval.end))
  90. if restriction.earliest is not None:
  91. start_time_candidates.append(self._align_to_next(restriction.earliest))
  92. next_start_time = max(start_time_candidates)
  93. if restriction.latest is not None and restriction.latest < next_start_time:
  94. return None
  95. return DagRunInfo.interval(
  96. # pendulum.Datetime ± timedelta should return pendulum.Datetime
  97. # however mypy decide that output would be datetime.datetime
  98. next_start_time - self._interval, # type: ignore[arg-type]
  99. next_start_time,
  100. )