traveller.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. from __future__ import annotations
  2. from typing import TYPE_CHECKING
  3. from typing import cast
  4. from pendulum.datetime import DateTime
  5. from pendulum.utils._compat import PYPY
  6. if TYPE_CHECKING:
  7. from types import TracebackType
  8. from typing_extensions import Self
  9. class BaseTraveller:
  10. def __init__(self, datetime_class: type[DateTime] = DateTime) -> None:
  11. self._datetime_class: type[DateTime] = datetime_class
  12. def freeze(self) -> Self:
  13. raise self._not_implemented()
  14. def travel_back(self) -> Self:
  15. raise self._not_implemented()
  16. def travel(
  17. self,
  18. years: int = 0,
  19. months: int = 0,
  20. weeks: int = 0,
  21. days: int = 0,
  22. hours: int = 0,
  23. minutes: int = 0,
  24. seconds: int = 0,
  25. microseconds: int = 0,
  26. ) -> Self:
  27. raise self._not_implemented()
  28. def travel_to(self, dt: DateTime, *, freeze: bool = False) -> Self:
  29. raise self._not_implemented()
  30. def __enter__(self) -> Self:
  31. return self
  32. def __exit__(
  33. self,
  34. exc_type: type[BaseException] | None,
  35. exc_val: BaseException | None,
  36. exc_tb: TracebackType,
  37. ) -> None:
  38. ...
  39. def _not_implemented(self) -> NotImplementedError:
  40. return NotImplementedError()
  41. if not PYPY:
  42. try:
  43. import time_machine
  44. except ImportError:
  45. time_machine = None # type: ignore[assignment]
  46. if time_machine is not None:
  47. class Traveller(BaseTraveller):
  48. def __init__(self, datetime_class: type[DateTime] = DateTime) -> None:
  49. super().__init__(datetime_class)
  50. self._started: bool = False
  51. self._traveller: time_machine.travel | None = None
  52. self._coordinates: time_machine.Coordinates | None = None
  53. def freeze(self) -> Self:
  54. if self._started:
  55. cast(time_machine.Coordinates, self._coordinates).move_to(
  56. self._datetime_class.now(), tick=False
  57. )
  58. else:
  59. self._start(freeze=True)
  60. return self
  61. def travel_back(self) -> Self:
  62. if not self._started:
  63. return self
  64. cast(time_machine.travel, self._traveller).stop()
  65. self._coordinates = None
  66. self._traveller = None
  67. self._started = False
  68. return self
  69. def travel(
  70. self,
  71. years: int = 0,
  72. months: int = 0,
  73. weeks: int = 0,
  74. days: int = 0,
  75. hours: int = 0,
  76. minutes: int = 0,
  77. seconds: int = 0,
  78. microseconds: int = 0,
  79. *,
  80. freeze: bool = False,
  81. ) -> Self:
  82. self._start(freeze=freeze)
  83. cast(time_machine.Coordinates, self._coordinates).move_to(
  84. self._datetime_class.now().add(
  85. years=years,
  86. months=months,
  87. weeks=weeks,
  88. days=days,
  89. hours=hours,
  90. minutes=minutes,
  91. seconds=seconds,
  92. microseconds=microseconds,
  93. )
  94. )
  95. return self
  96. def travel_to(self, dt: DateTime, *, freeze: bool = False) -> Self:
  97. self._start(freeze=freeze)
  98. cast(time_machine.Coordinates, self._coordinates).move_to(dt)
  99. return self
  100. def _start(self, freeze: bool = False) -> None:
  101. if self._started:
  102. return
  103. if not self._traveller:
  104. self._traveller = time_machine.travel(
  105. self._datetime_class.now(), tick=not freeze
  106. )
  107. self._coordinates = self._traveller.start()
  108. self._started = True
  109. def __enter__(self) -> Self:
  110. self._start()
  111. return self
  112. def __exit__(
  113. self,
  114. exc_type: type[BaseException] | None,
  115. exc_val: BaseException | None,
  116. exc_tb: TracebackType,
  117. ) -> None:
  118. self.travel_back()
  119. else:
  120. class Traveller(BaseTraveller): # type: ignore[no-redef]
  121. def _not_implemented(self) -> NotImplementedError:
  122. return NotImplementedError(
  123. "Time travelling is an optional feature. "
  124. 'You can add it by installing Pendulum with the "test" extra.'
  125. )
  126. else:
  127. class Traveller(BaseTraveller): # type: ignore[no-redef]
  128. def _not_implemented(self) -> NotImplementedError:
  129. return NotImplementedError(
  130. "Time travelling is not supported on the PyPy Python implementation."
  131. )