archive.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import operator
  2. from fsspec import AbstractFileSystem
  3. from fsspec.utils import tokenize
  4. class AbstractArchiveFileSystem(AbstractFileSystem):
  5. """
  6. A generic superclass for implementing Archive-based filesystems.
  7. Currently, it is shared amongst
  8. :class:`~fsspec.implementations.zip.ZipFileSystem`,
  9. :class:`~fsspec.implementations.libarchive.LibArchiveFileSystem` and
  10. :class:`~fsspec.implementations.tar.TarFileSystem`.
  11. """
  12. def __str__(self):
  13. return f"<Archive-like object {type(self).__name__} at {id(self)}>"
  14. __repr__ = __str__
  15. def ukey(self, path):
  16. return tokenize(path, self.fo, self.protocol)
  17. def _all_dirnames(self, paths):
  18. """Returns *all* directory names for each path in paths, including intermediate
  19. ones.
  20. Parameters
  21. ----------
  22. paths: Iterable of path strings
  23. """
  24. if len(paths) == 0:
  25. return set()
  26. dirnames = {self._parent(path) for path in paths} - {self.root_marker}
  27. return dirnames | self._all_dirnames(dirnames)
  28. def info(self, path, **kwargs):
  29. self._get_dirs()
  30. path = self._strip_protocol(path)
  31. if path in {"", "/"} and self.dir_cache:
  32. return {"name": "", "type": "directory", "size": 0}
  33. if path in self.dir_cache:
  34. return self.dir_cache[path]
  35. elif path + "/" in self.dir_cache:
  36. return self.dir_cache[path + "/"]
  37. else:
  38. raise FileNotFoundError(path)
  39. def ls(self, path, detail=True, **kwargs):
  40. self._get_dirs()
  41. paths = {}
  42. for p, f in self.dir_cache.items():
  43. p = p.rstrip("/")
  44. if "/" in p:
  45. root = p.rsplit("/", 1)[0]
  46. else:
  47. root = ""
  48. if root == path.rstrip("/"):
  49. paths[p] = f
  50. elif all(
  51. (a == b)
  52. for a, b in zip(path.split("/"), [""] + p.strip("/").split("/"))
  53. ):
  54. # root directory entry
  55. ppath = p.rstrip("/").split("/", 1)[0]
  56. if ppath not in paths:
  57. out = {"name": ppath, "size": 0, "type": "directory"}
  58. paths[ppath] = out
  59. if detail:
  60. out = sorted(paths.values(), key=operator.itemgetter("name"))
  61. return out
  62. else:
  63. return sorted(paths)