cached.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import tempfile
  6. import time
  7. import weakref
  8. from shutil import rmtree
  9. from typing import TYPE_CHECKING, Any, Callable, ClassVar
  10. from fsspec import AbstractFileSystem, filesystem
  11. from fsspec.callbacks import DEFAULT_CALLBACK
  12. from fsspec.compression import compr
  13. from fsspec.core import BaseCache, MMapCache
  14. from fsspec.exceptions import BlocksizeMismatchError
  15. from fsspec.implementations.cache_mapper import create_cache_mapper
  16. from fsspec.implementations.cache_metadata import CacheMetadata
  17. from fsspec.spec import AbstractBufferedFile
  18. from fsspec.transaction import Transaction
  19. from fsspec.utils import infer_compression
  20. if TYPE_CHECKING:
  21. from fsspec.implementations.cache_mapper import AbstractCacheMapper
  22. logger = logging.getLogger("fsspec.cached")
  23. class WriteCachedTransaction(Transaction):
  24. def complete(self, commit=True):
  25. rpaths = [f.path for f in self.files]
  26. lpaths = [f.fn for f in self.files]
  27. if commit:
  28. self.fs.put(lpaths, rpaths)
  29. self.files.clear()
  30. self.fs._intrans = False
  31. self.fs._transaction = None
  32. self.fs = None # break cycle
  33. class CachingFileSystem(AbstractFileSystem):
  34. """Locally caching filesystem, layer over any other FS
  35. This class implements chunk-wise local storage of remote files, for quick
  36. access after the initial download. The files are stored in a given
  37. directory with hashes of URLs for the filenames. If no directory is given,
  38. a temporary one is used, which should be cleaned up by the OS after the
  39. process ends. The files themselves are sparse (as implemented in
  40. :class:`~fsspec.caching.MMapCache`), so only the data which is accessed
  41. takes up space.
  42. Restrictions:
  43. - the block-size must be the same for each access of a given file, unless
  44. all blocks of the file have already been read
  45. - caching can only be applied to file-systems which produce files
  46. derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
  47. allowed, for testing
  48. """
  49. protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
  50. def __init__(
  51. self,
  52. target_protocol=None,
  53. cache_storage="TMP",
  54. cache_check=10,
  55. check_files=False,
  56. expiry_time=604800,
  57. target_options=None,
  58. fs=None,
  59. same_names: bool | None = None,
  60. compression=None,
  61. cache_mapper: AbstractCacheMapper | None = None,
  62. **kwargs,
  63. ):
  64. """
  65. Parameters
  66. ----------
  67. target_protocol: str (optional)
  68. Target filesystem protocol. Provide either this or ``fs``.
  69. cache_storage: str or list(str)
  70. Location to store files. If "TMP", this is a temporary directory,
  71. and will be cleaned up by the OS when this process ends (or later).
  72. If a list, each location will be tried in the order given, but
  73. only the last will be considered writable.
  74. cache_check: int
  75. Number of seconds between reload of cache metadata
  76. check_files: bool
  77. Whether to explicitly see if the UID of the remote file matches
  78. the stored one before using. Warning: some file systems such as
  79. HTTP cannot reliably give a unique hash of the contents of some
  80. path, so be sure to set this option to False.
  81. expiry_time: int
  82. The time in seconds after which a local copy is considered useless.
  83. Set to falsy to prevent expiry. The default is equivalent to one
  84. week.
  85. target_options: dict or None
  86. Passed to the instantiation of the FS, if fs is None.
  87. fs: filesystem instance
  88. The target filesystem to run against. Provide this or ``protocol``.
  89. same_names: bool (optional)
  90. By default, target URLs are hashed using a ``HashCacheMapper`` so
  91. that files from different backends with the same basename do not
  92. conflict. If this argument is ``true``, a ``BasenameCacheMapper``
  93. is used instead. Other cache mapper options are available by using
  94. the ``cache_mapper`` keyword argument. Only one of this and
  95. ``cache_mapper`` should be specified.
  96. compression: str (optional)
  97. To decompress on download. Can be 'infer' (guess from the URL name),
  98. one of the entries in ``fsspec.compression.compr``, or None for no
  99. decompression.
  100. cache_mapper: AbstractCacheMapper (optional)
  101. The object use to map from original filenames to cached filenames.
  102. Only one of this and ``same_names`` should be specified.
  103. """
  104. super().__init__(**kwargs)
  105. if fs is None and target_protocol is None:
  106. raise ValueError(
  107. "Please provide filesystem instance(fs) or target_protocol"
  108. )
  109. if not (fs is None) ^ (target_protocol is None):
  110. raise ValueError(
  111. "Both filesystems (fs) and target_protocol may not be both given."
  112. )
  113. if cache_storage == "TMP":
  114. tempdir = tempfile.mkdtemp()
  115. storage = [tempdir]
  116. weakref.finalize(self, self._remove_tempdir, tempdir)
  117. else:
  118. if isinstance(cache_storage, str):
  119. storage = [cache_storage]
  120. else:
  121. storage = cache_storage
  122. os.makedirs(storage[-1], exist_ok=True)
  123. self.storage = storage
  124. self.kwargs = target_options or {}
  125. self.cache_check = cache_check
  126. self.check_files = check_files
  127. self.expiry = expiry_time
  128. self.compression = compression
  129. # Size of cache in bytes. If None then the size is unknown and will be
  130. # recalculated the next time cache_size() is called. On writes to the
  131. # cache this is reset to None.
  132. self._cache_size = None
  133. if same_names is not None and cache_mapper is not None:
  134. raise ValueError(
  135. "Cannot specify both same_names and cache_mapper in "
  136. "CachingFileSystem.__init__"
  137. )
  138. if cache_mapper is not None:
  139. self._mapper = cache_mapper
  140. else:
  141. self._mapper = create_cache_mapper(
  142. same_names if same_names is not None else False
  143. )
  144. self.target_protocol = (
  145. target_protocol
  146. if isinstance(target_protocol, str)
  147. else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
  148. )
  149. self._metadata = CacheMetadata(self.storage)
  150. self.load_cache()
  151. self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
  152. def _strip_protocol(path):
  153. # acts as a method, since each instance has a difference target
  154. return self.fs._strip_protocol(type(self)._strip_protocol(path))
  155. self._strip_protocol: Callable = _strip_protocol
  156. @staticmethod
  157. def _remove_tempdir(tempdir):
  158. try:
  159. rmtree(tempdir)
  160. except Exception:
  161. pass
  162. def _mkcache(self):
  163. os.makedirs(self.storage[-1], exist_ok=True)
  164. def cache_size(self):
  165. """Return size of cache in bytes.
  166. If more than one cache directory is in use, only the size of the last
  167. one (the writable cache directory) is returned.
  168. """
  169. if self._cache_size is None:
  170. cache_dir = self.storage[-1]
  171. self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
  172. return self._cache_size
  173. def load_cache(self):
  174. """Read set of stored blocks from file"""
  175. self._metadata.load()
  176. self._mkcache()
  177. self.last_cache = time.time()
  178. def save_cache(self):
  179. """Save set of stored blocks from file"""
  180. self._mkcache()
  181. self._metadata.save()
  182. self.last_cache = time.time()
  183. self._cache_size = None
  184. def _check_cache(self):
  185. """Reload caches if time elapsed or any disappeared"""
  186. self._mkcache()
  187. if not self.cache_check:
  188. # explicitly told not to bother checking
  189. return
  190. timecond = time.time() - self.last_cache > self.cache_check
  191. existcond = all(os.path.exists(storage) for storage in self.storage)
  192. if timecond or not existcond:
  193. self.load_cache()
  194. def _check_file(self, path):
  195. """Is path in cache and still valid"""
  196. path = self._strip_protocol(path)
  197. self._check_cache()
  198. return self._metadata.check_file(path, self)
  199. def clear_cache(self):
  200. """Remove all files and metadata from the cache
  201. In the case of multiple cache locations, this clears only the last one,
  202. which is assumed to be the read/write one.
  203. """
  204. rmtree(self.storage[-1])
  205. self.load_cache()
  206. self._cache_size = None
  207. def clear_expired_cache(self, expiry_time=None):
  208. """Remove all expired files and metadata from the cache
  209. In the case of multiple cache locations, this clears only the last one,
  210. which is assumed to be the read/write one.
  211. Parameters
  212. ----------
  213. expiry_time: int
  214. The time in seconds after which a local copy is considered useless.
  215. If not defined the default is equivalent to the attribute from the
  216. file caching instantiation.
  217. """
  218. if not expiry_time:
  219. expiry_time = self.expiry
  220. self._check_cache()
  221. expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
  222. for fn in expired_files:
  223. if os.path.exists(fn):
  224. os.remove(fn)
  225. if writable_cache_empty:
  226. rmtree(self.storage[-1])
  227. self.load_cache()
  228. self._cache_size = None
  229. def pop_from_cache(self, path):
  230. """Remove cached version of given file
  231. Deletes local copy of the given (remote) path. If it is found in a cache
  232. location which is not the last, it is assumed to be read-only, and
  233. raises PermissionError
  234. """
  235. path = self._strip_protocol(path)
  236. fn = self._metadata.pop_file(path)
  237. if fn is not None:
  238. os.remove(fn)
  239. self._cache_size = None
  240. def _open(
  241. self,
  242. path,
  243. mode="rb",
  244. block_size=None,
  245. autocommit=True,
  246. cache_options=None,
  247. **kwargs,
  248. ):
  249. """Wrap the target _open
  250. If the whole file exists in the cache, just open it locally and
  251. return that.
  252. Otherwise, open the file on the target FS, and make it have a mmap
  253. cache pointing to the location which we determine, in our cache.
  254. The ``blocks`` instance is shared, so as the mmap cache instance
  255. updates, so does the entry in our ``cached_files`` attribute.
  256. We monkey-patch this file, so that when it closes, we call
  257. ``close_and_update`` to save the state of the blocks.
  258. """
  259. path = self._strip_protocol(path)
  260. path = self.fs._strip_protocol(path)
  261. if "r" not in mode:
  262. return self.fs._open(
  263. path,
  264. mode=mode,
  265. block_size=block_size,
  266. autocommit=autocommit,
  267. cache_options=cache_options,
  268. **kwargs,
  269. )
  270. detail = self._check_file(path)
  271. if detail:
  272. # file is in cache
  273. detail, fn = detail
  274. hash, blocks = detail["fn"], detail["blocks"]
  275. if blocks is True:
  276. # stored file is complete
  277. logger.debug("Opening local copy of %s", path)
  278. return open(fn, mode)
  279. # TODO: action where partial file exists in read-only cache
  280. logger.debug("Opening partially cached copy of %s", path)
  281. else:
  282. hash = self._mapper(path)
  283. fn = os.path.join(self.storage[-1], hash)
  284. blocks = set()
  285. detail = {
  286. "original": path,
  287. "fn": hash,
  288. "blocks": blocks,
  289. "time": time.time(),
  290. "uid": self.fs.ukey(path),
  291. }
  292. self._metadata.update_file(path, detail)
  293. logger.debug("Creating local sparse file for %s", path)
  294. # call target filesystems open
  295. self._mkcache()
  296. f = self.fs._open(
  297. path,
  298. mode=mode,
  299. block_size=block_size,
  300. autocommit=autocommit,
  301. cache_options=cache_options,
  302. cache_type="none",
  303. **kwargs,
  304. )
  305. if self.compression:
  306. comp = (
  307. infer_compression(path)
  308. if self.compression == "infer"
  309. else self.compression
  310. )
  311. f = compr[comp](f, mode="rb")
  312. if "blocksize" in detail:
  313. if detail["blocksize"] != f.blocksize:
  314. raise BlocksizeMismatchError(
  315. f"Cached file must be reopened with same block"
  316. f" size as original (old: {detail['blocksize']},"
  317. f" new {f.blocksize})"
  318. )
  319. else:
  320. detail["blocksize"] = f.blocksize
  321. def _fetch_ranges(ranges):
  322. return self.fs.cat_ranges(
  323. [path] * len(ranges),
  324. [r[0] for r in ranges],
  325. [r[1] for r in ranges],
  326. **kwargs,
  327. )
  328. multi_fetcher = None if self.compression else _fetch_ranges
  329. f.cache = MMapCache(
  330. f.blocksize, f._fetch_range, f.size, fn, blocks, multi_fetcher=multi_fetcher
  331. )
  332. close = f.close
  333. f.close = lambda: self.close_and_update(f, close)
  334. self.save_cache()
  335. return f
  336. def _parent(self, path):
  337. return self.fs._parent(path)
  338. def hash_name(self, path: str, *args: Any) -> str:
  339. # Kept for backward compatibility with downstream libraries.
  340. # Ignores extra arguments, previously same_name boolean.
  341. return self._mapper(path)
  342. def close_and_update(self, f, close):
  343. """Called when a file is closing, so store the set of blocks"""
  344. if f.closed:
  345. return
  346. path = self._strip_protocol(f.path)
  347. self._metadata.on_close_cached_file(f, path)
  348. try:
  349. logger.debug("going to save")
  350. self.save_cache()
  351. logger.debug("saved")
  352. except OSError:
  353. logger.debug("Cache saving failed while closing file")
  354. except NameError:
  355. logger.debug("Cache save failed due to interpreter shutdown")
  356. close()
  357. f.closed = True
  358. def ls(self, path, detail=True):
  359. return self.fs.ls(path, detail)
  360. def __getattribute__(self, item):
  361. if item in {
  362. "load_cache",
  363. "_open",
  364. "save_cache",
  365. "close_and_update",
  366. "__init__",
  367. "__getattribute__",
  368. "__reduce__",
  369. "_make_local_details",
  370. "open",
  371. "cat",
  372. "cat_file",
  373. "cat_ranges",
  374. "get",
  375. "read_block",
  376. "tail",
  377. "head",
  378. "info",
  379. "ls",
  380. "exists",
  381. "isfile",
  382. "isdir",
  383. "_check_file",
  384. "_check_cache",
  385. "_mkcache",
  386. "clear_cache",
  387. "clear_expired_cache",
  388. "pop_from_cache",
  389. "local_file",
  390. "_paths_from_path",
  391. "get_mapper",
  392. "open_many",
  393. "commit_many",
  394. "hash_name",
  395. "__hash__",
  396. "__eq__",
  397. "to_json",
  398. "to_dict",
  399. "cache_size",
  400. "pipe_file",
  401. "pipe",
  402. "start_transaction",
  403. "end_transaction",
  404. }:
  405. # all the methods defined in this class. Note `open` here, since
  406. # it calls `_open`, but is actually in superclass
  407. return lambda *args, **kw: getattr(type(self), item).__get__(self)(
  408. *args, **kw
  409. )
  410. if item in ["__reduce_ex__"]:
  411. raise AttributeError
  412. if item in ["transaction"]:
  413. # property
  414. return type(self).transaction.__get__(self)
  415. if item in ["_cache", "transaction_type"]:
  416. # class attributes
  417. return getattr(type(self), item)
  418. if item == "__class__":
  419. return type(self)
  420. d = object.__getattribute__(self, "__dict__")
  421. fs = d.get("fs", None) # fs is not immediately defined
  422. if item in d:
  423. return d[item]
  424. elif fs is not None:
  425. if item in fs.__dict__:
  426. # attribute of instance
  427. return fs.__dict__[item]
  428. # attributed belonging to the target filesystem
  429. cls = type(fs)
  430. m = getattr(cls, item)
  431. if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
  432. not hasattr(m, "__self__") or m.__self__ is None
  433. ):
  434. # instance method
  435. return m.__get__(fs, cls)
  436. return m # class method or attribute
  437. else:
  438. # attributes of the superclass, while target is being set up
  439. return super().__getattribute__(item)
  440. def __eq__(self, other):
  441. """Test for equality."""
  442. if self is other:
  443. return True
  444. if not isinstance(other, type(self)):
  445. return False
  446. return (
  447. self.storage == other.storage
  448. and self.kwargs == other.kwargs
  449. and self.cache_check == other.cache_check
  450. and self.check_files == other.check_files
  451. and self.expiry == other.expiry
  452. and self.compression == other.compression
  453. and self._mapper == other._mapper
  454. and self.target_protocol == other.target_protocol
  455. )
  456. def __hash__(self):
  457. """Calculate hash."""
  458. return (
  459. hash(tuple(self.storage))
  460. ^ hash(str(self.kwargs))
  461. ^ hash(self.cache_check)
  462. ^ hash(self.check_files)
  463. ^ hash(self.expiry)
  464. ^ hash(self.compression)
  465. ^ hash(self._mapper)
  466. ^ hash(self.target_protocol)
  467. )
  468. class WholeFileCacheFileSystem(CachingFileSystem):
  469. """Caches whole remote files on first access
  470. This class is intended as a layer over any other file system, and
  471. will make a local copy of each file accessed, so that all subsequent
  472. reads are local. This is similar to ``CachingFileSystem``, but without
  473. the block-wise functionality and so can work even when sparse files
  474. are not allowed. See its docstring for definition of the init
  475. arguments.
  476. The class still needs access to the remote store for listing files,
  477. and may refresh cached files.
  478. """
  479. protocol = "filecache"
  480. local_file = True
  481. def open_many(self, open_files, **kwargs):
  482. paths = [of.path for of in open_files]
  483. if "r" in open_files.mode:
  484. self._mkcache()
  485. else:
  486. return [
  487. LocalTempFile(
  488. self.fs,
  489. path,
  490. mode=open_files.mode,
  491. fn=os.path.join(self.storage[-1], self._mapper(path)),
  492. **kwargs,
  493. )
  494. for path in paths
  495. ]
  496. if self.compression:
  497. raise NotImplementedError
  498. details = [self._check_file(sp) for sp in paths]
  499. downpath = [p for p, d in zip(paths, details) if not d]
  500. downfn0 = [
  501. os.path.join(self.storage[-1], self._mapper(p))
  502. for p, d in zip(paths, details)
  503. ] # keep these path names for opening later
  504. downfn = [fn for fn, d in zip(downfn0, details) if not d]
  505. if downpath:
  506. # skip if all files are already cached and up to date
  507. self.fs.get(downpath, downfn)
  508. # update metadata - only happens when downloads are successful
  509. newdetail = [
  510. {
  511. "original": path,
  512. "fn": self._mapper(path),
  513. "blocks": True,
  514. "time": time.time(),
  515. "uid": self.fs.ukey(path),
  516. }
  517. for path in downpath
  518. ]
  519. for path, detail in zip(downpath, newdetail):
  520. self._metadata.update_file(path, detail)
  521. self.save_cache()
  522. def firstpart(fn):
  523. # helper to adapt both whole-file and simple-cache
  524. return fn[1] if isinstance(fn, tuple) else fn
  525. return [
  526. open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
  527. for fn0, fn1 in zip(details, downfn0)
  528. ]
  529. def commit_many(self, open_files):
  530. self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
  531. [f.close() for f in open_files]
  532. for f in open_files:
  533. # in case autocommit is off, and so close did not already delete
  534. try:
  535. os.remove(f.name)
  536. except FileNotFoundError:
  537. pass
  538. self._cache_size = None
  539. def _make_local_details(self, path):
  540. hash = self._mapper(path)
  541. fn = os.path.join(self.storage[-1], hash)
  542. detail = {
  543. "original": path,
  544. "fn": hash,
  545. "blocks": True,
  546. "time": time.time(),
  547. "uid": self.fs.ukey(path),
  548. }
  549. self._metadata.update_file(path, detail)
  550. logger.debug("Copying %s to local cache", path)
  551. return fn
  552. def cat(
  553. self,
  554. path,
  555. recursive=False,
  556. on_error="raise",
  557. callback=DEFAULT_CALLBACK,
  558. **kwargs,
  559. ):
  560. paths = self.expand_path(
  561. path, recursive=recursive, maxdepth=kwargs.get("maxdepth")
  562. )
  563. getpaths = []
  564. storepaths = []
  565. fns = []
  566. out = {}
  567. for p in paths.copy():
  568. try:
  569. detail = self._check_file(p)
  570. if not detail:
  571. fn = self._make_local_details(p)
  572. getpaths.append(p)
  573. storepaths.append(fn)
  574. else:
  575. detail, fn = detail if isinstance(detail, tuple) else (None, detail)
  576. fns.append(fn)
  577. except Exception as e:
  578. if on_error == "raise":
  579. raise
  580. if on_error == "return":
  581. out[p] = e
  582. paths.remove(p)
  583. if getpaths:
  584. self.fs.get(getpaths, storepaths)
  585. self.save_cache()
  586. callback.set_size(len(paths))
  587. for p, fn in zip(paths, fns):
  588. with open(fn, "rb") as f:
  589. out[p] = f.read()
  590. callback.relative_update(1)
  591. if isinstance(path, str) and len(paths) == 1 and recursive is False:
  592. out = out[paths[0]]
  593. return out
  594. def _open(self, path, mode="rb", **kwargs):
  595. path = self._strip_protocol(path)
  596. if "r" not in mode:
  597. hash = self._mapper(path)
  598. fn = os.path.join(self.storage[-1], hash)
  599. user_specified_kwargs = {
  600. k: v
  601. for k, v in kwargs.items()
  602. # those kwargs were added by open(), we don't want them
  603. if k not in ["autocommit", "block_size", "cache_options"]
  604. }
  605. return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
  606. detail = self._check_file(path)
  607. if detail:
  608. detail, fn = detail
  609. _, blocks = detail["fn"], detail["blocks"]
  610. if blocks is True:
  611. logger.debug("Opening local copy of %s", path)
  612. # In order to support downstream filesystems to be able to
  613. # infer the compression from the original filename, like
  614. # the `TarFileSystem`, let's extend the `io.BufferedReader`
  615. # fileobject protocol by adding a dedicated attribute
  616. # `original`.
  617. f = open(fn, mode)
  618. f.original = detail.get("original")
  619. return f
  620. else:
  621. raise ValueError(
  622. f"Attempt to open partially cached file {path}"
  623. f" as a wholly cached file"
  624. )
  625. else:
  626. fn = self._make_local_details(path)
  627. kwargs["mode"] = mode
  628. # call target filesystems open
  629. self._mkcache()
  630. if self.compression:
  631. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  632. if isinstance(f, AbstractBufferedFile):
  633. # want no type of caching if just downloading whole thing
  634. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  635. comp = (
  636. infer_compression(path)
  637. if self.compression == "infer"
  638. else self.compression
  639. )
  640. f = compr[comp](f, mode="rb")
  641. data = True
  642. while data:
  643. block = getattr(f, "blocksize", 5 * 2**20)
  644. data = f.read(block)
  645. f2.write(data)
  646. else:
  647. self.fs.get_file(path, fn)
  648. self.save_cache()
  649. return self._open(path, mode)
  650. class SimpleCacheFileSystem(WholeFileCacheFileSystem):
  651. """Caches whole remote files on first access
  652. This class is intended as a layer over any other file system, and
  653. will make a local copy of each file accessed, so that all subsequent
  654. reads are local. This implementation only copies whole files, and
  655. does not keep any metadata about the download time or file details.
  656. It is therefore safer to use in multi-threaded/concurrent situations.
  657. This is the only of the caching filesystems that supports write: you will
  658. be given a real local open file, and upon close and commit, it will be
  659. uploaded to the target filesystem; the writability or the target URL is
  660. not checked until that time.
  661. """
  662. protocol = "simplecache"
  663. local_file = True
  664. transaction_type = WriteCachedTransaction
  665. def __init__(self, **kwargs):
  666. kw = kwargs.copy()
  667. for key in ["cache_check", "expiry_time", "check_files"]:
  668. kw[key] = False
  669. super().__init__(**kw)
  670. for storage in self.storage:
  671. if not os.path.exists(storage):
  672. os.makedirs(storage, exist_ok=True)
  673. def _check_file(self, path):
  674. self._check_cache()
  675. sha = self._mapper(path)
  676. for storage in self.storage:
  677. fn = os.path.join(storage, sha)
  678. if os.path.exists(fn):
  679. return fn
  680. def save_cache(self):
  681. pass
  682. def load_cache(self):
  683. pass
  684. def pipe_file(self, path, value=None, **kwargs):
  685. if self._intrans:
  686. with self.open(path, "wb") as f:
  687. f.write(value)
  688. else:
  689. super().pipe_file(path, value)
  690. def ls(self, path, detail=True, **kwargs):
  691. path = self._strip_protocol(path)
  692. details = []
  693. try:
  694. details = self.fs.ls(
  695. path, detail=True, **kwargs
  696. ).copy() # don't edit original!
  697. except FileNotFoundError as e:
  698. ex = e
  699. else:
  700. ex = None
  701. if self._intrans:
  702. path1 = path.rstrip("/") + "/"
  703. for f in self.transaction.files:
  704. if f.path == path:
  705. details.append(
  706. {"name": path, "size": f.size or f.tell(), "type": "file"}
  707. )
  708. elif f.path.startswith(path1):
  709. if f.path.count("/") == path1.count("/"):
  710. details.append(
  711. {"name": f.path, "size": f.size or f.tell(), "type": "file"}
  712. )
  713. else:
  714. dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
  715. details.append({"name": dname, "size": 0, "type": "directory"})
  716. if ex is not None and not details:
  717. raise ex
  718. if detail:
  719. return details
  720. return sorted(_["name"] for _ in details)
  721. def info(self, path, **kwargs):
  722. path = self._strip_protocol(path)
  723. if self._intrans:
  724. f = [_ for _ in self.transaction.files if _.path == path]
  725. if f:
  726. size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
  727. return {"name": path, "size": size, "type": "file"}
  728. f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
  729. if f:
  730. return {"name": path, "size": 0, "type": "directory"}
  731. return self.fs.info(path, **kwargs)
  732. def pipe(self, path, value=None, **kwargs):
  733. if isinstance(path, str):
  734. self.pipe_file(self._strip_protocol(path), value, **kwargs)
  735. elif isinstance(path, dict):
  736. for k, v in path.items():
  737. self.pipe_file(self._strip_protocol(k), v, **kwargs)
  738. else:
  739. raise ValueError("path must be str or dict")
  740. def cat_ranges(
  741. self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
  742. ):
  743. lpaths = [self._check_file(p) for p in paths]
  744. rpaths = [p for l, p in zip(lpaths, paths) if l is False]
  745. lpaths = [l for l, p in zip(lpaths, paths) if l is False]
  746. self.fs.get(rpaths, lpaths)
  747. return super().cat_ranges(
  748. paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
  749. )
  750. def _open(self, path, mode="rb", **kwargs):
  751. path = self._strip_protocol(path)
  752. sha = self._mapper(path)
  753. if "r" not in mode:
  754. fn = os.path.join(self.storage[-1], sha)
  755. user_specified_kwargs = {
  756. k: v
  757. for k, v in kwargs.items()
  758. if k not in ["autocommit", "block_size", "cache_options"]
  759. } # those were added by open()
  760. return LocalTempFile(
  761. self,
  762. path,
  763. mode=mode,
  764. autocommit=not self._intrans,
  765. fn=fn,
  766. **user_specified_kwargs,
  767. )
  768. fn = self._check_file(path)
  769. if fn:
  770. return open(fn, mode)
  771. fn = os.path.join(self.storage[-1], sha)
  772. logger.debug("Copying %s to local cache", path)
  773. kwargs["mode"] = mode
  774. self._mkcache()
  775. self._cache_size = None
  776. if self.compression:
  777. with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
  778. if isinstance(f, AbstractBufferedFile):
  779. # want no type of caching if just downloading whole thing
  780. f.cache = BaseCache(0, f.cache.fetcher, f.size)
  781. comp = (
  782. infer_compression(path)
  783. if self.compression == "infer"
  784. else self.compression
  785. )
  786. f = compr[comp](f, mode="rb")
  787. data = True
  788. while data:
  789. block = getattr(f, "blocksize", 5 * 2**20)
  790. data = f.read(block)
  791. f2.write(data)
  792. else:
  793. self.fs.get_file(path, fn)
  794. return self._open(path, mode)
  795. class LocalTempFile:
  796. """A temporary local file, which will be uploaded on commit"""
  797. def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
  798. self.fn = fn
  799. self.fh = open(fn, mode)
  800. self.mode = mode
  801. if seek:
  802. self.fh.seek(seek)
  803. self.path = path
  804. self.size = None
  805. self.fs = fs
  806. self.closed = False
  807. self.autocommit = autocommit
  808. self.kwargs = kwargs
  809. def __reduce__(self):
  810. # always open in r+b to allow continuing writing at a location
  811. return (
  812. LocalTempFile,
  813. (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
  814. )
  815. def __enter__(self):
  816. return self.fh
  817. def __exit__(self, exc_type, exc_val, exc_tb):
  818. self.close()
  819. def close(self):
  820. # self.size = self.fh.tell()
  821. if self.closed:
  822. return
  823. self.fh.close()
  824. self.closed = True
  825. if self.autocommit:
  826. self.commit()
  827. def discard(self):
  828. self.fh.close()
  829. os.remove(self.fn)
  830. def commit(self):
  831. self.fs.put(self.fn, self.path, **self.kwargs)
  832. # we do not delete local copy - it's still in the cache
  833. @property
  834. def name(self):
  835. return self.fn
  836. def __repr__(self) -> str:
  837. return f"LocalTempFile: {self.path}"
  838. def __getattr__(self, item):
  839. return getattr(self.fh, item)