asyn_wrapper.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import asyncio
  2. import functools
  3. import inspect
  4. from fsspec.asyn import AsyncFileSystem, running_async
  5. def async_wrapper(func, obj=None):
  6. """
  7. Wraps a synchronous function to make it awaitable.
  8. Parameters
  9. ----------
  10. func : callable
  11. The synchronous function to wrap.
  12. obj : object, optional
  13. The instance to bind the function to, if applicable.
  14. Returns
  15. -------
  16. coroutine
  17. An awaitable version of the function.
  18. """
  19. @functools.wraps(func)
  20. async def wrapper(*args, **kwargs):
  21. return await asyncio.to_thread(func, *args, **kwargs)
  22. return wrapper
  23. class AsyncFileSystemWrapper(AsyncFileSystem):
  24. """
  25. A wrapper class to convert a synchronous filesystem into an asynchronous one.
  26. This class takes an existing synchronous filesystem implementation and wraps all
  27. its methods to provide an asynchronous interface.
  28. Parameters
  29. ----------
  30. sync_fs : AbstractFileSystem
  31. The synchronous filesystem instance to wrap.
  32. """
  33. protocol = "async_wrapper"
  34. cachable = False
  35. def __init__(self, fs, *args, asynchronous=None, **kwargs):
  36. if asynchronous is None:
  37. asynchronous = running_async()
  38. super().__init__(*args, asynchronous=asynchronous, **kwargs)
  39. self.sync_fs = fs
  40. self.protocol = self.sync_fs.protocol
  41. self._wrap_all_sync_methods()
  42. @property
  43. def fsid(self):
  44. return f"async_{self.sync_fs.fsid}"
  45. def _wrap_all_sync_methods(self):
  46. """
  47. Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
  48. """
  49. excluded_methods = {"open"}
  50. for method_name in dir(self.sync_fs):
  51. if method_name.startswith("_") or method_name in excluded_methods:
  52. continue
  53. attr = inspect.getattr_static(self.sync_fs, method_name)
  54. if isinstance(attr, property):
  55. continue
  56. method = getattr(self.sync_fs, method_name)
  57. if callable(method) and not asyncio.iscoroutinefunction(method):
  58. async_method = async_wrapper(method, obj=self)
  59. setattr(self, f"_{method_name}", async_method)
  60. @classmethod
  61. def wrap_class(cls, sync_fs_class):
  62. """
  63. Create a new class that can be used to instantiate an AsyncFileSystemWrapper
  64. with lazy instantiation of the underlying synchronous filesystem.
  65. Parameters
  66. ----------
  67. sync_fs_class : type
  68. The class of the synchronous filesystem to wrap.
  69. Returns
  70. -------
  71. type
  72. A new class that wraps the provided synchronous filesystem class.
  73. """
  74. class GeneratedAsyncFileSystemWrapper(cls):
  75. def __init__(self, *args, **kwargs):
  76. sync_fs = sync_fs_class(*args, **kwargs)
  77. super().__init__(sync_fs)
  78. GeneratedAsyncFileSystemWrapper.__name__ = (
  79. f"Async{sync_fs_class.__name__}Wrapper"
  80. )
  81. return GeneratedAsyncFileSystemWrapper