123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- """ """
- from __future__ import annotations
- import dataclasses
- import importlib.resources
- import re
- import sys
- from collections import UserDict
- from types import ModuleType
- from typing import TYPE_CHECKING
- from packaging.version import Version
- from limits.typing import NamedTuple
- from .errors import ConfigurationError
- from .limits import GRANULARITIES, RateLimitItem
- SEPARATORS = re.compile(r"[,;|]{1}")
- SINGLE_EXPR = re.compile(
- r"""
- \s*([0-9]+)
- \s*(/|\s*per\s*)
- \s*([0-9]+)
- *\s*(hour|minute|second|day|month|year)s?\s*""",
- re.IGNORECASE | re.VERBOSE,
- )
- EXPR = re.compile(
- rf"^{SINGLE_EXPR.pattern}(:?{SEPARATORS.pattern}{SINGLE_EXPR.pattern})*$",
- re.IGNORECASE | re.VERBOSE,
- )
- class WindowStats(NamedTuple):
- """
- tuple to describe a rate limited window
- """
- #: Time as seconds since the Epoch when this window will be reset
- reset_time: float
- #: Quantity remaining in this window
- remaining: int
- @dataclasses.dataclass
- class Dependency:
- name: str
- version_required: Version | None
- version_found: Version | None
- module: ModuleType
- MissingModule = ModuleType("Missing")
- if TYPE_CHECKING:
- _UserDict = UserDict[str, Dependency]
- else:
- _UserDict = UserDict
- class DependencyDict(_UserDict):
- def __getitem__(self, key: str) -> Dependency:
- dependency = super().__getitem__(key)
- if dependency.module is MissingModule:
- message = f"'{dependency.name}' prerequisite not available."
- if dependency.version_required:
- message += (
- f" A minimum version of {dependency.version_required} is required."
- if dependency.version_required
- else ""
- )
- message += (
- " See https://limits.readthedocs.io/en/stable/storage.html#supported-versions"
- " for more details."
- )
- raise ConfigurationError(message)
- elif dependency.version_required and (
- not dependency.version_found
- or dependency.version_found < dependency.version_required
- ):
- raise ConfigurationError(
- f"The minimum version of {dependency.version_required}"
- f" for '{dependency.name}' could not be found. Found version: {dependency.version_found}"
- )
- return dependency
- class LazyDependency:
- """
- Simple utility that provides an :attr:`dependency`
- to the child class to fetch any dependencies
- without having to import them explicitly.
- """
- DEPENDENCIES: dict[str, Version | None] | list[str] = []
- """
- The python modules this class has a dependency on.
- Used to lazily populate the :attr:`dependencies`
- """
- def __init__(self) -> None:
- self._dependencies: DependencyDict = DependencyDict()
- @property
- def dependencies(self) -> DependencyDict:
- """
- Cached mapping of the modules this storage depends on.
- This is done so that the module is only imported lazily
- when the storage is instantiated.
- :meta private:
- """
- if not getattr(self, "_dependencies", None):
- dependencies = DependencyDict()
- mapping: dict[str, Version | None]
- if isinstance(self.DEPENDENCIES, list):
- mapping = {dependency: None for dependency in self.DEPENDENCIES}
- else:
- mapping = self.DEPENDENCIES
- for name, minimum_version in mapping.items():
- dependency, version = get_dependency(name)
- dependencies[name] = Dependency(
- name, minimum_version, version, dependency
- )
- self._dependencies = dependencies
- return self._dependencies
- def get_dependency(module_path: str) -> tuple[ModuleType, Version | None]:
- """
- safe function to import a module at runtime
- """
- try:
- if module_path not in sys.modules:
- __import__(module_path)
- root = module_path.split(".")[0]
- version = getattr(sys.modules[root], "__version__", "0.0.0")
- return sys.modules[module_path], Version(version)
- except ImportError: # pragma: no cover
- return MissingModule, None
- def get_package_data(path: str) -> bytes:
- return importlib.resources.files("limits").joinpath(path).read_bytes()
- def parse_many(limit_string: str) -> list[RateLimitItem]:
- """
- parses rate limits in string notation containing multiple rate limits
- (e.g. ``1/second; 5/minute``)
- :param limit_string: rate limit string using :ref:`ratelimit-string`
- :raise ValueError: if the string notation is invalid.
- """
- if not (isinstance(limit_string, str) and EXPR.match(limit_string)):
- raise ValueError(f"couldn't parse rate limit string '{limit_string}'")
- limits = []
- for limit in SEPARATORS.split(limit_string):
- match = SINGLE_EXPR.match(limit)
- if match:
- amount, _, multiples, granularity_string = match.groups()
- granularity = granularity_from_string(granularity_string)
- limits.append(
- granularity(int(amount), multiples and int(multiples) or None)
- )
- return limits
- def parse(limit_string: str) -> RateLimitItem:
- """
- parses a single rate limit in string notation
- (e.g. ``1/second`` or ``1 per second``)
- :param limit_string: rate limit string using :ref:`ratelimit-string`
- :raise ValueError: if the string notation is invalid.
- """
- return list(parse_many(limit_string))[0]
- def granularity_from_string(granularity_string: str) -> type[RateLimitItem]:
- """
- :param granularity_string:
- :raise ValueError:
- """
- for granularity in GRANULARITIES.values():
- if granularity.check_granularity_string(granularity_string):
- return granularity
- raise ValueError(f"no granularity matched for {granularity_string}")
|