redis_cluster.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from __future__ import annotations
  2. import urllib
  3. from deprecated.sphinx import versionchanged
  4. from packaging.version import Version
  5. from limits.storage.redis import RedisStorage
  6. @versionchanged(
  7. version="3.14.0",
  8. reason="""
  9. Dropped support for the :pypi:`redis-py-cluster` library
  10. which has been abandoned/deprecated.
  11. """,
  12. )
  13. @versionchanged(
  14. version="2.5.0",
  15. reason="""
  16. Cluster support was provided by the :pypi:`redis-py-cluster` library
  17. which has been absorbed into the official :pypi:`redis` client. By
  18. default the :class:`redis.cluster.RedisCluster` client will be used
  19. however if the version of the package is lower than ``4.2.0`` the implementation
  20. will fallback to trying to use :class:`rediscluster.RedisCluster`.
  21. """,
  22. )
  23. @versionchanged(
  24. version="4.3",
  25. reason=(
  26. "Added support for using the redis client from :pypi:`valkey`"
  27. " if :paramref:`uri` has the ``valkey+cluster://`` schema"
  28. ),
  29. )
  30. class RedisClusterStorage(RedisStorage):
  31. """
  32. Rate limit storage with redis cluster as backend
  33. Depends on :pypi:`redis` (or :pypi:`valkey` if :paramref:`uri`
  34. starts with ``valkey+cluster://``).
  35. """
  36. STORAGE_SCHEME = ["redis+cluster", "valkey+cluster"]
  37. """The storage scheme for redis cluster"""
  38. DEFAULT_OPTIONS: dict[str, float | str | bool] = {
  39. "max_connections": 1000,
  40. }
  41. "Default options passed to the :class:`~redis.cluster.RedisCluster`"
  42. DEPENDENCIES = {
  43. "redis": Version("4.2.0"),
  44. "valkey": Version("6.0"),
  45. }
  46. def __init__(
  47. self,
  48. uri: str,
  49. wrap_exceptions: bool = False,
  50. **options: float | str | bool,
  51. ) -> None:
  52. """
  53. :param uri: url of the form
  54. ``redis+cluster://[:password]@host:port,host:port``
  55. If the uri scheme is ``valkey+cluster`` the implementation used will be from
  56. :pypi:`valkey`.
  57. :param wrap_exceptions: Whether to wrap storage exceptions in
  58. :exc:`limits.errors.StorageError` before raising it.
  59. :param options: all remaining keyword arguments are passed
  60. directly to the constructor of :class:`redis.cluster.RedisCluster`
  61. :raise ConfigurationError: when the :pypi:`redis` library is not
  62. available or if the redis cluster cannot be reached.
  63. """
  64. parsed = urllib.parse.urlparse(uri)
  65. parsed_auth: dict[str, float | str | bool] = {}
  66. if parsed.username:
  67. parsed_auth["username"] = parsed.username
  68. if parsed.password:
  69. parsed_auth["password"] = parsed.password
  70. sep = parsed.netloc.find("@") + 1
  71. cluster_hosts = []
  72. for loc in parsed.netloc[sep:].split(","):
  73. host, port = loc.split(":")
  74. cluster_hosts.append((host, int(port)))
  75. self.storage = None
  76. self.target_server = "valkey" if uri.startswith("valkey") else "redis"
  77. merged_options = {**self.DEFAULT_OPTIONS, **parsed_auth, **options}
  78. self.dependency = self.dependencies[self.target_server].module
  79. startup_nodes = [self.dependency.cluster.ClusterNode(*c) for c in cluster_hosts]
  80. if self.target_server == "redis":
  81. self.storage = self.dependency.cluster.RedisCluster(
  82. startup_nodes=startup_nodes, **merged_options
  83. )
  84. else:
  85. self.storage = self.dependency.cluster.ValkeyCluster(
  86. startup_nodes=startup_nodes, **merged_options
  87. )
  88. assert self.storage
  89. self.initialize_storage(uri)
  90. super(RedisStorage, self).__init__(uri, wrap_exceptions, **options)
  91. def reset(self) -> int | None:
  92. """
  93. Redis Clusters are sharded and deleting across shards
  94. can't be done atomically. Because of this, this reset loops over all
  95. keys that are prefixed with ``self.PREFIX`` and calls delete on them,
  96. one at a time.
  97. .. warning::
  98. This operation was not tested with extremely large data sets.
  99. On a large production based system, care should be taken with its
  100. usage as it could be slow on very large data sets"""
  101. prefix = self.prefixed_key("*")
  102. count = 0
  103. for primary in self.storage.get_primaries():
  104. node = self.storage.get_redis_connection(primary)
  105. keys = node.keys(prefix)
  106. count += sum([node.delete(k.decode("utf-8")) for k in keys])
  107. return count