123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- from __future__ import annotations
- import logging
- from datetime import datetime, timezone
- from errno import ENOTEMPTY
- from io import BytesIO
- from pathlib import PurePath, PureWindowsPath
- from typing import Any, ClassVar
- from fsspec import AbstractFileSystem
- from fsspec.implementations.local import LocalFileSystem
- from fsspec.utils import stringify_path
- logger = logging.getLogger("fsspec.memoryfs")
- class MemoryFileSystem(AbstractFileSystem):
- """A filesystem based on a dict of BytesIO objects
- This is a global filesystem so instances of this class all point to the same
- in memory filesystem.
- """
- store: ClassVar[dict[str, Any]] = {} # global, do not overwrite!
- pseudo_dirs = [""] # global, do not overwrite!
- protocol = "memory"
- root_marker = "/"
- @classmethod
- def _strip_protocol(cls, path):
- if isinstance(path, PurePath):
- if isinstance(path, PureWindowsPath):
- return LocalFileSystem._strip_protocol(path)
- else:
- path = stringify_path(path)
- if path.startswith("memory://"):
- path = path[len("memory://") :]
- if "::" in path or "://" in path:
- return path.rstrip("/")
- path = path.lstrip("/").rstrip("/")
- return "/" + path if path else ""
- def ls(self, path, detail=True, **kwargs):
- path = self._strip_protocol(path)
- if path in self.store:
- # there is a key with this exact name
- if not detail:
- return [path]
- return [
- {
- "name": path,
- "size": self.store[path].size,
- "type": "file",
- "created": self.store[path].created.timestamp(),
- }
- ]
- paths = set()
- starter = path + "/"
- out = []
- for p2 in tuple(self.store):
- if p2.startswith(starter):
- if "/" not in p2[len(starter) :]:
- # exact child
- out.append(
- {
- "name": p2,
- "size": self.store[p2].size,
- "type": "file",
- "created": self.store[p2].created.timestamp(),
- }
- )
- elif len(p2) > len(starter):
- # implied child directory
- ppath = starter + p2[len(starter) :].split("/", 1)[0]
- if ppath not in paths:
- out = out or []
- out.append(
- {
- "name": ppath,
- "size": 0,
- "type": "directory",
- }
- )
- paths.add(ppath)
- for p2 in self.pseudo_dirs:
- if p2.startswith(starter):
- if "/" not in p2[len(starter) :]:
- # exact child pdir
- if p2 not in paths:
- out.append({"name": p2, "size": 0, "type": "directory"})
- paths.add(p2)
- else:
- # directory implied by deeper pdir
- ppath = starter + p2[len(starter) :].split("/", 1)[0]
- if ppath not in paths:
- out.append({"name": ppath, "size": 0, "type": "directory"})
- paths.add(ppath)
- if not out:
- if path in self.pseudo_dirs:
- # empty dir
- return []
- raise FileNotFoundError(path)
- if detail:
- return out
- return sorted([f["name"] for f in out])
- def mkdir(self, path, create_parents=True, **kwargs):
- path = self._strip_protocol(path)
- if path in self.store or path in self.pseudo_dirs:
- raise FileExistsError(path)
- if self._parent(path).strip("/") and self.isfile(self._parent(path)):
- raise NotADirectoryError(self._parent(path))
- if create_parents and self._parent(path).strip("/"):
- try:
- self.mkdir(self._parent(path), create_parents, **kwargs)
- except FileExistsError:
- pass
- if path and path not in self.pseudo_dirs:
- self.pseudo_dirs.append(path)
- def makedirs(self, path, exist_ok=False):
- try:
- self.mkdir(path, create_parents=True)
- except FileExistsError:
- if not exist_ok:
- raise
- def pipe_file(self, path, value, mode="overwrite", **kwargs):
- """Set the bytes of given file
- Avoids copies of the data if possible
- """
- mode = "xb" if mode == "create" else "wb"
- self.open(path, mode=mode, data=value)
- def rmdir(self, path):
- path = self._strip_protocol(path)
- if path == "":
- # silently avoid deleting FS root
- return
- if path in self.pseudo_dirs:
- if not self.ls(path):
- self.pseudo_dirs.remove(path)
- else:
- raise OSError(ENOTEMPTY, "Directory not empty", path)
- else:
- raise FileNotFoundError(path)
- def info(self, path, **kwargs):
- logger.debug("info: %s", path)
- path = self._strip_protocol(path)
- if path in self.pseudo_dirs or any(
- p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs
- ):
- return {
- "name": path,
- "size": 0,
- "type": "directory",
- }
- elif path in self.store:
- filelike = self.store[path]
- return {
- "name": path,
- "size": filelike.size,
- "type": "file",
- "created": getattr(filelike, "created", None),
- }
- else:
- raise FileNotFoundError(path)
- def _open(
- self,
- path,
- mode="rb",
- block_size=None,
- autocommit=True,
- cache_options=None,
- **kwargs,
- ):
- path = self._strip_protocol(path)
- if "x" in mode and self.exists(path):
- raise FileExistsError
- if path in self.pseudo_dirs:
- raise IsADirectoryError(path)
- parent = path
- while len(parent) > 1:
- parent = self._parent(parent)
- if self.isfile(parent):
- raise FileExistsError(parent)
- if mode in ["rb", "ab", "r+b"]:
- if path in self.store:
- f = self.store[path]
- if mode == "ab":
- # position at the end of file
- f.seek(0, 2)
- else:
- # position at the beginning of file
- f.seek(0)
- return f
- else:
- raise FileNotFoundError(path)
- elif mode in {"wb", "xb"}:
- if mode == "xb" and self.exists(path):
- raise FileExistsError
- m = MemoryFile(self, path, kwargs.get("data"))
- if not self._intrans:
- m.commit()
- return m
- else:
- name = self.__class__.__name__
- raise ValueError(f"unsupported file mode for {name}: {mode!r}")
- def cp_file(self, path1, path2, **kwargs):
- path1 = self._strip_protocol(path1)
- path2 = self._strip_protocol(path2)
- if self.isfile(path1):
- self.store[path2] = MemoryFile(
- self, path2, self.store[path1].getvalue()
- ) # implicit copy
- elif self.isdir(path1):
- if path2 not in self.pseudo_dirs:
- self.pseudo_dirs.append(path2)
- else:
- raise FileNotFoundError(path1)
- def cat_file(self, path, start=None, end=None, **kwargs):
- logger.debug("cat: %s", path)
- path = self._strip_protocol(path)
- try:
- return bytes(self.store[path].getbuffer()[start:end])
- except KeyError as e:
- raise FileNotFoundError(path) from e
- def _rm(self, path):
- path = self._strip_protocol(path)
- try:
- del self.store[path]
- except KeyError as e:
- raise FileNotFoundError(path) from e
- def modified(self, path):
- path = self._strip_protocol(path)
- try:
- return self.store[path].modified
- except KeyError as e:
- raise FileNotFoundError(path) from e
- def created(self, path):
- path = self._strip_protocol(path)
- try:
- return self.store[path].created
- except KeyError as e:
- raise FileNotFoundError(path) from e
- def isfile(self, path):
- path = self._strip_protocol(path)
- return path in self.store
- def rm(self, path, recursive=False, maxdepth=None):
- if isinstance(path, str):
- path = self._strip_protocol(path)
- else:
- path = [self._strip_protocol(p) for p in path]
- paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
- for p in reversed(paths):
- if self.isfile(p):
- self.rm_file(p)
- # If the expanded path doesn't exist, it is only because the expanded
- # path was a directory that does not exist in self.pseudo_dirs. This
- # is possible if you directly create files without making the
- # directories first.
- elif not self.exists(p):
- continue
- else:
- self.rmdir(p)
- class MemoryFile(BytesIO):
- """A BytesIO which can't close and works as a context manager
- Can initialise with data. Each path should only be active once at any moment.
- No need to provide fs, path if auto-committing (default)
- """
- def __init__(self, fs=None, path=None, data=None):
- logger.debug("open file %s", path)
- self.fs = fs
- self.path = path
- self.created = datetime.now(tz=timezone.utc)
- self.modified = datetime.now(tz=timezone.utc)
- if data:
- super().__init__(data)
- self.seek(0)
- @property
- def size(self):
- return self.getbuffer().nbytes
- def __enter__(self):
- return self
- def close(self):
- pass
- def discard(self):
- pass
- def commit(self):
- self.fs.store[self.path] = self
- self.modified = datetime.now(tz=timezone.utc)
|