callbacks.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. from functools import wraps
  2. class Callback:
  3. """
  4. Base class and interface for callback mechanism
  5. This class can be used directly for monitoring file transfers by
  6. providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument,
  7. below), or subclassed for more specialised behaviour.
  8. Parameters
  9. ----------
  10. size: int (optional)
  11. Nominal quantity for the value that corresponds to a complete
  12. transfer, e.g., total number of tiles or total number of
  13. bytes
  14. value: int (0)
  15. Starting internal counter value
  16. hooks: dict or None
  17. A dict of named functions to be called on each update. The signature
  18. of these must be ``f(size, value, **kwargs)``
  19. """
  20. def __init__(self, size=None, value=0, hooks=None, **kwargs):
  21. self.size = size
  22. self.value = value
  23. self.hooks = hooks or {}
  24. self.kw = kwargs
  25. def __enter__(self):
  26. return self
  27. def __exit__(self, *exc_args):
  28. self.close()
  29. def close(self):
  30. """Close callback."""
  31. def branched(self, path_1, path_2, **kwargs):
  32. """
  33. Return callback for child transfers
  34. If this callback is operating at a higher level, e.g., put, which may
  35. trigger transfers that can also be monitored. The function returns a callback
  36. that has to be passed to the child method, e.g., put_file,
  37. as `callback=` argument.
  38. The implementation uses `callback.branch` for compatibility.
  39. When implementing callbacks, it is recommended to override this function instead
  40. of `branch` and avoid calling `super().branched(...)`.
  41. Prefer using this function over `branch`.
  42. Parameters
  43. ----------
  44. path_1: str
  45. Child's source path
  46. path_2: str
  47. Child's destination path
  48. **kwargs:
  49. Arbitrary keyword arguments
  50. Returns
  51. -------
  52. callback: Callback
  53. A callback instance to be passed to the child method
  54. """
  55. self.branch(path_1, path_2, kwargs)
  56. # mutate kwargs so that we can force the caller to pass "callback=" explicitly
  57. return kwargs.pop("callback", DEFAULT_CALLBACK)
  58. def branch_coro(self, fn):
  59. """
  60. Wraps a coroutine, and pass a new child callback to it.
  61. """
  62. @wraps(fn)
  63. async def func(path1, path2: str, **kwargs):
  64. with self.branched(path1, path2, **kwargs) as child:
  65. return await fn(path1, path2, callback=child, **kwargs)
  66. return func
  67. def set_size(self, size):
  68. """
  69. Set the internal maximum size attribute
  70. Usually called if not initially set at instantiation. Note that this
  71. triggers a ``call()``.
  72. Parameters
  73. ----------
  74. size: int
  75. """
  76. self.size = size
  77. self.call()
  78. def absolute_update(self, value):
  79. """
  80. Set the internal value state
  81. Triggers ``call()``
  82. Parameters
  83. ----------
  84. value: int
  85. """
  86. self.value = value
  87. self.call()
  88. def relative_update(self, inc=1):
  89. """
  90. Delta increment the internal counter
  91. Triggers ``call()``
  92. Parameters
  93. ----------
  94. inc: int
  95. """
  96. self.value += inc
  97. self.call()
  98. def call(self, hook_name=None, **kwargs):
  99. """
  100. Execute hook(s) with current state
  101. Each function is passed the internal size and current value
  102. Parameters
  103. ----------
  104. hook_name: str or None
  105. If given, execute on this hook
  106. kwargs: passed on to (all) hook(s)
  107. """
  108. if not self.hooks:
  109. return
  110. kw = self.kw.copy()
  111. kw.update(kwargs)
  112. if hook_name:
  113. if hook_name not in self.hooks:
  114. return
  115. return self.hooks[hook_name](self.size, self.value, **kw)
  116. for hook in self.hooks.values() or []:
  117. hook(self.size, self.value, **kw)
  118. def wrap(self, iterable):
  119. """
  120. Wrap an iterable to call ``relative_update`` on each iterations
  121. Parameters
  122. ----------
  123. iterable: Iterable
  124. The iterable that is being wrapped
  125. """
  126. for item in iterable:
  127. self.relative_update()
  128. yield item
  129. def branch(self, path_1, path_2, kwargs):
  130. """
  131. Set callbacks for child transfers
  132. If this callback is operating at a higher level, e.g., put, which may
  133. trigger transfers that can also be monitored. The passed kwargs are
  134. to be *mutated* to add ``callback=``, if this class supports branching
  135. to children.
  136. Parameters
  137. ----------
  138. path_1: str
  139. Child's source path
  140. path_2: str
  141. Child's destination path
  142. kwargs: dict
  143. arguments passed to child method, e.g., put_file.
  144. Returns
  145. -------
  146. """
  147. return None
  148. def no_op(self, *_, **__):
  149. pass
  150. def __getattr__(self, item):
  151. """
  152. If undefined methods are called on this class, nothing happens
  153. """
  154. return self.no_op
  155. @classmethod
  156. def as_callback(cls, maybe_callback=None):
  157. """Transform callback=... into Callback instance
  158. For the special value of ``None``, return the global instance of
  159. ``NoOpCallback``. This is an alternative to including
  160. ``callback=DEFAULT_CALLBACK`` directly in a method signature.
  161. """
  162. if maybe_callback is None:
  163. return DEFAULT_CALLBACK
  164. return maybe_callback
  165. class NoOpCallback(Callback):
  166. """
  167. This implementation of Callback does exactly nothing
  168. """
  169. def call(self, *args, **kwargs):
  170. return None
  171. class DotPrinterCallback(Callback):
  172. """
  173. Simple example Callback implementation
  174. Almost identical to Callback with a hook that prints a char; here we
  175. demonstrate how the outer layer may print "#" and the inner layer "."
  176. """
  177. def __init__(self, chr_to_print="#", **kwargs):
  178. self.chr = chr_to_print
  179. super().__init__(**kwargs)
  180. def branch(self, path_1, path_2, kwargs):
  181. """Mutate kwargs to add new instance with different print char"""
  182. kwargs["callback"] = DotPrinterCallback(".")
  183. def call(self, **kwargs):
  184. """Just outputs a character"""
  185. print(self.chr, end="")
  186. class TqdmCallback(Callback):
  187. """
  188. A callback to display a progress bar using tqdm
  189. Parameters
  190. ----------
  191. tqdm_kwargs : dict, (optional)
  192. Any argument accepted by the tqdm constructor.
  193. See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_.
  194. Will be forwarded to `tqdm_cls`.
  195. tqdm_cls: (optional)
  196. subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`.
  197. Examples
  198. --------
  199. >>> import fsspec
  200. >>> from fsspec.callbacks import TqdmCallback
  201. >>> fs = fsspec.filesystem("memory")
  202. >>> path2distant_data = "/your-path"
  203. >>> fs.upload(
  204. ".",
  205. path2distant_data,
  206. recursive=True,
  207. callback=TqdmCallback(),
  208. )
  209. You can forward args to tqdm using the ``tqdm_kwargs`` parameter.
  210. >>> fs.upload(
  211. ".",
  212. path2distant_data,
  213. recursive=True,
  214. callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}),
  215. )
  216. You can also customize the progress bar by passing a subclass of `tqdm`.
  217. .. code-block:: python
  218. class TqdmFormat(tqdm):
  219. '''Provides a `total_time` format parameter'''
  220. @property
  221. def format_dict(self):
  222. d = super().format_dict
  223. total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1)
  224. d.update(total_time=self.format_interval(total_time) + " in total")
  225. return d
  226. >>> with TqdmCallback(
  227. tqdm_kwargs={
  228. "desc": "desc",
  229. "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}",
  230. },
  231. tqdm_cls=TqdmFormat,
  232. ) as callback:
  233. fs.upload(".", path2distant_data, recursive=True, callback=callback)
  234. """
  235. def __init__(self, tqdm_kwargs=None, *args, **kwargs):
  236. try:
  237. from tqdm import tqdm
  238. except ImportError as exce:
  239. raise ImportError(
  240. "Using TqdmCallback requires tqdm to be installed"
  241. ) from exce
  242. self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm)
  243. self._tqdm_kwargs = tqdm_kwargs or {}
  244. self.tqdm = None
  245. super().__init__(*args, **kwargs)
  246. def call(self, *args, **kwargs):
  247. if self.tqdm is None:
  248. self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
  249. self.tqdm.total = self.size
  250. self.tqdm.update(self.value - self.tqdm.n)
  251. def close(self):
  252. if self.tqdm is not None:
  253. self.tqdm.close()
  254. self.tqdm = None
  255. def __del__(self):
  256. return self.close()
  257. DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()