123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- import asyncio
- import functools
- import inspect
- from fsspec.asyn import AsyncFileSystem, running_async
- def async_wrapper(func, obj=None):
- """
- Wraps a synchronous function to make it awaitable.
- Parameters
- ----------
- func : callable
- The synchronous function to wrap.
- obj : object, optional
- The instance to bind the function to, if applicable.
- Returns
- -------
- coroutine
- An awaitable version of the function.
- """
- @functools.wraps(func)
- async def wrapper(*args, **kwargs):
- return await asyncio.to_thread(func, *args, **kwargs)
- return wrapper
- class AsyncFileSystemWrapper(AsyncFileSystem):
- """
- A wrapper class to convert a synchronous filesystem into an asynchronous one.
- This class takes an existing synchronous filesystem implementation and wraps all
- its methods to provide an asynchronous interface.
- Parameters
- ----------
- sync_fs : AbstractFileSystem
- The synchronous filesystem instance to wrap.
- """
- protocol = "async_wrapper"
- cachable = False
- def __init__(self, fs, *args, asynchronous=None, **kwargs):
- if asynchronous is None:
- asynchronous = running_async()
- super().__init__(*args, asynchronous=asynchronous, **kwargs)
- self.sync_fs = fs
- self.protocol = self.sync_fs.protocol
- self._wrap_all_sync_methods()
- @property
- def fsid(self):
- return f"async_{self.sync_fs.fsid}"
- def _wrap_all_sync_methods(self):
- """
- Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
- """
- excluded_methods = {"open"}
- for method_name in dir(self.sync_fs):
- if method_name.startswith("_") or method_name in excluded_methods:
- continue
- attr = inspect.getattr_static(self.sync_fs, method_name)
- if isinstance(attr, property):
- continue
- method = getattr(self.sync_fs, method_name)
- if callable(method) and not asyncio.iscoroutinefunction(method):
- async_method = async_wrapper(method, obj=self)
- setattr(self, f"_{method_name}", async_method)
- @classmethod
- def wrap_class(cls, sync_fs_class):
- """
- Create a new class that can be used to instantiate an AsyncFileSystemWrapper
- with lazy instantiation of the underlying synchronous filesystem.
- Parameters
- ----------
- sync_fs_class : type
- The class of the synchronous filesystem to wrap.
- Returns
- -------
- type
- A new class that wraps the provided synchronous filesystem class.
- """
- class GeneratedAsyncFileSystemWrapper(cls):
- def __init__(self, *args, **kwargs):
- sync_fs = sync_fs_class(*args, **kwargs)
- super().__init__(sync_fs)
- GeneratedAsyncFileSystemWrapper.__name__ = (
- f"Async{sync_fs_class.__name__}Wrapper"
- )
- return GeneratedAsyncFileSystemWrapper
|