dynamodb.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import datetime
  2. import typing as _t
  3. from cachelib.base import BaseCache
  4. from cachelib.serializers import DynamoDbSerializer
  5. CREATED_AT_FIELD = "created_at"
  6. RESPONSE_FIELD = "response"
  7. class DynamoDbCache(BaseCache):
  8. """
  9. Implementation of cachelib.BaseCache that uses an AWS DynamoDb table
  10. as the backend.
  11. Your server process will require dynamodb:GetItem and dynamodb:PutItem
  12. IAM permissions on the cache table.
  13. Limitations: DynamoDB table items are limited to 400 KB in size. Since
  14. this class stores cached items in a table, the max size of a cache entry
  15. will be slightly less than 400 KB, since the cache key and expiration
  16. time fields are also part of the item.
  17. :param table_name: The name of the DynamoDB table to use
  18. :param default_timeout: Set the timeout in seconds after which cache entries
  19. expire
  20. :param key_field: The name of the hash_key attribute in the DynamoDb
  21. table. This must be a string attribute.
  22. :param expiration_time_field: The name of the table attribute to store the
  23. expiration time in. This will be an int
  24. attribute. The timestamp will be stored as
  25. seconds past the epoch. If you configure
  26. this as the TTL field, then DynamoDB will
  27. automatically delete expired entries.
  28. :param key_prefix: A prefix that should be added to all keys.
  29. """
  30. serializer = DynamoDbSerializer()
  31. def __init__(
  32. self,
  33. table_name: _t.Optional[str] = "python-cache",
  34. default_timeout: int = 300,
  35. key_field: _t.Optional[str] = "cache_key",
  36. expiration_time_field: _t.Optional[str] = "expiration_time",
  37. key_prefix: _t.Optional[str] = None,
  38. **kwargs: _t.Any
  39. ):
  40. super().__init__(default_timeout)
  41. try:
  42. import boto3 # type: ignore
  43. except ImportError as err:
  44. raise RuntimeError("no boto3 module found") from err
  45. self._table_name = table_name
  46. self._key_field = key_field
  47. self._expiration_time_field = expiration_time_field
  48. self.key_prefix = key_prefix or ""
  49. self._dynamo = boto3.resource("dynamodb", **kwargs)
  50. self._attr = boto3.dynamodb.conditions.Attr
  51. try:
  52. self._table = self._dynamo.Table(table_name)
  53. self._table.load()
  54. # catch this exception (triggered if the table doesn't exist)
  55. except Exception:
  56. table = self._dynamo.create_table(
  57. AttributeDefinitions=[
  58. {"AttributeName": key_field, "AttributeType": "S"}
  59. ],
  60. TableName=table_name,
  61. KeySchema=[
  62. {"AttributeName": key_field, "KeyType": "HASH"},
  63. ],
  64. BillingMode="PAY_PER_REQUEST",
  65. )
  66. table.wait_until_exists()
  67. dynamo = boto3.client("dynamodb", **kwargs)
  68. dynamo.update_time_to_live(
  69. TableName=table_name,
  70. TimeToLiveSpecification={
  71. "Enabled": True,
  72. "AttributeName": expiration_time_field,
  73. },
  74. )
  75. self._table = self._dynamo.Table(table_name)
  76. self._table.load()
  77. def _utcnow(self) -> _t.Any:
  78. """Return a tz-aware UTC datetime representing the current time"""
  79. return datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
  80. def _get_item(self, key: str, attributes: _t.Optional[list] = None) -> _t.Any:
  81. """
  82. Get an item from the cache table, optionally limiting the returned
  83. attributes.
  84. :param key: The cache key of the item to fetch
  85. :param attributes: An optional list of attributes to fetch. If not
  86. given, all attributes are fetched. The
  87. expiration_time field will always be added to the
  88. list of fetched attributes.
  89. :return: The table item for key if it exists and is not expired, else
  90. None
  91. """
  92. kwargs = {}
  93. if attributes:
  94. if self._expiration_time_field not in attributes:
  95. attributes = list(attributes) + [self._expiration_time_field]
  96. kwargs = dict(ProjectionExpression=",".join(attributes))
  97. response = self._table.get_item(Key={self._key_field: key}, **kwargs)
  98. cache_item = response.get("Item")
  99. if cache_item:
  100. now = int(self._utcnow().timestamp())
  101. if cache_item.get(self._expiration_time_field, now + 100) > now:
  102. return cache_item
  103. return None
  104. def get(self, key: str) -> _t.Any:
  105. """
  106. Get a cache item
  107. :param key: The cache key of the item to fetch
  108. :return: cache value if not expired, else None
  109. """
  110. cache_item = self._get_item(self.key_prefix + key)
  111. if cache_item:
  112. response = cache_item[RESPONSE_FIELD]
  113. value = self.serializer.loads(response)
  114. return value
  115. return None
  116. def delete(self, key: str) -> bool:
  117. """
  118. Deletes an item from the cache. This is a no-op if the item doesn't
  119. exist
  120. :param key: Key of the item to delete.
  121. :return: True if the key existed and was deleted
  122. """
  123. try:
  124. self._table.delete_item(
  125. Key={self._key_field: self.key_prefix + key},
  126. ConditionExpression=self._attr(self._key_field).exists(),
  127. )
  128. return True
  129. except self._dynamo.meta.client.exceptions.ConditionalCheckFailedException:
  130. return False
  131. def _set(
  132. self,
  133. key: str,
  134. value: _t.Any,
  135. timeout: _t.Optional[int] = None,
  136. overwrite: _t.Optional[bool] = True,
  137. ) -> _t.Any:
  138. """
  139. Store a cache item, with the option to not overwrite existing items
  140. :param key: Cache key to use
  141. :param value: a serializable object
  142. :param timeout: The timeout in seconds for the cached item, to override
  143. the default
  144. :param overwrite: If true, overwrite any existing cache item with key.
  145. If false, the new value will only be stored if no
  146. non-expired cache item exists with key.
  147. :return: True if the new item was stored.
  148. """
  149. timeout = self._normalize_timeout(timeout)
  150. now = self._utcnow()
  151. kwargs = {}
  152. if not overwrite:
  153. # Cause the put to fail if a non-expired item with this key
  154. # already exists
  155. cond = self._attr(self._key_field).not_exists() | self._attr(
  156. self._expiration_time_field
  157. ).lte(int(now.timestamp()))
  158. kwargs = dict(ConditionExpression=cond)
  159. try:
  160. dump = self.serializer.dumps(value)
  161. item = {
  162. self._key_field: key,
  163. CREATED_AT_FIELD: now.isoformat(),
  164. RESPONSE_FIELD: dump,
  165. }
  166. if timeout > 0:
  167. expiration_time = now + datetime.timedelta(seconds=timeout)
  168. item[self._expiration_time_field] = int(expiration_time.timestamp())
  169. self._table.put_item(Item=item, **kwargs)
  170. return True
  171. except Exception:
  172. return False
  173. def set(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> _t.Any:
  174. return self._set(self.key_prefix + key, value, timeout=timeout, overwrite=True)
  175. def add(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> _t.Any:
  176. return self._set(self.key_prefix + key, value, timeout=timeout, overwrite=False)
  177. def has(self, key: str) -> bool:
  178. return (
  179. self._get_item(self.key_prefix + key, [self._expiration_time_field])
  180. is not None
  181. )
  182. def clear(self) -> bool:
  183. paginator = self._dynamo.meta.client.get_paginator("scan")
  184. pages = paginator.paginate(
  185. TableName=self._table_name, ProjectionExpression=self._key_field
  186. )
  187. with self._table.batch_writer() as batch:
  188. for page in pages:
  189. for item in page["Items"]:
  190. batch.delete_item(Key=item)
  191. return True