_helpers.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. from __future__ import annotations
  2. import datetime
  3. import math
  4. from typing import NamedTuple
  5. from typing import cast
  6. from pendulum.constants import DAY_OF_WEEK_TABLE
  7. from pendulum.constants import DAYS_PER_L_YEAR
  8. from pendulum.constants import DAYS_PER_MONTHS
  9. from pendulum.constants import DAYS_PER_N_YEAR
  10. from pendulum.constants import EPOCH_YEAR
  11. from pendulum.constants import MONTHS_OFFSETS
  12. from pendulum.constants import SECS_PER_4_YEARS
  13. from pendulum.constants import SECS_PER_100_YEARS
  14. from pendulum.constants import SECS_PER_400_YEARS
  15. from pendulum.constants import SECS_PER_DAY
  16. from pendulum.constants import SECS_PER_HOUR
  17. from pendulum.constants import SECS_PER_MIN
  18. from pendulum.constants import SECS_PER_YEAR
  19. from pendulum.constants import TM_DECEMBER
  20. from pendulum.constants import TM_JANUARY
  21. from pendulum.tz.timezone import Timezone
  22. from pendulum.utils._compat import zoneinfo
  23. class PreciseDiff(NamedTuple):
  24. years: int
  25. months: int
  26. days: int
  27. hours: int
  28. minutes: int
  29. seconds: int
  30. microseconds: int
  31. total_days: int
  32. def __repr__(self) -> str:
  33. return (
  34. f"{self.years} years "
  35. f"{self.months} months "
  36. f"{self.days} days "
  37. f"{self.hours} hours "
  38. f"{self.minutes} minutes "
  39. f"{self.seconds} seconds "
  40. f"{self.microseconds} microseconds"
  41. )
  42. def is_leap(year: int) -> bool:
  43. return year % 4 == 0 and (year % 100 != 0 or year % 400 == 0)
  44. def is_long_year(year: int) -> bool:
  45. def p(y: int) -> int:
  46. return y + y // 4 - y // 100 + y // 400
  47. return p(year) % 7 == 4 or p(year - 1) % 7 == 3
  48. def week_day(year: int, month: int, day: int) -> int:
  49. if month < 3:
  50. year -= 1
  51. w = (
  52. year
  53. + year // 4
  54. - year // 100
  55. + year // 400
  56. + DAY_OF_WEEK_TABLE[month - 1]
  57. + day
  58. ) % 7
  59. if not w:
  60. w = 7
  61. return w
  62. def days_in_year(year: int) -> int:
  63. if is_leap(year):
  64. return DAYS_PER_L_YEAR
  65. return DAYS_PER_N_YEAR
  66. def local_time(
  67. unix_time: int, utc_offset: int, microseconds: int
  68. ) -> tuple[int, int, int, int, int, int, int]:
  69. """
  70. Returns a UNIX time as a broken-down time
  71. for a particular transition type.
  72. """
  73. year = EPOCH_YEAR
  74. seconds = math.floor(unix_time)
  75. # Shift to a base year that is 400-year aligned.
  76. if seconds >= 0:
  77. seconds -= 10957 * SECS_PER_DAY
  78. year += 30 # == 2000
  79. else:
  80. seconds += (146097 - 10957) * SECS_PER_DAY
  81. year -= 370 # == 1600
  82. seconds += utc_offset
  83. # Handle years in chunks of 400/100/4/1
  84. year += 400 * (seconds // SECS_PER_400_YEARS)
  85. seconds %= SECS_PER_400_YEARS
  86. if seconds < 0:
  87. seconds += SECS_PER_400_YEARS
  88. year -= 400
  89. leap_year = 1 # 4-century aligned
  90. sec_per_100years = SECS_PER_100_YEARS[leap_year]
  91. while seconds >= sec_per_100years:
  92. seconds -= sec_per_100years
  93. year += 100
  94. leap_year = 0 # 1-century, non 4-century aligned
  95. sec_per_100years = SECS_PER_100_YEARS[leap_year]
  96. sec_per_4years = SECS_PER_4_YEARS[leap_year]
  97. while seconds >= sec_per_4years:
  98. seconds -= sec_per_4years
  99. year += 4
  100. leap_year = 1 # 4-year, non century aligned
  101. sec_per_4years = SECS_PER_4_YEARS[leap_year]
  102. sec_per_year = SECS_PER_YEAR[leap_year]
  103. while seconds >= sec_per_year:
  104. seconds -= sec_per_year
  105. year += 1
  106. leap_year = 0 # non 4-year aligned
  107. sec_per_year = SECS_PER_YEAR[leap_year]
  108. # Handle months and days
  109. month = TM_DECEMBER + 1
  110. day = seconds // SECS_PER_DAY + 1
  111. seconds %= SECS_PER_DAY
  112. while month != TM_JANUARY + 1:
  113. month_offset = MONTHS_OFFSETS[leap_year][month]
  114. if day > month_offset:
  115. day -= month_offset
  116. break
  117. month -= 1
  118. # Handle hours, minutes, seconds and microseconds
  119. hour, seconds = divmod(seconds, SECS_PER_HOUR)
  120. minute, second = divmod(seconds, SECS_PER_MIN)
  121. return year, month, day, hour, minute, second, microseconds
  122. def precise_diff(
  123. d1: datetime.datetime | datetime.date, d2: datetime.datetime | datetime.date
  124. ) -> PreciseDiff:
  125. """
  126. Calculate a precise difference between two datetimes.
  127. :param d1: The first datetime
  128. :param d2: The second datetime
  129. """
  130. sign = 1
  131. if d1 == d2:
  132. return PreciseDiff(0, 0, 0, 0, 0, 0, 0, 0)
  133. tzinfo1: datetime.tzinfo | None = (
  134. d1.tzinfo if isinstance(d1, datetime.datetime) else None
  135. )
  136. tzinfo2: datetime.tzinfo | None = (
  137. d2.tzinfo if isinstance(d2, datetime.datetime) else None
  138. )
  139. if (
  140. tzinfo1 is None
  141. and tzinfo2 is not None
  142. or tzinfo2 is None
  143. and tzinfo1 is not None
  144. ):
  145. raise ValueError(
  146. "Comparison between naive and aware datetimes is not supported"
  147. )
  148. if d1 > d2:
  149. d1, d2 = d2, d1
  150. sign = -1
  151. d_diff = 0
  152. hour_diff = 0
  153. min_diff = 0
  154. sec_diff = 0
  155. mic_diff = 0
  156. total_days = _day_number(d2.year, d2.month, d2.day) - _day_number(
  157. d1.year, d1.month, d1.day
  158. )
  159. in_same_tz = False
  160. tz1 = None
  161. tz2 = None
  162. # Trying to figure out the timezone names
  163. # If we can't find them, we assume different timezones
  164. if tzinfo1 and tzinfo2:
  165. tz1 = _get_tzinfo_name(tzinfo1)
  166. tz2 = _get_tzinfo_name(tzinfo2)
  167. in_same_tz = tz1 == tz2 and tz1 is not None
  168. if isinstance(d2, datetime.datetime):
  169. if isinstance(d1, datetime.datetime):
  170. # If we are not in the same timezone
  171. # we need to adjust
  172. #
  173. # We also need to adjust if we do not
  174. # have variable-length units
  175. if not in_same_tz or total_days == 0:
  176. offset1 = d1.utcoffset()
  177. offset2 = d2.utcoffset()
  178. if offset1:
  179. d1 = d1 - offset1
  180. if offset2:
  181. d2 = d2 - offset2
  182. hour_diff = d2.hour - d1.hour
  183. min_diff = d2.minute - d1.minute
  184. sec_diff = d2.second - d1.second
  185. mic_diff = d2.microsecond - d1.microsecond
  186. else:
  187. hour_diff = d2.hour
  188. min_diff = d2.minute
  189. sec_diff = d2.second
  190. mic_diff = d2.microsecond
  191. if mic_diff < 0:
  192. mic_diff += 1000000
  193. sec_diff -= 1
  194. if sec_diff < 0:
  195. sec_diff += 60
  196. min_diff -= 1
  197. if min_diff < 0:
  198. min_diff += 60
  199. hour_diff -= 1
  200. if hour_diff < 0:
  201. hour_diff += 24
  202. d_diff -= 1
  203. y_diff = d2.year - d1.year
  204. m_diff = d2.month - d1.month
  205. d_diff += d2.day - d1.day
  206. if d_diff < 0:
  207. year = d2.year
  208. month = d2.month
  209. if month == 1:
  210. month = 12
  211. year -= 1
  212. else:
  213. month -= 1
  214. leap = int(is_leap(year))
  215. days_in_last_month = DAYS_PER_MONTHS[leap][month]
  216. days_in_month = DAYS_PER_MONTHS[int(is_leap(d2.year))][d2.month]
  217. if d_diff < days_in_month - days_in_last_month:
  218. # We don't have a full month, we calculate days
  219. if days_in_last_month < d1.day:
  220. d_diff += d1.day
  221. else:
  222. d_diff += days_in_last_month
  223. elif d_diff == days_in_month - days_in_last_month:
  224. # We have exactly a full month
  225. # We remove the days difference
  226. # and add one to the months difference
  227. d_diff = 0
  228. m_diff += 1
  229. else:
  230. # We have a full month
  231. d_diff += days_in_last_month
  232. m_diff -= 1
  233. if m_diff < 0:
  234. m_diff += 12
  235. y_diff -= 1
  236. return PreciseDiff(
  237. sign * y_diff,
  238. sign * m_diff,
  239. sign * d_diff,
  240. sign * hour_diff,
  241. sign * min_diff,
  242. sign * sec_diff,
  243. sign * mic_diff,
  244. sign * total_days,
  245. )
  246. def _day_number(year: int, month: int, day: int) -> int:
  247. month = (month + 9) % 12
  248. year = year - month // 10
  249. return (
  250. 365 * year
  251. + year // 4
  252. - year // 100
  253. + year // 400
  254. + (month * 306 + 5) // 10
  255. + (day - 1)
  256. )
  257. def _get_tzinfo_name(tzinfo: datetime.tzinfo | None) -> str | None:
  258. if tzinfo is None:
  259. return None
  260. if hasattr(tzinfo, "key"):
  261. # zoneinfo timezone
  262. return cast(zoneinfo.ZoneInfo, tzinfo).key
  263. elif hasattr(tzinfo, "name"):
  264. # Pendulum timezone
  265. return cast(Timezone, tzinfo).name
  266. elif hasattr(tzinfo, "zone"):
  267. # pytz timezone
  268. return tzinfo.zone # type: ignore[no-any-return]
  269. return None