util.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. """ """
  2. from __future__ import annotations
  3. import dataclasses
  4. import importlib.resources
  5. import re
  6. import sys
  7. from collections import UserDict
  8. from types import ModuleType
  9. from typing import TYPE_CHECKING
  10. from packaging.version import Version
  11. from limits.typing import NamedTuple
  12. from .errors import ConfigurationError
  13. from .limits import GRANULARITIES, RateLimitItem
  14. SEPARATORS = re.compile(r"[,;|]{1}")
  15. SINGLE_EXPR = re.compile(
  16. r"""
  17. \s*([0-9]+)
  18. \s*(/|\s*per\s*)
  19. \s*([0-9]+)
  20. *\s*(hour|minute|second|day|month|year)s?\s*""",
  21. re.IGNORECASE | re.VERBOSE,
  22. )
  23. EXPR = re.compile(
  24. rf"^{SINGLE_EXPR.pattern}(:?{SEPARATORS.pattern}{SINGLE_EXPR.pattern})*$",
  25. re.IGNORECASE | re.VERBOSE,
  26. )
  27. class WindowStats(NamedTuple):
  28. """
  29. tuple to describe a rate limited window
  30. """
  31. #: Time as seconds since the Epoch when this window will be reset
  32. reset_time: float
  33. #: Quantity remaining in this window
  34. remaining: int
  35. @dataclasses.dataclass
  36. class Dependency:
  37. name: str
  38. version_required: Version | None
  39. version_found: Version | None
  40. module: ModuleType
  41. MissingModule = ModuleType("Missing")
  42. if TYPE_CHECKING:
  43. _UserDict = UserDict[str, Dependency]
  44. else:
  45. _UserDict = UserDict
  46. class DependencyDict(_UserDict):
  47. def __getitem__(self, key: str) -> Dependency:
  48. dependency = super().__getitem__(key)
  49. if dependency.module is MissingModule:
  50. message = f"'{dependency.name}' prerequisite not available."
  51. if dependency.version_required:
  52. message += (
  53. f" A minimum version of {dependency.version_required} is required."
  54. if dependency.version_required
  55. else ""
  56. )
  57. message += (
  58. " See https://limits.readthedocs.io/en/stable/storage.html#supported-versions"
  59. " for more details."
  60. )
  61. raise ConfigurationError(message)
  62. elif dependency.version_required and (
  63. not dependency.version_found
  64. or dependency.version_found < dependency.version_required
  65. ):
  66. raise ConfigurationError(
  67. f"The minimum version of {dependency.version_required}"
  68. f" for '{dependency.name}' could not be found. Found version: {dependency.version_found}"
  69. )
  70. return dependency
  71. class LazyDependency:
  72. """
  73. Simple utility that provides an :attr:`dependency`
  74. to the child class to fetch any dependencies
  75. without having to import them explicitly.
  76. """
  77. DEPENDENCIES: dict[str, Version | None] | list[str] = []
  78. """
  79. The python modules this class has a dependency on.
  80. Used to lazily populate the :attr:`dependencies`
  81. """
  82. def __init__(self) -> None:
  83. self._dependencies: DependencyDict = DependencyDict()
  84. @property
  85. def dependencies(self) -> DependencyDict:
  86. """
  87. Cached mapping of the modules this storage depends on.
  88. This is done so that the module is only imported lazily
  89. when the storage is instantiated.
  90. :meta private:
  91. """
  92. if not getattr(self, "_dependencies", None):
  93. dependencies = DependencyDict()
  94. mapping: dict[str, Version | None]
  95. if isinstance(self.DEPENDENCIES, list):
  96. mapping = {dependency: None for dependency in self.DEPENDENCIES}
  97. else:
  98. mapping = self.DEPENDENCIES
  99. for name, minimum_version in mapping.items():
  100. dependency, version = get_dependency(name)
  101. dependencies[name] = Dependency(
  102. name, minimum_version, version, dependency
  103. )
  104. self._dependencies = dependencies
  105. return self._dependencies
  106. def get_dependency(module_path: str) -> tuple[ModuleType, Version | None]:
  107. """
  108. safe function to import a module at runtime
  109. """
  110. try:
  111. if module_path not in sys.modules:
  112. __import__(module_path)
  113. root = module_path.split(".")[0]
  114. version = getattr(sys.modules[root], "__version__", "0.0.0")
  115. return sys.modules[module_path], Version(version)
  116. except ImportError: # pragma: no cover
  117. return MissingModule, None
  118. def get_package_data(path: str) -> bytes:
  119. return importlib.resources.files("limits").joinpath(path).read_bytes()
  120. def parse_many(limit_string: str) -> list[RateLimitItem]:
  121. """
  122. parses rate limits in string notation containing multiple rate limits
  123. (e.g. ``1/second; 5/minute``)
  124. :param limit_string: rate limit string using :ref:`ratelimit-string`
  125. :raise ValueError: if the string notation is invalid.
  126. """
  127. if not (isinstance(limit_string, str) and EXPR.match(limit_string)):
  128. raise ValueError(f"couldn't parse rate limit string '{limit_string}'")
  129. limits = []
  130. for limit in SEPARATORS.split(limit_string):
  131. match = SINGLE_EXPR.match(limit)
  132. if match:
  133. amount, _, multiples, granularity_string = match.groups()
  134. granularity = granularity_from_string(granularity_string)
  135. limits.append(
  136. granularity(int(amount), multiples and int(multiples) or None)
  137. )
  138. return limits
  139. def parse(limit_string: str) -> RateLimitItem:
  140. """
  141. parses a single rate limit in string notation
  142. (e.g. ``1/second`` or ``1 per second``)
  143. :param limit_string: rate limit string using :ref:`ratelimit-string`
  144. :raise ValueError: if the string notation is invalid.
  145. """
  146. return list(parse_many(limit_string))[0]
  147. def granularity_from_string(granularity_string: str) -> type[RateLimitItem]:
  148. """
  149. :param granularity_string:
  150. :raise ValueError:
  151. """
  152. for granularity in GRANULARITIES.values():
  153. if granularity.check_granularity_string(granularity_string):
  154. return granularity
  155. raise ValueError(f"no granularity matched for {granularity_string}")