duration.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533
  1. from __future__ import annotations
  2. from datetime import timedelta
  3. from typing import TYPE_CHECKING
  4. from typing import cast
  5. from typing import overload
  6. import pendulum
  7. from pendulum.constants import SECONDS_PER_DAY
  8. from pendulum.constants import SECONDS_PER_HOUR
  9. from pendulum.constants import SECONDS_PER_MINUTE
  10. from pendulum.constants import US_PER_SECOND
  11. from pendulum.utils._compat import PYPY
  12. if TYPE_CHECKING:
  13. from typing_extensions import Self
  14. def _divide_and_round(a: float, b: float) -> int:
  15. """divide a by b and round result to the nearest integer
  16. When the ratio is exactly half-way between two integers,
  17. the even integer is returned.
  18. """
  19. # Based on the reference implementation for divmod_near
  20. # in Objects/longobject.c.
  21. q, r = divmod(a, b)
  22. # The output of divmod() is either a float or an int,
  23. # but we always want it to be an int.
  24. q = int(q)
  25. # round up if either r / b > 0.5, or r / b == 0.5 and q is odd.
  26. # The expression r / b > 0.5 is equivalent to 2 * r > b if b is
  27. # positive, 2 * r < b if b negative.
  28. r *= 2
  29. greater_than_half = r > b if b > 0 else r < b
  30. if greater_than_half or r == b and q % 2 == 1:
  31. q += 1
  32. return q
  33. class Duration(timedelta):
  34. """
  35. Replacement for the standard timedelta class.
  36. Provides several improvements over the base class.
  37. """
  38. _total: float = 0
  39. _years: int = 0
  40. _months: int = 0
  41. _weeks: int = 0
  42. _days: int = 0
  43. _remaining_days: int = 0
  44. _seconds: int = 0
  45. _microseconds: int = 0
  46. _y = None
  47. _m = None
  48. _w = None
  49. _d = None
  50. _h = None
  51. _i = None
  52. _s = None
  53. _invert = None
  54. def __new__(
  55. cls,
  56. days: float = 0,
  57. seconds: float = 0,
  58. microseconds: float = 0,
  59. milliseconds: float = 0,
  60. minutes: float = 0,
  61. hours: float = 0,
  62. weeks: float = 0,
  63. years: float = 0,
  64. months: float = 0,
  65. ) -> Self:
  66. if not isinstance(years, int) or not isinstance(months, int):
  67. raise ValueError("Float year and months are not supported")
  68. self = timedelta.__new__(
  69. cls,
  70. days + years * 365 + months * 30,
  71. seconds,
  72. microseconds,
  73. milliseconds,
  74. minutes,
  75. hours,
  76. weeks,
  77. )
  78. # Intuitive normalization
  79. total = self.total_seconds() - (years * 365 + months * 30) * SECONDS_PER_DAY
  80. self._total = total
  81. m = 1
  82. if total < 0:
  83. m = -1
  84. self._microseconds = round(total % m * 1e6)
  85. self._seconds = abs(int(total)) % SECONDS_PER_DAY * m
  86. _days = abs(int(total)) // SECONDS_PER_DAY * m
  87. self._days = _days
  88. self._remaining_days = abs(_days) % 7 * m
  89. self._weeks = abs(_days) // 7 * m
  90. self._months = months
  91. self._years = years
  92. self._signature = { # type: ignore[attr-defined]
  93. "years": years,
  94. "months": months,
  95. "weeks": weeks,
  96. "days": days,
  97. "hours": hours,
  98. "minutes": minutes,
  99. "seconds": seconds,
  100. "microseconds": microseconds + milliseconds * 1000,
  101. }
  102. return self
  103. def total_minutes(self) -> float:
  104. return self.total_seconds() / SECONDS_PER_MINUTE
  105. def total_hours(self) -> float:
  106. return self.total_seconds() / SECONDS_PER_HOUR
  107. def total_days(self) -> float:
  108. return self.total_seconds() / SECONDS_PER_DAY
  109. def total_weeks(self) -> float:
  110. return self.total_days() / 7
  111. if PYPY:
  112. def total_seconds(self) -> float:
  113. days = 0
  114. if hasattr(self, "_years"):
  115. days += self._years * 365
  116. if hasattr(self, "_months"):
  117. days += self._months * 30
  118. if hasattr(self, "_remaining_days"):
  119. days += self._weeks * 7 + self._remaining_days
  120. else:
  121. days += self._days
  122. return (
  123. (days * SECONDS_PER_DAY + self._seconds) * US_PER_SECOND
  124. + self._microseconds
  125. ) / US_PER_SECOND
  126. @property
  127. def years(self) -> int:
  128. return self._years
  129. @property
  130. def months(self) -> int:
  131. return self._months
  132. @property
  133. def weeks(self) -> int:
  134. return self._weeks
  135. if PYPY:
  136. @property
  137. def days(self) -> int:
  138. return self._years * 365 + self._months * 30 + self._days
  139. @property
  140. def remaining_days(self) -> int:
  141. return self._remaining_days
  142. @property
  143. def hours(self) -> int:
  144. if self._h is None:
  145. seconds = self._seconds
  146. self._h = 0
  147. if abs(seconds) >= 3600:
  148. self._h = (abs(seconds) // 3600 % 24) * self._sign(seconds)
  149. return self._h
  150. @property
  151. def minutes(self) -> int:
  152. if self._i is None:
  153. seconds = self._seconds
  154. self._i = 0
  155. if abs(seconds) >= 60:
  156. self._i = (abs(seconds) // 60 % 60) * self._sign(seconds)
  157. return self._i
  158. @property
  159. def seconds(self) -> int:
  160. return self._seconds
  161. @property
  162. def remaining_seconds(self) -> int:
  163. if self._s is None:
  164. self._s = self._seconds
  165. self._s = abs(self._s) % 60 * self._sign(self._s)
  166. return self._s
  167. @property
  168. def microseconds(self) -> int:
  169. return self._microseconds
  170. @property
  171. def invert(self) -> bool:
  172. if self._invert is None:
  173. self._invert = self.total_seconds() < 0
  174. return self._invert
  175. def in_weeks(self) -> int:
  176. return int(self.total_weeks())
  177. def in_days(self) -> int:
  178. return int(self.total_days())
  179. def in_hours(self) -> int:
  180. return int(self.total_hours())
  181. def in_minutes(self) -> int:
  182. return int(self.total_minutes())
  183. def in_seconds(self) -> int:
  184. return int(self.total_seconds())
  185. def in_words(self, locale: str | None = None, separator: str = " ") -> str:
  186. """
  187. Get the current interval in words in the current locale.
  188. Ex: 6 jours 23 heures 58 minutes
  189. :param locale: The locale to use. Defaults to current locale.
  190. :param separator: The separator to use between each unit
  191. """
  192. intervals = [
  193. ("year", self.years),
  194. ("month", self.months),
  195. ("week", self.weeks),
  196. ("day", self.remaining_days),
  197. ("hour", self.hours),
  198. ("minute", self.minutes),
  199. ("second", self.remaining_seconds),
  200. ]
  201. if locale is None:
  202. locale = pendulum.get_locale()
  203. loaded_locale = pendulum.locale(locale)
  204. parts = []
  205. for interval in intervals:
  206. unit, interval_count = interval
  207. if abs(interval_count) > 0:
  208. translation = loaded_locale.translation(
  209. f"units.{unit}.{loaded_locale.plural(abs(interval_count))}"
  210. )
  211. parts.append(translation.format(interval_count))
  212. if not parts:
  213. count: int | str = 0
  214. if abs(self.microseconds) > 0:
  215. unit = f"units.second.{loaded_locale.plural(1)}"
  216. count = f"{abs(self.microseconds) / 1e6:.2f}"
  217. else:
  218. unit = f"units.microsecond.{loaded_locale.plural(0)}"
  219. translation = loaded_locale.translation(unit)
  220. parts.append(translation.format(count))
  221. return separator.join(parts)
  222. def _sign(self, value: float) -> int:
  223. if value < 0:
  224. return -1
  225. return 1
  226. def as_timedelta(self) -> timedelta:
  227. """
  228. Return the interval as a native timedelta.
  229. """
  230. return timedelta(seconds=self.total_seconds())
  231. def __str__(self) -> str:
  232. return self.in_words()
  233. def __repr__(self) -> str:
  234. rep = f"{self.__class__.__name__}("
  235. if self._years:
  236. rep += f"years={self._years}, "
  237. if self._months:
  238. rep += f"months={self._months}, "
  239. if self._weeks:
  240. rep += f"weeks={self._weeks}, "
  241. if self._days:
  242. rep += f"days={self._remaining_days}, "
  243. if self.hours:
  244. rep += f"hours={self.hours}, "
  245. if self.minutes:
  246. rep += f"minutes={self.minutes}, "
  247. if self.remaining_seconds:
  248. rep += f"seconds={self.remaining_seconds}, "
  249. if self.microseconds:
  250. rep += f"microseconds={self.microseconds}, "
  251. rep += ")"
  252. return rep.replace(", )", ")")
  253. def __add__(self, other: timedelta) -> Self:
  254. if isinstance(other, timedelta):
  255. return self.__class__(seconds=self.total_seconds() + other.total_seconds())
  256. return NotImplemented
  257. __radd__ = __add__
  258. def __sub__(self, other: timedelta) -> Self:
  259. if isinstance(other, timedelta):
  260. return self.__class__(seconds=self.total_seconds() - other.total_seconds())
  261. return NotImplemented
  262. def __neg__(self) -> Self:
  263. return self.__class__(
  264. years=-self._years,
  265. months=-self._months,
  266. weeks=-self._weeks,
  267. days=-self._remaining_days,
  268. seconds=-self._seconds,
  269. microseconds=-self._microseconds,
  270. )
  271. def _to_microseconds(self) -> int:
  272. return (self._days * (24 * 3600) + self._seconds) * 1000000 + self._microseconds
  273. def __mul__(self, other: int | float) -> Self:
  274. if isinstance(other, int):
  275. return self.__class__(
  276. years=self._years * other,
  277. months=self._months * other,
  278. seconds=self._total * other,
  279. )
  280. if isinstance(other, float):
  281. usec = self._to_microseconds()
  282. a, b = other.as_integer_ratio()
  283. return self.__class__(0, 0, _divide_and_round(usec * a, b))
  284. return NotImplemented
  285. __rmul__ = __mul__
  286. @overload
  287. def __floordiv__(self, other: timedelta) -> int:
  288. ...
  289. @overload
  290. def __floordiv__(self, other: int) -> Self:
  291. ...
  292. def __floordiv__(self, other: int | timedelta) -> int | Duration:
  293. if not isinstance(other, (int, timedelta)):
  294. return NotImplemented
  295. usec = self._to_microseconds()
  296. if isinstance(other, timedelta):
  297. return cast(
  298. int, usec // other._to_microseconds() # type: ignore[attr-defined]
  299. )
  300. if isinstance(other, int):
  301. return self.__class__(
  302. 0,
  303. 0,
  304. usec // other,
  305. years=self._years // other,
  306. months=self._months // other,
  307. )
  308. @overload
  309. def __truediv__(self, other: timedelta) -> float:
  310. ...
  311. @overload
  312. def __truediv__(self, other: float) -> Self:
  313. ...
  314. def __truediv__(self, other: int | float | timedelta) -> Self | float:
  315. if not isinstance(other, (int, float, timedelta)):
  316. return NotImplemented
  317. usec = self._to_microseconds()
  318. if isinstance(other, timedelta):
  319. return cast(
  320. float, usec / other._to_microseconds() # type: ignore[attr-defined]
  321. )
  322. if isinstance(other, int):
  323. return self.__class__(
  324. 0,
  325. 0,
  326. _divide_and_round(usec, other),
  327. years=_divide_and_round(self._years, other),
  328. months=_divide_and_round(self._months, other),
  329. )
  330. if isinstance(other, float):
  331. a, b = other.as_integer_ratio()
  332. return self.__class__(
  333. 0,
  334. 0,
  335. _divide_and_round(b * usec, a),
  336. years=_divide_and_round(self._years * b, a),
  337. months=_divide_and_round(self._months, other),
  338. )
  339. __div__ = __floordiv__
  340. def __mod__(self, other: timedelta) -> Self:
  341. if isinstance(other, timedelta):
  342. r = self._to_microseconds() % other._to_microseconds() # type: ignore[attr-defined] # noqa: E501
  343. return self.__class__(0, 0, r)
  344. return NotImplemented
  345. def __divmod__(self, other: timedelta) -> tuple[int, Duration]:
  346. if isinstance(other, timedelta):
  347. q, r = divmod(
  348. self._to_microseconds(),
  349. other._to_microseconds(), # type: ignore[attr-defined]
  350. )
  351. return q, self.__class__(0, 0, r)
  352. return NotImplemented
  353. def __deepcopy__(self, _: dict[int, Self]) -> Self:
  354. return self.__class__(
  355. days=self.remaining_days,
  356. seconds=self.remaining_seconds,
  357. microseconds=self.microseconds,
  358. minutes=self.minutes,
  359. hours=self.hours,
  360. years=self.years,
  361. months=self.months,
  362. )
  363. Duration.min = Duration(days=-999999999)
  364. Duration.max = Duration(
  365. days=999999999, hours=23, minutes=59, seconds=59, microseconds=999999
  366. )
  367. Duration.resolution = Duration(microseconds=1)
  368. class AbsoluteDuration(Duration):
  369. """
  370. Duration that expresses a time difference in absolute values.
  371. """
  372. def __new__(
  373. cls,
  374. days: float = 0,
  375. seconds: float = 0,
  376. microseconds: float = 0,
  377. milliseconds: float = 0,
  378. minutes: float = 0,
  379. hours: float = 0,
  380. weeks: float = 0,
  381. years: float = 0,
  382. months: float = 0,
  383. ) -> AbsoluteDuration:
  384. if not isinstance(years, int) or not isinstance(months, int):
  385. raise ValueError("Float year and months are not supported")
  386. self = timedelta.__new__(
  387. cls, days, seconds, microseconds, milliseconds, minutes, hours, weeks
  388. )
  389. # We need to compute the total_seconds() value
  390. # on a native timedelta object
  391. delta = timedelta(
  392. days, seconds, microseconds, milliseconds, minutes, hours, weeks
  393. )
  394. # Intuitive normalization
  395. self._total = delta.total_seconds()
  396. total = abs(self._total)
  397. self._microseconds = round(total % 1 * 1e6)
  398. days, self._seconds = divmod(int(total), SECONDS_PER_DAY)
  399. self._days = abs(days + years * 365 + months * 30)
  400. self._weeks, self._remaining_days = divmod(days, 7)
  401. self._months = abs(months)
  402. self._years = abs(years)
  403. return self
  404. def total_seconds(self) -> float:
  405. return abs(self._total)
  406. @property
  407. def invert(self) -> bool:
  408. if self._invert is None:
  409. self._invert = self._total < 0
  410. return self._invert