123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110 |
- import asyncio
- import asyncio.events
- import functools
- import inspect
- import io
- import numbers
- import os
- import re
- import threading
- from contextlib import contextmanager
- from glob import has_magic
- from typing import TYPE_CHECKING, Iterable
- from .callbacks import DEFAULT_CALLBACK
- from .exceptions import FSTimeoutError
- from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
- from .spec import AbstractBufferedFile, AbstractFileSystem
- from .utils import glob_translate, is_exception, other_paths
- private = re.compile("_[^_]")
- iothread = [None] # dedicated fsspec IO thread
- loop = [None] # global event loop for any non-async instance
- _lock = None # global lock placeholder
- get_running_loop = asyncio.get_running_loop
- def get_lock():
- """Allocate or return a threading lock.
- The lock is allocated on first use to allow setting one lock per forked process.
- """
- global _lock
- if not _lock:
- _lock = threading.Lock()
- return _lock
- def reset_lock():
- """Reset the global lock.
- This should be called only on the init of a forked process to reset the lock to
- None, enabling the new forked process to get a new lock.
- """
- global _lock
- iothread[0] = None
- loop[0] = None
- _lock = None
- async def _runner(event, coro, result, timeout=None):
- timeout = timeout if timeout else None # convert 0 or 0.0 to None
- if timeout is not None:
- coro = asyncio.wait_for(coro, timeout=timeout)
- try:
- result[0] = await coro
- except Exception as ex:
- result[0] = ex
- finally:
- event.set()
- def sync(loop, func, *args, timeout=None, **kwargs):
- """
- Make loop run coroutine until it returns. Runs in other thread
- Examples
- --------
- >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
- timeout=timeout, **kwargs)
- """
- timeout = timeout if timeout else None # convert 0 or 0.0 to None
- # NB: if the loop is not running *yet*, it is OK to submit work
- # and we will wait for it
- if loop is None or loop.is_closed():
- raise RuntimeError("Loop is not running")
- try:
- loop0 = asyncio.events.get_running_loop()
- if loop0 is loop:
- raise NotImplementedError("Calling sync() from within a running loop")
- except NotImplementedError:
- raise
- except RuntimeError:
- pass
- coro = func(*args, **kwargs)
- result = [None]
- event = threading.Event()
- asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
- while True:
- # this loops allows thread to get interrupted
- if event.wait(1):
- break
- if timeout is not None:
- timeout -= 1
- if timeout < 0:
- raise FSTimeoutError
- return_result = result[0]
- if isinstance(return_result, asyncio.TimeoutError):
- # suppress asyncio.TimeoutError, raise FSTimeoutError
- raise FSTimeoutError from return_result
- elif isinstance(return_result, BaseException):
- raise return_result
- else:
- return return_result
- def sync_wrapper(func, obj=None):
- """Given a function, make so can be called in blocking contexts
- Leave obj=None if defining within a class. Pass the instance if attaching
- as an attribute of the instance.
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- self = obj or args[0]
- return sync(self.loop, func, *args, **kwargs)
- return wrapper
- @contextmanager
- def _selector_policy():
- original_policy = asyncio.get_event_loop_policy()
- try:
- if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
- asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
- yield
- finally:
- asyncio.set_event_loop_policy(original_policy)
- def get_loop():
- """Create or return the default fsspec IO loop
- The loop will be running on a separate thread.
- """
- if loop[0] is None:
- with get_lock():
- # repeat the check just in case the loop got filled between the
- # previous two calls from another thread
- if loop[0] is None:
- with _selector_policy():
- loop[0] = asyncio.new_event_loop()
- th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
- th.daemon = True
- th.start()
- iothread[0] = th
- return loop[0]
- def reset_after_fork():
- global lock
- loop[0] = None
- iothread[0] = None
- lock = None
- if hasattr(os, "register_at_fork"):
- # should be posix; this will do nothing for spawn or forkserver subprocesses
- os.register_at_fork(after_in_child=reset_after_fork)
- if TYPE_CHECKING:
- import resource
- ResourceError = resource.error
- else:
- try:
- import resource
- except ImportError:
- resource = None
- ResourceError = OSError
- else:
- ResourceError = getattr(resource, "error", OSError)
- _DEFAULT_BATCH_SIZE = 128
- _NOFILES_DEFAULT_BATCH_SIZE = 1280
- def _get_batch_size(nofiles=False):
- from fsspec.config import conf
- if nofiles:
- if "nofiles_gather_batch_size" in conf:
- return conf["nofiles_gather_batch_size"]
- else:
- if "gather_batch_size" in conf:
- return conf["gather_batch_size"]
- if nofiles:
- return _NOFILES_DEFAULT_BATCH_SIZE
- if resource is None:
- return _DEFAULT_BATCH_SIZE
- try:
- soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
- except (ImportError, ValueError, ResourceError):
- return _DEFAULT_BATCH_SIZE
- if soft_limit == resource.RLIM_INFINITY:
- return -1
- else:
- return soft_limit // 8
- def running_async() -> bool:
- """Being executed by an event loop?"""
- try:
- asyncio.get_running_loop()
- return True
- except RuntimeError:
- return False
- async def _run_coros_in_chunks(
- coros,
- batch_size=None,
- callback=DEFAULT_CALLBACK,
- timeout=None,
- return_exceptions=False,
- nofiles=False,
- ):
- """Run the given coroutines in chunks.
- Parameters
- ----------
- coros: list of coroutines to run
- batch_size: int or None
- Number of coroutines to submit/wait on simultaneously.
- If -1, then it will not be any throttling. If
- None, it will be inferred from _get_batch_size()
- callback: fsspec.callbacks.Callback instance
- Gets a relative_update when each coroutine completes
- timeout: number or None
- If given, each coroutine times out after this time. Note that, since
- there are multiple batches, the total run time of this function will in
- general be longer
- return_exceptions: bool
- Same meaning as in asyncio.gather
- nofiles: bool
- If inferring the batch_size, does this operation involve local files?
- If yes, you normally expect smaller batches.
- """
- if batch_size is None:
- batch_size = _get_batch_size(nofiles=nofiles)
- if batch_size == -1:
- batch_size = len(coros)
- assert batch_size > 0
- async def _run_coro(coro, i):
- try:
- return await asyncio.wait_for(coro, timeout=timeout), i
- except Exception as e:
- if not return_exceptions:
- raise
- return e, i
- finally:
- callback.relative_update(1)
- i = 0
- n = len(coros)
- results = [None] * n
- pending = set()
- while pending or i < n:
- while len(pending) < batch_size and i < n:
- pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
- i += 1
- if not pending:
- break
- done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
- while done:
- result, k = await done.pop()
- results[k] = result
- return results
- # these methods should be implemented as async by any async-able backend
- async_methods = [
- "_ls",
- "_cat_file",
- "_get_file",
- "_put_file",
- "_rm_file",
- "_cp_file",
- "_pipe_file",
- "_expand_path",
- "_info",
- "_isfile",
- "_isdir",
- "_exists",
- "_walk",
- "_glob",
- "_find",
- "_du",
- "_size",
- "_mkdir",
- "_makedirs",
- ]
- class AsyncFileSystem(AbstractFileSystem):
- """Async file operations, default implementations
- Passes bulk operations to asyncio.gather for concurrent operation.
- Implementations that have concurrent batch operations and/or async methods
- should inherit from this class instead of AbstractFileSystem. Docstrings are
- copied from the un-underscored method in AbstractFileSystem, if not given.
- """
- # note that methods do not have docstring here; they will be copied
- # for _* methods and inferred for overridden methods.
- async_impl = True
- mirror_sync_methods = True
- disable_throttling = False
- def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
- self.asynchronous = asynchronous
- self._pid = os.getpid()
- if not asynchronous:
- self._loop = loop or get_loop()
- else:
- self._loop = None
- self.batch_size = batch_size
- super().__init__(*args, **kwargs)
- @property
- def loop(self):
- if self._pid != os.getpid():
- raise RuntimeError("This class is not fork-safe")
- return self._loop
- async def _rm_file(self, path, **kwargs):
- raise NotImplementedError
- async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
- # TODO: implement on_error
- batch_size = batch_size or self.batch_size
- path = await self._expand_path(path, recursive=recursive)
- return await _run_coros_in_chunks(
- [self._rm_file(p, **kwargs) for p in reversed(path)],
- batch_size=batch_size,
- nofiles=True,
- )
- async def _cp_file(self, path1, path2, **kwargs):
- raise NotImplementedError
- async def _mv_file(self, path1, path2):
- await self._cp_file(path1, path2)
- await self._rm_file(path1)
- async def _copy(
- self,
- path1,
- path2,
- recursive=False,
- on_error=None,
- maxdepth=None,
- batch_size=None,
- **kwargs,
- ):
- if on_error is None and recursive:
- on_error = "ignore"
- elif on_error is None:
- on_error = "raise"
- if isinstance(path1, list) and isinstance(path2, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- paths1 = path1
- paths2 = path2
- else:
- source_is_str = isinstance(path1, str)
- paths1 = await self._expand_path(
- path1, maxdepth=maxdepth, recursive=recursive
- )
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- paths1 = [
- p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
- ]
- if not paths1:
- return
- source_is_file = len(paths1) == 1
- dest_is_dir = isinstance(path2, str) and (
- trailing_sep(path2) or await self._isdir(path2)
- )
- exists = source_is_str and (
- (has_magic(path1) and source_is_file)
- or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
- )
- paths2 = other_paths(
- paths1,
- path2,
- exists=exists,
- flatten=not source_is_str,
- )
- batch_size = batch_size or self.batch_size
- coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
- result = await _run_coros_in_chunks(
- coros, batch_size=batch_size, return_exceptions=True, nofiles=True
- )
- for ex in filter(is_exception, result):
- if on_error == "ignore" and isinstance(ex, FileNotFoundError):
- continue
- raise ex
- async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
- raise NotImplementedError
- async def _pipe(self, path, value=None, batch_size=None, **kwargs):
- if isinstance(path, str):
- path = {path: value}
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
- batch_size=batch_size,
- nofiles=True,
- )
- async def _process_limits(self, url, start, end):
- """Helper for "Range"-based _cat_file"""
- size = None
- suff = False
- if start is not None and start < 0:
- # if start is negative and end None, end is the "suffix length"
- if end is None:
- end = -start
- start = ""
- suff = True
- else:
- size = size or (await self._info(url))["size"]
- start = size + start
- elif start is None:
- start = 0
- if not suff:
- if end is not None and end < 0:
- if start is not None:
- size = size or (await self._info(url))["size"]
- end = size + end
- elif end is None:
- end = ""
- if isinstance(end, numbers.Integral):
- end -= 1 # bytes range is inclusive
- return f"bytes={start}-{end}"
- async def _cat_file(self, path, start=None, end=None, **kwargs):
- raise NotImplementedError
- async def _cat(
- self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
- ):
- paths = await self._expand_path(path, recursive=recursive)
- coros = [self._cat_file(path, **kwargs) for path in paths]
- batch_size = batch_size or self.batch_size
- out = await _run_coros_in_chunks(
- coros, batch_size=batch_size, nofiles=True, return_exceptions=True
- )
- if on_error == "raise":
- ex = next(filter(is_exception, out), False)
- if ex:
- raise ex
- if (
- len(paths) > 1
- or isinstance(path, list)
- or paths[0] != self._strip_protocol(path)
- ):
- return {
- k: v
- for k, v in zip(paths, out)
- if on_error != "omit" or not is_exception(v)
- }
- else:
- return out[0]
- async def _cat_ranges(
- self,
- paths,
- starts,
- ends,
- max_gap=None,
- batch_size=None,
- on_error="return",
- **kwargs,
- ):
- """Get the contents of byte ranges from one or more files
- Parameters
- ----------
- paths: list
- A list of of filepaths on this filesystems
- starts, ends: int or list
- Bytes limits of the read. If using a single int, the same value will be
- used to read all the specified files.
- """
- # TODO: on_error
- if max_gap is not None:
- # use utils.merge_offset_ranges
- raise NotImplementedError
- if not isinstance(paths, list):
- raise TypeError
- if not isinstance(starts, Iterable):
- starts = [starts] * len(paths)
- if not isinstance(ends, Iterable):
- ends = [ends] * len(paths)
- if len(starts) != len(paths) or len(ends) != len(paths):
- raise ValueError
- coros = [
- self._cat_file(p, start=s, end=e, **kwargs)
- for p, s, e in zip(paths, starts, ends)
- ]
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, nofiles=True, return_exceptions=True
- )
- async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
- raise NotImplementedError
- async def _put(
- self,
- lpath,
- rpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- batch_size=None,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) from local.
- Copies a specific file or tree of files (if recursive=True). If rpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within.
- The put_file method will be called concurrently on a batch of files. The
- batch_size option can configure the amount of futures that can be executed
- at the same time. If it is -1, then all the files will be uploaded concurrently.
- The default can be set for this instance by passing "batch_size" in the
- constructor, or for all instances by setting the "gather_batch_size" key
- in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- source_is_str = isinstance(lpath, str)
- if source_is_str:
- lpath = make_path_posix(lpath)
- fs = LocalFileSystem()
- lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
- if not lpaths:
- return
- source_is_file = len(lpaths) == 1
- dest_is_dir = isinstance(rpath, str) and (
- trailing_sep(rpath) or await self._isdir(rpath)
- )
- rpath = self._strip_protocol(rpath)
- exists = source_is_str and (
- (has_magic(lpath) and source_is_file)
- or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
- )
- rpaths = other_paths(
- lpaths,
- rpath,
- exists=exists,
- flatten=not source_is_str,
- )
- is_dir = {l: os.path.isdir(l) for l in lpaths}
- rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
- file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
- await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
- batch_size = batch_size or self.batch_size
- coros = []
- callback.set_size(len(file_pairs))
- for lfile, rfile in file_pairs:
- put_file = callback.branch_coro(self._put_file)
- coros.append(put_file(lfile, rfile, **kwargs))
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, callback=callback
- )
- async def _get_file(self, rpath, lpath, **kwargs):
- raise NotImplementedError
- async def _get(
- self,
- rpath,
- lpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) to local.
- Copies a specific file or tree of files (if recursive=True). If lpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within. Can submit a list of paths, which may be glob-patterns
- and will be expanded.
- The get_file method will be called concurrently on a batch of files. The
- batch_size option can configure the amount of futures that can be executed
- at the same time. If it is -1, then all the files will be uploaded concurrently.
- The default can be set for this instance by passing "batch_size" in the
- constructor, or for all instances by setting the "gather_batch_size" key
- in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- source_is_str = isinstance(rpath, str)
- # First check for rpath trailing slash as _strip_protocol removes it.
- source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
- rpath = self._strip_protocol(rpath)
- rpaths = await self._expand_path(
- rpath, recursive=recursive, maxdepth=maxdepth
- )
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- rpaths = [
- p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
- ]
- if not rpaths:
- return
- lpath = make_path_posix(lpath)
- source_is_file = len(rpaths) == 1
- dest_is_dir = isinstance(lpath, str) and (
- trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
- )
- exists = source_is_str and (
- (has_magic(rpath) and source_is_file)
- or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
- )
- lpaths = other_paths(
- rpaths,
- lpath,
- exists=exists,
- flatten=not source_is_str,
- )
- [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
- batch_size = kwargs.pop("batch_size", self.batch_size)
- coros = []
- callback.set_size(len(lpaths))
- for lpath, rpath in zip(lpaths, rpaths):
- get_file = callback.branch_coro(self._get_file)
- coros.append(get_file(rpath, lpath, **kwargs))
- return await _run_coros_in_chunks(
- coros, batch_size=batch_size, callback=callback
- )
- async def _isfile(self, path):
- try:
- return (await self._info(path))["type"] == "file"
- except: # noqa: E722
- return False
- async def _isdir(self, path):
- try:
- return (await self._info(path))["type"] == "directory"
- except OSError:
- return False
- async def _size(self, path):
- return (await self._info(path)).get("size", None)
- async def _sizes(self, paths, batch_size=None):
- batch_size = batch_size or self.batch_size
- return await _run_coros_in_chunks(
- [self._size(p) for p in paths], batch_size=batch_size
- )
- async def _exists(self, path, **kwargs):
- try:
- await self._info(path, **kwargs)
- return True
- except FileNotFoundError:
- return False
- async def _info(self, path, **kwargs):
- raise NotImplementedError
- async def _ls(self, path, detail=True, **kwargs):
- raise NotImplementedError
- async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- path = self._strip_protocol(path)
- full_dirs = {}
- dirs = {}
- files = {}
- detail = kwargs.pop("detail", False)
- try:
- listing = await self._ls(path, detail=True, **kwargs)
- except (FileNotFoundError, OSError) as e:
- if on_error == "raise":
- raise
- elif callable(on_error):
- on_error(e)
- if detail:
- yield path, {}, {}
- else:
- yield path, [], []
- return
- for info in listing:
- # each info name must be at least [path]/part , but here
- # we check also for names like [path]/part/
- pathname = info["name"].rstrip("/")
- name = pathname.rsplit("/", 1)[-1]
- if info["type"] == "directory" and pathname != path:
- # do not include "self" path
- full_dirs[name] = pathname
- dirs[name] = info
- elif pathname == path:
- # file-like with same name as give path
- files[""] = info
- else:
- files[name] = info
- if detail:
- yield path, dirs, files
- else:
- yield path, list(dirs), list(files)
- if maxdepth is not None:
- maxdepth -= 1
- if maxdepth < 1:
- return
- for d in dirs:
- async for _ in self._walk(
- full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
- ):
- yield _
- async def _glob(self, path, maxdepth=None, **kwargs):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- import re
- seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
- ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
- path = self._strip_protocol(path)
- append_slash_to_dirname = ends_with_sep or path.endswith(
- tuple(sep + "**" for sep in seps)
- )
- idx_star = path.find("*") if path.find("*") >= 0 else len(path)
- idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
- idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
- min_idx = min(idx_star, idx_qmark, idx_brace)
- detail = kwargs.pop("detail", False)
- if not has_magic(path):
- if await self._exists(path, **kwargs):
- if not detail:
- return [path]
- else:
- return {path: await self._info(path, **kwargs)}
- else:
- if not detail:
- return [] # glob of non-existent returns empty
- else:
- return {}
- elif "/" in path[:min_idx]:
- min_idx = path[:min_idx].rindex("/")
- root = path[: min_idx + 1]
- depth = path[min_idx + 1 :].count("/") + 1
- else:
- root = ""
- depth = path[min_idx + 1 :].count("/") + 1
- if "**" in path:
- if maxdepth is not None:
- idx_double_stars = path.find("**")
- depth_double_stars = path[idx_double_stars:].count("/") + 1
- depth = depth - depth_double_stars + maxdepth
- else:
- depth = None
- allpaths = await self._find(
- root, maxdepth=depth, withdirs=True, detail=True, **kwargs
- )
- pattern = glob_translate(path + ("/" if ends_with_sep else ""))
- pattern = re.compile(pattern)
- out = {
- p: info
- for p, info in sorted(allpaths.items())
- if pattern.match(
- p + "/"
- if append_slash_to_dirname and info["type"] == "directory"
- else p
- )
- }
- if detail:
- return out
- else:
- return list(out)
- async def _du(self, path, total=True, maxdepth=None, **kwargs):
- sizes = {}
- # async for?
- for f in await self._find(path, maxdepth=maxdepth, **kwargs):
- info = await self._info(f)
- sizes[info["name"]] = info["size"]
- if total:
- return sum(sizes.values())
- else:
- return sizes
- async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
- path = self._strip_protocol(path)
- out = {}
- detail = kwargs.pop("detail", False)
- # Add the root directory if withdirs is requested
- # This is needed for posix glob compliance
- if withdirs and path != "" and await self._isdir(path):
- out[path] = await self._info(path)
- # async for?
- async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
- if withdirs:
- files.update(dirs)
- out.update({info["name"]: info for name, info in files.items()})
- if not out and (await self._isfile(path)):
- # walk works on directories, but find should also return [path]
- # when path happens to be a file
- out[path] = {}
- names = sorted(out)
- if not detail:
- return names
- else:
- return {name: out[name] for name in names}
- async def _expand_path(self, path, recursive=False, maxdepth=None):
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- if isinstance(path, str):
- out = await self._expand_path([path], recursive, maxdepth)
- else:
- out = set()
- path = [self._strip_protocol(p) for p in path]
- for p in path: # can gather here
- if has_magic(p):
- bit = set(await self._glob(p, maxdepth=maxdepth))
- out |= bit
- if recursive:
- # glob call above expanded one depth so if maxdepth is defined
- # then decrement it in expand_path call below. If it is zero
- # after decrementing then avoid expand_path call.
- if maxdepth is not None and maxdepth <= 1:
- continue
- out |= set(
- await self._expand_path(
- list(bit),
- recursive=recursive,
- maxdepth=maxdepth - 1 if maxdepth is not None else None,
- )
- )
- continue
- elif recursive:
- rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
- out |= rec
- if p not in out and (recursive is False or (await self._exists(p))):
- # should only check once, for the root
- out.add(p)
- if not out:
- raise FileNotFoundError(path)
- return sorted(out)
- async def _mkdir(self, path, create_parents=True, **kwargs):
- pass # not necessary to implement, may not have directories
- async def _makedirs(self, path, exist_ok=False):
- pass # not necessary to implement, may not have directories
- async def open_async(self, path, mode="rb", **kwargs):
- if "b" not in mode or kwargs.get("compression"):
- raise ValueError
- raise NotImplementedError
- def mirror_sync_methods(obj):
- """Populate sync and async methods for obj
- For each method will create a sync version if the name refers to an async method
- (coroutine) and there is no override in the child class; will create an async
- method for the corresponding sync method if there is no implementation.
- Uses the methods specified in
- - async_methods: the set that an implementation is expected to provide
- - default_async_methods: that can be derived from their sync version in
- AbstractFileSystem
- - AsyncFileSystem: async-specific default coroutines
- """
- from fsspec import AbstractFileSystem
- for method in async_methods + dir(AsyncFileSystem):
- if not method.startswith("_"):
- continue
- smethod = method[1:]
- if private.match(method):
- isco = inspect.iscoroutinefunction(getattr(obj, method, None))
- unsync = getattr(getattr(obj, smethod, False), "__func__", None)
- is_default = unsync is getattr(AbstractFileSystem, smethod, "")
- if isco and is_default:
- mth = sync_wrapper(getattr(obj, method), obj=obj)
- setattr(obj, smethod, mth)
- if not mth.__doc__:
- mth.__doc__ = getattr(
- getattr(AbstractFileSystem, smethod, None), "__doc__", ""
- )
- class FSSpecCoroutineCancel(Exception):
- pass
- def _dump_running_tasks(
- printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
- ):
- import traceback
- tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
- if printout:
- [task.print_stack() for task in tasks]
- out = [
- {
- "locals": task._coro.cr_frame.f_locals,
- "file": task._coro.cr_frame.f_code.co_filename,
- "firstline": task._coro.cr_frame.f_code.co_firstlineno,
- "linelo": task._coro.cr_frame.f_lineno,
- "stack": traceback.format_stack(task._coro.cr_frame),
- "task": task if with_task else None,
- }
- for task in tasks
- ]
- if cancel:
- for t in tasks:
- cbs = t._callbacks
- t.cancel()
- asyncio.futures.Future.set_exception(t, exc)
- asyncio.futures.Future.cancel(t)
- [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
- try:
- t._coro.throw(exc) # exits coro, unless explicitly handled
- except exc:
- pass
- return out
- class AbstractAsyncStreamedFile(AbstractBufferedFile):
- # no read buffering, and always auto-commit
- # TODO: readahead might still be useful here, but needs async version
- async def read(self, length=-1):
- """
- Return data from cache, or fetch pieces as necessary
- Parameters
- ----------
- length: int (-1)
- Number of bytes to read; if <0, all remaining bytes.
- """
- length = -1 if length is None else int(length)
- if self.mode != "rb":
- raise ValueError("File not in read mode")
- if length < 0:
- length = self.size - self.loc
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if length == 0:
- # don't even bother calling fetch
- return b""
- out = await self._fetch_range(self.loc, self.loc + length)
- self.loc += len(out)
- return out
- async def write(self, data):
- """
- Write data to buffer.
- Buffer only sent on flush() or if buffer is greater than
- or equal to blocksize.
- Parameters
- ----------
- data: bytes
- Set of bytes to be written.
- """
- if self.mode not in {"wb", "ab"}:
- raise ValueError("File not in write mode")
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if self.forced:
- raise ValueError("This file has been force-flushed, can only close")
- out = self.buffer.write(data)
- self.loc += out
- if self.buffer.tell() >= self.blocksize:
- await self.flush()
- return out
- async def close(self):
- """Close file
- Finalizes writes, discards cache
- """
- if getattr(self, "_unclosable", False):
- return
- if self.closed:
- return
- if self.mode == "rb":
- self.cache = None
- else:
- if not self.forced:
- await self.flush(force=True)
- if self.fs is not None:
- self.fs.invalidate_cache(self.path)
- self.fs.invalidate_cache(self.fs._parent(self.path))
- self.closed = True
- async def flush(self, force=False):
- if self.closed:
- raise ValueError("Flush on closed file")
- if force and self.forced:
- raise ValueError("Force flush cannot be called more than once")
- if force:
- self.forced = True
- if self.mode not in {"wb", "ab"}:
- # no-op to flush on read-mode
- return
- if not force and self.buffer.tell() < self.blocksize:
- # Defer write on small block
- return
- if self.offset is None:
- # Initialize a multipart upload
- self.offset = 0
- try:
- await self._initiate_upload()
- except:
- self.closed = True
- raise
- if await self._upload_chunk(final=force) is not False:
- self.offset += self.buffer.seek(0, 2)
- self.buffer = io.BytesIO()
- async def __aenter__(self):
- return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close()
- async def _fetch_range(self, start, end):
- raise NotImplementedError
- async def _initiate_upload(self):
- pass
- async def _upload_chunk(self, final=False):
- raise NotImplementedError
|