libarchive.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. from contextlib import contextmanager
  2. from ctypes import (
  3. CFUNCTYPE,
  4. POINTER,
  5. c_int,
  6. c_longlong,
  7. c_void_p,
  8. cast,
  9. create_string_buffer,
  10. )
  11. import libarchive
  12. import libarchive.ffi as ffi
  13. from fsspec import open_files
  14. from fsspec.archive import AbstractArchiveFileSystem
  15. from fsspec.implementations.memory import MemoryFile
  16. from fsspec.utils import DEFAULT_BLOCK_SIZE
  17. # Libarchive requires seekable files or memory only for certain archive
  18. # types. However, since we read the directory first to cache the contents
  19. # and also allow random access to any file, the file-like object needs
  20. # to be seekable no matter what.
  21. # Seek call-backs (not provided in the libarchive python wrapper)
  22. SEEK_CALLBACK = CFUNCTYPE(c_longlong, c_int, c_void_p, c_longlong, c_int)
  23. read_set_seek_callback = ffi.ffi(
  24. "read_set_seek_callback", [ffi.c_archive_p, SEEK_CALLBACK], c_int, ffi.check_int
  25. )
  26. new_api = hasattr(ffi, "NO_OPEN_CB")
  27. @contextmanager
  28. def custom_reader(file, format_name="all", filter_name="all", block_size=ffi.page_size):
  29. """Read an archive from a seekable file-like object.
  30. The `file` object must support the standard `readinto` and 'seek' methods.
  31. """
  32. buf = create_string_buffer(block_size)
  33. buf_p = cast(buf, c_void_p)
  34. def read_func(archive_p, context, ptrptr):
  35. # readinto the buffer, returns number of bytes read
  36. length = file.readinto(buf)
  37. # write the address of the buffer into the pointer
  38. ptrptr = cast(ptrptr, POINTER(c_void_p))
  39. ptrptr[0] = buf_p
  40. # tell libarchive how much data was written into the buffer
  41. return length
  42. def seek_func(archive_p, context, offset, whence):
  43. file.seek(offset, whence)
  44. # tell libarchvie the current position
  45. return file.tell()
  46. read_cb = ffi.READ_CALLBACK(read_func)
  47. seek_cb = SEEK_CALLBACK(seek_func)
  48. if new_api:
  49. open_cb = ffi.NO_OPEN_CB
  50. close_cb = ffi.NO_CLOSE_CB
  51. else:
  52. open_cb = libarchive.read.OPEN_CALLBACK(ffi.VOID_CB)
  53. close_cb = libarchive.read.CLOSE_CALLBACK(ffi.VOID_CB)
  54. with libarchive.read.new_archive_read(format_name, filter_name) as archive_p:
  55. read_set_seek_callback(archive_p, seek_cb)
  56. ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
  57. yield libarchive.read.ArchiveRead(archive_p)
  58. class LibArchiveFileSystem(AbstractArchiveFileSystem):
  59. """Compressed archives as a file-system (read-only)
  60. Supports the following formats:
  61. tar, pax , cpio, ISO9660, zip, mtree, shar, ar, raw, xar, lha/lzh, rar
  62. Microsoft CAB, 7-Zip, WARC
  63. See the libarchive documentation for further restrictions.
  64. https://www.libarchive.org/
  65. Keeps file object open while instance lives. It only works in seekable
  66. file-like objects. In case the filesystem does not support this kind of
  67. file object, it is recommended to cache locally.
  68. This class is pickleable, but not necessarily thread-safe (depends on the
  69. platform). See libarchive documentation for details.
  70. """
  71. root_marker = ""
  72. protocol = "libarchive"
  73. cachable = False
  74. def __init__(
  75. self,
  76. fo="",
  77. mode="r",
  78. target_protocol=None,
  79. target_options=None,
  80. block_size=DEFAULT_BLOCK_SIZE,
  81. **kwargs,
  82. ):
  83. """
  84. Parameters
  85. ----------
  86. fo: str or file-like
  87. Contains ZIP, and must exist. If a str, will fetch file using
  88. :meth:`~fsspec.open_files`, which must return one file exactly.
  89. mode: str
  90. Currently, only 'r' accepted
  91. target_protocol: str (optional)
  92. If ``fo`` is a string, this value can be used to override the
  93. FS protocol inferred from a URL
  94. target_options: dict (optional)
  95. Kwargs passed when instantiating the target FS, if ``fo`` is
  96. a string.
  97. """
  98. super().__init__(self, **kwargs)
  99. if mode != "r":
  100. raise ValueError("Only read from archive files accepted")
  101. if isinstance(fo, str):
  102. files = open_files(fo, protocol=target_protocol, **(target_options or {}))
  103. if len(files) != 1:
  104. raise ValueError(
  105. f'Path "{fo}" did not resolve to exactly one file: "{files}"'
  106. )
  107. fo = files[0]
  108. self.of = fo
  109. self.fo = fo.__enter__() # the whole instance is a context
  110. self.block_size = block_size
  111. self.dir_cache = None
  112. @contextmanager
  113. def _open_archive(self):
  114. self.fo.seek(0)
  115. with custom_reader(self.fo, block_size=self.block_size) as arc:
  116. yield arc
  117. @classmethod
  118. def _strip_protocol(cls, path):
  119. # file paths are always relative to the archive root
  120. return super()._strip_protocol(path).lstrip("/")
  121. def _get_dirs(self):
  122. fields = {
  123. "name": "pathname",
  124. "size": "size",
  125. "created": "ctime",
  126. "mode": "mode",
  127. "uid": "uid",
  128. "gid": "gid",
  129. "mtime": "mtime",
  130. }
  131. if self.dir_cache is not None:
  132. return
  133. self.dir_cache = {}
  134. list_names = []
  135. with self._open_archive() as arc:
  136. for entry in arc:
  137. if not entry.isdir and not entry.isfile:
  138. # Skip symbolic links, fifo entries, etc.
  139. continue
  140. self.dir_cache.update(
  141. {
  142. dirname: {"name": dirname, "size": 0, "type": "directory"}
  143. for dirname in self._all_dirnames(set(entry.name))
  144. }
  145. )
  146. f = {key: getattr(entry, fields[key]) for key in fields}
  147. f["type"] = "directory" if entry.isdir else "file"
  148. list_names.append(entry.name)
  149. self.dir_cache[f["name"]] = f
  150. # libarchive does not seem to return an entry for the directories (at least
  151. # not in all formats), so get the directories names from the files names
  152. self.dir_cache.update(
  153. {
  154. dirname: {"name": dirname, "size": 0, "type": "directory"}
  155. for dirname in self._all_dirnames(list_names)
  156. }
  157. )
  158. def _open(
  159. self,
  160. path,
  161. mode="rb",
  162. block_size=None,
  163. autocommit=True,
  164. cache_options=None,
  165. **kwargs,
  166. ):
  167. path = self._strip_protocol(path)
  168. if mode != "rb":
  169. raise NotImplementedError
  170. data = bytes()
  171. with self._open_archive() as arc:
  172. for entry in arc:
  173. if entry.pathname != path:
  174. continue
  175. if entry.size == 0:
  176. # empty file, so there are no blocks
  177. break
  178. for block in entry.get_blocks(entry.size):
  179. data = block
  180. break
  181. else:
  182. raise ValueError
  183. return MemoryFile(fs=self, path=path, data=data)