generic.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. from __future__ import annotations
  2. import inspect
  3. import logging
  4. import os
  5. import shutil
  6. import uuid
  7. from typing import Optional
  8. from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
  9. from .callbacks import DEFAULT_CALLBACK
  10. from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs
  11. _generic_fs = {}
  12. logger = logging.getLogger("fsspec.generic")
  13. def set_generic_fs(protocol, **storage_options):
  14. _generic_fs[protocol] = filesystem(protocol, **storage_options)
  15. default_method = "default"
  16. def _resolve_fs(url, method=None, protocol=None, storage_options=None):
  17. """Pick instance of backend FS"""
  18. method = method or default_method
  19. protocol = protocol or split_protocol(url)[0]
  20. storage_options = storage_options or {}
  21. if method == "default":
  22. return filesystem(protocol)
  23. if method == "generic":
  24. return _generic_fs[protocol]
  25. if method == "current":
  26. cls = get_filesystem_class(protocol)
  27. return cls.current()
  28. if method == "options":
  29. fs, _ = url_to_fs(url, **storage_options.get(protocol, {}))
  30. return fs
  31. raise ValueError(f"Unknown FS resolution method: {method}")
  32. def rsync(
  33. source,
  34. destination,
  35. delete_missing=False,
  36. source_field="size",
  37. dest_field="size",
  38. update_cond="different",
  39. inst_kwargs=None,
  40. fs=None,
  41. **kwargs,
  42. ):
  43. """Sync files between two directory trees
  44. (experimental)
  45. Parameters
  46. ----------
  47. source: str
  48. Root of the directory tree to take files from. This must be a directory, but
  49. do not include any terminating "/" character
  50. destination: str
  51. Root path to copy into. The contents of this location should be
  52. identical to the contents of ``source`` when done. This will be made a
  53. directory, and the terminal "/" should not be included.
  54. delete_missing: bool
  55. If there are paths in the destination that don't exist in the
  56. source and this is True, delete them. Otherwise, leave them alone.
  57. source_field: str | callable
  58. If ``update_field`` is "different", this is the key in the info
  59. of source files to consider for difference. Maybe a function of the
  60. info dict.
  61. dest_field: str | callable
  62. If ``update_field`` is "different", this is the key in the info
  63. of destination files to consider for difference. May be a function of
  64. the info dict.
  65. update_cond: "different"|"always"|"never"
  66. If "always", every file is copied, regardless of whether it exists in
  67. the destination. If "never", files that exist in the destination are
  68. not copied again. If "different" (default), only copy if the info
  69. fields given by ``source_field`` and ``dest_field`` (usually "size")
  70. are different. Other comparisons may be added in the future.
  71. inst_kwargs: dict|None
  72. If ``fs`` is None, use this set of keyword arguments to make a
  73. GenericFileSystem instance
  74. fs: GenericFileSystem|None
  75. Instance to use if explicitly given. The instance defines how to
  76. to make downstream file system instances from paths.
  77. Returns
  78. -------
  79. dict of the copy operations that were performed, {source: destination}
  80. """
  81. fs = fs or GenericFileSystem(**(inst_kwargs or {}))
  82. source = fs._strip_protocol(source)
  83. destination = fs._strip_protocol(destination)
  84. allfiles = fs.find(source, withdirs=True, detail=True)
  85. if not fs.isdir(source):
  86. raise ValueError("Can only rsync on a directory")
  87. otherfiles = fs.find(destination, withdirs=True, detail=True)
  88. dirs = [
  89. a
  90. for a, v in allfiles.items()
  91. if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
  92. ]
  93. logger.debug(f"{len(dirs)} directories to create")
  94. if dirs:
  95. fs.make_many_dirs(
  96. [dirn.replace(source, destination) for dirn in dirs], exist_ok=True
  97. )
  98. allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
  99. logger.debug(f"{len(allfiles)} files to consider for copy")
  100. to_delete = [
  101. o
  102. for o, v in otherfiles.items()
  103. if o.replace(destination, source) not in allfiles and v["type"] == "file"
  104. ]
  105. for k, v in allfiles.copy().items():
  106. otherfile = k.replace(source, destination)
  107. if otherfile in otherfiles:
  108. if update_cond == "always":
  109. allfiles[k] = otherfile
  110. elif update_cond == "different":
  111. inf1 = source_field(v) if callable(source_field) else v[source_field]
  112. v2 = otherfiles[otherfile]
  113. inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
  114. if inf1 != inf2:
  115. # details mismatch, make copy
  116. allfiles[k] = otherfile
  117. else:
  118. # details match, don't copy
  119. allfiles.pop(k)
  120. else:
  121. # file not in target yet
  122. allfiles[k] = otherfile
  123. logger.debug(f"{len(allfiles)} files to copy")
  124. if allfiles:
  125. source_files, target_files = zip(*allfiles.items())
  126. fs.cp(source_files, target_files, **kwargs)
  127. logger.debug(f"{len(to_delete)} files to delete")
  128. if delete_missing and to_delete:
  129. fs.rm(to_delete)
  130. return allfiles
  131. class GenericFileSystem(AsyncFileSystem):
  132. """Wrapper over all other FS types
  133. <experimental!>
  134. This implementation is a single unified interface to be able to run FS operations
  135. over generic URLs, and dispatch to the specific implementations using the URL
  136. protocol prefix.
  137. Note: instances of this FS are always async, even if you never use it with any async
  138. backend.
  139. """
  140. protocol = "generic" # there is no real reason to ever use a protocol with this FS
  141. def __init__(self, default_method="default", **kwargs):
  142. """
  143. Parameters
  144. ----------
  145. default_method: str (optional)
  146. Defines how to configure backend FS instances. Options are:
  147. - "default": instantiate like FSClass(), with no
  148. extra arguments; this is the default instance of that FS, and can be
  149. configured via the config system
  150. - "generic": takes instances from the `_generic_fs` dict in this module,
  151. which you must populate before use. Keys are by protocol
  152. - "current": takes the most recently instantiated version of each FS
  153. """
  154. self.method = default_method
  155. super().__init__(**kwargs)
  156. def _parent(self, path):
  157. fs = _resolve_fs(path, self.method)
  158. return fs.unstrip_protocol(fs._parent(path))
  159. def _strip_protocol(self, path):
  160. # normalization only
  161. fs = _resolve_fs(path, self.method)
  162. return fs.unstrip_protocol(fs._strip_protocol(path))
  163. async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
  164. fs = _resolve_fs(path, self.method)
  165. if fs.async_impl:
  166. out = await fs._find(
  167. path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
  168. )
  169. else:
  170. out = fs.find(
  171. path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
  172. )
  173. result = {}
  174. for k, v in out.items():
  175. v = v.copy() # don't corrupt target FS dircache
  176. name = fs.unstrip_protocol(k)
  177. v["name"] = name
  178. result[name] = v
  179. if detail:
  180. return result
  181. return list(result)
  182. async def _info(self, url, **kwargs):
  183. fs = _resolve_fs(url, self.method)
  184. if fs.async_impl:
  185. out = await fs._info(url, **kwargs)
  186. else:
  187. out = fs.info(url, **kwargs)
  188. out = out.copy() # don't edit originals
  189. out["name"] = fs.unstrip_protocol(out["name"])
  190. return out
  191. async def _ls(
  192. self,
  193. url,
  194. detail=True,
  195. **kwargs,
  196. ):
  197. fs = _resolve_fs(url, self.method)
  198. if fs.async_impl:
  199. out = await fs._ls(url, detail=True, **kwargs)
  200. else:
  201. out = fs.ls(url, detail=True, **kwargs)
  202. out = [o.copy() for o in out] # don't edit originals
  203. for o in out:
  204. o["name"] = fs.unstrip_protocol(o["name"])
  205. if detail:
  206. return out
  207. else:
  208. return [o["name"] for o in out]
  209. async def _cat_file(
  210. self,
  211. url,
  212. **kwargs,
  213. ):
  214. fs = _resolve_fs(url, self.method)
  215. if fs.async_impl:
  216. return await fs._cat_file(url, **kwargs)
  217. else:
  218. return fs.cat_file(url, **kwargs)
  219. async def _pipe_file(
  220. self,
  221. path,
  222. value,
  223. **kwargs,
  224. ):
  225. fs = _resolve_fs(path, self.method)
  226. if fs.async_impl:
  227. return await fs._pipe_file(path, value, **kwargs)
  228. else:
  229. return fs.pipe_file(path, value, **kwargs)
  230. async def _rm(self, url, **kwargs):
  231. urls = url
  232. if isinstance(urls, str):
  233. urls = [urls]
  234. fs = _resolve_fs(urls[0], self.method)
  235. if fs.async_impl:
  236. await fs._rm(urls, **kwargs)
  237. else:
  238. fs.rm(url, **kwargs)
  239. async def _makedirs(self, path, exist_ok=False):
  240. logger.debug("Make dir %s", path)
  241. fs = _resolve_fs(path, self.method)
  242. if fs.async_impl:
  243. await fs._makedirs(path, exist_ok=exist_ok)
  244. else:
  245. fs.makedirs(path, exist_ok=exist_ok)
  246. def rsync(self, source, destination, **kwargs):
  247. """Sync files between two directory trees
  248. See `func:rsync` for more details.
  249. """
  250. rsync(source, destination, fs=self, **kwargs)
  251. async def _cp_file(
  252. self,
  253. url,
  254. url2,
  255. blocksize=2**20,
  256. callback=DEFAULT_CALLBACK,
  257. **kwargs,
  258. ):
  259. fs = _resolve_fs(url, self.method)
  260. fs2 = _resolve_fs(url2, self.method)
  261. if fs is fs2:
  262. # pure remote
  263. if fs.async_impl:
  264. return await fs._cp_file(url, url2, **kwargs)
  265. else:
  266. return fs.cp_file(url, url2, **kwargs)
  267. kw = {"blocksize": 0, "cache_type": "none"}
  268. try:
  269. f1 = (
  270. await fs.open_async(url, "rb")
  271. if hasattr(fs, "open_async")
  272. else fs.open(url, "rb", **kw)
  273. )
  274. callback.set_size(await maybe_await(f1.size))
  275. f2 = (
  276. await fs2.open_async(url2, "wb")
  277. if hasattr(fs2, "open_async")
  278. else fs2.open(url2, "wb", **kw)
  279. )
  280. while f1.size is None or f2.tell() < f1.size:
  281. data = await maybe_await(f1.read(blocksize))
  282. if f1.size is None and not data:
  283. break
  284. await maybe_await(f2.write(data))
  285. callback.absolute_update(f2.tell())
  286. finally:
  287. try:
  288. await maybe_await(f2.close())
  289. await maybe_await(f1.close())
  290. except NameError:
  291. # fail while opening f1 or f2
  292. pass
  293. async def _make_many_dirs(self, urls, exist_ok=True):
  294. fs = _resolve_fs(urls[0], self.method)
  295. if fs.async_impl:
  296. coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls]
  297. await _run_coros_in_chunks(coros)
  298. else:
  299. for u in urls:
  300. fs.makedirs(u, exist_ok=exist_ok)
  301. make_many_dirs = sync_wrapper(_make_many_dirs)
  302. async def _copy(
  303. self,
  304. path1: list[str],
  305. path2: list[str],
  306. recursive: bool = False,
  307. on_error: str = "ignore",
  308. maxdepth: Optional[int] = None,
  309. batch_size: Optional[int] = None,
  310. tempdir: Optional[str] = None,
  311. **kwargs,
  312. ):
  313. if recursive:
  314. raise NotImplementedError
  315. fs = _resolve_fs(path1[0], self.method)
  316. fs2 = _resolve_fs(path2[0], self.method)
  317. # not expanding paths atm., assume call is from rsync()
  318. if fs is fs2:
  319. # pure remote
  320. if fs.async_impl:
  321. return await fs._copy(path1, path2, **kwargs)
  322. else:
  323. return fs.copy(path1, path2, **kwargs)
  324. await copy_file_op(
  325. fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
  326. )
  327. async def copy_file_op(
  328. fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore"
  329. ):
  330. import tempfile
  331. tempdir = tempdir or tempfile.mkdtemp()
  332. try:
  333. coros = [
  334. _copy_file_op(
  335. fs1,
  336. u1,
  337. fs2,
  338. u2,
  339. os.path.join(tempdir, uuid.uuid4().hex),
  340. on_error=on_error,
  341. )
  342. for u1, u2 in zip(url1, url2)
  343. ]
  344. await _run_coros_in_chunks(coros, batch_size=batch_size)
  345. finally:
  346. shutil.rmtree(tempdir)
  347. async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
  348. ex = () if on_error == "raise" else Exception
  349. logger.debug("Copy %s -> %s", url1, url2)
  350. try:
  351. if fs1.async_impl:
  352. await fs1._get_file(url1, local)
  353. else:
  354. fs1.get_file(url1, local)
  355. if fs2.async_impl:
  356. await fs2._put_file(local, url2)
  357. else:
  358. fs2.put_file(local, url2)
  359. os.unlink(local)
  360. logger.debug("Copy %s -> %s; done", url1, url2)
  361. except ex as e:
  362. logger.debug("ignoring cp exception for %s: %s", url1, e)
  363. async def maybe_await(cor):
  364. if inspect.iscoroutine(cor):
  365. return await cor
  366. else:
  367. return cor