etcd.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from __future__ import annotations
  2. import time
  3. import urllib.parse
  4. from deprecated.sphinx import deprecated
  5. from limits.errors import ConcurrentUpdateError
  6. from limits.storage.base import Storage
  7. from limits.typing import TYPE_CHECKING
  8. if TYPE_CHECKING:
  9. import etcd3
  10. @deprecated(version="4.4")
  11. class EtcdStorage(Storage):
  12. """
  13. Rate limit storage with etcd as backend.
  14. Depends on :pypi:`etcd3`.
  15. """
  16. STORAGE_SCHEME = ["etcd"]
  17. """The storage scheme for etcd"""
  18. DEPENDENCIES = ["etcd3"]
  19. PREFIX = "limits"
  20. MAX_RETRIES = 5
  21. def __init__(
  22. self,
  23. uri: str,
  24. max_retries: int = MAX_RETRIES,
  25. wrap_exceptions: bool = False,
  26. **options: str,
  27. ) -> None:
  28. """
  29. :param uri: etcd location of the form
  30. ``etcd://host:port``,
  31. :param max_retries: Maximum number of attempts to retry
  32. in the case of concurrent updates to a rate limit key
  33. :param wrap_exceptions: Whether to wrap storage exceptions in
  34. :exc:`limits.errors.StorageError` before raising it.
  35. :param options: all remaining keyword arguments are passed
  36. directly to the constructor of :class:`etcd3.Etcd3Client`
  37. :raise ConfigurationError: when :pypi:`etcd3` is not available
  38. """
  39. parsed = urllib.parse.urlparse(uri)
  40. self.lib = self.dependencies["etcd3"].module
  41. self.storage: etcd3.Etcd3Client = self.lib.client(
  42. parsed.hostname, parsed.port, **options
  43. )
  44. self.max_retries = max_retries
  45. super().__init__(uri, wrap_exceptions=wrap_exceptions)
  46. @property
  47. def base_exceptions(
  48. self,
  49. ) -> type[Exception] | tuple[type[Exception], ...]: # pragma: no cover
  50. return self.lib.Etcd3Exception # type: ignore[no-any-return]
  51. def prefixed_key(self, key: str) -> bytes:
  52. return f"{self.PREFIX}/{key}".encode()
  53. def incr(
  54. self, key: str, expiry: int, elastic_expiry: bool = False, amount: int = 1
  55. ) -> int:
  56. retries = 0
  57. etcd_key = self.prefixed_key(key)
  58. while retries < self.max_retries:
  59. now = time.time()
  60. lease = self.storage.lease(expiry)
  61. window_end = now + expiry
  62. create_attempt = self.storage.transaction(
  63. compare=[self.storage.transactions.create(etcd_key) == "0"],
  64. success=[
  65. self.storage.transactions.put(
  66. etcd_key,
  67. f"{amount}:{window_end}".encode(),
  68. lease=lease.id,
  69. )
  70. ],
  71. failure=[self.storage.transactions.get(etcd_key)],
  72. )
  73. if create_attempt[0]:
  74. return amount
  75. else:
  76. cur, meta = create_attempt[1][0][0]
  77. cur_value, window_end = cur.split(b":")
  78. window_end = float(window_end)
  79. if window_end <= now:
  80. self.storage.revoke_lease(meta.lease_id)
  81. self.storage.delete(etcd_key)
  82. else:
  83. if elastic_expiry:
  84. self.storage.refresh_lease(meta.lease_id)
  85. window_end = now + expiry
  86. new = int(cur_value) + amount
  87. if self.storage.transaction(
  88. compare=[self.storage.transactions.value(etcd_key) == cur],
  89. success=[
  90. self.storage.transactions.put(
  91. etcd_key,
  92. f"{new}:{window_end}".encode(),
  93. lease=meta.lease_id,
  94. )
  95. ],
  96. failure=[],
  97. )[0]:
  98. return new
  99. retries += 1
  100. raise ConcurrentUpdateError(key, retries)
  101. def get(self, key: str) -> int:
  102. value, meta = self.storage.get(self.prefixed_key(key))
  103. if value:
  104. amount, expiry = value.split(b":")
  105. if float(expiry) > time.time():
  106. return int(amount)
  107. return 0
  108. def get_expiry(self, key: str) -> float:
  109. value, _ = self.storage.get(self.prefixed_key(key))
  110. if value:
  111. return float(value.split(b":")[1])
  112. return time.time()
  113. def check(self) -> bool:
  114. try:
  115. self.storage.status()
  116. return True
  117. except: # noqa
  118. return False
  119. def reset(self) -> int | None:
  120. return self.storage.delete_prefix(f"{self.PREFIX}/").deleted
  121. def clear(self, key: str) -> None:
  122. self.storage.delete(self.prefixed_key(key))