123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- from functools import wraps
- class Callback:
- """
- Base class and interface for callback mechanism
- This class can be used directly for monitoring file transfers by
- providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument,
- below), or subclassed for more specialised behaviour.
- Parameters
- ----------
- size: int (optional)
- Nominal quantity for the value that corresponds to a complete
- transfer, e.g., total number of tiles or total number of
- bytes
- value: int (0)
- Starting internal counter value
- hooks: dict or None
- A dict of named functions to be called on each update. The signature
- of these must be ``f(size, value, **kwargs)``
- """
- def __init__(self, size=None, value=0, hooks=None, **kwargs):
- self.size = size
- self.value = value
- self.hooks = hooks or {}
- self.kw = kwargs
- def __enter__(self):
- return self
- def __exit__(self, *exc_args):
- self.close()
- def close(self):
- """Close callback."""
- def branched(self, path_1, path_2, **kwargs):
- """
- Return callback for child transfers
- If this callback is operating at a higher level, e.g., put, which may
- trigger transfers that can also be monitored. The function returns a callback
- that has to be passed to the child method, e.g., put_file,
- as `callback=` argument.
- The implementation uses `callback.branch` for compatibility.
- When implementing callbacks, it is recommended to override this function instead
- of `branch` and avoid calling `super().branched(...)`.
- Prefer using this function over `branch`.
- Parameters
- ----------
- path_1: str
- Child's source path
- path_2: str
- Child's destination path
- **kwargs:
- Arbitrary keyword arguments
- Returns
- -------
- callback: Callback
- A callback instance to be passed to the child method
- """
- self.branch(path_1, path_2, kwargs)
- # mutate kwargs so that we can force the caller to pass "callback=" explicitly
- return kwargs.pop("callback", DEFAULT_CALLBACK)
- def branch_coro(self, fn):
- """
- Wraps a coroutine, and pass a new child callback to it.
- """
- @wraps(fn)
- async def func(path1, path2: str, **kwargs):
- with self.branched(path1, path2, **kwargs) as child:
- return await fn(path1, path2, callback=child, **kwargs)
- return func
- def set_size(self, size):
- """
- Set the internal maximum size attribute
- Usually called if not initially set at instantiation. Note that this
- triggers a ``call()``.
- Parameters
- ----------
- size: int
- """
- self.size = size
- self.call()
- def absolute_update(self, value):
- """
- Set the internal value state
- Triggers ``call()``
- Parameters
- ----------
- value: int
- """
- self.value = value
- self.call()
- def relative_update(self, inc=1):
- """
- Delta increment the internal counter
- Triggers ``call()``
- Parameters
- ----------
- inc: int
- """
- self.value += inc
- self.call()
- def call(self, hook_name=None, **kwargs):
- """
- Execute hook(s) with current state
- Each function is passed the internal size and current value
- Parameters
- ----------
- hook_name: str or None
- If given, execute on this hook
- kwargs: passed on to (all) hook(s)
- """
- if not self.hooks:
- return
- kw = self.kw.copy()
- kw.update(kwargs)
- if hook_name:
- if hook_name not in self.hooks:
- return
- return self.hooks[hook_name](self.size, self.value, **kw)
- for hook in self.hooks.values() or []:
- hook(self.size, self.value, **kw)
- def wrap(self, iterable):
- """
- Wrap an iterable to call ``relative_update`` on each iterations
- Parameters
- ----------
- iterable: Iterable
- The iterable that is being wrapped
- """
- for item in iterable:
- self.relative_update()
- yield item
- def branch(self, path_1, path_2, kwargs):
- """
- Set callbacks for child transfers
- If this callback is operating at a higher level, e.g., put, which may
- trigger transfers that can also be monitored. The passed kwargs are
- to be *mutated* to add ``callback=``, if this class supports branching
- to children.
- Parameters
- ----------
- path_1: str
- Child's source path
- path_2: str
- Child's destination path
- kwargs: dict
- arguments passed to child method, e.g., put_file.
- Returns
- -------
- """
- return None
- def no_op(self, *_, **__):
- pass
- def __getattr__(self, item):
- """
- If undefined methods are called on this class, nothing happens
- """
- return self.no_op
- @classmethod
- def as_callback(cls, maybe_callback=None):
- """Transform callback=... into Callback instance
- For the special value of ``None``, return the global instance of
- ``NoOpCallback``. This is an alternative to including
- ``callback=DEFAULT_CALLBACK`` directly in a method signature.
- """
- if maybe_callback is None:
- return DEFAULT_CALLBACK
- return maybe_callback
- class NoOpCallback(Callback):
- """
- This implementation of Callback does exactly nothing
- """
- def call(self, *args, **kwargs):
- return None
- class DotPrinterCallback(Callback):
- """
- Simple example Callback implementation
- Almost identical to Callback with a hook that prints a char; here we
- demonstrate how the outer layer may print "#" and the inner layer "."
- """
- def __init__(self, chr_to_print="#", **kwargs):
- self.chr = chr_to_print
- super().__init__(**kwargs)
- def branch(self, path_1, path_2, kwargs):
- """Mutate kwargs to add new instance with different print char"""
- kwargs["callback"] = DotPrinterCallback(".")
- def call(self, **kwargs):
- """Just outputs a character"""
- print(self.chr, end="")
- class TqdmCallback(Callback):
- """
- A callback to display a progress bar using tqdm
- Parameters
- ----------
- tqdm_kwargs : dict, (optional)
- Any argument accepted by the tqdm constructor.
- See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_.
- Will be forwarded to `tqdm_cls`.
- tqdm_cls: (optional)
- subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`.
- Examples
- --------
- >>> import fsspec
- >>> from fsspec.callbacks import TqdmCallback
- >>> fs = fsspec.filesystem("memory")
- >>> path2distant_data = "/your-path"
- >>> fs.upload(
- ".",
- path2distant_data,
- recursive=True,
- callback=TqdmCallback(),
- )
- You can forward args to tqdm using the ``tqdm_kwargs`` parameter.
- >>> fs.upload(
- ".",
- path2distant_data,
- recursive=True,
- callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}),
- )
- You can also customize the progress bar by passing a subclass of `tqdm`.
- .. code-block:: python
- class TqdmFormat(tqdm):
- '''Provides a `total_time` format parameter'''
- @property
- def format_dict(self):
- d = super().format_dict
- total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1)
- d.update(total_time=self.format_interval(total_time) + " in total")
- return d
- >>> with TqdmCallback(
- tqdm_kwargs={
- "desc": "desc",
- "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}",
- },
- tqdm_cls=TqdmFormat,
- ) as callback:
- fs.upload(".", path2distant_data, recursive=True, callback=callback)
- """
- def __init__(self, tqdm_kwargs=None, *args, **kwargs):
- try:
- from tqdm import tqdm
- except ImportError as exce:
- raise ImportError(
- "Using TqdmCallback requires tqdm to be installed"
- ) from exce
- self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm)
- self._tqdm_kwargs = tqdm_kwargs or {}
- self.tqdm = None
- super().__init__(*args, **kwargs)
- def call(self, *args, **kwargs):
- if self.tqdm is None:
- self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
- self.tqdm.total = self.size
- self.tqdm.update(self.value - self.tqdm.n)
- def close(self):
- if self.tqdm is not None:
- self.tqdm.close()
- self.tqdm = None
- def __del__(self):
- return self.close()
- DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()
|