_flavour.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. from __future__ import annotations
  2. import os.path
  3. import posixpath
  4. import sys
  5. import warnings
  6. from functools import lru_cache
  7. from typing import TYPE_CHECKING
  8. from typing import Any
  9. from typing import Mapping
  10. from typing import Sequence
  11. from typing import TypedDict
  12. from typing import Union
  13. from urllib.parse import SplitResult
  14. from urllib.parse import urlsplit
  15. if sys.version_info >= (3, 12):
  16. from typing import TypeAlias
  17. else:
  18. TypeAlias = Any
  19. from fsspec.registry import known_implementations
  20. from fsspec.registry import registry as _class_registry
  21. from fsspec.spec import AbstractFileSystem
  22. from upath._compat import deprecated
  23. from upath._compat import str_remove_prefix
  24. from upath._compat import str_remove_suffix
  25. from upath._flavour_sources import FileSystemFlavourBase
  26. from upath._flavour_sources import flavour_registry
  27. from upath._protocol import get_upath_protocol
  28. from upath._protocol import normalize_empty_netloc
  29. if TYPE_CHECKING:
  30. from upath.core import UPath
  31. __all__ = [
  32. "LazyFlavourDescriptor",
  33. "default_flavour",
  34. "upath_urijoin",
  35. "upath_get_kwargs_from_url",
  36. ]
  37. class_registry: Mapping[str, type[AbstractFileSystem]] = _class_registry
  38. PathOrStr: TypeAlias = Union[str, "os.PathLike[str]"]
  39. class AnyProtocolFileSystemFlavour(FileSystemFlavourBase):
  40. sep = "/"
  41. protocol = ()
  42. root_marker = "/"
  43. @classmethod
  44. def _strip_protocol(cls, path: str) -> str:
  45. protocol = get_upath_protocol(path)
  46. if path.startswith(protocol + "://"):
  47. path = path[len(protocol) + 3 :]
  48. elif path.startswith(protocol + "::"):
  49. path = path[len(protocol) + 2 :]
  50. path = path.rstrip("/")
  51. return path or cls.root_marker
  52. @staticmethod
  53. def _get_kwargs_from_urls(path: str) -> dict[str, Any]:
  54. return {}
  55. @classmethod
  56. def _parent(cls, path):
  57. path = cls._strip_protocol(path)
  58. if "/" in path:
  59. parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
  60. return cls.root_marker + parent
  61. else:
  62. return cls.root_marker
  63. class ProtocolConfig(TypedDict):
  64. netloc_is_anchor: set[str]
  65. supports_empty_parts: set[str]
  66. meaningful_trailing_slash: set[str]
  67. root_marker_override: dict[str, str]
  68. class WrappedFileSystemFlavour: # (pathlib_abc.FlavourBase)
  69. """flavour class for universal_pathlib
  70. **INTERNAL AND VERY MUCH EXPERIMENTAL**
  71. Implements the fsspec compatible low-level lexical operations on
  72. PurePathBase-like objects.
  73. Note:
  74. In case you find yourself in need of subclassing this class,
  75. please open an issue in the universal_pathlib issue tracker:
  76. https://github.com/fsspec/universal_pathlib/issues
  77. Ideally we can find a way to make your use-case work by adding
  78. more functionality to this class.
  79. """
  80. # Note:
  81. # It would be ideal if there would be a way to avoid the need for
  82. # indicating the following settings via the protocol. This is a
  83. # workaround to be able to implement the flavour correctly.
  84. # TODO:
  85. # These settings should be configured on the UPath class?!?
  86. protocol_config: ProtocolConfig = {
  87. "netloc_is_anchor": {
  88. "http",
  89. "https",
  90. "s3",
  91. "s3a",
  92. "smb",
  93. "gs",
  94. "gcs",
  95. "az",
  96. "adl",
  97. "abfs",
  98. "abfss",
  99. "webdav+http",
  100. "webdav+https",
  101. },
  102. "supports_empty_parts": {
  103. "http",
  104. "https",
  105. "s3",
  106. "s3a",
  107. "gs",
  108. "gcs",
  109. "az",
  110. "adl",
  111. "abfs",
  112. },
  113. "meaningful_trailing_slash": {
  114. "http",
  115. "https",
  116. },
  117. "root_marker_override": {
  118. "ssh": "/",
  119. "sftp": "/",
  120. },
  121. }
  122. def __init__(
  123. self,
  124. spec: type[AbstractFileSystem | FileSystemFlavourBase] | AbstractFileSystem,
  125. *,
  126. netloc_is_anchor: bool = False,
  127. supports_empty_parts: bool = False,
  128. meaningful_trailing_slash: bool = False,
  129. root_marker_override: str | None = None,
  130. ) -> None:
  131. """initialize the flavour with the given fsspec"""
  132. self._spec = spec
  133. # netloc is considered an anchor, influences:
  134. # - splitdrive
  135. # - join
  136. self.netloc_is_anchor = bool(netloc_is_anchor)
  137. # supports empty parts, influences:
  138. # - join
  139. # - UPath._parse_path
  140. self.supports_empty_parts = bool(supports_empty_parts)
  141. # meaningful trailing slash, influences:
  142. # - join
  143. # - UPath._parse_path
  144. self.has_meaningful_trailing_slash = bool(meaningful_trailing_slash)
  145. # some filesystems require UPath to enforce a specific root marker
  146. if root_marker_override is None:
  147. self.root_marker_override = None
  148. else:
  149. self.root_marker_override = str(root_marker_override)
  150. @classmethod
  151. @lru_cache(maxsize=None)
  152. def from_protocol(
  153. cls,
  154. protocol: str,
  155. ) -> WrappedFileSystemFlavour:
  156. """return the fsspec flavour for the given protocol"""
  157. _c = cls.protocol_config
  158. config: dict[str, Any] = {
  159. "netloc_is_anchor": protocol in _c["netloc_is_anchor"],
  160. "supports_empty_parts": protocol in _c["supports_empty_parts"],
  161. "meaningful_trailing_slash": protocol in _c["meaningful_trailing_slash"],
  162. "root_marker_override": _c["root_marker_override"].get(protocol),
  163. }
  164. # first try to get an already imported fsspec filesystem class
  165. try:
  166. return cls(class_registry[protocol], **config)
  167. except KeyError:
  168. pass
  169. # next try to get the flavour from the generated flavour registry
  170. # to avoid imports
  171. try:
  172. return cls(flavour_registry[protocol], **config)
  173. except KeyError:
  174. pass
  175. # finally fallback to a default flavour for the protocol
  176. if protocol in known_implementations:
  177. warnings.warn(
  178. f"Could not find default for known protocol {protocol!r}."
  179. " Creating a default flavour for it. Please report this"
  180. " to the universal_pathlib issue tracker.",
  181. UserWarning,
  182. stacklevel=2,
  183. )
  184. return cls(AnyProtocolFileSystemFlavour, **config)
  185. def __repr__(self):
  186. if isinstance(self._spec, type):
  187. return f"<wrapped class {self._spec.__name__}>"
  188. else:
  189. return f"<wrapped instance {self._spec.__class__.__name__}>"
  190. # === fsspec.AbstractFileSystem ===================================
  191. @property
  192. def protocol(self) -> tuple[str, ...]:
  193. if isinstance(self._spec.protocol, str):
  194. return (self._spec.protocol,)
  195. else:
  196. return self._spec.protocol
  197. @property
  198. def root_marker(self) -> str:
  199. if self.root_marker_override is not None:
  200. return self.root_marker_override
  201. else:
  202. return self._spec.root_marker
  203. @property
  204. def local_file(self) -> bool:
  205. return bool(getattr(self._spec, "local_file", False))
  206. @staticmethod
  207. def stringify_path(pth: PathOrStr) -> str:
  208. if isinstance(pth, str):
  209. out = pth
  210. elif getattr(pth, "__fspath__", None) is not None:
  211. out = pth.__fspath__()
  212. elif isinstance(pth, os.PathLike):
  213. out = str(pth)
  214. elif hasattr(pth, "path"): # type: ignore[unreachable]
  215. out = pth.path
  216. else:
  217. out = str(pth)
  218. return normalize_empty_netloc(out)
  219. def strip_protocol(self, pth: PathOrStr) -> str:
  220. pth = self.stringify_path(pth)
  221. return self._spec._strip_protocol(pth)
  222. def get_kwargs_from_url(self, url: PathOrStr) -> dict[str, Any]:
  223. # NOTE: the public variant is _from_url not _from_urls
  224. if hasattr(url, "storage_options"):
  225. return dict(url.storage_options)
  226. url = self.stringify_path(url)
  227. return self._spec._get_kwargs_from_urls(url)
  228. def parent(self, path: PathOrStr) -> str:
  229. path = self.stringify_path(path)
  230. return self._spec._parent(path)
  231. # === pathlib_abc.FlavourBase =====================================
  232. @property
  233. def sep(self) -> str:
  234. return self._spec.sep
  235. @property
  236. def altsep(self) -> str | None:
  237. return None
  238. def isabs(self, path: PathOrStr) -> bool:
  239. path = self.strip_protocol(path)
  240. if self.local_file:
  241. return os.path.isabs(path)
  242. else:
  243. return path.startswith(self.root_marker)
  244. def join(self, path: PathOrStr, *paths: PathOrStr) -> str:
  245. if self.netloc_is_anchor:
  246. drv, p0 = self.splitdrive(path)
  247. pN = list(map(self.stringify_path, paths))
  248. if not drv and not p0:
  249. path, *pN = pN
  250. drv, p0 = self.splitdrive(path)
  251. p0 = p0 or self.sep
  252. else:
  253. p0 = str(self.strip_protocol(path)) or self.root_marker
  254. pN = list(map(self.stringify_path, paths))
  255. drv = ""
  256. if self.supports_empty_parts:
  257. return drv + self.sep.join([str_remove_suffix(p0, self.sep), *pN])
  258. else:
  259. return drv + posixpath.join(p0, *pN)
  260. def split(self, path: PathOrStr):
  261. stripped_path = self.strip_protocol(path)
  262. head = self.parent(stripped_path) or self.root_marker
  263. if head:
  264. return head, stripped_path[len(head) + 1 :]
  265. else:
  266. return "", stripped_path
  267. def splitdrive(self, path: PathOrStr) -> tuple[str, str]:
  268. path = self.strip_protocol(path)
  269. if self.netloc_is_anchor:
  270. u = urlsplit(path)
  271. if u.scheme:
  272. # cases like: "http://example.com/foo/bar"
  273. drive = u._replace(path="", query="", fragment="").geturl()
  274. rest = u._replace(scheme="", netloc="").geturl()
  275. if (
  276. u.path.startswith("//")
  277. and SplitResult("", "", "//", "", "").geturl() == "////"
  278. ):
  279. # see: fsspec/universal_pathlib#233
  280. rest = rest[2:]
  281. return drive, rest or self.root_marker or self.sep
  282. else:
  283. # cases like: "bucket/some/special/key
  284. drive, root, tail = path.partition(self.sep)
  285. return drive, root + tail
  286. elif self.local_file:
  287. return os.path.splitdrive(path)
  288. else:
  289. # all other cases don't have a drive
  290. return "", path
  291. def normcase(self, path: PathOrStr) -> str:
  292. if self.local_file:
  293. return os.path.normcase(self.stringify_path(path))
  294. else:
  295. return self.stringify_path(path)
  296. # === Python3.12 pathlib flavour ==================================
  297. def splitroot(self, path: PathOrStr) -> tuple[str, str, str]:
  298. drive, tail = self.splitdrive(path)
  299. if self.netloc_is_anchor:
  300. root_marker = self.root_marker or self.sep
  301. else:
  302. root_marker = self.root_marker
  303. return drive, root_marker, str_remove_prefix(tail, self.sep)
  304. # === deprecated backwards compatibility ===========================
  305. @deprecated(python_version=(3, 12))
  306. def casefold(self, s: str) -> str:
  307. if self.local_file:
  308. return s
  309. else:
  310. return s.lower()
  311. @deprecated(python_version=(3, 12))
  312. def parse_parts(self, parts: Sequence[str]) -> tuple[str, str, list[str]]:
  313. parsed = []
  314. sep = self.sep
  315. drv = root = ""
  316. it = reversed(parts)
  317. for part in it:
  318. if part:
  319. drv, root, rel = self.splitroot(part)
  320. if not root or root and rel:
  321. for x in reversed(rel.split(sep)):
  322. parsed.append(sys.intern(x))
  323. if drv or root:
  324. parsed.append(drv + root)
  325. parsed.reverse()
  326. return drv, root, parsed
  327. @deprecated(python_version=(3, 12))
  328. def join_parsed_parts(
  329. self,
  330. drv: str,
  331. root: str,
  332. parts: list[str],
  333. drv2: str,
  334. root2: str,
  335. parts2: list[str],
  336. ) -> tuple[str, str, list[str]]:
  337. if root2:
  338. if not drv2 and drv:
  339. return drv, root2, [drv + root2] + parts2[1:]
  340. elif drv2:
  341. if drv2 == drv or self.casefold(drv2) == self.casefold(drv):
  342. # Same drive => second path is relative to the first
  343. return drv, root, parts + parts2[1:]
  344. else:
  345. # Second path is non-anchored (common case)
  346. return drv, root, parts + parts2
  347. return drv2, root2, parts2
  348. default_flavour = WrappedFileSystemFlavour(AnyProtocolFileSystemFlavour)
  349. class LazyFlavourDescriptor:
  350. """descriptor to lazily get the flavour for a given protocol"""
  351. def __init__(self) -> None:
  352. self._owner: type[UPath] | None = None
  353. def __set_name__(self, owner: type[UPath], name: str) -> None:
  354. # helper to provide a more informative repr
  355. self._owner = owner
  356. self._default_protocol: str | None
  357. try:
  358. self._default_protocol = self._owner.protocols[0] # type: ignore
  359. except (AttributeError, IndexError):
  360. self._default_protocol = None
  361. def __get__(self, instance: UPath, owner: type[UPath]) -> WrappedFileSystemFlavour:
  362. if instance is not None:
  363. return WrappedFileSystemFlavour.from_protocol(instance.protocol)
  364. elif self._default_protocol: # type: ignore
  365. return WrappedFileSystemFlavour.from_protocol(self._default_protocol)
  366. else:
  367. return default_flavour
  368. def __repr__(self):
  369. cls_name = f"{type(self).__name__}"
  370. if self._owner is None:
  371. return f"<unbound {cls_name}>"
  372. else:
  373. return f"<{cls_name} of {self._owner.__name__}>"
  374. def upath_strip_protocol(pth: PathOrStr) -> str:
  375. if protocol := get_upath_protocol(pth):
  376. return WrappedFileSystemFlavour.from_protocol(protocol).strip_protocol(pth)
  377. return WrappedFileSystemFlavour.stringify_path(pth)
  378. def upath_get_kwargs_from_url(url: PathOrStr) -> dict[str, Any]:
  379. if protocol := get_upath_protocol(url):
  380. return WrappedFileSystemFlavour.from_protocol(protocol).get_kwargs_from_url(url)
  381. return {}
  382. def upath_urijoin(base: str, uri: str) -> str:
  383. """Join a base URI and a possibly relative URI to form an absolute
  384. interpretation of the latter."""
  385. # see:
  386. # https://github.com/python/cpython/blob/ae6c01d9d2/Lib/urllib/parse.py#L539-L605
  387. # modifications:
  388. # - removed allow_fragments parameter
  389. # - all schemes are considered to allow relative paths
  390. # - all schemes are considered to allow netloc (revisit this)
  391. # - no bytes support (removes encoding and decoding)
  392. if not base:
  393. return uri
  394. if not uri:
  395. return base
  396. bs = urlsplit(base, scheme="")
  397. us = urlsplit(uri, scheme=bs.scheme)
  398. if us.scheme != bs.scheme: # or us.scheme not in uses_relative:
  399. return uri
  400. # if us.scheme in uses_netloc:
  401. if us.netloc:
  402. return us.geturl()
  403. else:
  404. us = us._replace(netloc=bs.netloc)
  405. # end if
  406. if not us.path and not us.fragment:
  407. us = us._replace(path=bs.path, fragment=bs.fragment)
  408. if not us.query:
  409. us = us._replace(query=bs.query)
  410. return us.geturl()
  411. base_parts = bs.path.split("/")
  412. if base_parts[-1] != "":
  413. del base_parts[-1]
  414. if us.path[:1] == "/":
  415. segments = us.path.split("/")
  416. else:
  417. segments = base_parts + us.path.split("/")
  418. segments[1:-1] = filter(None, segments[1:-1])
  419. resolved_path: list[str] = []
  420. for seg in segments:
  421. if seg == "..":
  422. try:
  423. resolved_path.pop()
  424. except IndexError:
  425. pass
  426. elif seg == ".":
  427. continue
  428. else:
  429. resolved_path.append(seg)
  430. if segments[-1] in (".", ".."):
  431. resolved_path.append("")
  432. return us._replace(path="/".join(resolved_path) or "/").geturl()