googlecloudstoragecache.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import datetime
  2. import json
  3. import logging
  4. from flask_caching.backends.base import BaseCache
  5. logger = logging.getLogger(__name__)
  6. try:
  7. from google.auth.credentials import AnonymousCredentials
  8. from google.cloud import storage, exceptions
  9. except ImportError as e:
  10. raise RuntimeError("no google-cloud-storage module found") from e
  11. class GoogleCloudStorageCache(BaseCache):
  12. """Uses an Google Cloud Storage bucket as a cache backend.
  13. Note: User-contributed functionality. This project does not guarantee that
  14. this functionality will be maintained or functional at any given time.
  15. Note: Cache keys must meet GCS criteria for a valid object name (a sequence
  16. of Unicode characters whose UTF-8 encoding is at most 1024 bytes long).
  17. Note: Expired cache objects are not automatically purged. If
  18. delete_expired_objects_on_read=True, they will be deleted following an
  19. attempted read (which reduces performance). Otherwise, you have to delete
  20. stale objects yourself. Consider an GCS bucket lifecycle rule or other
  21. out-of-band process. For example you can use the following rule.
  22. {"rule": [{"action": {"type": "Delete"}, "condition": {"daysSinceCustomTime": 0}}]}
  23. https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
  24. :param bucket: Required. Name of the bucket to use. It must already exist.
  25. :param key_prefix: A prefix that should be added to all keys.
  26. :param default_timeout: the default timeout that is used if no timeout is
  27. specified on :meth:`~BaseCache.set`. A timeout of
  28. 0 indicates that the cache never expires.
  29. :param delete_expired_objects_on_read: If True, if a read finds a stale
  30. object, it will be deleted before
  31. a response is returned. Will slow
  32. down responses.
  33. :param anonymous: If true, use anonymous credentials. Useful for testing.
  34. Any additional keyword arguments will be passed to ``google.cloud.storage.Client``.
  35. """
  36. def __init__(
  37. self,
  38. bucket,
  39. key_prefix=None,
  40. default_timeout=300,
  41. delete_expired_objects_on_read=False,
  42. anonymous=False,
  43. **kwargs
  44. ):
  45. super().__init__(default_timeout)
  46. if not isinstance(bucket, str):
  47. raise ValueError("GCSCache bucket parameter must be a string")
  48. if anonymous:
  49. self._client = storage.Client(
  50. credentials=AnonymousCredentials(), project="test", **kwargs
  51. )
  52. else:
  53. self._client = storage.Client(**kwargs)
  54. self.bucket = self._client.get_bucket(bucket)
  55. self.key_prefix = key_prefix or ""
  56. self.default_timeout = default_timeout
  57. self.delete_expired_objects_on_read = delete_expired_objects_on_read
  58. @classmethod
  59. def factory(cls, app, config, args, kwargs):
  60. args.insert(0, config["CACHE_GCS_BUCKET"])
  61. key_prefix = config.get("CACHE_KEY_PREFIX")
  62. if key_prefix:
  63. kwargs["key_prefix"] = key_prefix
  64. return cls(*args, **kwargs)
  65. def get(self, key):
  66. result = None
  67. expired = False
  68. hit_or_miss = "miss"
  69. full_key = self.key_prefix + key
  70. blob = self.bucket.get_blob(full_key)
  71. if blob is not None:
  72. expired = blob.custom_time and self._now() > blob.custom_time
  73. if expired:
  74. # Object is stale
  75. if self.delete_expired_objects_on_read:
  76. self._delete(full_key)
  77. else:
  78. try:
  79. result = blob.download_as_bytes()
  80. hit_or_miss = "hit"
  81. if blob.content_type == "application/json":
  82. result = json.loads(result)
  83. except exceptions.NotFound:
  84. pass
  85. expiredstr = "(expired)" if expired else ""
  86. logger.debug("get key %r -> %s %s", full_key, hit_or_miss, expiredstr)
  87. return result
  88. def set(self, key, value, timeout=None):
  89. result = False
  90. full_key = self.key_prefix + key
  91. content_type = "application/json"
  92. try:
  93. value = json.dumps(value)
  94. except (UnicodeDecodeError, TypeError):
  95. content_type = "application/octet-stream"
  96. blob = self.bucket.blob(full_key)
  97. if timeout is None:
  98. timeout = self.default_timeout
  99. if timeout != 0:
  100. # Use 'Custom-Time' for expiry
  101. # https://cloud.google.com/storage/docs/metadata#custom-time
  102. blob.custom_time = self._now(delta=timeout)
  103. try:
  104. blob.upload_from_string(value, content_type=content_type)
  105. result = True
  106. except exceptions.TooManyRequests:
  107. pass
  108. logger.debug("set key %r -> %s", full_key, result)
  109. return result
  110. def add(self, key, value, timeout=None):
  111. full_key = self.key_prefix + key
  112. if self._has(full_key):
  113. logger.debug("add key %r -> not added", full_key)
  114. return False
  115. else:
  116. return self.set(key, value, timeout)
  117. def delete(self, key):
  118. full_key = self.key_prefix + key
  119. return self._delete(full_key)
  120. def delete_many(self, *keys):
  121. return self._delete_many(self.key_prefix + key for key in keys)
  122. def has(self, key):
  123. full_key = self.key_prefix + key
  124. return self._has(full_key)
  125. def clear(self):
  126. return self._prune(clear_all=True)
  127. def _prune(self, clear_all=False):
  128. # Delete in batches of 100 which is much faster than individual deletes
  129. nremoved = 0
  130. now = self._now()
  131. response_iterator = self._client.list_blobs(
  132. self.bucket,
  133. prefix=self.key_prefix,
  134. fields="items(name,customTime),nextPageToken",
  135. )
  136. to_delete = []
  137. for blob in response_iterator:
  138. if clear_all or blob.custom_time and blob.custom_time < now:
  139. to_delete.append(blob.name)
  140. nremoved += 1
  141. if len(to_delete) == 100:
  142. self._delete_many(to_delete)
  143. to_delete = []
  144. # Delete the remainder
  145. if to_delete:
  146. self._delete_many(to_delete)
  147. logger.debug("evicted %d key(s)", nremoved)
  148. return True
  149. def _delete(self, key):
  150. return self._delete_many([key])
  151. def _delete_many(self, keys):
  152. try:
  153. with self._client.batch():
  154. for key in keys:
  155. self.bucket.delete_blob(key)
  156. except (exceptions.NotFound, exceptions.TooManyRequests):
  157. pass
  158. return True
  159. def _has(self, key):
  160. result = False
  161. expired = False
  162. blob = self.bucket.get_blob(key)
  163. if blob is not None:
  164. expired = blob.custom_time and self._now() > blob.custom_time
  165. if expired:
  166. # Exists but is stale
  167. if self.delete_expired_objects_on_read:
  168. self._delete(key)
  169. else:
  170. result = True
  171. expiredstr = "(expired)" if expired else ""
  172. logger.debug("has key %r -> %s %s", key, result, expiredstr)
  173. return result
  174. def _now(self, delta=0):
  175. return datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
  176. seconds=delta
  177. )