123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- import array
- import logging
- import posixpath
- import warnings
- from collections.abc import MutableMapping
- from functools import cached_property
- from fsspec.core import url_to_fs
- logger = logging.getLogger("fsspec.mapping")
- class FSMap(MutableMapping):
- """Wrap a FileSystem instance as a mutable wrapping.
- The keys of the mapping become files under the given root, and the
- values (which must be bytes) the contents of those files.
- Parameters
- ----------
- root: string
- prefix for all the files
- fs: FileSystem instance
- check: bool (=True)
- performs a touch at the location, to check for write access.
- Examples
- --------
- >>> fs = FileSystem(**parameters) # doctest: +SKIP
- >>> d = FSMap('my-data/path/', fs) # doctest: +SKIP
- or, more likely
- >>> d = fs.get_mapper('my-data/path/')
- >>> d['loc1'] = b'Hello World' # doctest: +SKIP
- >>> list(d.keys()) # doctest: +SKIP
- ['loc1']
- >>> d['loc1'] # doctest: +SKIP
- b'Hello World'
- """
- def __init__(self, root, fs, check=False, create=False, missing_exceptions=None):
- self.fs = fs
- self.root = fs._strip_protocol(root)
- self._root_key_to_str = fs._strip_protocol(posixpath.join(root, "x"))[:-1]
- if missing_exceptions is None:
- missing_exceptions = (
- FileNotFoundError,
- IsADirectoryError,
- NotADirectoryError,
- )
- self.missing_exceptions = missing_exceptions
- self.check = check
- self.create = create
- if create:
- if not self.fs.exists(root):
- self.fs.mkdir(root)
- if check:
- if not self.fs.exists(root):
- raise ValueError(
- f"Path {root} does not exist. Create "
- f" with the ``create=True`` keyword"
- )
- self.fs.touch(root + "/a")
- self.fs.rm(root + "/a")
- @cached_property
- def dirfs(self):
- """dirfs instance that can be used with the same keys as the mapper"""
- from .implementations.dirfs import DirFileSystem
- return DirFileSystem(path=self._root_key_to_str, fs=self.fs)
- def clear(self):
- """Remove all keys below root - empties out mapping"""
- logger.info("Clear mapping at %s", self.root)
- try:
- self.fs.rm(self.root, True)
- self.fs.mkdir(self.root)
- except: # noqa: E722
- pass
- def getitems(self, keys, on_error="raise"):
- """Fetch multiple items from the store
- If the backend is async-able, this might proceed concurrently
- Parameters
- ----------
- keys: list(str)
- They keys to be fetched
- on_error : "raise", "omit", "return"
- If raise, an underlying exception will be raised (converted to KeyError
- if the type is in self.missing_exceptions); if omit, keys with exception
- will simply not be included in the output; if "return", all keys are
- included in the output, but the value will be bytes or an exception
- instance.
- Returns
- -------
- dict(key, bytes|exception)
- """
- keys2 = [self._key_to_str(k) for k in keys]
- oe = on_error if on_error == "raise" else "return"
- try:
- out = self.fs.cat(keys2, on_error=oe)
- if isinstance(out, bytes):
- out = {keys2[0]: out}
- except self.missing_exceptions as e:
- raise KeyError from e
- out = {
- k: (KeyError() if isinstance(v, self.missing_exceptions) else v)
- for k, v in out.items()
- }
- return {
- key: out[k2] if on_error == "raise" else out.get(k2, KeyError(k2))
- for key, k2 in zip(keys, keys2)
- if on_error == "return" or not isinstance(out[k2], BaseException)
- }
- def setitems(self, values_dict):
- """Set the values of multiple items in the store
- Parameters
- ----------
- values_dict: dict(str, bytes)
- """
- values = {self._key_to_str(k): maybe_convert(v) for k, v in values_dict.items()}
- self.fs.pipe(values)
- def delitems(self, keys):
- """Remove multiple keys from the store"""
- self.fs.rm([self._key_to_str(k) for k in keys])
- def _key_to_str(self, key):
- """Generate full path for the key"""
- if not isinstance(key, str):
- # raise TypeError("key must be of type `str`, got `{type(key).__name__}`"
- warnings.warn(
- "from fsspec 2023.5 onward FSMap non-str keys will raise TypeError",
- DeprecationWarning,
- )
- if isinstance(key, list):
- key = tuple(key)
- key = str(key)
- return f"{self._root_key_to_str}{key}".rstrip("/")
- def _str_to_key(self, s):
- """Strip path of to leave key name"""
- return s[len(self.root) :].lstrip("/")
- def __getitem__(self, key, default=None):
- """Retrieve data"""
- k = self._key_to_str(key)
- try:
- result = self.fs.cat(k)
- except self.missing_exceptions as exc:
- if default is not None:
- return default
- raise KeyError(key) from exc
- return result
- def pop(self, key, default=None):
- """Pop data"""
- result = self.__getitem__(key, default)
- try:
- del self[key]
- except KeyError:
- pass
- return result
- def __setitem__(self, key, value):
- """Store value in key"""
- key = self._key_to_str(key)
- self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
- self.fs.pipe_file(key, maybe_convert(value))
- def __iter__(self):
- return (self._str_to_key(x) for x in self.fs.find(self.root))
- def __len__(self):
- return len(self.fs.find(self.root))
- def __delitem__(self, key):
- """Remove key"""
- try:
- self.fs.rm(self._key_to_str(key))
- except Exception as exc:
- raise KeyError from exc
- def __contains__(self, key):
- """Does key exist in mapping?"""
- path = self._key_to_str(key)
- return self.fs.isfile(path)
- def __reduce__(self):
- return FSMap, (self.root, self.fs, False, False, self.missing_exceptions)
- def maybe_convert(value):
- if isinstance(value, array.array) or hasattr(value, "__array__"):
- # bytes-like things
- if hasattr(value, "dtype") and value.dtype.kind in "Mm":
- # The buffer interface doesn't support datetime64/timdelta64 numpy
- # arrays
- value = value.view("int64")
- value = bytes(memoryview(value))
- return value
- def get_mapper(
- url="",
- check=False,
- create=False,
- missing_exceptions=None,
- alternate_root=None,
- **kwargs,
- ):
- """Create key-value interface for given URL and options
- The URL will be of the form "protocol://location" and point to the root
- of the mapper required. All keys will be file-names below this location,
- and their values the contents of each key.
- Also accepts compound URLs like zip::s3://bucket/file.zip , see ``fsspec.open``.
- Parameters
- ----------
- url: str
- Root URL of mapping
- check: bool
- Whether to attempt to read from the location before instantiation, to
- check that the mapping does exist
- create: bool
- Whether to make the directory corresponding to the root before
- instantiating
- missing_exceptions: None or tuple
- If given, these exception types will be regarded as missing keys and
- return KeyError when trying to read data. By default, you get
- (FileNotFoundError, IsADirectoryError, NotADirectoryError)
- alternate_root: None or str
- In cases of complex URLs, the parser may fail to pick the correct part
- for the mapper root, so this arg can override
- Returns
- -------
- ``FSMap`` instance, the dict-like key-value store.
- """
- # Removing protocol here - could defer to each open() on the backend
- fs, urlpath = url_to_fs(url, **kwargs)
- root = alternate_root if alternate_root is not None else urlpath
- return FSMap(root, fs, check, create, missing_exceptions=missing_exceptions)
|