123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- from __future__ import annotations
- from datetime import timedelta
- from typing import TYPE_CHECKING
- from typing import cast
- from typing import overload
- import pendulum
- from pendulum.constants import SECONDS_PER_DAY
- from pendulum.constants import SECONDS_PER_HOUR
- from pendulum.constants import SECONDS_PER_MINUTE
- from pendulum.constants import US_PER_SECOND
- from pendulum.utils._compat import PYPY
- if TYPE_CHECKING:
- from typing_extensions import Self
- def _divide_and_round(a: float, b: float) -> int:
- """divide a by b and round result to the nearest integer
- When the ratio is exactly half-way between two integers,
- the even integer is returned.
- """
- # Based on the reference implementation for divmod_near
- # in Objects/longobject.c.
- q, r = divmod(a, b)
- # The output of divmod() is either a float or an int,
- # but we always want it to be an int.
- q = int(q)
- # round up if either r / b > 0.5, or r / b == 0.5 and q is odd.
- # The expression r / b > 0.5 is equivalent to 2 * r > b if b is
- # positive, 2 * r < b if b negative.
- r *= 2
- greater_than_half = r > b if b > 0 else r < b
- if greater_than_half or r == b and q % 2 == 1:
- q += 1
- return q
- class Duration(timedelta):
- """
- Replacement for the standard timedelta class.
- Provides several improvements over the base class.
- """
- _total: float = 0
- _years: int = 0
- _months: int = 0
- _weeks: int = 0
- _days: int = 0
- _remaining_days: int = 0
- _seconds: int = 0
- _microseconds: int = 0
- _y = None
- _m = None
- _w = None
- _d = None
- _h = None
- _i = None
- _s = None
- _invert = None
- def __new__(
- cls,
- days: float = 0,
- seconds: float = 0,
- microseconds: float = 0,
- milliseconds: float = 0,
- minutes: float = 0,
- hours: float = 0,
- weeks: float = 0,
- years: float = 0,
- months: float = 0,
- ) -> Self:
- if not isinstance(years, int) or not isinstance(months, int):
- raise ValueError("Float year and months are not supported")
- self = timedelta.__new__(
- cls,
- days + years * 365 + months * 30,
- seconds,
- microseconds,
- milliseconds,
- minutes,
- hours,
- weeks,
- )
- # Intuitive normalization
- total = self.total_seconds() - (years * 365 + months * 30) * SECONDS_PER_DAY
- self._total = total
- m = 1
- if total < 0:
- m = -1
- self._microseconds = round(total % m * 1e6)
- self._seconds = abs(int(total)) % SECONDS_PER_DAY * m
- _days = abs(int(total)) // SECONDS_PER_DAY * m
- self._days = _days
- self._remaining_days = abs(_days) % 7 * m
- self._weeks = abs(_days) // 7 * m
- self._months = months
- self._years = years
- self._signature = { # type: ignore[attr-defined]
- "years": years,
- "months": months,
- "weeks": weeks,
- "days": days,
- "hours": hours,
- "minutes": minutes,
- "seconds": seconds,
- "microseconds": microseconds + milliseconds * 1000,
- }
- return self
- def total_minutes(self) -> float:
- return self.total_seconds() / SECONDS_PER_MINUTE
- def total_hours(self) -> float:
- return self.total_seconds() / SECONDS_PER_HOUR
- def total_days(self) -> float:
- return self.total_seconds() / SECONDS_PER_DAY
- def total_weeks(self) -> float:
- return self.total_days() / 7
- if PYPY:
- def total_seconds(self) -> float:
- days = 0
- if hasattr(self, "_years"):
- days += self._years * 365
- if hasattr(self, "_months"):
- days += self._months * 30
- if hasattr(self, "_remaining_days"):
- days += self._weeks * 7 + self._remaining_days
- else:
- days += self._days
- return (
- (days * SECONDS_PER_DAY + self._seconds) * US_PER_SECOND
- + self._microseconds
- ) / US_PER_SECOND
- @property
- def years(self) -> int:
- return self._years
- @property
- def months(self) -> int:
- return self._months
- @property
- def weeks(self) -> int:
- return self._weeks
- if PYPY:
- @property
- def days(self) -> int:
- return self._years * 365 + self._months * 30 + self._days
- @property
- def remaining_days(self) -> int:
- return self._remaining_days
- @property
- def hours(self) -> int:
- if self._h is None:
- seconds = self._seconds
- self._h = 0
- if abs(seconds) >= 3600:
- self._h = (abs(seconds) // 3600 % 24) * self._sign(seconds)
- return self._h
- @property
- def minutes(self) -> int:
- if self._i is None:
- seconds = self._seconds
- self._i = 0
- if abs(seconds) >= 60:
- self._i = (abs(seconds) // 60 % 60) * self._sign(seconds)
- return self._i
- @property
- def seconds(self) -> int:
- return self._seconds
- @property
- def remaining_seconds(self) -> int:
- if self._s is None:
- self._s = self._seconds
- self._s = abs(self._s) % 60 * self._sign(self._s)
- return self._s
- @property
- def microseconds(self) -> int:
- return self._microseconds
- @property
- def invert(self) -> bool:
- if self._invert is None:
- self._invert = self.total_seconds() < 0
- return self._invert
- def in_weeks(self) -> int:
- return int(self.total_weeks())
- def in_days(self) -> int:
- return int(self.total_days())
- def in_hours(self) -> int:
- return int(self.total_hours())
- def in_minutes(self) -> int:
- return int(self.total_minutes())
- def in_seconds(self) -> int:
- return int(self.total_seconds())
- def in_words(self, locale: str | None = None, separator: str = " ") -> str:
- """
- Get the current interval in words in the current locale.
- Ex: 6 jours 23 heures 58 minutes
- :param locale: The locale to use. Defaults to current locale.
- :param separator: The separator to use between each unit
- """
- intervals = [
- ("year", self.years),
- ("month", self.months),
- ("week", self.weeks),
- ("day", self.remaining_days),
- ("hour", self.hours),
- ("minute", self.minutes),
- ("second", self.remaining_seconds),
- ]
- if locale is None:
- locale = pendulum.get_locale()
- loaded_locale = pendulum.locale(locale)
- parts = []
- for interval in intervals:
- unit, interval_count = interval
- if abs(interval_count) > 0:
- translation = loaded_locale.translation(
- f"units.{unit}.{loaded_locale.plural(abs(interval_count))}"
- )
- parts.append(translation.format(interval_count))
- if not parts:
- count: int | str = 0
- if abs(self.microseconds) > 0:
- unit = f"units.second.{loaded_locale.plural(1)}"
- count = f"{abs(self.microseconds) / 1e6:.2f}"
- else:
- unit = f"units.microsecond.{loaded_locale.plural(0)}"
- translation = loaded_locale.translation(unit)
- parts.append(translation.format(count))
- return separator.join(parts)
- def _sign(self, value: float) -> int:
- if value < 0:
- return -1
- return 1
- def as_timedelta(self) -> timedelta:
- """
- Return the interval as a native timedelta.
- """
- return timedelta(seconds=self.total_seconds())
- def __str__(self) -> str:
- return self.in_words()
- def __repr__(self) -> str:
- rep = f"{self.__class__.__name__}("
- if self._years:
- rep += f"years={self._years}, "
- if self._months:
- rep += f"months={self._months}, "
- if self._weeks:
- rep += f"weeks={self._weeks}, "
- if self._days:
- rep += f"days={self._remaining_days}, "
- if self.hours:
- rep += f"hours={self.hours}, "
- if self.minutes:
- rep += f"minutes={self.minutes}, "
- if self.remaining_seconds:
- rep += f"seconds={self.remaining_seconds}, "
- if self.microseconds:
- rep += f"microseconds={self.microseconds}, "
- rep += ")"
- return rep.replace(", )", ")")
- def __add__(self, other: timedelta) -> Self:
- if isinstance(other, timedelta):
- return self.__class__(seconds=self.total_seconds() + other.total_seconds())
- return NotImplemented
- __radd__ = __add__
- def __sub__(self, other: timedelta) -> Self:
- if isinstance(other, timedelta):
- return self.__class__(seconds=self.total_seconds() - other.total_seconds())
- return NotImplemented
- def __neg__(self) -> Self:
- return self.__class__(
- years=-self._years,
- months=-self._months,
- weeks=-self._weeks,
- days=-self._remaining_days,
- seconds=-self._seconds,
- microseconds=-self._microseconds,
- )
- def _to_microseconds(self) -> int:
- return (self._days * (24 * 3600) + self._seconds) * 1000000 + self._microseconds
- def __mul__(self, other: int | float) -> Self:
- if isinstance(other, int):
- return self.__class__(
- years=self._years * other,
- months=self._months * other,
- seconds=self._total * other,
- )
- if isinstance(other, float):
- usec = self._to_microseconds()
- a, b = other.as_integer_ratio()
- return self.__class__(0, 0, _divide_and_round(usec * a, b))
- return NotImplemented
- __rmul__ = __mul__
- @overload
- def __floordiv__(self, other: timedelta) -> int:
- ...
- @overload
- def __floordiv__(self, other: int) -> Self:
- ...
- def __floordiv__(self, other: int | timedelta) -> int | Duration:
- if not isinstance(other, (int, timedelta)):
- return NotImplemented
- usec = self._to_microseconds()
- if isinstance(other, timedelta):
- return cast(
- int, usec // other._to_microseconds() # type: ignore[attr-defined]
- )
- if isinstance(other, int):
- return self.__class__(
- 0,
- 0,
- usec // other,
- years=self._years // other,
- months=self._months // other,
- )
- @overload
- def __truediv__(self, other: timedelta) -> float:
- ...
- @overload
- def __truediv__(self, other: float) -> Self:
- ...
- def __truediv__(self, other: int | float | timedelta) -> Self | float:
- if not isinstance(other, (int, float, timedelta)):
- return NotImplemented
- usec = self._to_microseconds()
- if isinstance(other, timedelta):
- return cast(
- float, usec / other._to_microseconds() # type: ignore[attr-defined]
- )
- if isinstance(other, int):
- return self.__class__(
- 0,
- 0,
- _divide_and_round(usec, other),
- years=_divide_and_round(self._years, other),
- months=_divide_and_round(self._months, other),
- )
- if isinstance(other, float):
- a, b = other.as_integer_ratio()
- return self.__class__(
- 0,
- 0,
- _divide_and_round(b * usec, a),
- years=_divide_and_round(self._years * b, a),
- months=_divide_and_round(self._months, other),
- )
- __div__ = __floordiv__
- def __mod__(self, other: timedelta) -> Self:
- if isinstance(other, timedelta):
- r = self._to_microseconds() % other._to_microseconds() # type: ignore[attr-defined] # noqa: E501
- return self.__class__(0, 0, r)
- return NotImplemented
- def __divmod__(self, other: timedelta) -> tuple[int, Duration]:
- if isinstance(other, timedelta):
- q, r = divmod(
- self._to_microseconds(),
- other._to_microseconds(), # type: ignore[attr-defined]
- )
- return q, self.__class__(0, 0, r)
- return NotImplemented
- def __deepcopy__(self, _: dict[int, Self]) -> Self:
- return self.__class__(
- days=self.remaining_days,
- seconds=self.remaining_seconds,
- microseconds=self.microseconds,
- minutes=self.minutes,
- hours=self.hours,
- years=self.years,
- months=self.months,
- )
- Duration.min = Duration(days=-999999999)
- Duration.max = Duration(
- days=999999999, hours=23, minutes=59, seconds=59, microseconds=999999
- )
- Duration.resolution = Duration(microseconds=1)
- class AbsoluteDuration(Duration):
- """
- Duration that expresses a time difference in absolute values.
- """
- def __new__(
- cls,
- days: float = 0,
- seconds: float = 0,
- microseconds: float = 0,
- milliseconds: float = 0,
- minutes: float = 0,
- hours: float = 0,
- weeks: float = 0,
- years: float = 0,
- months: float = 0,
- ) -> AbsoluteDuration:
- if not isinstance(years, int) or not isinstance(months, int):
- raise ValueError("Float year and months are not supported")
- self = timedelta.__new__(
- cls, days, seconds, microseconds, milliseconds, minutes, hours, weeks
- )
- # We need to compute the total_seconds() value
- # on a native timedelta object
- delta = timedelta(
- days, seconds, microseconds, milliseconds, minutes, hours, weeks
- )
- # Intuitive normalization
- self._total = delta.total_seconds()
- total = abs(self._total)
- self._microseconds = round(total % 1 * 1e6)
- days, self._seconds = divmod(int(total), SECONDS_PER_DAY)
- self._days = abs(days + years * 365 + months * 30)
- self._weeks, self._remaining_days = divmod(days, 7)
- self._months = abs(months)
- self._years = abs(years)
- return self
- def total_seconds(self) -> float:
- return abs(self._total)
- @property
- def invert(self) -> bool:
- if self._invert is None:
- self._invert = self._total < 0
- return self._invert
|