123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- import os
- import zipfile
- import fsspec
- from fsspec.archive import AbstractArchiveFileSystem
- class ZipFileSystem(AbstractArchiveFileSystem):
- """Read/Write contents of ZIP archive as a file-system
- Keeps file object open while instance lives.
- This class is pickleable, but not necessarily thread-safe
- """
- root_marker = ""
- protocol = "zip"
- cachable = False
- def __init__(
- self,
- fo="",
- mode="r",
- target_protocol=None,
- target_options=None,
- compression=zipfile.ZIP_STORED,
- allowZip64=True,
- compresslevel=None,
- **kwargs,
- ):
- """
- Parameters
- ----------
- fo: str or file-like
- Contains ZIP, and must exist. If a str, will fetch file using
- :meth:`~fsspec.open_files`, which must return one file exactly.
- mode: str
- Accept: "r", "w", "a"
- target_protocol: str (optional)
- If ``fo`` is a string, this value can be used to override the
- FS protocol inferred from a URL
- target_options: dict (optional)
- Kwargs passed when instantiating the target FS, if ``fo`` is
- a string.
- compression, allowZip64, compresslevel: passed to ZipFile
- Only relevant when creating a ZIP
- """
- super().__init__(self, **kwargs)
- if mode not in set("rwa"):
- raise ValueError(f"mode '{mode}' no understood")
- self.mode = mode
- if isinstance(fo, (str, os.PathLike)):
- if mode == "a":
- m = "r+b"
- else:
- m = mode + "b"
- fo = fsspec.open(
- fo, mode=m, protocol=target_protocol, **(target_options or {})
- )
- self.force_zip_64 = allowZip64
- self.of = fo
- self.fo = fo.__enter__() # the whole instance is a context
- self.zip = zipfile.ZipFile(
- self.fo,
- mode=mode,
- compression=compression,
- allowZip64=allowZip64,
- compresslevel=compresslevel,
- )
- self.dir_cache = None
- @classmethod
- def _strip_protocol(cls, path):
- # zip file paths are always relative to the archive root
- return super()._strip_protocol(path).lstrip("/")
- def __del__(self):
- if hasattr(self, "zip"):
- self.close()
- del self.zip
- def close(self):
- """Commits any write changes to the file. Done on ``del`` too."""
- self.zip.close()
- def _get_dirs(self):
- if self.dir_cache is None or self.mode in set("wa"):
- # when writing, dir_cache is always in the ZipFile's attributes,
- # not read from the file.
- files = self.zip.infolist()
- self.dir_cache = {
- dirname.rstrip("/"): {
- "name": dirname.rstrip("/"),
- "size": 0,
- "type": "directory",
- }
- for dirname in self._all_dirnames(self.zip.namelist())
- }
- for z in files:
- f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__}
- f.update(
- {
- "name": z.filename.rstrip("/"),
- "size": z.file_size,
- "type": ("directory" if z.is_dir() else "file"),
- }
- )
- self.dir_cache[f["name"]] = f
- def pipe_file(self, path, value, **kwargs):
- # override upstream, because we know the exact file size in this case
- self.zip.writestr(path, value, **kwargs)
- def _open(
- self,
- path,
- mode="rb",
- block_size=None,
- autocommit=True,
- cache_options=None,
- **kwargs,
- ):
- path = self._strip_protocol(path)
- if "r" in mode and self.mode in set("wa"):
- if self.exists(path):
- raise OSError("ZipFS can only be open for reading or writing, not both")
- raise FileNotFoundError(path)
- if "r" in self.mode and "w" in mode:
- raise OSError("ZipFS can only be open for reading or writing, not both")
- out = self.zip.open(path, mode.strip("b"), force_zip64=self.force_zip_64)
- if "r" in mode:
- info = self.info(path)
- out.size = info["size"]
- out.name = info["name"]
- return out
- def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- # Remove the leading slash, as the zip file paths are always
- # given without a leading slash
- path = path.lstrip("/")
- path_parts = list(filter(lambda s: bool(s), path.split("/")))
- def _matching_starts(file_path):
- file_parts = filter(lambda s: bool(s), file_path.split("/"))
- return all(a == b for a, b in zip(path_parts, file_parts))
- self._get_dirs()
- result = {}
- # To match posix find, if an exact file name is given, we should
- # return only that file
- if path in self.dir_cache and self.dir_cache[path]["type"] == "file":
- result[path] = self.dir_cache[path]
- return result if detail else [path]
- for file_path, file_info in self.dir_cache.items():
- if not (path == "" or _matching_starts(file_path)):
- continue
- if file_info["type"] == "directory":
- if withdirs:
- if file_path not in result:
- result[file_path.strip("/")] = file_info
- continue
- if file_path not in result:
- result[file_path] = file_info if detail else None
- if maxdepth:
- path_depth = path.count("/")
- result = {
- k: v for k, v in result.items() if k.count("/") - path_depth < maxdepth
- }
- return result if detail else sorted(result)
|