registry.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. """upath.registry -- registry for file system specific implementations
  2. Retrieve UPath implementations via `get_upath_class`.
  3. Register custom UPath subclasses in one of two ways:
  4. ### directly from Python
  5. >>> from upath import UPath
  6. >>> from upath.registry import register_implementation
  7. >>> my_protocol = "myproto"
  8. >>> class MyPath(UPath):
  9. ... pass
  10. >>> register_implementation(my_protocol, MyPath)
  11. ### via entry points
  12. ```toml
  13. # pyproject.toml
  14. [project.entry-points."universal_pathlib.implementations"]
  15. myproto = "my_module.submodule:MyPath"
  16. ```
  17. ```ini
  18. # setup.cfg
  19. [options.entry_points]
  20. universal_pathlib.implementations =
  21. myproto = my_module.submodule:MyPath
  22. ```
  23. """
  24. from __future__ import annotations
  25. import os
  26. import re
  27. import sys
  28. import warnings
  29. from collections import ChainMap
  30. from functools import lru_cache
  31. from importlib import import_module
  32. from importlib.metadata import entry_points
  33. from typing import TYPE_CHECKING
  34. from typing import Iterator
  35. from typing import MutableMapping
  36. from fsspec.core import get_filesystem_class
  37. from fsspec.registry import known_implementations as _fsspec_known_implementations
  38. import upath
  39. __all__ = [
  40. "get_upath_class",
  41. "available_implementations",
  42. "register_implementation",
  43. ]
  44. _ENTRY_POINT_GROUP = "universal_pathlib.implementations"
  45. class _Registry(MutableMapping[str, "type[upath.UPath]"]):
  46. """internal registry for UPath subclasses"""
  47. known_implementations: dict[str, str] = {
  48. "abfs": "upath.implementations.cloud.AzurePath",
  49. "abfss": "upath.implementations.cloud.AzurePath",
  50. "adl": "upath.implementations.cloud.AzurePath",
  51. "az": "upath.implementations.cloud.AzurePath",
  52. "data": "upath.implementations.data.DataPath",
  53. "file": "upath.implementations.local.FilePath",
  54. "local": "upath.implementations.local.FilePath",
  55. "gcs": "upath.implementations.cloud.GCSPath",
  56. "gs": "upath.implementations.cloud.GCSPath",
  57. "hdfs": "upath.implementations.hdfs.HDFSPath",
  58. "http": "upath.implementations.http.HTTPPath",
  59. "https": "upath.implementations.http.HTTPPath",
  60. "memory": "upath.implementations.memory.MemoryPath",
  61. "s3": "upath.implementations.cloud.S3Path",
  62. "s3a": "upath.implementations.cloud.S3Path",
  63. "sftp": "upath.implementations.sftp.SFTPPath",
  64. "ssh": "upath.implementations.sftp.SFTPPath",
  65. "webdav": "upath.implementations.webdav.WebdavPath",
  66. "webdav+http": "upath.implementations.webdav.WebdavPath",
  67. "webdav+https": "upath.implementations.webdav.WebdavPath",
  68. "github": "upath.implementations.github.GitHubPath",
  69. "smb": "upath.implementations.smb.SMBPath",
  70. }
  71. if TYPE_CHECKING:
  72. _m: MutableMapping[str, str | type[upath.UPath]]
  73. def __init__(self) -> None:
  74. if sys.version_info >= (3, 10):
  75. eps = entry_points(group=_ENTRY_POINT_GROUP)
  76. else:
  77. eps = entry_points().get(_ENTRY_POINT_GROUP, [])
  78. self._entries = {ep.name: ep for ep in eps}
  79. self._m = ChainMap({}, self.known_implementations) # type: ignore
  80. def __contains__(self, item: object) -> bool:
  81. return item in set().union(self._m, self._entries)
  82. def __getitem__(self, item: str) -> type[upath.UPath]:
  83. fqn: str | type[upath.UPath] | None = self._m.get(item)
  84. if fqn is None:
  85. if item in self._entries:
  86. fqn = self._m[item] = self._entries[item].load()
  87. if fqn is None:
  88. raise KeyError(f"{item} not in registry")
  89. if isinstance(fqn, str):
  90. module_name, name = fqn.rsplit(".", 1)
  91. mod = import_module(module_name)
  92. cls = getattr(mod, name) # type: ignore
  93. else:
  94. cls = fqn
  95. return cls
  96. def __setitem__(self, item: str, value: type[upath.UPath] | str) -> None:
  97. if not (
  98. (isinstance(value, type) and issubclass(value, upath.UPath))
  99. or isinstance(value, str)
  100. ):
  101. raise ValueError(
  102. f"expected UPath subclass or FQN-string, got: {type(value).__name__!r}"
  103. )
  104. if not item or item in self._m:
  105. get_upath_class.cache_clear()
  106. self._m[item] = value
  107. def __delitem__(self, __v: str) -> None:
  108. raise NotImplementedError("removal is unsupported")
  109. def __len__(self) -> int:
  110. return len(set().union(self._m, self._entries))
  111. def __iter__(self) -> Iterator[str]:
  112. return iter(set().union(self._m, self._entries))
  113. _registry = _Registry()
  114. def available_implementations(*, fallback: bool = False) -> list[str]:
  115. """return a list of protocols for available implementations
  116. Parameters
  117. ----------
  118. fallback:
  119. If True, also return protocols for fsspec filesystems without
  120. an implementation in universal_pathlib.
  121. """
  122. impl = list(_registry)
  123. if not fallback:
  124. return impl
  125. else:
  126. return list({*impl, *list(_fsspec_known_implementations)})
  127. def register_implementation(
  128. protocol: str,
  129. cls: type[upath.UPath] | str,
  130. *,
  131. clobber: bool = False,
  132. ) -> None:
  133. """register a UPath implementation with a protocol
  134. Parameters
  135. ----------
  136. protocol:
  137. Protocol name to associate with the class
  138. cls:
  139. The UPath subclass for the protocol or a str representing the
  140. full path to an implementation class like package.module.class.
  141. clobber:
  142. Whether to overwrite a protocol with the same name; if False,
  143. will raise instead.
  144. """
  145. if not re.match(r"^[a-z][a-z0-9+_.]+$", protocol):
  146. raise ValueError(f"{protocol!r} is not a valid URI scheme")
  147. if not clobber and protocol in _registry:
  148. raise ValueError(f"{protocol!r} is already in registry and clobber is False!")
  149. _registry[protocol] = cls
  150. @lru_cache
  151. def get_upath_class(
  152. protocol: str,
  153. *,
  154. fallback: bool = True,
  155. ) -> type[upath.UPath] | None:
  156. """Return the upath cls for the given protocol.
  157. Returns `None` if no matching protocol can be found.
  158. Parameters
  159. ----------
  160. protocol:
  161. The protocol string
  162. fallback:
  163. If fallback is False, don't return UPath instances for fsspec
  164. filesystems that don't have an implementation registered.
  165. """
  166. try:
  167. return _registry[protocol]
  168. except KeyError:
  169. if not protocol:
  170. if os.name == "nt":
  171. from upath.implementations.local import WindowsUPath
  172. return WindowsUPath
  173. else:
  174. from upath.implementations.local import PosixUPath
  175. return PosixUPath
  176. if not fallback:
  177. return None
  178. try:
  179. _ = get_filesystem_class(protocol)
  180. except ValueError:
  181. return None # this is an unknown protocol
  182. else:
  183. warnings.warn(
  184. f"UPath {protocol!r} filesystem not explicitly implemented."
  185. " Falling back to default implementation."
  186. " This filesystem may not be tested.",
  187. UserWarning,
  188. stacklevel=2,
  189. )
  190. return upath.UPath