123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- from __future__ import annotations
- import os
- import sys
- from inspect import ismemberdescriptor
- from pathlib import Path
- from pathlib import PosixPath
- from pathlib import WindowsPath
- from typing import IO
- from typing import Any
- from typing import Collection
- from typing import MutableMapping
- from urllib.parse import SplitResult
- from upath._protocol import compatible_protocol
- from upath.core import UPath
- __all__ = [
- "LocalPath",
- "FilePath",
- "PosixUPath",
- "WindowsUPath",
- ]
- _LISTDIR_WORKS_ON_FILES: bool | None = None
- def _check_listdir_works_on_files() -> bool:
- global _LISTDIR_WORKS_ON_FILES
- from fsspec.implementations.local import LocalFileSystem
- fs = LocalFileSystem()
- try:
- fs.ls(__file__)
- except NotADirectoryError:
- _LISTDIR_WORKS_ON_FILES = w = False
- else:
- _LISTDIR_WORKS_ON_FILES = w = True
- return w
- class LocalPath(UPath):
- __slots__ = ()
- @property
- def path(self):
- sep = self._flavour.sep
- if self.drive:
- return f"/{super().path}".replace(sep, "/")
- return super().path.replace(sep, "/")
- @property
- def _url(self):
- return SplitResult(self.protocol, "", self.path, "", "")
- class FilePath(LocalPath):
- __slots__ = ()
- def iterdir(self):
- if _LISTDIR_WORKS_ON_FILES is None:
- _check_listdir_works_on_files()
- if _LISTDIR_WORKS_ON_FILES and self.is_file():
- raise NotADirectoryError(f"{self}")
- return super().iterdir()
- _pathlib_py312_ignore = {
- "__slots__",
- "__module__",
- "__new__",
- "__init__",
- "_from_parts",
- "_from_parsed_parts",
- "with_segments",
- }
- def _set_class_attributes(
- type_dict: MutableMapping[str, Any],
- src: type[Path],
- *,
- ignore: Collection[str] = frozenset(_pathlib_py312_ignore),
- ) -> None:
- """helper function to assign all methods/attrs from src to a class dict"""
- visited = set()
- for cls in src.__mro__:
- if cls is object:
- continue
- for attr, func_or_value in cls.__dict__.items():
- if ismemberdescriptor(func_or_value):
- continue
- if attr in ignore or attr in visited:
- continue
- else:
- visited.add(attr)
- type_dict[attr] = func_or_value
- def _upath_init(inst: PosixUPath | WindowsUPath) -> None:
- """helper to initialize the PosixPath/WindowsPath instance with UPath attrs"""
- inst._protocol = ""
- inst._storage_options = {}
- if sys.version_info < (3, 10) and hasattr(inst, "_init"):
- inst._init()
- class PosixUPath(PosixPath, LocalPath): # type: ignore[misc]
- __slots__ = ()
- # assign all PosixPath methods/attrs to prevent multi inheritance issues
- _set_class_attributes(locals(), src=PosixPath)
- def open( # type: ignore[override]
- self,
- mode="r",
- buffering=-1,
- encoding=None,
- errors=None,
- newline=None,
- **fsspec_kwargs,
- ) -> IO[Any]:
- if fsspec_kwargs:
- return super(LocalPath, self).open(
- mode=mode,
- buffering=buffering,
- encoding=encoding,
- errors=errors,
- newline=newline,
- **fsspec_kwargs,
- )
- else:
- return PosixPath.open(self, mode, buffering, encoding, errors, newline)
- if sys.version_info < (3, 12):
- def __new__(
- cls, *args, protocol: str | None = None, **storage_options: Any
- ) -> PosixUPath:
- if os.name == "nt":
- raise NotImplementedError(
- f"cannot instantiate {cls.__name__} on your system"
- )
- if not compatible_protocol("", *args):
- raise ValueError("can't combine incompatible UPath protocols")
- obj = super().__new__(cls, *args)
- obj._protocol = ""
- return obj # type: ignore[return-value]
- def __init__(
- self, *args, protocol: str | None = None, **storage_options: Any
- ) -> None:
- super(Path, self).__init__()
- self._drv, self._root, self._parts = type(self)._parse_args(args)
- _upath_init(self)
- def _make_child(self, args):
- if not compatible_protocol(self._protocol, *args):
- raise ValueError("can't combine incompatible UPath protocols")
- return super()._make_child(args)
- @classmethod
- def _from_parts(cls, *args, **kwargs):
- obj = super(Path, cls)._from_parts(*args, **kwargs)
- _upath_init(obj)
- return obj
- @classmethod
- def _from_parsed_parts(cls, drv, root, parts):
- obj = super(Path, cls)._from_parsed_parts(drv, root, parts)
- _upath_init(obj)
- return obj
- @property
- def path(self) -> str:
- return PosixPath.__str__(self)
- class WindowsUPath(WindowsPath, LocalPath): # type: ignore[misc]
- __slots__ = ()
- # assign all WindowsPath methods/attrs to prevent multi inheritance issues
- _set_class_attributes(locals(), src=WindowsPath)
- def open( # type: ignore[override]
- self,
- mode="r",
- buffering=-1,
- encoding=None,
- errors=None,
- newline=None,
- **fsspec_kwargs,
- ) -> IO[Any]:
- if fsspec_kwargs:
- return super(LocalPath, self).open(
- mode=mode,
- buffering=buffering,
- encoding=encoding,
- errors=errors,
- newline=newline,
- **fsspec_kwargs,
- )
- else:
- return WindowsPath.open(self, mode, buffering, encoding, errors, newline)
- if sys.version_info < (3, 12):
- def __new__(
- cls, *args, protocol: str | None = None, **storage_options: Any
- ) -> WindowsUPath:
- if os.name != "nt":
- raise NotImplementedError(
- f"cannot instantiate {cls.__name__} on your system"
- )
- if not compatible_protocol("", *args):
- raise ValueError("can't combine incompatible UPath protocols")
- obj = super().__new__(cls, *args)
- obj._protocol = ""
- return obj # type: ignore[return-value]
- def __init__(
- self, *args, protocol: str | None = None, **storage_options: Any
- ) -> None:
- super(Path, self).__init__()
- self._drv, self._root, self._parts = self._parse_args(args)
- _upath_init(self)
- def _make_child(self, args):
- if not compatible_protocol(self._protocol, *args):
- raise ValueError("can't combine incompatible UPath protocols")
- return super()._make_child(args)
- @classmethod
- def _from_parts(cls, *args, **kwargs):
- obj = super(Path, cls)._from_parts(*args, **kwargs)
- _upath_init(obj)
- return obj
- @classmethod
- def _from_parsed_parts(cls, drv, root, parts):
- obj = super(Path, cls)._from_parsed_parts(drv, root, parts)
- _upath_init(obj)
- return obj
- @property
- def path(self) -> str:
- return WindowsPath.as_posix(self)
|