_stat.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. from __future__ import annotations
  2. import os
  3. import warnings
  4. from datetime import datetime
  5. from stat import S_IFDIR
  6. from stat import S_IFLNK
  7. from stat import S_IFREG
  8. from typing import Any
  9. from typing import Iterator
  10. from typing import Mapping
  11. from typing import Sequence
  12. __all__ = [
  13. "UPathStatResult",
  14. ]
  15. def _convert_value_to_timestamp(value: Any) -> int | float:
  16. """Try to convert a datetime-like value to a timestamp."""
  17. if isinstance(value, (int, float)):
  18. return value
  19. elif isinstance(value, str):
  20. if value.endswith("Z"):
  21. value = value[:-1] + "+00:00"
  22. return datetime.fromisoformat(value).timestamp()
  23. elif isinstance(value, datetime):
  24. return value.timestamp()
  25. else:
  26. warnings.warn(
  27. f"Cannot convert {value!r} of type {type(value)!r} to a timestamp."
  28. " Please report this at: https://github.com/fsspec/universal_path/issues",
  29. RuntimeWarning,
  30. stacklevel=2,
  31. )
  32. raise TypeError(f"Cannot convert {value!r} to a timestamp.")
  33. def _get_stat_result_extra_fields() -> tuple[str, ...]:
  34. """retrieve the extra fields of the os.stat_result class."""
  35. # Note:
  36. # The lines below let us provide a dictionary with the additional
  37. # named fields of the stat_result class as keys and the internal
  38. # index of the field as value.
  39. sr = os.stat_result(range(os.stat_result.n_fields))
  40. rd = sr.__reduce__()
  41. assert isinstance(rd, tuple), "unexpected return os.stat_result.__reduce__"
  42. _, (_, extra) = rd
  43. extra_fields = sorted(extra, key=extra.__getitem__)
  44. return tuple(extra_fields)
  45. class UPathStatResult:
  46. """A stat_result compatible class wrapping fsspec info dicts.
  47. **Note**: It is unlikely that you will ever have to instantiate
  48. this class directly. If you want to convert and info dict,
  49. use: `UPathStatResult.from_info(info)`
  50. This object may be accessed either as a tuple of
  51. (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime)
  52. or via the attributes st_mode, st_ino, st_dev, st_nlink, st_uid, and so on.
  53. There's an additional method `as_info()` for accessing the info dict.
  54. This is useful to access additional information provided by the file system
  55. implementation, that's not covered by the stat_result tuple.
  56. """
  57. __slots__ = ("_seq", "_info")
  58. # Note:
  59. # can't derive from os.stat_result at all, and can't derive from
  60. # tuple and have slots. So we duck type the os.stat_result class
  61. # Add the fields and "extra fields" of the os.stat_result class
  62. _fields = (
  63. "st_mode",
  64. "st_ino",
  65. "st_dev",
  66. "st_nlink",
  67. "st_uid",
  68. "st_gid",
  69. "st_size",
  70. "st_atime",
  71. "st_mtime",
  72. "st_ctime",
  73. )
  74. _fields_extra = _get_stat_result_extra_fields()
  75. # Provide the n_ attributes of the os.stat_result class for compatibility
  76. n_sequence_fields = len(_fields)
  77. n_fields = len(_fields) + len(_fields_extra)
  78. n_unnamed_fields = len(set(_fields_extra).intersection(_fields))
  79. if (
  80. n_fields != os.stat_result.n_fields
  81. or n_sequence_fields != os.stat_result.n_sequence_fields
  82. or n_unnamed_fields != os.stat_result.n_unnamed_fields
  83. ):
  84. warnings.warn(
  85. "UPathStatResult: The assumed number of fields in the"
  86. " stat_result class is not correct. Got: "
  87. f" {_fields!r}, {_fields_extra!r}, {os.stat_result.n_fields}"
  88. " This might cause problems? Please report this issue at:"
  89. " https://github.com/fsspec/universal_path/issues",
  90. RuntimeWarning,
  91. stacklevel=2,
  92. )
  93. def __init__(
  94. self,
  95. stat_result_seq: Sequence[int],
  96. info_dict: Mapping[str, Any] | None = None,
  97. ) -> None:
  98. """init compatible with os.stat_result
  99. Use `UPathStatResult.from_info(info)` to instantiate from a fsspec info.
  100. """
  101. seq = tuple(stat_result_seq)
  102. if n := len(seq) < self.n_sequence_fields:
  103. raise TypeError(
  104. f"{self.__name__} takes at least {self.n_fields}-sequence"
  105. " ({n}-sequence given)"
  106. )
  107. elif n > self.n_fields:
  108. raise TypeError(
  109. f"{self.__name__} takes at most {self.n_fields}-sequence"
  110. " ({n}-sequence given)"
  111. )
  112. elif self.n_sequence_fields <= n < self.n_sequence_fields:
  113. warnings.warn(
  114. "UPathStatResult: The seq provided more than"
  115. f" {self.n_sequence_fields} items. Ignoring the extra items...",
  116. UserWarning,
  117. stacklevel=2,
  118. )
  119. self._seq = seq[: self.n_sequence_fields]
  120. self._info = info_dict or {}
  121. def __repr__(self):
  122. cls_name = type(self).__name__
  123. seq_attrs = ", ".join(map("{0[0]}={0[1]}".format, zip(self._fields, self)))
  124. return f"{cls_name}({seq_attrs}, info={self._info!r})"
  125. def __eq__(self, other):
  126. if not isinstance(other, UPathStatResult):
  127. return NotImplemented
  128. else:
  129. return self._info == other._info
  130. # --- access to the fsspec info dict ------------------------------
  131. @classmethod
  132. def from_info(cls, info: Mapping[str, Any]) -> UPathStatResult:
  133. """Create a UPathStatResult from a fsspec info dict."""
  134. # fill all the fallback default values with 0
  135. defaults = [0] * cls.n_sequence_fields
  136. return cls(defaults, info)
  137. def as_info(self) -> Mapping[str, Any]:
  138. """Return the fsspec info dict."""
  139. return self._info
  140. # --- guaranteed fields -------------------------------------------
  141. @property
  142. def st_mode(self) -> int:
  143. """protection bits"""
  144. mode = self._info.get("mode")
  145. if isinstance(mode, int):
  146. return mode
  147. elif isinstance(mode, str):
  148. try:
  149. return int(mode, 8)
  150. except ValueError:
  151. pass
  152. type_ = self._info.get("type")
  153. if type_ == "file":
  154. return S_IFREG # see: stat.S_ISREG
  155. elif type_ == "directory":
  156. return S_IFDIR # see: stat.S_ISDIR
  157. if self._info.get("isLink"):
  158. return S_IFLNK # see: stat.S_ISLNK
  159. return self._seq[0]
  160. @property
  161. def st_ino(self) -> int:
  162. """inode"""
  163. ino = self._info.get("ino")
  164. if isinstance(ino, int):
  165. return ino
  166. return self._seq[1]
  167. @property
  168. def st_dev(self) -> int:
  169. """device"""
  170. dev = self._info.get("dev")
  171. if isinstance(dev, int):
  172. return dev
  173. return self._seq[2]
  174. @property
  175. def st_nlink(self) -> int:
  176. """number of hard links"""
  177. nlink = self._info.get("nlink")
  178. if isinstance(nlink, int):
  179. return nlink
  180. return self._seq[3]
  181. @property
  182. def st_uid(self) -> int:
  183. """user ID of owner"""
  184. for key in ["uid", "owner", "uname", "unix.owner"]:
  185. try:
  186. return int(self._info[key])
  187. except (ValueError, TypeError, KeyError):
  188. pass
  189. return self._seq[4]
  190. @property
  191. def st_gid(self) -> int:
  192. """group ID of owner"""
  193. for key in ["gid", "group", "gname", "unix.group"]:
  194. try:
  195. return int(self._info[key])
  196. except (ValueError, TypeError, KeyError):
  197. pass
  198. return self._seq[5]
  199. @property
  200. def st_size(self) -> int:
  201. """total size, in bytes"""
  202. try:
  203. return int(self._info["size"])
  204. except (ValueError, TypeError, KeyError):
  205. return self._seq[6]
  206. @property
  207. def st_atime(self) -> int | float:
  208. """time of last access"""
  209. for key in ["atime", "time", "last_accessed", "accessTime"]:
  210. try:
  211. raw_value = self._info[key]
  212. except KeyError:
  213. continue
  214. try:
  215. return _convert_value_to_timestamp(raw_value)
  216. except (TypeError, ValueError):
  217. pass
  218. return self._seq[7]
  219. @property
  220. def st_mtime(self) -> int | float:
  221. """time of last modification"""
  222. for key in [
  223. "mtime",
  224. "LastModified",
  225. "last_modified",
  226. "timeModified",
  227. "modificationTime",
  228. "modified_at",
  229. ]:
  230. try:
  231. raw_value = self._info[key]
  232. except KeyError:
  233. continue
  234. try:
  235. return _convert_value_to_timestamp(raw_value)
  236. except (TypeError, ValueError):
  237. pass
  238. return self._seq[8]
  239. @property
  240. def st_ctime(self) -> int | float:
  241. """time of last change"""
  242. try:
  243. raw_value = self._info["ctime"]
  244. except KeyError:
  245. pass
  246. else:
  247. try:
  248. return _convert_value_to_timestamp(raw_value)
  249. except (TypeError, ValueError):
  250. pass
  251. return self._seq[9]
  252. @property
  253. def st_birthtime(self) -> int | float:
  254. """time of creation"""
  255. for key in [
  256. "birthtime",
  257. "created",
  258. "creation_time",
  259. "timeCreated",
  260. "created_at",
  261. ]:
  262. try:
  263. raw_value = self._info[key]
  264. except KeyError:
  265. continue
  266. try:
  267. return _convert_value_to_timestamp(raw_value)
  268. except (TypeError, ValueError):
  269. pass
  270. raise AttributeError("birthtime")
  271. # --- extra fields ------------------------------------------------
  272. def __getattr__(self, item):
  273. if item in self._fields_extra:
  274. return 0 # fallback default value
  275. raise AttributeError(item)
  276. # --- os.stat_result tuple interface ------------------------------
  277. def __len__(self) -> int:
  278. return len(self._fields)
  279. def __iter__(self) -> Iterator[int]:
  280. """the sequence interface iterates over the guaranteed fields.
  281. All values are integers.
  282. """
  283. for field in self._fields:
  284. yield int(getattr(self, field))
  285. def index(self, value: int, start: int = 0, stop: int | None = None, /) -> int:
  286. """the sequence interface index method."""
  287. if stop is None:
  288. stop = len(self._seq)
  289. return self._seq.index(value, start, stop)
  290. def count(self, value: int) -> int:
  291. """the sequence interface count method."""
  292. return self._seq.count(value)
  293. # --- compatibility with the fsspec info dict interface ------------
  294. def __getitem__(self, item: int | str) -> Any:
  295. if isinstance(item, str):
  296. warnings.warn(
  297. "Access the fsspec info via `.as_info()[key]`",
  298. DeprecationWarning,
  299. stacklevel=2,
  300. )
  301. return self._info[item]
  302. # we need to go via the attributes and cast to int
  303. attr = self._fields[item]
  304. return int(getattr(self, attr))
  305. def keys(self):
  306. """compatibility with the fsspec info dict interface."""
  307. warnings.warn(
  308. "Access the fsspec info via `.as_info().keys()`",
  309. DeprecationWarning,
  310. stacklevel=2,
  311. )
  312. return self._info.keys()
  313. def values(self):
  314. """compatibility with the fsspec info dict interface."""
  315. warnings.warn(
  316. "Access the fsspec info via `.as_info().values()`",
  317. DeprecationWarning,
  318. stacklevel=2,
  319. )
  320. return self._info.values()
  321. def items(self):
  322. """compatibility with the fsspec info dict interface."""
  323. warnings.warn(
  324. "Access the fsspec info via `.as_info().items()`",
  325. DeprecationWarning,
  326. stacklevel=2,
  327. )
  328. return self._info.items()
  329. def get(self, key, default=None):
  330. """compatibility with the fsspec info dict interface."""
  331. warnings.warn(
  332. "Access the fsspec info via `.as_info().get(key, default)`",
  333. DeprecationWarning,
  334. stacklevel=2,
  335. )
  336. return self._info.get(key, default)
  337. def copy(self):
  338. """compatibility with the fsspec info dict interface."""
  339. warnings.warn(
  340. "Access the fsspec info via `.as_info().copy()`",
  341. DeprecationWarning,
  342. stacklevel=2,
  343. )
  344. return self._info.copy()