123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- import dask
- from distributed.client import Client, _get_global_client
- from distributed.worker import Worker
- from fsspec import filesystem
- from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
- from fsspec.utils import infer_storage_options
- def _get_client(client):
- if client is None:
- return _get_global_client()
- elif isinstance(client, Client):
- return client
- else:
- # e.g., connection string
- return Client(client)
- def _in_worker():
- return bool(Worker._instances)
- class DaskWorkerFileSystem(AbstractFileSystem):
- """View files accessible to a worker as any other remote file-system
- When instances are run on the worker, uses the real filesystem. When
- run on the client, they call the worker to provide information or data.
- **Warning** this implementation is experimental, and read-only for now.
- """
- def __init__(
- self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs
- ):
- super().__init__(**kwargs)
- if not (fs is None) ^ (target_protocol is None):
- raise ValueError(
- "Please provide one of filesystem instance (fs) or"
- " target_protocol, not both"
- )
- self.target_protocol = target_protocol
- self.target_options = target_options
- self.worker = None
- self.client = client
- self.fs = fs
- self._determine_worker()
- @staticmethod
- def _get_kwargs_from_urls(path):
- so = infer_storage_options(path)
- if "host" in so and "port" in so:
- return {"client": f"{so['host']}:{so['port']}"}
- else:
- return {}
- def _determine_worker(self):
- if _in_worker():
- self.worker = True
- if self.fs is None:
- self.fs = filesystem(
- self.target_protocol, **(self.target_options or {})
- )
- else:
- self.worker = False
- self.client = _get_client(self.client)
- self.rfs = dask.delayed(self)
- def mkdir(self, *args, **kwargs):
- if self.worker:
- self.fs.mkdir(*args, **kwargs)
- else:
- self.rfs.mkdir(*args, **kwargs).compute()
- def rm(self, *args, **kwargs):
- if self.worker:
- self.fs.rm(*args, **kwargs)
- else:
- self.rfs.rm(*args, **kwargs).compute()
- def copy(self, *args, **kwargs):
- if self.worker:
- self.fs.copy(*args, **kwargs)
- else:
- self.rfs.copy(*args, **kwargs).compute()
- def mv(self, *args, **kwargs):
- if self.worker:
- self.fs.mv(*args, **kwargs)
- else:
- self.rfs.mv(*args, **kwargs).compute()
- def ls(self, *args, **kwargs):
- if self.worker:
- return self.fs.ls(*args, **kwargs)
- else:
- return self.rfs.ls(*args, **kwargs).compute()
- def _open(
- self,
- path,
- mode="rb",
- block_size=None,
- autocommit=True,
- cache_options=None,
- **kwargs,
- ):
- if self.worker:
- return self.fs._open(
- path,
- mode=mode,
- block_size=block_size,
- autocommit=autocommit,
- cache_options=cache_options,
- **kwargs,
- )
- else:
- return DaskFile(
- fs=self,
- path=path,
- mode=mode,
- block_size=block_size,
- autocommit=autocommit,
- cache_options=cache_options,
- **kwargs,
- )
- def fetch_range(self, path, mode, start, end):
- if self.worker:
- with self._open(path, mode) as f:
- f.seek(start)
- return f.read(end - start)
- else:
- return self.rfs.fetch_range(path, mode, start, end).compute()
- class DaskFile(AbstractBufferedFile):
- def __init__(self, mode="rb", **kwargs):
- if mode != "rb":
- raise ValueError('Remote dask files can only be opened in "rb" mode')
- super().__init__(**kwargs)
- def _upload_chunk(self, final=False):
- pass
- def _initiate_upload(self):
- """Create remote file/upload"""
- pass
- def _fetch_range(self, start, end):
- """Get the specified set of bytes from remote"""
- return self.fs.fetch_range(self.path, self.mode, start, end)
|