dask.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import dask
  2. from distributed.client import Client, _get_global_client
  3. from distributed.worker import Worker
  4. from fsspec import filesystem
  5. from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
  6. from fsspec.utils import infer_storage_options
  7. def _get_client(client):
  8. if client is None:
  9. return _get_global_client()
  10. elif isinstance(client, Client):
  11. return client
  12. else:
  13. # e.g., connection string
  14. return Client(client)
  15. def _in_worker():
  16. return bool(Worker._instances)
  17. class DaskWorkerFileSystem(AbstractFileSystem):
  18. """View files accessible to a worker as any other remote file-system
  19. When instances are run on the worker, uses the real filesystem. When
  20. run on the client, they call the worker to provide information or data.
  21. **Warning** this implementation is experimental, and read-only for now.
  22. """
  23. def __init__(
  24. self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs
  25. ):
  26. super().__init__(**kwargs)
  27. if not (fs is None) ^ (target_protocol is None):
  28. raise ValueError(
  29. "Please provide one of filesystem instance (fs) or"
  30. " target_protocol, not both"
  31. )
  32. self.target_protocol = target_protocol
  33. self.target_options = target_options
  34. self.worker = None
  35. self.client = client
  36. self.fs = fs
  37. self._determine_worker()
  38. @staticmethod
  39. def _get_kwargs_from_urls(path):
  40. so = infer_storage_options(path)
  41. if "host" in so and "port" in so:
  42. return {"client": f"{so['host']}:{so['port']}"}
  43. else:
  44. return {}
  45. def _determine_worker(self):
  46. if _in_worker():
  47. self.worker = True
  48. if self.fs is None:
  49. self.fs = filesystem(
  50. self.target_protocol, **(self.target_options or {})
  51. )
  52. else:
  53. self.worker = False
  54. self.client = _get_client(self.client)
  55. self.rfs = dask.delayed(self)
  56. def mkdir(self, *args, **kwargs):
  57. if self.worker:
  58. self.fs.mkdir(*args, **kwargs)
  59. else:
  60. self.rfs.mkdir(*args, **kwargs).compute()
  61. def rm(self, *args, **kwargs):
  62. if self.worker:
  63. self.fs.rm(*args, **kwargs)
  64. else:
  65. self.rfs.rm(*args, **kwargs).compute()
  66. def copy(self, *args, **kwargs):
  67. if self.worker:
  68. self.fs.copy(*args, **kwargs)
  69. else:
  70. self.rfs.copy(*args, **kwargs).compute()
  71. def mv(self, *args, **kwargs):
  72. if self.worker:
  73. self.fs.mv(*args, **kwargs)
  74. else:
  75. self.rfs.mv(*args, **kwargs).compute()
  76. def ls(self, *args, **kwargs):
  77. if self.worker:
  78. return self.fs.ls(*args, **kwargs)
  79. else:
  80. return self.rfs.ls(*args, **kwargs).compute()
  81. def _open(
  82. self,
  83. path,
  84. mode="rb",
  85. block_size=None,
  86. autocommit=True,
  87. cache_options=None,
  88. **kwargs,
  89. ):
  90. if self.worker:
  91. return self.fs._open(
  92. path,
  93. mode=mode,
  94. block_size=block_size,
  95. autocommit=autocommit,
  96. cache_options=cache_options,
  97. **kwargs,
  98. )
  99. else:
  100. return DaskFile(
  101. fs=self,
  102. path=path,
  103. mode=mode,
  104. block_size=block_size,
  105. autocommit=autocommit,
  106. cache_options=cache_options,
  107. **kwargs,
  108. )
  109. def fetch_range(self, path, mode, start, end):
  110. if self.worker:
  111. with self._open(path, mode) as f:
  112. f.seek(start)
  113. return f.read(end - start)
  114. else:
  115. return self.rfs.fetch_range(path, mode, start, end).compute()
  116. class DaskFile(AbstractBufferedFile):
  117. def __init__(self, mode="rb", **kwargs):
  118. if mode != "rb":
  119. raise ValueError('Remote dask files can only be opened in "rb" mode')
  120. super().__init__(**kwargs)
  121. def _upload_chunk(self, final=False):
  122. pass
  123. def _initiate_upload(self):
  124. """Create remote file/upload"""
  125. pass
  126. def _fetch_range(self, start, end):
  127. """Get the specified set of bytes from remote"""
  128. return self.fs.fetch_range(self.path, self.mode, start, end)