123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- from .. import filesystem
- from ..asyn import AsyncFileSystem
- class DirFileSystem(AsyncFileSystem):
- """Directory prefix filesystem
- The DirFileSystem is a filesystem-wrapper. It assumes every path it is dealing with
- is relative to the `path`. After performing the necessary paths operation it
- delegates everything to the wrapped filesystem.
- """
- protocol = "dir"
- def __init__(
- self,
- path=None,
- fs=None,
- fo=None,
- target_protocol=None,
- target_options=None,
- **storage_options,
- ):
- """
- Parameters
- ----------
- path: str
- Path to the directory.
- fs: AbstractFileSystem
- An instantiated filesystem to wrap.
- target_protocol, target_options:
- if fs is none, construct it from these
- fo: str
- Alternate for path; do not provide both
- """
- super().__init__(**storage_options)
- if fs is None:
- fs = filesystem(protocol=target_protocol, **(target_options or {}))
- path = path or fo
- if self.asynchronous and not fs.async_impl:
- raise ValueError("can't use asynchronous with non-async fs")
- if fs.async_impl and self.asynchronous != fs.asynchronous:
- raise ValueError("both dirfs and fs should be in the same sync/async mode")
- self.path = fs._strip_protocol(path)
- self.fs = fs
- def _join(self, path):
- if isinstance(path, str):
- if not self.path:
- return path
- if not path:
- return self.path
- return self.fs.sep.join((self.path, self._strip_protocol(path)))
- if isinstance(path, dict):
- return {self._join(_path): value for _path, value in path.items()}
- return [self._join(_path) for _path in path]
- def _relpath(self, path):
- if isinstance(path, str):
- if not self.path:
- return path
- # We need to account for S3FileSystem returning paths that do not
- # start with a '/'
- if path == self.path or (
- self.path.startswith(self.fs.sep) and path == self.path[1:]
- ):
- return ""
- prefix = self.path + self.fs.sep
- if self.path.startswith(self.fs.sep) and not path.startswith(self.fs.sep):
- prefix = prefix[1:]
- assert path.startswith(prefix)
- return path[len(prefix) :]
- return [self._relpath(_path) for _path in path]
- # Wrappers below
- @property
- def sep(self):
- return self.fs.sep
- async def set_session(self, *args, **kwargs):
- return await self.fs.set_session(*args, **kwargs)
- async def _rm_file(self, path, **kwargs):
- return await self.fs._rm_file(self._join(path), **kwargs)
- def rm_file(self, path, **kwargs):
- return self.fs.rm_file(self._join(path), **kwargs)
- async def _rm(self, path, *args, **kwargs):
- return await self.fs._rm(self._join(path), *args, **kwargs)
- def rm(self, path, *args, **kwargs):
- return self.fs.rm(self._join(path), *args, **kwargs)
- async def _cp_file(self, path1, path2, **kwargs):
- return await self.fs._cp_file(self._join(path1), self._join(path2), **kwargs)
- def cp_file(self, path1, path2, **kwargs):
- return self.fs.cp_file(self._join(path1), self._join(path2), **kwargs)
- async def _copy(
- self,
- path1,
- path2,
- *args,
- **kwargs,
- ):
- return await self.fs._copy(
- self._join(path1),
- self._join(path2),
- *args,
- **kwargs,
- )
- def copy(self, path1, path2, *args, **kwargs):
- return self.fs.copy(
- self._join(path1),
- self._join(path2),
- *args,
- **kwargs,
- )
- async def _pipe(self, path, *args, **kwargs):
- return await self.fs._pipe(self._join(path), *args, **kwargs)
- def pipe(self, path, *args, **kwargs):
- return self.fs.pipe(self._join(path), *args, **kwargs)
- async def _pipe_file(self, path, *args, **kwargs):
- return await self.fs._pipe_file(self._join(path), *args, **kwargs)
- def pipe_file(self, path, *args, **kwargs):
- return self.fs.pipe_file(self._join(path), *args, **kwargs)
- async def _cat_file(self, path, *args, **kwargs):
- return await self.fs._cat_file(self._join(path), *args, **kwargs)
- def cat_file(self, path, *args, **kwargs):
- return self.fs.cat_file(self._join(path), *args, **kwargs)
- async def _cat(self, path, *args, **kwargs):
- ret = await self.fs._cat(
- self._join(path),
- *args,
- **kwargs,
- )
- if isinstance(ret, dict):
- return {self._relpath(key): value for key, value in ret.items()}
- return ret
- def cat(self, path, *args, **kwargs):
- ret = self.fs.cat(
- self._join(path),
- *args,
- **kwargs,
- )
- if isinstance(ret, dict):
- return {self._relpath(key): value for key, value in ret.items()}
- return ret
- async def _put_file(self, lpath, rpath, **kwargs):
- return await self.fs._put_file(lpath, self._join(rpath), **kwargs)
- def put_file(self, lpath, rpath, **kwargs):
- return self.fs.put_file(lpath, self._join(rpath), **kwargs)
- async def _put(
- self,
- lpath,
- rpath,
- *args,
- **kwargs,
- ):
- return await self.fs._put(
- lpath,
- self._join(rpath),
- *args,
- **kwargs,
- )
- def put(self, lpath, rpath, *args, **kwargs):
- return self.fs.put(
- lpath,
- self._join(rpath),
- *args,
- **kwargs,
- )
- async def _get_file(self, rpath, lpath, **kwargs):
- return await self.fs._get_file(self._join(rpath), lpath, **kwargs)
- def get_file(self, rpath, lpath, **kwargs):
- return self.fs.get_file(self._join(rpath), lpath, **kwargs)
- async def _get(self, rpath, *args, **kwargs):
- return await self.fs._get(self._join(rpath), *args, **kwargs)
- def get(self, rpath, *args, **kwargs):
- return self.fs.get(self._join(rpath), *args, **kwargs)
- async def _isfile(self, path):
- return await self.fs._isfile(self._join(path))
- def isfile(self, path):
- return self.fs.isfile(self._join(path))
- async def _isdir(self, path):
- return await self.fs._isdir(self._join(path))
- def isdir(self, path):
- return self.fs.isdir(self._join(path))
- async def _size(self, path):
- return await self.fs._size(self._join(path))
- def size(self, path):
- return self.fs.size(self._join(path))
- async def _exists(self, path):
- return await self.fs._exists(self._join(path))
- def exists(self, path):
- return self.fs.exists(self._join(path))
- async def _info(self, path, **kwargs):
- info = await self.fs._info(self._join(path), **kwargs)
- info = info.copy()
- info["name"] = self._relpath(info["name"])
- return info
- def info(self, path, **kwargs):
- info = self.fs.info(self._join(path), **kwargs)
- info = info.copy()
- info["name"] = self._relpath(info["name"])
- return info
- async def _ls(self, path, detail=True, **kwargs):
- ret = (await self.fs._ls(self._join(path), detail=detail, **kwargs)).copy()
- if detail:
- out = []
- for entry in ret:
- entry = entry.copy()
- entry["name"] = self._relpath(entry["name"])
- out.append(entry)
- return out
- return self._relpath(ret)
- def ls(self, path, detail=True, **kwargs):
- ret = self.fs.ls(self._join(path), detail=detail, **kwargs).copy()
- if detail:
- out = []
- for entry in ret:
- entry = entry.copy()
- entry["name"] = self._relpath(entry["name"])
- out.append(entry)
- return out
- return self._relpath(ret)
- async def _walk(self, path, *args, **kwargs):
- async for root, dirs, files in self.fs._walk(self._join(path), *args, **kwargs):
- yield self._relpath(root), dirs, files
- def walk(self, path, *args, **kwargs):
- for root, dirs, files in self.fs.walk(self._join(path), *args, **kwargs):
- yield self._relpath(root), dirs, files
- async def _glob(self, path, **kwargs):
- detail = kwargs.get("detail", False)
- ret = await self.fs._glob(self._join(path), **kwargs)
- if detail:
- return {self._relpath(path): info for path, info in ret.items()}
- return self._relpath(ret)
- def glob(self, path, **kwargs):
- detail = kwargs.get("detail", False)
- ret = self.fs.glob(self._join(path), **kwargs)
- if detail:
- return {self._relpath(path): info for path, info in ret.items()}
- return self._relpath(ret)
- async def _du(self, path, *args, **kwargs):
- total = kwargs.get("total", True)
- ret = await self.fs._du(self._join(path), *args, **kwargs)
- if total:
- return ret
- return {self._relpath(path): size for path, size in ret.items()}
- def du(self, path, *args, **kwargs):
- total = kwargs.get("total", True)
- ret = self.fs.du(self._join(path), *args, **kwargs)
- if total:
- return ret
- return {self._relpath(path): size for path, size in ret.items()}
- async def _find(self, path, *args, **kwargs):
- detail = kwargs.get("detail", False)
- ret = await self.fs._find(self._join(path), *args, **kwargs)
- if detail:
- return {self._relpath(path): info for path, info in ret.items()}
- return self._relpath(ret)
- def find(self, path, *args, **kwargs):
- detail = kwargs.get("detail", False)
- ret = self.fs.find(self._join(path), *args, **kwargs)
- if detail:
- return {self._relpath(path): info for path, info in ret.items()}
- return self._relpath(ret)
- async def _expand_path(self, path, *args, **kwargs):
- return self._relpath(
- await self.fs._expand_path(self._join(path), *args, **kwargs)
- )
- def expand_path(self, path, *args, **kwargs):
- return self._relpath(self.fs.expand_path(self._join(path), *args, **kwargs))
- async def _mkdir(self, path, *args, **kwargs):
- return await self.fs._mkdir(self._join(path), *args, **kwargs)
- def mkdir(self, path, *args, **kwargs):
- return self.fs.mkdir(self._join(path), *args, **kwargs)
- async def _makedirs(self, path, *args, **kwargs):
- return await self.fs._makedirs(self._join(path), *args, **kwargs)
- def makedirs(self, path, *args, **kwargs):
- return self.fs.makedirs(self._join(path), *args, **kwargs)
- def rmdir(self, path):
- return self.fs.rmdir(self._join(path))
- def mv(self, path1, path2, **kwargs):
- return self.fs.mv(
- self._join(path1),
- self._join(path2),
- **kwargs,
- )
- def touch(self, path, **kwargs):
- return self.fs.touch(self._join(path), **kwargs)
- def created(self, path):
- return self.fs.created(self._join(path))
- def modified(self, path):
- return self.fs.modified(self._join(path))
- def sign(self, path, *args, **kwargs):
- return self.fs.sign(self._join(path), *args, **kwargs)
- def __repr__(self):
- return f"{self.__class__.__qualname__}(path='{self.path}', fs={self.fs})"
- def open(
- self,
- path,
- *args,
- **kwargs,
- ):
- return self.fs.open(
- self._join(path),
- *args,
- **kwargs,
- )
- async def open_async(
- self,
- path,
- *args,
- **kwargs,
- ):
- return await self.fs.open_async(
- self._join(path),
- *args,
- **kwargs,
- )
|