123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- import datetime
- import json
- import logging
- from flask_caching.backends.base import BaseCache
- logger = logging.getLogger(__name__)
- try:
- from google.auth.credentials import AnonymousCredentials
- from google.cloud import storage, exceptions
- except ImportError as e:
- raise RuntimeError("no google-cloud-storage module found") from e
- class GoogleCloudStorageCache(BaseCache):
- """Uses an Google Cloud Storage bucket as a cache backend.
- Note: User-contributed functionality. This project does not guarantee that
- this functionality will be maintained or functional at any given time.
- Note: Cache keys must meet GCS criteria for a valid object name (a sequence
- of Unicode characters whose UTF-8 encoding is at most 1024 bytes long).
- Note: Expired cache objects are not automatically purged. If
- delete_expired_objects_on_read=True, they will be deleted following an
- attempted read (which reduces performance). Otherwise, you have to delete
- stale objects yourself. Consider an GCS bucket lifecycle rule or other
- out-of-band process. For example you can use the following rule.
- {"rule": [{"action": {"type": "Delete"}, "condition": {"daysSinceCustomTime": 0}}]}
- https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime
- :param bucket: Required. Name of the bucket to use. It must already exist.
- :param key_prefix: A prefix that should be added to all keys.
- :param default_timeout: the default timeout that is used if no timeout is
- specified on :meth:`~BaseCache.set`. A timeout of
- 0 indicates that the cache never expires.
- :param delete_expired_objects_on_read: If True, if a read finds a stale
- object, it will be deleted before
- a response is returned. Will slow
- down responses.
- :param anonymous: If true, use anonymous credentials. Useful for testing.
- Any additional keyword arguments will be passed to ``google.cloud.storage.Client``.
- """
- def __init__(
- self,
- bucket,
- key_prefix=None,
- default_timeout=300,
- delete_expired_objects_on_read=False,
- anonymous=False,
- **kwargs
- ):
- super().__init__(default_timeout)
- if not isinstance(bucket, str):
- raise ValueError("GCSCache bucket parameter must be a string")
- if anonymous:
- self._client = storage.Client(
- credentials=AnonymousCredentials(), project="test", **kwargs
- )
- else:
- self._client = storage.Client(**kwargs)
- self.bucket = self._client.get_bucket(bucket)
- self.key_prefix = key_prefix or ""
- self.default_timeout = default_timeout
- self.delete_expired_objects_on_read = delete_expired_objects_on_read
- @classmethod
- def factory(cls, app, config, args, kwargs):
- args.insert(0, config["CACHE_GCS_BUCKET"])
- key_prefix = config.get("CACHE_KEY_PREFIX")
- if key_prefix:
- kwargs["key_prefix"] = key_prefix
- return cls(*args, **kwargs)
- def get(self, key):
- result = None
- expired = False
- hit_or_miss = "miss"
- full_key = self.key_prefix + key
- blob = self.bucket.get_blob(full_key)
- if blob is not None:
- expired = blob.custom_time and self._now() > blob.custom_time
- if expired:
- # Object is stale
- if self.delete_expired_objects_on_read:
- self._delete(full_key)
- else:
- try:
- result = blob.download_as_bytes()
- hit_or_miss = "hit"
- if blob.content_type == "application/json":
- result = json.loads(result)
- except exceptions.NotFound:
- pass
- expiredstr = "(expired)" if expired else ""
- logger.debug("get key %r -> %s %s", full_key, hit_or_miss, expiredstr)
- return result
- def set(self, key, value, timeout=None):
- result = False
- full_key = self.key_prefix + key
- content_type = "application/json"
- try:
- value = json.dumps(value)
- except (UnicodeDecodeError, TypeError):
- content_type = "application/octet-stream"
- blob = self.bucket.blob(full_key)
- if timeout is None:
- timeout = self.default_timeout
- if timeout != 0:
- # Use 'Custom-Time' for expiry
- # https://cloud.google.com/storage/docs/metadata#custom-time
- blob.custom_time = self._now(delta=timeout)
- try:
- blob.upload_from_string(value, content_type=content_type)
- result = True
- except exceptions.TooManyRequests:
- pass
- logger.debug("set key %r -> %s", full_key, result)
- return result
- def add(self, key, value, timeout=None):
- full_key = self.key_prefix + key
- if self._has(full_key):
- logger.debug("add key %r -> not added", full_key)
- return False
- else:
- return self.set(key, value, timeout)
- def delete(self, key):
- full_key = self.key_prefix + key
- return self._delete(full_key)
- def delete_many(self, *keys):
- return self._delete_many(self.key_prefix + key for key in keys)
- def has(self, key):
- full_key = self.key_prefix + key
- return self._has(full_key)
- def clear(self):
- return self._prune(clear_all=True)
- def _prune(self, clear_all=False):
- # Delete in batches of 100 which is much faster than individual deletes
- nremoved = 0
- now = self._now()
- response_iterator = self._client.list_blobs(
- self.bucket,
- prefix=self.key_prefix,
- fields="items(name,customTime),nextPageToken",
- )
- to_delete = []
- for blob in response_iterator:
- if clear_all or blob.custom_time and blob.custom_time < now:
- to_delete.append(blob.name)
- nremoved += 1
- if len(to_delete) == 100:
- self._delete_many(to_delete)
- to_delete = []
- # Delete the remainder
- if to_delete:
- self._delete_many(to_delete)
- logger.debug("evicted %d key(s)", nremoved)
- return True
- def _delete(self, key):
- return self._delete_many([key])
- def _delete_many(self, keys):
- try:
- with self._client.batch():
- for key in keys:
- self.bucket.delete_blob(key)
- except (exceptions.NotFound, exceptions.TooManyRequests):
- pass
- return True
- def _has(self, key):
- result = False
- expired = False
- blob = self.bucket.get_blob(key)
- if blob is not None:
- expired = blob.custom_time and self._now() > blob.custom_time
- if expired:
- # Exists but is stale
- if self.delete_expired_objects_on_read:
- self._delete(key)
- else:
- result = True
- expiredstr = "(expired)" if expired else ""
- logger.debug("has key %r -> %s %s", key, result, expiredstr)
- return result
- def _now(self, delta=0):
- return datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
- seconds=delta
- )
|