__init__.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. """
  2. A Path-like interface for zipfiles.
  3. This codebase is shared between zipfile.Path in the stdlib
  4. and zipp in PyPI. See
  5. https://github.com/python/importlib_metadata/wiki/Development-Methodology
  6. for more detail.
  7. """
  8. import functools
  9. import io
  10. import itertools
  11. import pathlib
  12. import posixpath
  13. import re
  14. import stat
  15. import sys
  16. import zipfile
  17. from .compat.py310 import text_encoding
  18. from .glob import Translator
  19. from ._functools import save_method_args
  20. __all__ = ['Path']
  21. def _parents(path):
  22. """
  23. Given a path with elements separated by
  24. posixpath.sep, generate all parents of that path.
  25. >>> list(_parents('b/d'))
  26. ['b']
  27. >>> list(_parents('/b/d/'))
  28. ['/b']
  29. >>> list(_parents('b/d/f/'))
  30. ['b/d', 'b']
  31. >>> list(_parents('b'))
  32. []
  33. >>> list(_parents(''))
  34. []
  35. """
  36. return itertools.islice(_ancestry(path), 1, None)
  37. def _ancestry(path):
  38. """
  39. Given a path with elements separated by
  40. posixpath.sep, generate all elements of that path.
  41. >>> list(_ancestry('b/d'))
  42. ['b/d', 'b']
  43. >>> list(_ancestry('/b/d/'))
  44. ['/b/d', '/b']
  45. >>> list(_ancestry('b/d/f/'))
  46. ['b/d/f', 'b/d', 'b']
  47. >>> list(_ancestry('b'))
  48. ['b']
  49. >>> list(_ancestry(''))
  50. []
  51. Multiple separators are treated like a single.
  52. >>> list(_ancestry('//b//d///f//'))
  53. ['//b//d///f', '//b//d', '//b']
  54. """
  55. path = path.rstrip(posixpath.sep)
  56. while path.rstrip(posixpath.sep):
  57. yield path
  58. path, tail = posixpath.split(path)
  59. _dedupe = dict.fromkeys
  60. """Deduplicate an iterable in original order"""
  61. def _difference(minuend, subtrahend):
  62. """
  63. Return items in minuend not in subtrahend, retaining order
  64. with O(1) lookup.
  65. """
  66. return itertools.filterfalse(set(subtrahend).__contains__, minuend)
  67. class InitializedState:
  68. """
  69. Mix-in to save the initialization state for pickling.
  70. """
  71. @save_method_args
  72. def __init__(self, *args, **kwargs):
  73. super().__init__(*args, **kwargs)
  74. def __getstate__(self):
  75. return self._saved___init__.args, self._saved___init__.kwargs
  76. def __setstate__(self, state):
  77. args, kwargs = state
  78. super().__init__(*args, **kwargs)
  79. class CompleteDirs(InitializedState, zipfile.ZipFile):
  80. """
  81. A ZipFile subclass that ensures that implied directories
  82. are always included in the namelist.
  83. >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
  84. ['foo/', 'foo/bar/']
  85. >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
  86. ['foo/']
  87. """
  88. @staticmethod
  89. def _implied_dirs(names):
  90. parents = itertools.chain.from_iterable(map(_parents, names))
  91. as_dirs = (p + posixpath.sep for p in parents)
  92. return _dedupe(_difference(as_dirs, names))
  93. def namelist(self):
  94. names = super().namelist()
  95. return names + list(self._implied_dirs(names))
  96. def _name_set(self):
  97. return set(self.namelist())
  98. def resolve_dir(self, name):
  99. """
  100. If the name represents a directory, return that name
  101. as a directory (with the trailing slash).
  102. """
  103. names = self._name_set()
  104. dirname = name + '/'
  105. dir_match = name not in names and dirname in names
  106. return dirname if dir_match else name
  107. def getinfo(self, name):
  108. """
  109. Supplement getinfo for implied dirs.
  110. """
  111. try:
  112. return super().getinfo(name)
  113. except KeyError:
  114. if not name.endswith('/') or name not in self._name_set():
  115. raise
  116. return zipfile.ZipInfo(filename=name)
  117. @classmethod
  118. def make(cls, source):
  119. """
  120. Given a source (filename or zipfile), return an
  121. appropriate CompleteDirs subclass.
  122. """
  123. if isinstance(source, CompleteDirs):
  124. return source
  125. if not isinstance(source, zipfile.ZipFile):
  126. return cls(source)
  127. # Only allow for FastLookup when supplied zipfile is read-only
  128. if 'r' not in source.mode:
  129. cls = CompleteDirs
  130. source.__class__ = cls
  131. return source
  132. @classmethod
  133. def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
  134. """
  135. Given a writable zip file zf, inject directory entries for
  136. any directories implied by the presence of children.
  137. """
  138. for name in cls._implied_dirs(zf.namelist()):
  139. zf.writestr(name, b"")
  140. return zf
  141. class FastLookup(CompleteDirs):
  142. """
  143. ZipFile subclass to ensure implicit
  144. dirs exist and are resolved rapidly.
  145. """
  146. def namelist(self):
  147. return self._namelist
  148. @functools.cached_property
  149. def _namelist(self):
  150. return super().namelist()
  151. def _name_set(self):
  152. return self._name_set_prop
  153. @functools.cached_property
  154. def _name_set_prop(self):
  155. return super()._name_set()
  156. def _extract_text_encoding(encoding=None, *args, **kwargs):
  157. # compute stack level so that the caller of the caller sees any warning.
  158. is_pypy = sys.implementation.name == 'pypy'
  159. stack_level = 3 + is_pypy
  160. return text_encoding(encoding, stack_level), args, kwargs
  161. class Path:
  162. """
  163. A :class:`importlib.resources.abc.Traversable` interface for zip files.
  164. Implements many of the features users enjoy from
  165. :class:`pathlib.Path`.
  166. Consider a zip file with this structure::
  167. .
  168. ├── a.txt
  169. └── b
  170. ├── c.txt
  171. └── d
  172. └── e.txt
  173. >>> data = io.BytesIO()
  174. >>> zf = zipfile.ZipFile(data, 'w')
  175. >>> zf.writestr('a.txt', 'content of a')
  176. >>> zf.writestr('b/c.txt', 'content of c')
  177. >>> zf.writestr('b/d/e.txt', 'content of e')
  178. >>> zf.filename = 'mem/abcde.zip'
  179. Path accepts the zipfile object itself or a filename
  180. >>> path = Path(zf)
  181. From there, several path operations are available.
  182. Directory iteration (including the zip file itself):
  183. >>> a, b = path.iterdir()
  184. >>> a
  185. Path('mem/abcde.zip', 'a.txt')
  186. >>> b
  187. Path('mem/abcde.zip', 'b/')
  188. name property:
  189. >>> b.name
  190. 'b'
  191. join with divide operator:
  192. >>> c = b / 'c.txt'
  193. >>> c
  194. Path('mem/abcde.zip', 'b/c.txt')
  195. >>> c.name
  196. 'c.txt'
  197. Read text:
  198. >>> c.read_text(encoding='utf-8')
  199. 'content of c'
  200. existence:
  201. >>> c.exists()
  202. True
  203. >>> (b / 'missing.txt').exists()
  204. False
  205. Coercion to string:
  206. >>> import os
  207. >>> str(c).replace(os.sep, posixpath.sep)
  208. 'mem/abcde.zip/b/c.txt'
  209. At the root, ``name``, ``filename``, and ``parent``
  210. resolve to the zipfile.
  211. >>> str(path)
  212. 'mem/abcde.zip/'
  213. >>> path.name
  214. 'abcde.zip'
  215. >>> path.filename == pathlib.Path('mem/abcde.zip')
  216. True
  217. >>> str(path.parent)
  218. 'mem'
  219. If the zipfile has no filename, such attributes are not
  220. valid and accessing them will raise an Exception.
  221. >>> zf.filename = None
  222. >>> path.name
  223. Traceback (most recent call last):
  224. ...
  225. TypeError: ...
  226. >>> path.filename
  227. Traceback (most recent call last):
  228. ...
  229. TypeError: ...
  230. >>> path.parent
  231. Traceback (most recent call last):
  232. ...
  233. TypeError: ...
  234. # workaround python/cpython#106763
  235. >>> pass
  236. """
  237. __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
  238. def __init__(self, root, at=""):
  239. """
  240. Construct a Path from a ZipFile or filename.
  241. Note: When the source is an existing ZipFile object,
  242. its type (__class__) will be mutated to a
  243. specialized type. If the caller wishes to retain the
  244. original type, the caller should either create a
  245. separate ZipFile object or pass a filename.
  246. """
  247. self.root = FastLookup.make(root)
  248. self.at = at
  249. def __eq__(self, other):
  250. """
  251. >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
  252. False
  253. """
  254. if self.__class__ is not other.__class__:
  255. return NotImplemented
  256. return (self.root, self.at) == (other.root, other.at)
  257. def __hash__(self):
  258. return hash((self.root, self.at))
  259. def open(self, mode='r', *args, pwd=None, **kwargs):
  260. """
  261. Open this entry as text or binary following the semantics
  262. of ``pathlib.Path.open()`` by passing arguments through
  263. to io.TextIOWrapper().
  264. """
  265. if self.is_dir():
  266. raise IsADirectoryError(self)
  267. zip_mode = mode[0]
  268. if zip_mode == 'r' and not self.exists():
  269. raise FileNotFoundError(self)
  270. stream = self.root.open(self.at, zip_mode, pwd=pwd)
  271. if 'b' in mode:
  272. if args or kwargs:
  273. raise ValueError("encoding args invalid for binary operation")
  274. return stream
  275. # Text mode:
  276. encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
  277. return io.TextIOWrapper(stream, encoding, *args, **kwargs)
  278. def _base(self):
  279. return pathlib.PurePosixPath(self.at or self.root.filename)
  280. @property
  281. def name(self):
  282. return self._base().name
  283. @property
  284. def suffix(self):
  285. return self._base().suffix
  286. @property
  287. def suffixes(self):
  288. return self._base().suffixes
  289. @property
  290. def stem(self):
  291. return self._base().stem
  292. @property
  293. def filename(self):
  294. return pathlib.Path(self.root.filename).joinpath(self.at)
  295. def read_text(self, *args, **kwargs):
  296. encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
  297. with self.open('r', encoding, *args, **kwargs) as strm:
  298. return strm.read()
  299. def read_bytes(self):
  300. with self.open('rb') as strm:
  301. return strm.read()
  302. def _is_child(self, path):
  303. return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
  304. def _next(self, at):
  305. return self.__class__(self.root, at)
  306. def is_dir(self):
  307. return not self.at or self.at.endswith("/")
  308. def is_file(self):
  309. return self.exists() and not self.is_dir()
  310. def exists(self):
  311. return self.at in self.root._name_set()
  312. def iterdir(self):
  313. if not self.is_dir():
  314. raise ValueError("Can't listdir a file")
  315. subs = map(self._next, self.root.namelist())
  316. return filter(self._is_child, subs)
  317. def match(self, path_pattern):
  318. return pathlib.PurePosixPath(self.at).match(path_pattern)
  319. def is_symlink(self):
  320. """
  321. Return whether this path is a symlink.
  322. """
  323. info = self.root.getinfo(self.at)
  324. mode = info.external_attr >> 16
  325. return stat.S_ISLNK(mode)
  326. def glob(self, pattern):
  327. if not pattern:
  328. raise ValueError(f"Unacceptable pattern: {pattern!r}")
  329. prefix = re.escape(self.at)
  330. tr = Translator(seps='/')
  331. matches = re.compile(prefix + tr.translate(pattern)).fullmatch
  332. return map(self._next, filter(matches, self.root.namelist()))
  333. def rglob(self, pattern):
  334. return self.glob(f'**/{pattern}')
  335. def relative_to(self, other, *extra):
  336. return posixpath.relpath(str(self), str(other.joinpath(*extra)))
  337. def __str__(self):
  338. return posixpath.join(self.root.filename, self.at)
  339. def __repr__(self):
  340. return self.__repr.format(self=self)
  341. def joinpath(self, *other):
  342. next = posixpath.join(self.at, *other)
  343. return self._next(self.root.resolve_dir(next))
  344. __truediv__ = joinpath
  345. @property
  346. def parent(self):
  347. if not self.at:
  348. return self.filename.parent
  349. parent_at = posixpath.dirname(self.at.rstrip('/'))
  350. if parent_at:
  351. parent_at += '/'
  352. return self._next(parent_at)