123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from __future__ import annotations
- import os
- import pickle
- import time
- from typing import TYPE_CHECKING
- from fsspec.utils import atomic_write
- try:
- import ujson as json
- except ImportError:
- if not TYPE_CHECKING:
- import json
- if TYPE_CHECKING:
- from typing import Any, Dict, Iterator, Literal
- from typing_extensions import TypeAlias
- from .cached import CachingFileSystem
- Detail: TypeAlias = Dict[str, Any]
- class CacheMetadata:
- """Cache metadata.
- All reading and writing of cache metadata is performed by this class,
- accessing the cached files and blocks is not.
- Metadata is stored in a single file per storage directory in JSON format.
- For backward compatibility, also reads metadata stored in pickle format
- which is converted to JSON when next saved.
- """
- def __init__(self, storage: list[str]):
- """
- Parameters
- ----------
- storage: list[str]
- Directories containing cached files, must be at least one. Metadata
- is stored in the last of these directories by convention.
- """
- if not storage:
- raise ValueError("CacheMetadata expects at least one storage location")
- self._storage = storage
- self.cached_files: list[Detail] = [{}]
- # Private attribute to force saving of metadata in pickle format rather than
- # JSON for use in tests to confirm can read both pickle and JSON formats.
- self._force_save_pickle = False
- def _load(self, fn: str) -> Detail:
- """Low-level function to load metadata from specific file"""
- try:
- with open(fn, "r") as f:
- loaded = json.load(f)
- except ValueError:
- with open(fn, "rb") as f:
- loaded = pickle.load(f)
- for c in loaded.values():
- if isinstance(c.get("blocks"), list):
- c["blocks"] = set(c["blocks"])
- return loaded
- def _save(self, metadata_to_save: Detail, fn: str) -> None:
- """Low-level function to save metadata to specific file"""
- if self._force_save_pickle:
- with atomic_write(fn) as f:
- pickle.dump(metadata_to_save, f)
- else:
- with atomic_write(fn, mode="w") as f:
- json.dump(metadata_to_save, f)
- def _scan_locations(
- self, writable_only: bool = False
- ) -> Iterator[tuple[str, str, bool]]:
- """Yield locations (filenames) where metadata is stored, and whether
- writable or not.
- Parameters
- ----------
- writable: bool
- Set to True to only yield writable locations.
- Returns
- -------
- Yields (str, str, bool)
- """
- n = len(self._storage)
- for i, storage in enumerate(self._storage):
- writable = i == n - 1
- if writable_only and not writable:
- continue
- yield os.path.join(storage, "cache"), storage, writable
- def check_file(
- self, path: str, cfs: CachingFileSystem | None
- ) -> Literal[False] | tuple[Detail, str]:
- """If path is in cache return its details, otherwise return ``False``.
- If the optional CachingFileSystem is specified then it is used to
- perform extra checks to reject possible matches, such as if they are
- too old.
- """
- for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files):
- if path not in cache:
- continue
- detail = cache[path].copy()
- if cfs is not None:
- if cfs.check_files and detail["uid"] != cfs.fs.ukey(path):
- # Wrong file as determined by hash of file properties
- continue
- if cfs.expiry and time.time() - detail["time"] > cfs.expiry:
- # Cached file has expired
- continue
- fn = os.path.join(base, detail["fn"])
- if os.path.exists(fn):
- return detail, fn
- return False
- def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]:
- """Remove expired metadata from the cache.
- Returns names of files corresponding to expired metadata and a boolean
- flag indicating whether the writable cache is empty. Caller is
- responsible for deleting the expired files.
- """
- expired_files = []
- for path, detail in self.cached_files[-1].copy().items():
- if time.time() - detail["time"] > expiry_time:
- fn = detail.get("fn", "")
- if not fn:
- raise RuntimeError(
- f"Cache metadata does not contain 'fn' for {path}"
- )
- fn = os.path.join(self._storage[-1], fn)
- expired_files.append(fn)
- self.cached_files[-1].pop(path)
- if self.cached_files[-1]:
- cache_path = os.path.join(self._storage[-1], "cache")
- self._save(self.cached_files[-1], cache_path)
- writable_cache_empty = not self.cached_files[-1]
- return expired_files, writable_cache_empty
- def load(self) -> None:
- """Load all metadata from disk and store in ``self.cached_files``"""
- cached_files = []
- for fn, _, _ in self._scan_locations():
- if os.path.exists(fn):
- # TODO: consolidate blocks here
- cached_files.append(self._load(fn))
- else:
- cached_files.append({})
- self.cached_files = cached_files or [{}]
- def on_close_cached_file(self, f: Any, path: str) -> None:
- """Perform side-effect actions on closing a cached file.
- The actual closing of the file is the responsibility of the caller.
- """
- # File must be writeble, so in self.cached_files[-1]
- c = self.cached_files[-1][path]
- if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size:
- c["blocks"] = True
- def pop_file(self, path: str) -> str | None:
- """Remove metadata of cached file.
- If path is in the cache, return the filename of the cached file,
- otherwise return ``None``. Caller is responsible for deleting the
- cached file.
- """
- details = self.check_file(path, None)
- if not details:
- return None
- _, fn = details
- if fn.startswith(self._storage[-1]):
- self.cached_files[-1].pop(path)
- self.save()
- else:
- raise PermissionError(
- "Can only delete cached file in last, writable cache location"
- )
- return fn
- def save(self) -> None:
- """Save metadata to disk"""
- for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files):
- if not writable:
- continue
- if os.path.exists(fn):
- cached_files = self._load(fn)
- for k, c in cached_files.items():
- if k in cache:
- if c["blocks"] is True or cache[k]["blocks"] is True:
- c["blocks"] = True
- else:
- # self.cached_files[*][*]["blocks"] must continue to
- # point to the same set object so that updates
- # performed by MMapCache are propagated back to
- # self.cached_files.
- blocks = cache[k]["blocks"]
- blocks.update(c["blocks"])
- c["blocks"] = blocks
- c["time"] = max(c["time"], cache[k]["time"])
- c["uid"] = cache[k]["uid"]
- # Files can be added to cache after it was written once
- for k, c in cache.items():
- if k not in cached_files:
- cached_files[k] = c
- else:
- cached_files = cache
- cache = {k: v.copy() for k, v in cached_files.items()}
- for c in cache.values():
- if isinstance(c["blocks"], set):
- c["blocks"] = list(c["blocks"])
- self._save(cache, fn)
- self.cached_files[-1] = cached_files
- def update_file(self, path: str, detail: Detail) -> None:
- """Update metadata for specific file in memory, do not save"""
- self.cached_files[-1][path] = detail
|