dates.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import warnings
  20. from datetime import datetime, timedelta
  21. from typing import Collection
  22. from croniter import croniter
  23. from dateutil.relativedelta import relativedelta # for doctest
  24. from airflow.exceptions import RemovedInAirflow3Warning
  25. from airflow.typing_compat import Literal
  26. from airflow.utils import timezone
  27. cron_presets: dict[str, str] = {
  28. "@hourly": "0 * * * *",
  29. "@daily": "0 0 * * *",
  30. "@weekly": "0 0 * * 0",
  31. "@monthly": "0 0 1 * *",
  32. "@quarterly": "0 0 1 */3 *",
  33. "@yearly": "0 0 1 1 *",
  34. }
  35. def date_range(
  36. start_date: datetime,
  37. end_date: datetime | None = None,
  38. num: int | None = None,
  39. delta: str | timedelta | relativedelta | None = None,
  40. ) -> list[datetime]:
  41. """
  42. Get a list of dates in the specified range, separated by delta.
  43. .. code-block:: pycon
  44. >>> from airflow.utils.dates import date_range
  45. >>> from datetime import datetime, timedelta
  46. >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
  47. [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
  48. datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
  49. datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
  50. >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *")
  51. [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
  52. datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')),
  53. datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))]
  54. >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
  55. [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')),
  56. datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')),
  57. datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))]
  58. :param start_date: anchor date to start the series from
  59. :param end_date: right boundary for the date range
  60. :param num: alternatively to end_date, you can specify the number of
  61. number of entries you want in the range. This number can be negative,
  62. output will always be sorted regardless
  63. :param delta: step length. It can be datetime.timedelta or cron expression as string
  64. """
  65. warnings.warn(
  66. "`airflow.utils.dates.date_range()` is deprecated. Please use `airflow.timetables`.",
  67. category=RemovedInAirflow3Warning,
  68. stacklevel=2,
  69. )
  70. if not delta:
  71. return []
  72. if end_date:
  73. if start_date > end_date:
  74. raise ValueError("Wait. start_date needs to be before end_date")
  75. if num:
  76. raise ValueError("Wait. Either specify end_date OR num")
  77. if not end_date and not num:
  78. end_date = timezone.utcnow()
  79. delta_iscron = False
  80. time_zone = start_date.tzinfo
  81. abs_delta: timedelta | relativedelta
  82. if isinstance(delta, str):
  83. delta_iscron = True
  84. if timezone.is_localized(start_date):
  85. start_date = timezone.make_naive(start_date, time_zone)
  86. cron = croniter(cron_presets.get(delta, delta), start_date)
  87. elif isinstance(delta, timedelta):
  88. abs_delta = abs(delta)
  89. elif isinstance(delta, relativedelta):
  90. abs_delta = abs(delta)
  91. else:
  92. raise TypeError("Wait. delta must be either datetime.timedelta or cron expression as str")
  93. dates = []
  94. if end_date:
  95. if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
  96. end_date = timezone.make_naive(end_date, time_zone)
  97. while start_date <= end_date: # type: ignore
  98. if timezone.is_naive(start_date):
  99. dates.append(timezone.make_aware(start_date, time_zone))
  100. else:
  101. dates.append(start_date)
  102. if delta_iscron:
  103. start_date = cron.get_next(datetime)
  104. else:
  105. start_date += abs_delta
  106. else:
  107. num_entries: int = num # type: ignore
  108. for _ in range(abs(num_entries)):
  109. if timezone.is_naive(start_date):
  110. dates.append(timezone.make_aware(start_date, time_zone))
  111. else:
  112. dates.append(start_date)
  113. if delta_iscron and num_entries > 0:
  114. start_date = cron.get_next(datetime)
  115. elif delta_iscron:
  116. start_date = cron.get_prev(datetime)
  117. elif num_entries > 0:
  118. start_date += abs_delta
  119. else:
  120. start_date -= abs_delta
  121. return sorted(dates)
  122. def round_time(
  123. dt: datetime,
  124. delta: str | timedelta | relativedelta,
  125. start_date: datetime = timezone.make_aware(datetime.min),
  126. ):
  127. """
  128. Return ``start_date + i * delta`` for given ``i`` where the result is closest to ``dt``.
  129. .. code-block:: pycon
  130. >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1))
  131. datetime.datetime(2015, 1, 1, 0, 0)
  132. >>> round_time(datetime(2015, 1, 2), relativedelta(months=1))
  133. datetime.datetime(2015, 1, 1, 0, 0)
  134. >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
  135. datetime.datetime(2015, 9, 16, 0, 0)
  136. >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
  137. datetime.datetime(2015, 9, 15, 0, 0)
  138. >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
  139. datetime.datetime(2015, 9, 14, 0, 0)
  140. >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
  141. datetime.datetime(2015, 9, 14, 0, 0)
  142. """
  143. if isinstance(delta, str):
  144. # It's cron based, so it's easy
  145. time_zone = start_date.tzinfo
  146. start_date = timezone.make_naive(start_date, time_zone)
  147. cron = croniter(delta, start_date)
  148. prev = cron.get_prev(datetime)
  149. if prev == start_date:
  150. return timezone.make_aware(start_date, time_zone)
  151. else:
  152. return timezone.make_aware(prev, time_zone)
  153. # Ignore the microseconds of dt
  154. dt -= timedelta(microseconds=dt.microsecond)
  155. # We are looking for a datetime in the form start_date + i * delta
  156. # which is as close as possible to dt. Since delta could be a relative
  157. # delta we don't know its exact length in seconds so we cannot rely on
  158. # division to find i. Instead we employ a binary search algorithm, first
  159. # finding an upper and lower limit and then dissecting the interval until
  160. # we have found the closest match.
  161. # We first search an upper limit for i for which start_date + upper * delta
  162. # exceeds dt.
  163. upper = 1
  164. while start_date + upper * delta < dt:
  165. # To speed up finding an upper limit we grow this exponentially by a
  166. # factor of 2
  167. upper *= 2
  168. # Since upper is the first value for which start_date + upper * delta
  169. # exceeds dt, upper // 2 is below dt and therefore forms a lower limited
  170. # for the i we are looking for
  171. lower = upper // 2
  172. # We now continue to intersect the interval between
  173. # start_date + lower * delta and start_date + upper * delta
  174. # until we find the closest value
  175. while True:
  176. # Invariant: start + lower * delta < dt <= start + upper * delta
  177. # If start_date + (lower + 1)*delta exceeds dt, then either lower or
  178. # lower+1 has to be the solution we are searching for
  179. if start_date + (lower + 1) * delta >= dt:
  180. # Check if start_date + (lower + 1)*delta or
  181. # start_date + lower*delta is closer to dt and return the solution
  182. if (start_date + (lower + 1) * delta) - dt <= dt - (start_date + lower * delta):
  183. return start_date + (lower + 1) * delta
  184. else:
  185. return start_date + lower * delta
  186. # We intersect the interval and either replace the lower or upper
  187. # limit with the candidate
  188. candidate = lower + (upper - lower) // 2
  189. if start_date + candidate * delta >= dt:
  190. upper = candidate
  191. else:
  192. lower = candidate
  193. # in the special case when start_date > dt the search for upper will
  194. # immediately stop for upper == 1 which results in lower = upper // 2 = 0
  195. # and this function returns start_date.
  196. TimeUnit = Literal["days", "hours", "minutes", "seconds"]
  197. def infer_time_unit(time_seconds_arr: Collection[float]) -> TimeUnit:
  198. """
  199. Determine the most appropriate time unit for given durations (in seconds).
  200. e.g. 5400 seconds => 'minutes', 36000 seconds => 'hours'
  201. """
  202. if not time_seconds_arr:
  203. return "hours"
  204. max_time_seconds = max(time_seconds_arr)
  205. if max_time_seconds <= 60 * 2:
  206. return "seconds"
  207. elif max_time_seconds <= 60 * 60 * 2:
  208. return "minutes"
  209. elif max_time_seconds <= 24 * 60 * 60 * 2:
  210. return "hours"
  211. else:
  212. return "days"
  213. def scale_time_units(time_seconds_arr: Collection[float], unit: TimeUnit) -> Collection[float]:
  214. """Convert an array of time durations in seconds to the specified time unit."""
  215. if unit == "minutes":
  216. factor = 60
  217. elif unit == "hours":
  218. factor = 60 * 60
  219. elif unit == "days":
  220. factor = 24 * 60 * 60
  221. else:
  222. factor = 1
  223. return [x / factor for x in time_seconds_arr]
  224. def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
  225. """
  226. Get a datetime object representing *n* days ago.
  227. By default the time is set to midnight.
  228. """
  229. warnings.warn(
  230. "Function `days_ago` is deprecated and will be removed in Airflow 3.0. "
  231. "You can achieve equivalent behavior with `pendulum.today('UTC').add(days=-N, ...)`",
  232. RemovedInAirflow3Warning,
  233. stacklevel=2,
  234. )
  235. today = timezone.utcnow().replace(hour=hour, minute=minute, second=second, microsecond=microsecond)
  236. return today - timedelta(days=n)
  237. def parse_execution_date(execution_date_str):
  238. """Parse execution date string to datetime object."""
  239. return timezone.parse(execution_date_str)
  240. def datetime_to_nano(datetime) -> int:
  241. """Convert datetime to nanoseconds."""
  242. return int(datetime.timestamp() * 1000000000)