file.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. import errno
  2. import hashlib
  3. import logging
  4. import os
  5. import platform
  6. import stat
  7. import struct
  8. import tempfile
  9. import typing as _t
  10. from contextlib import contextmanager
  11. from pathlib import Path
  12. from time import sleep
  13. from time import time
  14. from cachelib.base import BaseCache
  15. from cachelib.serializers import FileSystemSerializer
  16. def _lazy_md5(string: bytes = b"") -> _t.Any:
  17. """Don't access ``hashlib.md5`` until runtime. FIPS builds may not include
  18. md5, in which case the import and use as a default would fail before the
  19. developer can configure something else.
  20. """
  21. return hashlib.md5(string)
  22. class FileSystemCache(BaseCache):
  23. """A cache that stores the items on the file system. This cache depends
  24. on being the only user of the `cache_dir`. Make absolutely sure that
  25. nobody but this cache stores files there or otherwise the cache will
  26. randomly delete files therein.
  27. :param cache_dir: the directory where cache files are stored.
  28. :param threshold: the maximum number of items the cache stores before
  29. it starts deleting some. A threshold value of 0
  30. indicates no threshold.
  31. :param default_timeout: the default timeout that is used if no timeout is
  32. specified on :meth:`~BaseCache.set`. A timeout of
  33. 0 indicates that the cache never expires.
  34. :param mode: the file mode wanted for the cache files, default 0600
  35. :param hash_method: Default hashlib.md5. The hash method used to
  36. generate the filename for cached results.
  37. Default is lazy loaded and can be overriden by
  38. seeting `_default_hash_method`
  39. """
  40. #: used for temporary files by the FileSystemCache
  41. _fs_transaction_suffix = ".__wz_cache"
  42. #: keep amount of files in a cache element
  43. _fs_count_file = "__wz_cache_count"
  44. #: default file name hashing method
  45. _default_hash_method = staticmethod(_lazy_md5)
  46. serializer = FileSystemSerializer()
  47. def __init__(
  48. self,
  49. cache_dir: str,
  50. threshold: int = 500,
  51. default_timeout: int = 300,
  52. mode: _t.Optional[int] = None,
  53. hash_method: _t.Any = None,
  54. ):
  55. BaseCache.__init__(self, default_timeout)
  56. self._path = cache_dir
  57. self._threshold = threshold
  58. self._hash_method = self._default_hash_method
  59. if hash_method is not None:
  60. self._hash_method = hash_method
  61. # Mode set by user takes precedence. If no mode has
  62. # been given, we need to set the correct default based
  63. # on user platform.
  64. self._mode = mode
  65. if self._mode is None:
  66. self._mode = self._get_compatible_platform_mode()
  67. try:
  68. os.makedirs(self._path)
  69. except OSError as ex:
  70. if ex.errno != errno.EEXIST:
  71. raise
  72. # If there are many files and a zero threshold,
  73. # the list_dir can slow initialisation massively
  74. if self._threshold != 0:
  75. self._update_count(value=len(list(self._list_dir())))
  76. def _get_compatible_platform_mode(self) -> int:
  77. mode = 0o600 # nix systems
  78. if platform.system() == "Windows":
  79. mode = stat.S_IWRITE
  80. return mode
  81. @property
  82. def _file_count(self) -> int:
  83. return self.get(self._fs_count_file) or 0
  84. def _update_count(
  85. self, delta: _t.Optional[int] = None, value: _t.Optional[int] = None
  86. ) -> None:
  87. # If we have no threshold, don't count files
  88. if self._threshold == 0:
  89. return
  90. if delta:
  91. new_count = self._file_count + delta
  92. else:
  93. new_count = value or 0
  94. self.set(self._fs_count_file, new_count, mgmt_element=True)
  95. def _normalize_timeout(self, timeout: _t.Optional[int]) -> int:
  96. timeout = BaseCache._normalize_timeout(self, timeout)
  97. if timeout != 0:
  98. timeout = int(time()) + timeout
  99. return int(timeout)
  100. def _is_mgmt(self, name: str) -> bool:
  101. fshash = self._get_filename(self._fs_count_file).split(os.sep)[-1]
  102. return name == fshash or name.endswith(self._fs_transaction_suffix)
  103. def _list_dir(self) -> _t.Generator[str, None, None]:
  104. """return a list of (fully qualified) cache filenames"""
  105. return (
  106. os.path.join(self._path, fn)
  107. for fn in os.listdir(self._path)
  108. if not self._is_mgmt(fn)
  109. )
  110. def _over_threshold(self) -> bool:
  111. return self._threshold != 0 and self._file_count > self._threshold
  112. def _remove_expired(self, now: float) -> None:
  113. for fname in self._list_dir():
  114. try:
  115. with self._safe_stream_open(fname, "rb") as f:
  116. expires = struct.unpack("I", f.read(4))[0]
  117. if expires != 0 and expires < now:
  118. os.remove(fname)
  119. self._update_count(delta=-1)
  120. except FileNotFoundError:
  121. pass
  122. except (OSError, EOFError, struct.error):
  123. logging.warning(
  124. "Exception raised while handling cache file '%s'",
  125. fname,
  126. exc_info=True,
  127. )
  128. def _remove_older(self) -> bool:
  129. exp_fname_tuples = []
  130. for fname in self._list_dir():
  131. try:
  132. with self._safe_stream_open(fname, "rb") as f:
  133. timestamp = struct.unpack("I", f.read(4))[0]
  134. exp_fname_tuples.append((timestamp, fname))
  135. except FileNotFoundError:
  136. pass
  137. except (OSError, EOFError, struct.error):
  138. logging.warning(
  139. "Exception raised while handling cache file '%s'",
  140. fname,
  141. exc_info=True,
  142. )
  143. fname_sorted = (
  144. fname for _, fname in sorted(exp_fname_tuples, key=lambda item: item[0])
  145. )
  146. for fname in fname_sorted:
  147. try:
  148. os.remove(fname)
  149. self._update_count(delta=-1)
  150. except FileNotFoundError:
  151. pass
  152. except OSError:
  153. logging.warning(
  154. "Exception raised while handling cache file '%s'",
  155. fname,
  156. exc_info=True,
  157. )
  158. return False
  159. if not self._over_threshold():
  160. break
  161. return True
  162. def _prune(self) -> None:
  163. if self._over_threshold():
  164. now = time()
  165. self._remove_expired(now)
  166. # if still over threshold
  167. if self._over_threshold():
  168. self._remove_older()
  169. def clear(self) -> bool:
  170. for i, fname in enumerate(self._list_dir()):
  171. try:
  172. os.remove(fname)
  173. except FileNotFoundError:
  174. pass
  175. except OSError:
  176. logging.warning(
  177. "Exception raised while handling cache file '%s'",
  178. fname,
  179. exc_info=True,
  180. )
  181. self._update_count(delta=-i)
  182. return False
  183. self._update_count(value=0)
  184. return True
  185. def _get_filename(self, key: str) -> str:
  186. if isinstance(key, str):
  187. bkey = key.encode("utf-8") # XXX unicode review
  188. bkey_hash = self._hash_method(bkey).hexdigest()
  189. else:
  190. raise TypeError(f"Key must be a string, received type {type(key)}")
  191. return os.path.join(self._path, bkey_hash)
  192. def get(self, key: str) -> _t.Any:
  193. filename = self._get_filename(key)
  194. try:
  195. with self._safe_stream_open(filename, "rb") as f:
  196. pickle_time = struct.unpack("I", f.read(4))[0]
  197. if pickle_time == 0 or pickle_time >= time():
  198. return self.serializer.load(f)
  199. except FileNotFoundError:
  200. pass
  201. except (OSError, EOFError, struct.error):
  202. logging.warning(
  203. "Exception raised while handling cache file '%s'",
  204. filename,
  205. exc_info=True,
  206. )
  207. return None
  208. def add(self, key: str, value: _t.Any, timeout: _t.Optional[int] = None) -> bool:
  209. filename = self._get_filename(key)
  210. if not os.path.exists(filename):
  211. return self.set(key, value, timeout)
  212. return False
  213. def set(
  214. self,
  215. key: str,
  216. value: _t.Any,
  217. timeout: _t.Optional[int] = None,
  218. mgmt_element: bool = False,
  219. ) -> bool:
  220. # Management elements have no timeout
  221. if mgmt_element:
  222. timeout = 0
  223. # Don't prune on management element update, to avoid loop
  224. else:
  225. self._prune()
  226. timeout = self._normalize_timeout(timeout)
  227. filename = self._get_filename(key)
  228. overwrite = os.path.isfile(filename)
  229. try:
  230. fd, tmp = tempfile.mkstemp(
  231. suffix=self._fs_transaction_suffix, dir=self._path
  232. )
  233. with os.fdopen(fd, "wb") as f:
  234. f.write(struct.pack("I", timeout))
  235. self.serializer.dump(value, f)
  236. self._run_safely(os.replace, tmp, filename)
  237. self._run_safely(os.chmod, filename, self._mode)
  238. fsize = Path(filename).stat().st_size
  239. except OSError:
  240. logging.warning(
  241. "Exception raised while handling cache file '%s'",
  242. filename,
  243. exc_info=True,
  244. )
  245. return False
  246. else:
  247. # Management elements should not count towards threshold
  248. if not overwrite and not mgmt_element:
  249. self._update_count(delta=1)
  250. return fsize > 0 # function should fail if file is empty
  251. def delete(self, key: str, mgmt_element: bool = False) -> bool:
  252. try:
  253. os.remove(self._get_filename(key))
  254. except FileNotFoundError: # if file doesn't exist we consider it deleted
  255. return True
  256. except OSError:
  257. logging.warning("Exception raised while handling cache file", exc_info=True)
  258. return False
  259. else:
  260. # Management elements should not count towards threshold
  261. if not mgmt_element:
  262. self._update_count(delta=-1)
  263. return True
  264. def has(self, key: str) -> bool:
  265. filename = self._get_filename(key)
  266. try:
  267. with self._safe_stream_open(filename, "rb") as f:
  268. pickle_time = struct.unpack("I", f.read(4))[0]
  269. if pickle_time == 0 or pickle_time >= time():
  270. return True
  271. else:
  272. return False
  273. except FileNotFoundError: # if there is no file there is no key
  274. return False
  275. except (OSError, EOFError, struct.error):
  276. logging.warning(
  277. "Exception raised while handling cache file '%s'",
  278. filename,
  279. exc_info=True,
  280. )
  281. return False
  282. def _run_safely(self, fn: _t.Callable, *args: _t.Any, **kwargs: _t.Any) -> _t.Any:
  283. """On Windows os.replace, os.chmod and open can yield
  284. permission errors if executed by two different processes."""
  285. if platform.system() == "Windows":
  286. output = None
  287. wait_step = 0.001
  288. max_sleep_time = 10.0
  289. total_sleep_time = 0.0
  290. while total_sleep_time < max_sleep_time:
  291. try:
  292. output = fn(*args, **kwargs)
  293. except PermissionError:
  294. sleep(wait_step)
  295. total_sleep_time += wait_step
  296. wait_step *= 2
  297. else:
  298. break
  299. else:
  300. output = fn(*args, **kwargs)
  301. return output
  302. @contextmanager
  303. def _safe_stream_open(self, path: str, mode: str) -> _t.Generator:
  304. fs = self._run_safely(open, path, mode)
  305. if fs is None:
  306. raise OSError
  307. try:
  308. yield fs
  309. finally:
  310. fs.close()