dirfs.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. from .. import filesystem
  2. from ..asyn import AsyncFileSystem
  3. class DirFileSystem(AsyncFileSystem):
  4. """Directory prefix filesystem
  5. The DirFileSystem is a filesystem-wrapper. It assumes every path it is dealing with
  6. is relative to the `path`. After performing the necessary paths operation it
  7. delegates everything to the wrapped filesystem.
  8. """
  9. protocol = "dir"
  10. def __init__(
  11. self,
  12. path=None,
  13. fs=None,
  14. fo=None,
  15. target_protocol=None,
  16. target_options=None,
  17. **storage_options,
  18. ):
  19. """
  20. Parameters
  21. ----------
  22. path: str
  23. Path to the directory.
  24. fs: AbstractFileSystem
  25. An instantiated filesystem to wrap.
  26. target_protocol, target_options:
  27. if fs is none, construct it from these
  28. fo: str
  29. Alternate for path; do not provide both
  30. """
  31. super().__init__(**storage_options)
  32. if fs is None:
  33. fs = filesystem(protocol=target_protocol, **(target_options or {}))
  34. path = path or fo
  35. if self.asynchronous and not fs.async_impl:
  36. raise ValueError("can't use asynchronous with non-async fs")
  37. if fs.async_impl and self.asynchronous != fs.asynchronous:
  38. raise ValueError("both dirfs and fs should be in the same sync/async mode")
  39. self.path = fs._strip_protocol(path)
  40. self.fs = fs
  41. def _join(self, path):
  42. if isinstance(path, str):
  43. if not self.path:
  44. return path
  45. if not path:
  46. return self.path
  47. return self.fs.sep.join((self.path, self._strip_protocol(path)))
  48. if isinstance(path, dict):
  49. return {self._join(_path): value for _path, value in path.items()}
  50. return [self._join(_path) for _path in path]
  51. def _relpath(self, path):
  52. if isinstance(path, str):
  53. if not self.path:
  54. return path
  55. # We need to account for S3FileSystem returning paths that do not
  56. # start with a '/'
  57. if path == self.path or (
  58. self.path.startswith(self.fs.sep) and path == self.path[1:]
  59. ):
  60. return ""
  61. prefix = self.path + self.fs.sep
  62. if self.path.startswith(self.fs.sep) and not path.startswith(self.fs.sep):
  63. prefix = prefix[1:]
  64. assert path.startswith(prefix)
  65. return path[len(prefix) :]
  66. return [self._relpath(_path) for _path in path]
  67. # Wrappers below
  68. @property
  69. def sep(self):
  70. return self.fs.sep
  71. async def set_session(self, *args, **kwargs):
  72. return await self.fs.set_session(*args, **kwargs)
  73. async def _rm_file(self, path, **kwargs):
  74. return await self.fs._rm_file(self._join(path), **kwargs)
  75. def rm_file(self, path, **kwargs):
  76. return self.fs.rm_file(self._join(path), **kwargs)
  77. async def _rm(self, path, *args, **kwargs):
  78. return await self.fs._rm(self._join(path), *args, **kwargs)
  79. def rm(self, path, *args, **kwargs):
  80. return self.fs.rm(self._join(path), *args, **kwargs)
  81. async def _cp_file(self, path1, path2, **kwargs):
  82. return await self.fs._cp_file(self._join(path1), self._join(path2), **kwargs)
  83. def cp_file(self, path1, path2, **kwargs):
  84. return self.fs.cp_file(self._join(path1), self._join(path2), **kwargs)
  85. async def _copy(
  86. self,
  87. path1,
  88. path2,
  89. *args,
  90. **kwargs,
  91. ):
  92. return await self.fs._copy(
  93. self._join(path1),
  94. self._join(path2),
  95. *args,
  96. **kwargs,
  97. )
  98. def copy(self, path1, path2, *args, **kwargs):
  99. return self.fs.copy(
  100. self._join(path1),
  101. self._join(path2),
  102. *args,
  103. **kwargs,
  104. )
  105. async def _pipe(self, path, *args, **kwargs):
  106. return await self.fs._pipe(self._join(path), *args, **kwargs)
  107. def pipe(self, path, *args, **kwargs):
  108. return self.fs.pipe(self._join(path), *args, **kwargs)
  109. async def _pipe_file(self, path, *args, **kwargs):
  110. return await self.fs._pipe_file(self._join(path), *args, **kwargs)
  111. def pipe_file(self, path, *args, **kwargs):
  112. return self.fs.pipe_file(self._join(path), *args, **kwargs)
  113. async def _cat_file(self, path, *args, **kwargs):
  114. return await self.fs._cat_file(self._join(path), *args, **kwargs)
  115. def cat_file(self, path, *args, **kwargs):
  116. return self.fs.cat_file(self._join(path), *args, **kwargs)
  117. async def _cat(self, path, *args, **kwargs):
  118. ret = await self.fs._cat(
  119. self._join(path),
  120. *args,
  121. **kwargs,
  122. )
  123. if isinstance(ret, dict):
  124. return {self._relpath(key): value for key, value in ret.items()}
  125. return ret
  126. def cat(self, path, *args, **kwargs):
  127. ret = self.fs.cat(
  128. self._join(path),
  129. *args,
  130. **kwargs,
  131. )
  132. if isinstance(ret, dict):
  133. return {self._relpath(key): value for key, value in ret.items()}
  134. return ret
  135. async def _put_file(self, lpath, rpath, **kwargs):
  136. return await self.fs._put_file(lpath, self._join(rpath), **kwargs)
  137. def put_file(self, lpath, rpath, **kwargs):
  138. return self.fs.put_file(lpath, self._join(rpath), **kwargs)
  139. async def _put(
  140. self,
  141. lpath,
  142. rpath,
  143. *args,
  144. **kwargs,
  145. ):
  146. return await self.fs._put(
  147. lpath,
  148. self._join(rpath),
  149. *args,
  150. **kwargs,
  151. )
  152. def put(self, lpath, rpath, *args, **kwargs):
  153. return self.fs.put(
  154. lpath,
  155. self._join(rpath),
  156. *args,
  157. **kwargs,
  158. )
  159. async def _get_file(self, rpath, lpath, **kwargs):
  160. return await self.fs._get_file(self._join(rpath), lpath, **kwargs)
  161. def get_file(self, rpath, lpath, **kwargs):
  162. return self.fs.get_file(self._join(rpath), lpath, **kwargs)
  163. async def _get(self, rpath, *args, **kwargs):
  164. return await self.fs._get(self._join(rpath), *args, **kwargs)
  165. def get(self, rpath, *args, **kwargs):
  166. return self.fs.get(self._join(rpath), *args, **kwargs)
  167. async def _isfile(self, path):
  168. return await self.fs._isfile(self._join(path))
  169. def isfile(self, path):
  170. return self.fs.isfile(self._join(path))
  171. async def _isdir(self, path):
  172. return await self.fs._isdir(self._join(path))
  173. def isdir(self, path):
  174. return self.fs.isdir(self._join(path))
  175. async def _size(self, path):
  176. return await self.fs._size(self._join(path))
  177. def size(self, path):
  178. return self.fs.size(self._join(path))
  179. async def _exists(self, path):
  180. return await self.fs._exists(self._join(path))
  181. def exists(self, path):
  182. return self.fs.exists(self._join(path))
  183. async def _info(self, path, **kwargs):
  184. info = await self.fs._info(self._join(path), **kwargs)
  185. info = info.copy()
  186. info["name"] = self._relpath(info["name"])
  187. return info
  188. def info(self, path, **kwargs):
  189. info = self.fs.info(self._join(path), **kwargs)
  190. info = info.copy()
  191. info["name"] = self._relpath(info["name"])
  192. return info
  193. async def _ls(self, path, detail=True, **kwargs):
  194. ret = (await self.fs._ls(self._join(path), detail=detail, **kwargs)).copy()
  195. if detail:
  196. out = []
  197. for entry in ret:
  198. entry = entry.copy()
  199. entry["name"] = self._relpath(entry["name"])
  200. out.append(entry)
  201. return out
  202. return self._relpath(ret)
  203. def ls(self, path, detail=True, **kwargs):
  204. ret = self.fs.ls(self._join(path), detail=detail, **kwargs).copy()
  205. if detail:
  206. out = []
  207. for entry in ret:
  208. entry = entry.copy()
  209. entry["name"] = self._relpath(entry["name"])
  210. out.append(entry)
  211. return out
  212. return self._relpath(ret)
  213. async def _walk(self, path, *args, **kwargs):
  214. async for root, dirs, files in self.fs._walk(self._join(path), *args, **kwargs):
  215. yield self._relpath(root), dirs, files
  216. def walk(self, path, *args, **kwargs):
  217. for root, dirs, files in self.fs.walk(self._join(path), *args, **kwargs):
  218. yield self._relpath(root), dirs, files
  219. async def _glob(self, path, **kwargs):
  220. detail = kwargs.get("detail", False)
  221. ret = await self.fs._glob(self._join(path), **kwargs)
  222. if detail:
  223. return {self._relpath(path): info for path, info in ret.items()}
  224. return self._relpath(ret)
  225. def glob(self, path, **kwargs):
  226. detail = kwargs.get("detail", False)
  227. ret = self.fs.glob(self._join(path), **kwargs)
  228. if detail:
  229. return {self._relpath(path): info for path, info in ret.items()}
  230. return self._relpath(ret)
  231. async def _du(self, path, *args, **kwargs):
  232. total = kwargs.get("total", True)
  233. ret = await self.fs._du(self._join(path), *args, **kwargs)
  234. if total:
  235. return ret
  236. return {self._relpath(path): size for path, size in ret.items()}
  237. def du(self, path, *args, **kwargs):
  238. total = kwargs.get("total", True)
  239. ret = self.fs.du(self._join(path), *args, **kwargs)
  240. if total:
  241. return ret
  242. return {self._relpath(path): size for path, size in ret.items()}
  243. async def _find(self, path, *args, **kwargs):
  244. detail = kwargs.get("detail", False)
  245. ret = await self.fs._find(self._join(path), *args, **kwargs)
  246. if detail:
  247. return {self._relpath(path): info for path, info in ret.items()}
  248. return self._relpath(ret)
  249. def find(self, path, *args, **kwargs):
  250. detail = kwargs.get("detail", False)
  251. ret = self.fs.find(self._join(path), *args, **kwargs)
  252. if detail:
  253. return {self._relpath(path): info for path, info in ret.items()}
  254. return self._relpath(ret)
  255. async def _expand_path(self, path, *args, **kwargs):
  256. return self._relpath(
  257. await self.fs._expand_path(self._join(path), *args, **kwargs)
  258. )
  259. def expand_path(self, path, *args, **kwargs):
  260. return self._relpath(self.fs.expand_path(self._join(path), *args, **kwargs))
  261. async def _mkdir(self, path, *args, **kwargs):
  262. return await self.fs._mkdir(self._join(path), *args, **kwargs)
  263. def mkdir(self, path, *args, **kwargs):
  264. return self.fs.mkdir(self._join(path), *args, **kwargs)
  265. async def _makedirs(self, path, *args, **kwargs):
  266. return await self.fs._makedirs(self._join(path), *args, **kwargs)
  267. def makedirs(self, path, *args, **kwargs):
  268. return self.fs.makedirs(self._join(path), *args, **kwargs)
  269. def rmdir(self, path):
  270. return self.fs.rmdir(self._join(path))
  271. def mv(self, path1, path2, **kwargs):
  272. return self.fs.mv(
  273. self._join(path1),
  274. self._join(path2),
  275. **kwargs,
  276. )
  277. def touch(self, path, **kwargs):
  278. return self.fs.touch(self._join(path), **kwargs)
  279. def created(self, path):
  280. return self.fs.created(self._join(path))
  281. def modified(self, path):
  282. return self.fs.modified(self._join(path))
  283. def sign(self, path, *args, **kwargs):
  284. return self.fs.sign(self._join(path), *args, **kwargs)
  285. def __repr__(self):
  286. return f"{self.__class__.__qualname__}(path='{self.path}', fs={self.fs})"
  287. def open(
  288. self,
  289. path,
  290. *args,
  291. **kwargs,
  292. ):
  293. return self.fs.open(
  294. self._join(path),
  295. *args,
  296. **kwargs,
  297. )
  298. async def open_async(
  299. self,
  300. path,
  301. *args,
  302. **kwargs,
  303. ):
  304. return await self.fs.open_async(
  305. self._join(path),
  306. *args,
  307. **kwargs,
  308. )