redis_sentinel.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. from __future__ import annotations
  2. import urllib.parse
  3. from typing import TYPE_CHECKING
  4. from deprecated.sphinx import versionchanged
  5. from packaging.version import Version
  6. from limits.errors import ConfigurationError
  7. from limits.storage.redis import RedisStorage
  8. from limits.typing import RedisClient
  9. if TYPE_CHECKING:
  10. pass
  11. @versionchanged(
  12. version="4.3",
  13. reason=(
  14. "Added support for using the redis client from :pypi:`valkey`"
  15. " if :paramref:`uri` has the ``valkey+sentinel://`` schema"
  16. ),
  17. )
  18. class RedisSentinelStorage(RedisStorage):
  19. """
  20. Rate limit storage with redis sentinel as backend
  21. Depends on :pypi:`redis` package (or :pypi:`valkey` if :paramref:`uri` starts with
  22. ``valkey+sentinel://``)
  23. """
  24. STORAGE_SCHEME = ["redis+sentinel", "valkey+sentinel"]
  25. """The storage scheme for redis accessed via a redis sentinel installation"""
  26. DEPENDENCIES = {
  27. "redis": Version("3.0"),
  28. "redis.sentinel": Version("3.0"),
  29. "valkey": Version("6.0"),
  30. "valkey.sentinel": Version("6.0"),
  31. }
  32. def __init__(
  33. self,
  34. uri: str,
  35. service_name: str | None = None,
  36. use_replicas: bool = True,
  37. sentinel_kwargs: dict[str, float | str | bool] | None = None,
  38. wrap_exceptions: bool = False,
  39. **options: float | str | bool,
  40. ) -> None:
  41. """
  42. :param uri: url of the form
  43. ``redis+sentinel://host:port,host:port/service_name``
  44. If the uri scheme is ``valkey+sentinel`` the implementation used will be from
  45. :pypi:`valkey`.
  46. :param service_name: sentinel service name
  47. (if not provided in :attr:`uri`)
  48. :param use_replicas: Whether to use replicas for read only operations
  49. :param sentinel_kwargs: kwargs to pass as
  50. :attr:`sentinel_kwargs` to :class:`redis.sentinel.Sentinel`
  51. :param wrap_exceptions: Whether to wrap storage exceptions in
  52. :exc:`limits.errors.StorageError` before raising it.
  53. :param options: all remaining keyword arguments are passed
  54. directly to the constructor of :class:`redis.sentinel.Sentinel`
  55. :raise ConfigurationError: when the redis library is not available
  56. or if the redis master host cannot be pinged.
  57. """
  58. super(RedisStorage, self).__init__(
  59. uri, wrap_exceptions=wrap_exceptions, **options
  60. )
  61. parsed = urllib.parse.urlparse(uri)
  62. sentinel_configuration = []
  63. sentinel_options = sentinel_kwargs.copy() if sentinel_kwargs else {}
  64. parsed_auth: dict[str, float | str | bool] = {}
  65. if parsed.username:
  66. parsed_auth["username"] = parsed.username
  67. if parsed.password:
  68. parsed_auth["password"] = parsed.password
  69. sep = parsed.netloc.find("@") + 1
  70. for loc in parsed.netloc[sep:].split(","):
  71. host, port = loc.split(":")
  72. sentinel_configuration.append((host, int(port)))
  73. self.service_name = (
  74. parsed.path.replace("/", "") if parsed.path else service_name
  75. )
  76. if self.service_name is None:
  77. raise ConfigurationError("'service_name' not provided")
  78. self.target_server = "valkey" if uri.startswith("valkey") else "redis"
  79. sentinel_dep = self.dependencies[f"{self.target_server}.sentinel"].module
  80. self.sentinel = sentinel_dep.Sentinel(
  81. sentinel_configuration,
  82. sentinel_kwargs={**parsed_auth, **sentinel_options},
  83. **{**parsed_auth, **options},
  84. )
  85. self.storage: RedisClient = self.sentinel.master_for(self.service_name)
  86. self.storage_slave: RedisClient = self.sentinel.slave_for(self.service_name)
  87. self.use_replicas = use_replicas
  88. self.initialize_storage(uri)
  89. @property
  90. def base_exceptions(
  91. self,
  92. ) -> type[Exception] | tuple[type[Exception], ...]: # pragma: no cover
  93. return ( # type: ignore[no-any-return]
  94. self.dependencies["redis"].module.RedisError
  95. if self.target_server == "redis"
  96. else self.dependencies["valkey"].module.ValkeyError
  97. )
  98. def get_connection(self, readonly: bool = False) -> RedisClient:
  99. return self.storage_slave if (readonly and self.use_replicas) else self.storage