123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- from __future__ import annotations
- import os
- import warnings
- from datetime import datetime
- from stat import S_IFDIR
- from stat import S_IFLNK
- from stat import S_IFREG
- from typing import Any
- from typing import Iterator
- from typing import Mapping
- from typing import Sequence
- __all__ = [
- "UPathStatResult",
- ]
- def _convert_value_to_timestamp(value: Any) -> int | float:
- """Try to convert a datetime-like value to a timestamp."""
- if isinstance(value, (int, float)):
- return value
- elif isinstance(value, str):
- if value.endswith("Z"):
- value = value[:-1] + "+00:00"
- return datetime.fromisoformat(value).timestamp()
- elif isinstance(value, datetime):
- return value.timestamp()
- else:
- warnings.warn(
- f"Cannot convert {value!r} of type {type(value)!r} to a timestamp."
- " Please report this at: https://github.com/fsspec/universal_path/issues",
- RuntimeWarning,
- stacklevel=2,
- )
- raise TypeError(f"Cannot convert {value!r} to a timestamp.")
- def _get_stat_result_extra_fields() -> tuple[str, ...]:
- """retrieve the extra fields of the os.stat_result class."""
- # Note:
- # The lines below let us provide a dictionary with the additional
- # named fields of the stat_result class as keys and the internal
- # index of the field as value.
- sr = os.stat_result(range(os.stat_result.n_fields))
- rd = sr.__reduce__()
- assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__"
- _, (_, extra) = rd
- extra_fields = sorted(extra, key=extra.__getitem__)
- return tuple(extra_fields)
- class UPathStatResult:
- """A stat_result compatible class wrapping fsspec info dicts.
- **Note**: It is unlikely that you will ever have to instantiate
- this class directly. If you want to convert and info dict,
- use: `UPathStatResult.from_info(info)`
- This object may be accessed either as a tuple of
- (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
- or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on.
- There's an additional method `as_info()` for accessing the info dict.
- This is useful to access additional information provided by the file system
- implementation, that's not covered by the stat_result tuple.
- """
- __slots__ = ("_seq", "_info")
- # Note:
- # can't derive from os.stat_result at all, and can't derive from
- # tuple and have slots. So we duck type the os.stat_result class
- # Add the fields and "extra fields" of the os.stat_result class
- _fields = (
- "st_mode",
- "st_ino",
- "st_dev",
- "st_nlink",
- "st_uid",
- "st_gid",
- "st_size",
- "st_atime",
- "st_mtime",
- "st_ctime",
- )
- _fields_extra = _get_stat_result_extra_fields()
- # Provide the n_ attributes of the os.stat_result class for compatibility
- n_sequence_fields = len(_fields)
- n_fields = len(_fields) + len(_fields_extra)
- n_unnamed_fields = len(set(_fields_extra).intersection(_fields))
- if (
- n_fields != os.stat_result.n_fields
- or n_sequence_fields != os.stat_result.n_sequence_fields
- or n_unnamed_fields != os.stat_result.n_unnamed_fields
- ):
- warnings.warn(
- "UPathStatResult: The assumed number of fields in the"
- " stat_result class is not correct. Got: "
- f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}"
- " This might cause problems? Please report this issue at:"
- " https://github.com/fsspec/universal_path/issues",
- RuntimeWarning,
- stacklevel=2,
- )
- def __init__(
- self,
- stat_result_seq: Sequence[int],
- info_dict: Mapping[str, Any] | None = None,
- ) -> None:
- """init compatible with os.stat_result
- Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info.
- """
- seq = tuple(stat_result_seq)
- if n := len(seq) < self.n_sequence_fields:
- raise TypeError(
- f"{self.__name__} takes at least {self.n_fields}-sequence"
- " ({n}-sequence given)"
- )
- elif n > self.n_fields:
- raise TypeError(
- f"{self.__name__} takes at most {self.n_fields}-sequence"
- " ({n}-sequence given)"
- )
- elif self.n_sequence_fields <= n < self.n_sequence_fields:
- warnings.warn(
- "UPathStatResult: The seq provided more than"
- f" {self.n_sequence_fields} items. Ignoring the extra items...",
- UserWarning,
- stacklevel=2,
- )
- self._seq = seq[: self.n_sequence_fields]
- self._info = info_dict or {}
- def __repr__(self):
- cls_name = type(self).__name__
- seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self)))
- return f"{cls_name}({seq_attrs}, info={self._info!r})"
- def __eq__(self, other):
- if not isinstance(other, UPathStatResult):
- return NotImplemented
- else:
- return self._info == other._info
- # --- access to the fsspec info dict ------------------------------
- @classmethod
- def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult:
- """Create a UPathStatResult from a fsspec info dict."""
- # fill all the fallback default values with 0
- defaults = [0] * cls.n_sequence_fields
- return cls(defaults, info)
- def as_info(self) -> Mapping[str, Any]:
- """Return the fsspec info dict."""
- return self._info
- # --- guaranteed fields -------------------------------------------
- @property
- def st_mode(self) -> int:
- """protection bits"""
- mode = self._info.get("mode")
- if isinstance(mode, int):
- return mode
- elif isinstance(mode, str):
- try:
- return int(mode, 8)
- except ValueError:
- pass
- type_ = self._info.get("type")
- if type_ == "file":
- return S_IFREG # see: stat.S_ISREG
- elif type_ == "directory":
- return S_IFDIR # see: stat.S_ISDIR
- if self._info.get("isLink"):
- return S_IFLNK # see: stat.S_ISLNK
- return self._seq[0]
- @property
- def st_ino(self) -> int:
- """inode"""
- ino = self._info.get("ino")
- if isinstance(ino, int):
- return ino
- return self._seq[1]
- @property
- def st_dev(self) -> int:
- """device"""
- dev = self._info.get("dev")
- if isinstance(dev, int):
- return dev
- return self._seq[2]
- @property
- def st_nlink(self) -> int:
- """number of hard links"""
- nlink = self._info.get("nlink")
- if isinstance(nlink, int):
- return nlink
- return self._seq[3]
- @property
- def st_uid(self) -> int:
- """user ID of owner"""
- for key in ["uid", "owner", "uname", "unix.owner"]:
- try:
- return int(self._info[key])
- except (ValueError, TypeError, KeyError):
- pass
- return self._seq[4]
- @property
- def st_gid(self) -> int:
- """group ID of owner"""
- for key in ["gid", "group", "gname", "unix.group"]:
- try:
- return int(self._info[key])
- except (ValueError, TypeError, KeyError):
- pass
- return self._seq[5]
- @property
- def st_size(self) -> int:
- """total size, in bytes"""
- try:
- return int(self._info["size"])
- except (ValueError, TypeError, KeyError):
- return self._seq[6]
- @property
- def st_atime(self) -> int | float:
- """time of last access"""
- for key in ["atime", "time", "last_accessed", "accessTime"]:
- try:
- raw_value = self._info[key]
- except KeyError:
- continue
- try:
- return _convert_value_to_timestamp(raw_value)
- except (TypeError, ValueError):
- pass
- return self._seq[7]
- @property
- def st_mtime(self) -> int | float:
- """time of last modification"""
- for key in [
- "mtime",
- "LastModified",
- "last_modified",
- "timeModified",
- "modificationTime",
- "modified_at",
- ]:
- try:
- raw_value = self._info[key]
- except KeyError:
- continue
- try:
- return _convert_value_to_timestamp(raw_value)
- except (TypeError, ValueError):
- pass
- return self._seq[8]
- @property
- def st_ctime(self) -> int | float:
- """time of last change"""
- try:
- raw_value = self._info["ctime"]
- except KeyError:
- pass
- else:
- try:
- return _convert_value_to_timestamp(raw_value)
- except (TypeError, ValueError):
- pass
- return self._seq[9]
- @property
- def st_birthtime(self) -> int | float:
- """time of creation"""
- for key in [
- "birthtime",
- "created",
- "creation_time",
- "timeCreated",
- "created_at",
- ]:
- try:
- raw_value = self._info[key]
- except KeyError:
- continue
- try:
- return _convert_value_to_timestamp(raw_value)
- except (TypeError, ValueError):
- pass
- raise AttributeError("birthtime")
- # --- extra fields ------------------------------------------------
- def __getattr__(self, item):
- if item in self._fields_extra:
- return 0 # fallback default value
- raise AttributeError(item)
- # --- os.stat_result tuple interface ------------------------------
- def __len__(self) -> int:
- return len(self._fields)
- def __iter__(self) -> Iterator[int]:
- """the sequence interface iterates over the guaranteed fields.
- All values are integers.
- """
- for field in self._fields:
- yield int(getattr(self, field))
- def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int:
- """the sequence interface index method."""
- if stop is None:
- stop = len(self._seq)
- return self._seq.index(value, start, stop)
- def count(self, value: int) -> int:
- """the sequence interface count method."""
- return self._seq.count(value)
- # --- compatibility with the fsspec info dict interface ------------
- def __getitem__(self, item: int | str) -> Any:
- if isinstance(item, str):
- warnings.warn(
- "Access the fsspec info via `.as_info()[key]`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info[item]
- # we need to go via the attributes and cast to int
- attr = self._fields[item]
- return int(getattr(self, attr))
- def keys(self):
- """compatibility with the fsspec info dict interface."""
- warnings.warn(
- "Access the fsspec info via `.as_info().keys()`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info.keys()
- def values(self):
- """compatibility with the fsspec info dict interface."""
- warnings.warn(
- "Access the fsspec info via `.as_info().values()`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info.values()
- def items(self):
- """compatibility with the fsspec info dict interface."""
- warnings.warn(
- "Access the fsspec info via `.as_info().items()`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info.items()
- def get(self, key, default=None):
- """compatibility with the fsspec info dict interface."""
- warnings.warn(
- "Access the fsspec info via `.as_info().get(key, default)`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info.get(key, default)
- def copy(self):
- """compatibility with the fsspec info dict interface."""
- warnings.warn(
- "Access the fsspec info via `.as_info().copy()`",
- DeprecationWarning,
- stacklevel=2,
- )
- return self._info.copy()
|