limits.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. """ """
  2. from __future__ import annotations
  3. from functools import total_ordering
  4. from limits.typing import ClassVar, NamedTuple, cast
  5. def safe_string(value: bytes | str | int | float) -> str:
  6. """
  7. normalize a byte/str/int or float to a str
  8. """
  9. if isinstance(value, bytes):
  10. return value.decode()
  11. return str(value)
  12. class Granularity(NamedTuple):
  13. seconds: int
  14. name: str
  15. TIME_TYPES = dict(
  16. day=Granularity(60 * 60 * 24, "day"),
  17. month=Granularity(60 * 60 * 24 * 30, "month"),
  18. year=Granularity(60 * 60 * 24 * 30 * 12, "year"),
  19. hour=Granularity(60 * 60, "hour"),
  20. minute=Granularity(60, "minute"),
  21. second=Granularity(1, "second"),
  22. )
  23. GRANULARITIES: dict[str, type[RateLimitItem]] = {}
  24. class RateLimitItemMeta(type):
  25. def __new__(
  26. cls,
  27. name: str,
  28. parents: tuple[type, ...],
  29. dct: dict[str, Granularity | list[str]],
  30. ) -> RateLimitItemMeta:
  31. if "__slots__" not in dct:
  32. dct["__slots__"] = []
  33. granularity = super().__new__(cls, name, parents, dct)
  34. if "GRANULARITY" in dct:
  35. GRANULARITIES[dct["GRANULARITY"][1]] = cast(
  36. type[RateLimitItem], granularity
  37. )
  38. return granularity
  39. # pylint: disable=no-member
  40. @total_ordering
  41. class RateLimitItem(metaclass=RateLimitItemMeta):
  42. """
  43. defines a Rate limited resource which contains the characteristic
  44. namespace, amount and granularity multiples of the rate limiting window.
  45. :param amount: the rate limit amount
  46. :param multiples: multiple of the 'per' :attr:`GRANULARITY`
  47. (e.g. 'n' per 'm' seconds)
  48. :param namespace: category for the specific rate limit
  49. """
  50. __slots__ = ["namespace", "amount", "multiples"]
  51. GRANULARITY: ClassVar[Granularity]
  52. """
  53. A tuple describing the granularity of this limit as
  54. (number of seconds, name)
  55. """
  56. def __init__(
  57. self, amount: int, multiples: int | None = 1, namespace: str = "LIMITER"
  58. ):
  59. self.namespace = namespace
  60. self.amount = int(amount)
  61. self.multiples = int(multiples or 1)
  62. @classmethod
  63. def check_granularity_string(cls, granularity_string: str) -> bool:
  64. """
  65. Checks if this instance matches a *granularity_string*
  66. of type ``n per hour``, ``n per minute`` etc,
  67. by comparing with :attr:`GRANULARITY`
  68. """
  69. return granularity_string.lower() in cls.GRANULARITY.name
  70. def get_expiry(self) -> int:
  71. """
  72. :return: the duration the limit is enforced for in seconds.
  73. """
  74. return self.GRANULARITY.seconds * self.multiples
  75. def key_for(self, *identifiers: bytes | str | int | float) -> str:
  76. """
  77. Constructs a key for the current limit and any additional
  78. identifiers provided.
  79. :param identifiers: a list of strings to append to the key
  80. :return: a string key identifying this resource with
  81. each identifier separated with a '/' delimiter.
  82. """
  83. remainder = "/".join(
  84. [safe_string(k) for k in identifiers]
  85. + [
  86. safe_string(self.amount),
  87. safe_string(self.multiples),
  88. self.GRANULARITY.name,
  89. ]
  90. )
  91. return f"{self.namespace}/{remainder}"
  92. def __eq__(self, other: object) -> bool:
  93. if isinstance(other, RateLimitItem):
  94. return (
  95. self.amount == other.amount
  96. and self.GRANULARITY == other.GRANULARITY
  97. and self.multiples == other.multiples
  98. )
  99. return False
  100. def __repr__(self) -> str:
  101. return f"{self.amount} per {self.multiples} {self.GRANULARITY.name}"
  102. def __lt__(self, other: RateLimitItem) -> bool:
  103. return self.GRANULARITY.seconds < other.GRANULARITY.seconds
  104. def __hash__(self) -> int:
  105. return hash((self.namespace, self.amount, self.multiples, self.GRANULARITY))
  106. class RateLimitItemPerYear(RateLimitItem):
  107. """
  108. per year rate limited resource.
  109. """
  110. GRANULARITY = TIME_TYPES["year"]
  111. """A year"""
  112. class RateLimitItemPerMonth(RateLimitItem):
  113. """
  114. per month rate limited resource.
  115. """
  116. GRANULARITY = TIME_TYPES["month"]
  117. """A month"""
  118. class RateLimitItemPerDay(RateLimitItem):
  119. """
  120. per day rate limited resource.
  121. """
  122. GRANULARITY = TIME_TYPES["day"]
  123. """A day"""
  124. class RateLimitItemPerHour(RateLimitItem):
  125. """
  126. per hour rate limited resource.
  127. """
  128. GRANULARITY = TIME_TYPES["hour"]
  129. """An hour"""
  130. class RateLimitItemPerMinute(RateLimitItem):
  131. """
  132. per minute rate limited resource.
  133. """
  134. GRANULARITY = TIME_TYPES["minute"]
  135. """A minute"""
  136. class RateLimitItemPerSecond(RateLimitItem):
  137. """
  138. per second rate limited resource.
  139. """
  140. GRANULARITY = TIME_TYPES["second"]
  141. """A second"""