123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- from __future__ import annotations
- import inspect
- import logging
- import os
- import shutil
- import uuid
- from typing import Optional
- from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
- from .callbacks import DEFAULT_CALLBACK
- from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs
- _generic_fs = {}
- logger = logging.getLogger("fsspec.generic")
- def set_generic_fs(protocol, **storage_options):
- _generic_fs[protocol] = filesystem(protocol, **storage_options)
- default_method = "default"
- def _resolve_fs(url, method=None, protocol=None, storage_options=None):
- """Pick instance of backend FS"""
- method = method or default_method
- protocol = protocol or split_protocol(url)[0]
- storage_options = storage_options or {}
- if method == "default":
- return filesystem(protocol)
- if method == "generic":
- return _generic_fs[protocol]
- if method == "current":
- cls = get_filesystem_class(protocol)
- return cls.current()
- if method == "options":
- fs, _ = url_to_fs(url, **storage_options.get(protocol, {}))
- return fs
- raise ValueError(f"Unknown FS resolution method: {method}")
- def rsync(
- source,
- destination,
- delete_missing=False,
- source_field="size",
- dest_field="size",
- update_cond="different",
- inst_kwargs=None,
- fs=None,
- **kwargs,
- ):
- """Sync files between two directory trees
- (experimental)
- Parameters
- ----------
- source: str
- Root of the directory tree to take files from. This must be a directory, but
- do not include any terminating "/" character
- destination: str
- Root path to copy into. The contents of this location should be
- identical to the contents of ``source`` when done. This will be made a
- directory, and the terminal "/" should not be included.
- delete_missing: bool
- If there are paths in the destination that don't exist in the
- source and this is True, delete them. Otherwise, leave them alone.
- source_field: str | callable
- If ``update_field`` is "different", this is the key in the info
- of source files to consider for difference. Maybe a function of the
- info dict.
- dest_field: str | callable
- If ``update_field`` is "different", this is the key in the info
- of destination files to consider for difference. May be a function of
- the info dict.
- update_cond: "different"|"always"|"never"
- If "always", every file is copied, regardless of whether it exists in
- the destination. If "never", files that exist in the destination are
- not copied again. If "different" (default), only copy if the info
- fields given by ``source_field`` and ``dest_field`` (usually "size")
- are different. Other comparisons may be added in the future.
- inst_kwargs: dict|None
- If ``fs`` is None, use this set of keyword arguments to make a
- GenericFileSystem instance
- fs: GenericFileSystem|None
- Instance to use if explicitly given. The instance defines how to
- to make downstream file system instances from paths.
- Returns
- -------
- dict of the copy operations that were performed, {source: destination}
- """
- fs = fs or GenericFileSystem(**(inst_kwargs or {}))
- source = fs._strip_protocol(source)
- destination = fs._strip_protocol(destination)
- allfiles = fs.find(source, withdirs=True, detail=True)
- if not fs.isdir(source):
- raise ValueError("Can only rsync on a directory")
- otherfiles = fs.find(destination, withdirs=True, detail=True)
- dirs = [
- a
- for a, v in allfiles.items()
- if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
- ]
- logger.debug(f"{len(dirs)} directories to create")
- if dirs:
- fs.make_many_dirs(
- [dirn.replace(source, destination) for dirn in dirs], exist_ok=True
- )
- allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
- logger.debug(f"{len(allfiles)} files to consider for copy")
- to_delete = [
- o
- for o, v in otherfiles.items()
- if o.replace(destination, source) not in allfiles and v["type"] == "file"
- ]
- for k, v in allfiles.copy().items():
- otherfile = k.replace(source, destination)
- if otherfile in otherfiles:
- if update_cond == "always":
- allfiles[k] = otherfile
- elif update_cond == "different":
- inf1 = source_field(v) if callable(source_field) else v[source_field]
- v2 = otherfiles[otherfile]
- inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
- if inf1 != inf2:
- # details mismatch, make copy
- allfiles[k] = otherfile
- else:
- # details match, don't copy
- allfiles.pop(k)
- else:
- # file not in target yet
- allfiles[k] = otherfile
- logger.debug(f"{len(allfiles)} files to copy")
- if allfiles:
- source_files, target_files = zip(*allfiles.items())
- fs.cp(source_files, target_files, **kwargs)
- logger.debug(f"{len(to_delete)} files to delete")
- if delete_missing and to_delete:
- fs.rm(to_delete)
- return allfiles
- class GenericFileSystem(AsyncFileSystem):
- """Wrapper over all other FS types
- <experimental!>
- This implementation is a single unified interface to be able to run FS operations
- over generic URLs, and dispatch to the specific implementations using the URL
- protocol prefix.
- Note: instances of this FS are always async, even if you never use it with any async
- backend.
- """
- protocol = "generic" # there is no real reason to ever use a protocol with this FS
- def __init__(self, default_method="default", **kwargs):
- """
- Parameters
- ----------
- default_method: str (optional)
- Defines how to configure backend FS instances. Options are:
- - "default": instantiate like FSClass(), with no
- extra arguments; this is the default instance of that FS, and can be
- configured via the config system
- - "generic": takes instances from the `_generic_fs` dict in this module,
- which you must populate before use. Keys are by protocol
- - "current": takes the most recently instantiated version of each FS
- """
- self.method = default_method
- super().__init__(**kwargs)
- def _parent(self, path):
- fs = _resolve_fs(path, self.method)
- return fs.unstrip_protocol(fs._parent(path))
- def _strip_protocol(self, path):
- # normalization only
- fs = _resolve_fs(path, self.method)
- return fs.unstrip_protocol(fs._strip_protocol(path))
- async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
- fs = _resolve_fs(path, self.method)
- if fs.async_impl:
- out = await fs._find(
- path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
- )
- else:
- out = fs.find(
- path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
- )
- result = {}
- for k, v in out.items():
- v = v.copy() # don't corrupt target FS dircache
- name = fs.unstrip_protocol(k)
- v["name"] = name
- result[name] = v
- if detail:
- return result
- return list(result)
- async def _info(self, url, **kwargs):
- fs = _resolve_fs(url, self.method)
- if fs.async_impl:
- out = await fs._info(url, **kwargs)
- else:
- out = fs.info(url, **kwargs)
- out = out.copy() # don't edit originals
- out["name"] = fs.unstrip_protocol(out["name"])
- return out
- async def _ls(
- self,
- url,
- detail=True,
- **kwargs,
- ):
- fs = _resolve_fs(url, self.method)
- if fs.async_impl:
- out = await fs._ls(url, detail=True, **kwargs)
- else:
- out = fs.ls(url, detail=True, **kwargs)
- out = [o.copy() for o in out] # don't edit originals
- for o in out:
- o["name"] = fs.unstrip_protocol(o["name"])
- if detail:
- return out
- else:
- return [o["name"] for o in out]
- async def _cat_file(
- self,
- url,
- **kwargs,
- ):
- fs = _resolve_fs(url, self.method)
- if fs.async_impl:
- return await fs._cat_file(url, **kwargs)
- else:
- return fs.cat_file(url, **kwargs)
- async def _pipe_file(
- self,
- path,
- value,
- **kwargs,
- ):
- fs = _resolve_fs(path, self.method)
- if fs.async_impl:
- return await fs._pipe_file(path, value, **kwargs)
- else:
- return fs.pipe_file(path, value, **kwargs)
- async def _rm(self, url, **kwargs):
- urls = url
- if isinstance(urls, str):
- urls = [urls]
- fs = _resolve_fs(urls[0], self.method)
- if fs.async_impl:
- await fs._rm(urls, **kwargs)
- else:
- fs.rm(url, **kwargs)
- async def _makedirs(self, path, exist_ok=False):
- logger.debug("Make dir %s", path)
- fs = _resolve_fs(path, self.method)
- if fs.async_impl:
- await fs._makedirs(path, exist_ok=exist_ok)
- else:
- fs.makedirs(path, exist_ok=exist_ok)
- def rsync(self, source, destination, **kwargs):
- """Sync files between two directory trees
- See `func:rsync` for more details.
- """
- rsync(source, destination, fs=self, **kwargs)
- async def _cp_file(
- self,
- url,
- url2,
- blocksize=2**20,
- callback=DEFAULT_CALLBACK,
- **kwargs,
- ):
- fs = _resolve_fs(url, self.method)
- fs2 = _resolve_fs(url2, self.method)
- if fs is fs2:
- # pure remote
- if fs.async_impl:
- return await fs._cp_file(url, url2, **kwargs)
- else:
- return fs.cp_file(url, url2, **kwargs)
- kw = {"blocksize": 0, "cache_type": "none"}
- try:
- f1 = (
- await fs.open_async(url, "rb")
- if hasattr(fs, "open_async")
- else fs.open(url, "rb", **kw)
- )
- callback.set_size(await maybe_await(f1.size))
- f2 = (
- await fs2.open_async(url2, "wb")
- if hasattr(fs2, "open_async")
- else fs2.open(url2, "wb", **kw)
- )
- while f1.size is None or f2.tell() < f1.size:
- data = await maybe_await(f1.read(blocksize))
- if f1.size is None and not data:
- break
- await maybe_await(f2.write(data))
- callback.absolute_update(f2.tell())
- finally:
- try:
- await maybe_await(f2.close())
- await maybe_await(f1.close())
- except NameError:
- # fail while opening f1 or f2
- pass
- async def _make_many_dirs(self, urls, exist_ok=True):
- fs = _resolve_fs(urls[0], self.method)
- if fs.async_impl:
- coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls]
- await _run_coros_in_chunks(coros)
- else:
- for u in urls:
- fs.makedirs(u, exist_ok=exist_ok)
- make_many_dirs = sync_wrapper(_make_many_dirs)
- async def _copy(
- self,
- path1: list[str],
- path2: list[str],
- recursive: bool = False,
- on_error: str = "ignore",
- maxdepth: Optional[int] = None,
- batch_size: Optional[int] = None,
- tempdir: Optional[str] = None,
- **kwargs,
- ):
- if recursive:
- raise NotImplementedError
- fs = _resolve_fs(path1[0], self.method)
- fs2 = _resolve_fs(path2[0], self.method)
- # not expanding paths atm., assume call is from rsync()
- if fs is fs2:
- # pure remote
- if fs.async_impl:
- return await fs._copy(path1, path2, **kwargs)
- else:
- return fs.copy(path1, path2, **kwargs)
- await copy_file_op(
- fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
- )
- async def copy_file_op(
- fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore"
- ):
- import tempfile
- tempdir = tempdir or tempfile.mkdtemp()
- try:
- coros = [
- _copy_file_op(
- fs1,
- u1,
- fs2,
- u2,
- os.path.join(tempdir, uuid.uuid4().hex),
- on_error=on_error,
- )
- for u1, u2 in zip(url1, url2)
- ]
- await _run_coros_in_chunks(coros, batch_size=batch_size)
- finally:
- shutil.rmtree(tempdir)
- async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
- ex = () if on_error == "raise" else Exception
- logger.debug("Copy %s -> %s", url1, url2)
- try:
- if fs1.async_impl:
- await fs1._get_file(url1, local)
- else:
- fs1.get_file(url1, local)
- if fs2.async_impl:
- await fs2._put_file(local, url2)
- else:
- fs2.put_file(local, url2)
- os.unlink(local)
- logger.debug("Copy %s -> %s; done", url1, url2)
- except ex as e:
- logger.debug("ignoring cp exception for %s: %s", url1, e)
- async def maybe_await(cor):
- if inspect.iscoroutine(cor):
- return await cor
- else:
- return cor
|