http.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import annotations
  2. import os
  3. import warnings
  4. from itertools import chain
  5. from typing import Any
  6. from fsspec.asyn import sync
  7. from upath._compat import FSSpecAccessorShim as _FSSpecAccessorShim
  8. from upath._stat import UPathStatResult
  9. from upath.core import UPath
  10. __all__ = ["HTTPPath"]
  11. # accessors are deprecated
  12. _HTTPAccessor = _FSSpecAccessorShim
  13. class HTTPPath(UPath):
  14. @classmethod
  15. def _transform_init_args(
  16. cls,
  17. args: tuple[str | os.PathLike, ...],
  18. protocol: str,
  19. storage_options: dict[str, Any],
  20. ) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
  21. # allow initialization via a path argument and protocol keyword
  22. if args and not str(args[0]).startswith(protocol):
  23. args = (f"{protocol}://{str(args[0]).lstrip('/')}", *args[1:])
  24. return args, protocol, storage_options
  25. @property
  26. def root(self) -> str: # type: ignore[override]
  27. return super().root or "/"
  28. def __str__(self):
  29. return super(UPath, self).__str__()
  30. def is_file(self):
  31. try:
  32. next(super().iterdir())
  33. except (StopIteration, NotADirectoryError):
  34. return True
  35. except FileNotFoundError:
  36. return False
  37. else:
  38. return False
  39. def is_dir(self):
  40. try:
  41. next(super().iterdir())
  42. except (StopIteration, NotADirectoryError):
  43. return False
  44. except FileNotFoundError:
  45. return False
  46. else:
  47. return True
  48. def stat(self, follow_symlinks: bool = True):
  49. if not follow_symlinks:
  50. warnings.warn(
  51. f"{type(self).__name__}.stat(follow_symlinks=False):"
  52. " is currently ignored.",
  53. UserWarning,
  54. stacklevel=2,
  55. )
  56. info = self.fs.info(self.path)
  57. if "url" in info:
  58. info["type"] = "directory" if info["url"].endswith("/") else "file"
  59. return UPathStatResult.from_info(info)
  60. def iterdir(self):
  61. if self.parts[-1:] == ("",):
  62. yield from self.parent.iterdir()
  63. else:
  64. it = iter(super().iterdir())
  65. try:
  66. item0 = next(it)
  67. except (StopIteration, NotADirectoryError):
  68. raise NotADirectoryError(str(self))
  69. except FileNotFoundError:
  70. raise FileNotFoundError(str(self))
  71. else:
  72. yield from chain([item0], it)
  73. def resolve(
  74. self: HTTPPath,
  75. strict: bool = False,
  76. follow_redirects: bool = True,
  77. ) -> HTTPPath:
  78. """Normalize the path and resolve redirects."""
  79. # Normalise the path
  80. resolved_path = super().resolve(strict=strict)
  81. # if the last part is "..", then it's a directory
  82. if self.parts[-1:] == ("..",):
  83. resolved_path = resolved_path.joinpath("")
  84. if follow_redirects:
  85. # Get the fsspec fs
  86. fs = self.fs
  87. url = str(self)
  88. # Ensure we have a session
  89. session = sync(fs.loop, fs.set_session)
  90. # Use HEAD requests if the server allows it, falling back to GETs
  91. for method in (session.head, session.get):
  92. r = sync(fs.loop, method, url, allow_redirects=True)
  93. try:
  94. r.raise_for_status()
  95. except Exception as exc:
  96. if method == session.get:
  97. raise FileNotFoundError(self) from exc
  98. else:
  99. resolved_path = HTTPPath(str(r.url))
  100. break
  101. return resolved_path