http_sync.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931
  1. """This file is largely copied from http.py"""
  2. import io
  3. import logging
  4. import re
  5. import urllib.error
  6. import urllib.parse
  7. from copy import copy
  8. from json import dumps, loads
  9. from urllib.parse import urlparse
  10. try:
  11. import yarl
  12. except (ImportError, ModuleNotFoundError, OSError):
  13. yarl = False
  14. from fsspec.callbacks import _DEFAULT_CALLBACK
  15. from fsspec.registry import register_implementation
  16. from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
  17. from fsspec.utils import DEFAULT_BLOCK_SIZE, isfilelike, nullcontext, tokenize
  18. from ..caching import AllBytes
  19. # https://stackoverflow.com/a/15926317/3821154
  20. ex = re.compile(r"""<(a|A)\s+(?:[^>]*?\s+)?(href|HREF)=["'](?P<url>[^"']+)""")
  21. ex2 = re.compile(r"""(?P<url>http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
  22. logger = logging.getLogger("fsspec.http")
  23. class JsHttpException(urllib.error.HTTPError): ...
  24. class StreamIO(io.BytesIO):
  25. # fake class, so you can set attributes on it
  26. # will eventually actually stream
  27. ...
  28. class ResponseProxy:
  29. """Looks like a requests response"""
  30. def __init__(self, req, stream=False):
  31. self.request = req
  32. self.stream = stream
  33. self._data = None
  34. self._headers = None
  35. @property
  36. def raw(self):
  37. if self._data is None:
  38. b = self.request.response.to_bytes()
  39. if self.stream:
  40. self._data = StreamIO(b)
  41. else:
  42. self._data = b
  43. return self._data
  44. def close(self):
  45. if hasattr(self, "_data"):
  46. del self._data
  47. @property
  48. def headers(self):
  49. if self._headers is None:
  50. self._headers = dict(
  51. [
  52. _.split(": ")
  53. for _ in self.request.getAllResponseHeaders().strip().split("\r\n")
  54. ]
  55. )
  56. return self._headers
  57. @property
  58. def status_code(self):
  59. return int(self.request.status)
  60. def raise_for_status(self):
  61. if not self.ok:
  62. raise JsHttpException(
  63. self.url, self.status_code, self.reason, self.headers, None
  64. )
  65. def iter_content(self, chunksize, *_, **__):
  66. while True:
  67. out = self.raw.read(chunksize)
  68. if out:
  69. yield out
  70. else:
  71. break
  72. @property
  73. def reason(self):
  74. return self.request.statusText
  75. @property
  76. def ok(self):
  77. return self.status_code < 400
  78. @property
  79. def url(self):
  80. return self.request.response.responseURL
  81. @property
  82. def text(self):
  83. # TODO: encoding from headers
  84. return self.content.decode()
  85. @property
  86. def content(self):
  87. self.stream = False
  88. return self.raw
  89. def json(self):
  90. return loads(self.text)
  91. class RequestsSessionShim:
  92. def __init__(self):
  93. self.headers = {}
  94. def request(
  95. self,
  96. method,
  97. url,
  98. params=None,
  99. data=None,
  100. headers=None,
  101. cookies=None,
  102. files=None,
  103. auth=None,
  104. timeout=None,
  105. allow_redirects=None,
  106. proxies=None,
  107. hooks=None,
  108. stream=None,
  109. verify=None,
  110. cert=None,
  111. json=None,
  112. ):
  113. from js import Blob, XMLHttpRequest
  114. logger.debug("JS request: %s %s", method, url)
  115. if cert or verify or proxies or files or cookies or hooks:
  116. raise NotImplementedError
  117. if data and json:
  118. raise ValueError("Use json= or data=, not both")
  119. req = XMLHttpRequest.new()
  120. extra = auth if auth else ()
  121. if params:
  122. url = f"{url}?{urllib.parse.urlencode(params)}"
  123. req.open(method, url, False, *extra)
  124. if timeout:
  125. req.timeout = timeout
  126. if headers:
  127. for k, v in headers.items():
  128. req.setRequestHeader(k, v)
  129. req.setRequestHeader("Accept", "application/octet-stream")
  130. req.responseType = "arraybuffer"
  131. if json:
  132. blob = Blob.new([dumps(data)], {type: "application/json"})
  133. req.send(blob)
  134. elif data:
  135. if isinstance(data, io.IOBase):
  136. data = data.read()
  137. blob = Blob.new([data], {type: "application/octet-stream"})
  138. req.send(blob)
  139. else:
  140. req.send(None)
  141. return ResponseProxy(req, stream=stream)
  142. def get(self, url, **kwargs):
  143. return self.request("GET", url, **kwargs)
  144. def head(self, url, **kwargs):
  145. return self.request("HEAD", url, **kwargs)
  146. def post(self, url, **kwargs):
  147. return self.request("POST}", url, **kwargs)
  148. def put(self, url, **kwargs):
  149. return self.request("PUT", url, **kwargs)
  150. def patch(self, url, **kwargs):
  151. return self.request("PATCH", url, **kwargs)
  152. def delete(self, url, **kwargs):
  153. return self.request("DELETE", url, **kwargs)
  154. class HTTPFileSystem(AbstractFileSystem):
  155. """
  156. Simple File-System for fetching data via HTTP(S)
  157. This is the BLOCKING version of the normal HTTPFileSystem. It uses
  158. requests in normal python and the JS runtime in pyodide.
  159. ***This implementation is extremely experimental, do not use unless
  160. you are testing pyodide/pyscript integration***
  161. """
  162. protocol = ("http", "https", "sync-http", "sync-https")
  163. sep = "/"
  164. def __init__(
  165. self,
  166. simple_links=True,
  167. block_size=None,
  168. same_scheme=True,
  169. cache_type="readahead",
  170. cache_options=None,
  171. client_kwargs=None,
  172. encoded=False,
  173. **storage_options,
  174. ):
  175. """
  176. Parameters
  177. ----------
  178. block_size: int
  179. Blocks to read bytes; if 0, will default to raw requests file-like
  180. objects instead of HTTPFile instances
  181. simple_links: bool
  182. If True, will consider both HTML <a> tags and anything that looks
  183. like a URL; if False, will consider only the former.
  184. same_scheme: True
  185. When doing ls/glob, if this is True, only consider paths that have
  186. http/https matching the input URLs.
  187. size_policy: this argument is deprecated
  188. client_kwargs: dict
  189. Passed to aiohttp.ClientSession, see
  190. https://docs.aiohttp.org/en/stable/client_reference.html
  191. For example, ``{'auth': aiohttp.BasicAuth('user', 'pass')}``
  192. storage_options: key-value
  193. Any other parameters passed on to requests
  194. cache_type, cache_options: defaults used in open
  195. """
  196. super().__init__(self, **storage_options)
  197. self.block_size = block_size if block_size is not None else DEFAULT_BLOCK_SIZE
  198. self.simple_links = simple_links
  199. self.same_schema = same_scheme
  200. self.cache_type = cache_type
  201. self.cache_options = cache_options
  202. self.client_kwargs = client_kwargs or {}
  203. self.encoded = encoded
  204. self.kwargs = storage_options
  205. try:
  206. import js # noqa: F401
  207. logger.debug("Starting JS session")
  208. self.session = RequestsSessionShim()
  209. self.js = True
  210. except Exception as e:
  211. import requests
  212. logger.debug("Starting cpython session because of: %s", e)
  213. self.session = requests.Session(**(client_kwargs or {}))
  214. self.js = False
  215. request_options = copy(storage_options)
  216. self.use_listings_cache = request_options.pop("use_listings_cache", False)
  217. request_options.pop("listings_expiry_time", None)
  218. request_options.pop("max_paths", None)
  219. request_options.pop("skip_instance_cache", None)
  220. self.kwargs = request_options
  221. @property
  222. def fsid(self):
  223. return "sync-http"
  224. def encode_url(self, url):
  225. if yarl:
  226. return yarl.URL(url, encoded=self.encoded)
  227. return url
  228. @classmethod
  229. def _strip_protocol(cls, path: str) -> str:
  230. """For HTTP, we always want to keep the full URL"""
  231. path = path.replace("sync-http://", "http://").replace(
  232. "sync-https://", "https://"
  233. )
  234. return path
  235. @classmethod
  236. def _parent(cls, path):
  237. # override, since _strip_protocol is different for URLs
  238. par = super()._parent(path)
  239. if len(par) > 7: # "http://..."
  240. return par
  241. return ""
  242. def _ls_real(self, url, detail=True, **kwargs):
  243. # ignoring URL-encoded arguments
  244. kw = self.kwargs.copy()
  245. kw.update(kwargs)
  246. logger.debug(url)
  247. r = self.session.get(self.encode_url(url), **self.kwargs)
  248. self._raise_not_found_for_status(r, url)
  249. text = r.text
  250. if self.simple_links:
  251. links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
  252. else:
  253. links = [u[2] for u in ex.findall(text)]
  254. out = set()
  255. parts = urlparse(url)
  256. for l in links:
  257. if isinstance(l, tuple):
  258. l = l[1]
  259. if l.startswith("/") and len(l) > 1:
  260. # absolute URL on this server
  261. l = parts.scheme + "://" + parts.netloc + l
  262. if l.startswith("http"):
  263. if self.same_schema and l.startswith(url.rstrip("/") + "/"):
  264. out.add(l)
  265. elif l.replace("https", "http").startswith(
  266. url.replace("https", "http").rstrip("/") + "/"
  267. ):
  268. # allowed to cross http <-> https
  269. out.add(l)
  270. else:
  271. if l not in ["..", "../"]:
  272. # Ignore FTP-like "parent"
  273. out.add("/".join([url.rstrip("/"), l.lstrip("/")]))
  274. if not out and url.endswith("/"):
  275. out = self._ls_real(url.rstrip("/"), detail=False)
  276. if detail:
  277. return [
  278. {
  279. "name": u,
  280. "size": None,
  281. "type": "directory" if u.endswith("/") else "file",
  282. }
  283. for u in out
  284. ]
  285. else:
  286. return sorted(out)
  287. def ls(self, url, detail=True, **kwargs):
  288. if self.use_listings_cache and url in self.dircache:
  289. out = self.dircache[url]
  290. else:
  291. out = self._ls_real(url, detail=detail, **kwargs)
  292. self.dircache[url] = out
  293. return out
  294. def _raise_not_found_for_status(self, response, url):
  295. """
  296. Raises FileNotFoundError for 404s, otherwise uses raise_for_status.
  297. """
  298. if response.status_code == 404:
  299. raise FileNotFoundError(url)
  300. response.raise_for_status()
  301. def cat_file(self, url, start=None, end=None, **kwargs):
  302. kw = self.kwargs.copy()
  303. kw.update(kwargs)
  304. logger.debug(url)
  305. if start is not None or end is not None:
  306. if start == end:
  307. return b""
  308. headers = kw.pop("headers", {}).copy()
  309. headers["Range"] = self._process_limits(url, start, end)
  310. kw["headers"] = headers
  311. r = self.session.get(self.encode_url(url), **kw)
  312. self._raise_not_found_for_status(r, url)
  313. return r.content
  314. def get_file(
  315. self, rpath, lpath, chunk_size=5 * 2**20, callback=_DEFAULT_CALLBACK, **kwargs
  316. ):
  317. kw = self.kwargs.copy()
  318. kw.update(kwargs)
  319. logger.debug(rpath)
  320. r = self.session.get(self.encode_url(rpath), **kw)
  321. try:
  322. size = int(
  323. r.headers.get("content-length", None)
  324. or r.headers.get("Content-Length", None)
  325. )
  326. except (ValueError, KeyError, TypeError):
  327. size = None
  328. callback.set_size(size)
  329. self._raise_not_found_for_status(r, rpath)
  330. if not isfilelike(lpath):
  331. lpath = open(lpath, "wb")
  332. for chunk in r.iter_content(chunk_size, decode_unicode=False):
  333. lpath.write(chunk)
  334. callback.relative_update(len(chunk))
  335. def put_file(
  336. self,
  337. lpath,
  338. rpath,
  339. chunk_size=5 * 2**20,
  340. callback=_DEFAULT_CALLBACK,
  341. method="post",
  342. **kwargs,
  343. ):
  344. def gen_chunks():
  345. # Support passing arbitrary file-like objects
  346. # and use them instead of streams.
  347. if isinstance(lpath, io.IOBase):
  348. context = nullcontext(lpath)
  349. use_seek = False # might not support seeking
  350. else:
  351. context = open(lpath, "rb")
  352. use_seek = True
  353. with context as f:
  354. if use_seek:
  355. callback.set_size(f.seek(0, 2))
  356. f.seek(0)
  357. else:
  358. callback.set_size(getattr(f, "size", None))
  359. chunk = f.read(chunk_size)
  360. while chunk:
  361. yield chunk
  362. callback.relative_update(len(chunk))
  363. chunk = f.read(chunk_size)
  364. kw = self.kwargs.copy()
  365. kw.update(kwargs)
  366. method = method.lower()
  367. if method not in ("post", "put"):
  368. raise ValueError(
  369. f"method has to be either 'post' or 'put', not: {method!r}"
  370. )
  371. meth = getattr(self.session, method)
  372. resp = meth(rpath, data=gen_chunks(), **kw)
  373. self._raise_not_found_for_status(resp, rpath)
  374. def _process_limits(self, url, start, end):
  375. """Helper for "Range"-based _cat_file"""
  376. size = None
  377. suff = False
  378. if start is not None and start < 0:
  379. # if start is negative and end None, end is the "suffix length"
  380. if end is None:
  381. end = -start
  382. start = ""
  383. suff = True
  384. else:
  385. size = size or self.info(url)["size"]
  386. start = size + start
  387. elif start is None:
  388. start = 0
  389. if not suff:
  390. if end is not None and end < 0:
  391. if start is not None:
  392. size = size or self.info(url)["size"]
  393. end = size + end
  394. elif end is None:
  395. end = ""
  396. if isinstance(end, int):
  397. end -= 1 # bytes range is inclusive
  398. return f"bytes={start}-{end}"
  399. def exists(self, path, **kwargs):
  400. kw = self.kwargs.copy()
  401. kw.update(kwargs)
  402. try:
  403. logger.debug(path)
  404. r = self.session.get(self.encode_url(path), **kw)
  405. return r.status_code < 400
  406. except Exception:
  407. return False
  408. def isfile(self, path, **kwargs):
  409. return self.exists(path, **kwargs)
  410. def _open(
  411. self,
  412. path,
  413. mode="rb",
  414. block_size=None,
  415. autocommit=None, # XXX: This differs from the base class.
  416. cache_type=None,
  417. cache_options=None,
  418. size=None,
  419. **kwargs,
  420. ):
  421. """Make a file-like object
  422. Parameters
  423. ----------
  424. path: str
  425. Full URL with protocol
  426. mode: string
  427. must be "rb"
  428. block_size: int or None
  429. Bytes to download in one request; use instance value if None. If
  430. zero, will return a streaming Requests file-like instance.
  431. kwargs: key-value
  432. Any other parameters, passed to requests calls
  433. """
  434. if mode != "rb":
  435. raise NotImplementedError
  436. block_size = block_size if block_size is not None else self.block_size
  437. kw = self.kwargs.copy()
  438. kw.update(kwargs)
  439. size = size or self.info(path, **kwargs)["size"]
  440. if block_size and size:
  441. return HTTPFile(
  442. self,
  443. path,
  444. session=self.session,
  445. block_size=block_size,
  446. mode=mode,
  447. size=size,
  448. cache_type=cache_type or self.cache_type,
  449. cache_options=cache_options or self.cache_options,
  450. **kw,
  451. )
  452. else:
  453. return HTTPStreamFile(
  454. self,
  455. path,
  456. mode=mode,
  457. session=self.session,
  458. **kw,
  459. )
  460. def ukey(self, url):
  461. """Unique identifier; assume HTTP files are static, unchanging"""
  462. return tokenize(url, self.kwargs, self.protocol)
  463. def info(self, url, **kwargs):
  464. """Get info of URL
  465. Tries to access location via HEAD, and then GET methods, but does
  466. not fetch the data.
  467. It is possible that the server does not supply any size information, in
  468. which case size will be given as None (and certain operations on the
  469. corresponding file will not work).
  470. """
  471. info = {}
  472. for policy in ["head", "get"]:
  473. try:
  474. info.update(
  475. _file_info(
  476. self.encode_url(url),
  477. size_policy=policy,
  478. session=self.session,
  479. **self.kwargs,
  480. **kwargs,
  481. )
  482. )
  483. if info.get("size") is not None:
  484. break
  485. except Exception as exc:
  486. if policy == "get":
  487. # If get failed, then raise a FileNotFoundError
  488. raise FileNotFoundError(url) from exc
  489. logger.debug(str(exc))
  490. return {"name": url, "size": None, **info, "type": "file"}
  491. def glob(self, path, maxdepth=None, **kwargs):
  492. """
  493. Find files by glob-matching.
  494. This implementation is idntical to the one in AbstractFileSystem,
  495. but "?" is not considered as a character for globbing, because it is
  496. so common in URLs, often identifying the "query" part.
  497. """
  498. import re
  499. ends = path.endswith("/")
  500. path = self._strip_protocol(path)
  501. indstar = path.find("*") if path.find("*") >= 0 else len(path)
  502. indbrace = path.find("[") if path.find("[") >= 0 else len(path)
  503. ind = min(indstar, indbrace)
  504. detail = kwargs.pop("detail", False)
  505. if not has_magic(path):
  506. root = path
  507. depth = 1
  508. if ends:
  509. path += "/*"
  510. elif self.exists(path):
  511. if not detail:
  512. return [path]
  513. else:
  514. return {path: self.info(path)}
  515. else:
  516. if not detail:
  517. return [] # glob of non-existent returns empty
  518. else:
  519. return {}
  520. elif "/" in path[:ind]:
  521. ind2 = path[:ind].rindex("/")
  522. root = path[: ind2 + 1]
  523. depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
  524. else:
  525. root = ""
  526. depth = None if "**" in path else path[ind + 1 :].count("/") + 1
  527. allpaths = self.find(
  528. root, maxdepth=maxdepth or depth, withdirs=True, detail=True, **kwargs
  529. )
  530. # Escape characters special to python regex, leaving our supported
  531. # special characters in place.
  532. # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
  533. # for shell globbing details.
  534. pattern = (
  535. "^"
  536. + (
  537. path.replace("\\", r"\\")
  538. .replace(".", r"\.")
  539. .replace("+", r"\+")
  540. .replace("//", "/")
  541. .replace("(", r"\(")
  542. .replace(")", r"\)")
  543. .replace("|", r"\|")
  544. .replace("^", r"\^")
  545. .replace("$", r"\$")
  546. .replace("{", r"\{")
  547. .replace("}", r"\}")
  548. .rstrip("/")
  549. )
  550. + "$"
  551. )
  552. pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
  553. pattern = re.sub("[*]", "[^/]*", pattern)
  554. pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
  555. out = {
  556. p: allpaths[p]
  557. for p in sorted(allpaths)
  558. if pattern.match(p.replace("//", "/").rstrip("/"))
  559. }
  560. if detail:
  561. return out
  562. else:
  563. return list(out)
  564. def isdir(self, path):
  565. # override, since all URLs are (also) files
  566. try:
  567. return bool(self.ls(path))
  568. except (FileNotFoundError, ValueError):
  569. return False
  570. class HTTPFile(AbstractBufferedFile):
  571. """
  572. A file-like object pointing to a remove HTTP(S) resource
  573. Supports only reading, with read-ahead of a predermined block-size.
  574. In the case that the server does not supply the filesize, only reading of
  575. the complete file in one go is supported.
  576. Parameters
  577. ----------
  578. url: str
  579. Full URL of the remote resource, including the protocol
  580. session: requests.Session or None
  581. All calls will be made within this session, to avoid restarting
  582. connections where the server allows this
  583. block_size: int or None
  584. The amount of read-ahead to do, in bytes. Default is 5MB, or the value
  585. configured for the FileSystem creating this file
  586. size: None or int
  587. If given, this is the size of the file in bytes, and we don't attempt
  588. to call the server to find the value.
  589. kwargs: all other key-values are passed to requests calls.
  590. """
  591. def __init__(
  592. self,
  593. fs,
  594. url,
  595. session=None,
  596. block_size=None,
  597. mode="rb",
  598. cache_type="bytes",
  599. cache_options=None,
  600. size=None,
  601. **kwargs,
  602. ):
  603. if mode != "rb":
  604. raise NotImplementedError("File mode not supported")
  605. self.url = url
  606. self.session = session
  607. self.details = {"name": url, "size": size, "type": "file"}
  608. super().__init__(
  609. fs=fs,
  610. path=url,
  611. mode=mode,
  612. block_size=block_size,
  613. cache_type=cache_type,
  614. cache_options=cache_options,
  615. **kwargs,
  616. )
  617. def read(self, length=-1):
  618. """Read bytes from file
  619. Parameters
  620. ----------
  621. length: int
  622. Read up to this many bytes. If negative, read all content to end of
  623. file. If the server has not supplied the filesize, attempting to
  624. read only part of the data will raise a ValueError.
  625. """
  626. if (
  627. (length < 0 and self.loc == 0) # explicit read all
  628. # but not when the size is known and fits into a block anyways
  629. and not (self.size is not None and self.size <= self.blocksize)
  630. ):
  631. self._fetch_all()
  632. if self.size is None:
  633. if length < 0:
  634. self._fetch_all()
  635. else:
  636. length = min(self.size - self.loc, length)
  637. return super().read(length)
  638. def _fetch_all(self):
  639. """Read whole file in one shot, without caching
  640. This is only called when position is still at zero,
  641. and read() is called without a byte-count.
  642. """
  643. logger.debug(f"Fetch all for {self}")
  644. if not isinstance(self.cache, AllBytes):
  645. r = self.session.get(self.fs.encode_url(self.url), **self.kwargs)
  646. r.raise_for_status()
  647. out = r.content
  648. self.cache = AllBytes(size=len(out), fetcher=None, blocksize=None, data=out)
  649. self.size = len(out)
  650. def _parse_content_range(self, headers):
  651. """Parse the Content-Range header"""
  652. s = headers.get("Content-Range", "")
  653. m = re.match(r"bytes (\d+-\d+|\*)/(\d+|\*)", s)
  654. if not m:
  655. return None, None, None
  656. if m[1] == "*":
  657. start = end = None
  658. else:
  659. start, end = [int(x) for x in m[1].split("-")]
  660. total = None if m[2] == "*" else int(m[2])
  661. return start, end, total
  662. def _fetch_range(self, start, end):
  663. """Download a block of data
  664. The expectation is that the server returns only the requested bytes,
  665. with HTTP code 206. If this is not the case, we first check the headers,
  666. and then stream the output - if the data size is bigger than we
  667. requested, an exception is raised.
  668. """
  669. logger.debug(f"Fetch range for {self}: {start}-{end}")
  670. kwargs = self.kwargs.copy()
  671. headers = kwargs.pop("headers", {}).copy()
  672. headers["Range"] = f"bytes={start}-{end - 1}"
  673. logger.debug("%s : %s", self.url, headers["Range"])
  674. r = self.session.get(self.fs.encode_url(self.url), headers=headers, **kwargs)
  675. if r.status_code == 416:
  676. # range request outside file
  677. return b""
  678. r.raise_for_status()
  679. # If the server has handled the range request, it should reply
  680. # with status 206 (partial content). But we'll guess that a suitable
  681. # Content-Range header or a Content-Length no more than the
  682. # requested range also mean we have got the desired range.
  683. cl = r.headers.get("Content-Length", r.headers.get("content-length", end + 1))
  684. response_is_range = (
  685. r.status_code == 206
  686. or self._parse_content_range(r.headers)[0] == start
  687. or int(cl) <= end - start
  688. )
  689. if response_is_range:
  690. # partial content, as expected
  691. out = r.content
  692. elif start > 0:
  693. raise ValueError(
  694. "The HTTP server doesn't appear to support range requests. "
  695. "Only reading this file from the beginning is supported. "
  696. "Open with block_size=0 for a streaming file interface."
  697. )
  698. else:
  699. # Response is not a range, but we want the start of the file,
  700. # so we can read the required amount anyway.
  701. cl = 0
  702. out = []
  703. for chunk in r.iter_content(2**20, False):
  704. out.append(chunk)
  705. cl += len(chunk)
  706. out = b"".join(out)[: end - start]
  707. return out
  708. magic_check = re.compile("([*[])")
  709. def has_magic(s):
  710. match = magic_check.search(s)
  711. return match is not None
  712. class HTTPStreamFile(AbstractBufferedFile):
  713. def __init__(self, fs, url, mode="rb", session=None, **kwargs):
  714. self.url = url
  715. self.session = session
  716. if mode != "rb":
  717. raise ValueError
  718. self.details = {"name": url, "size": None}
  719. super().__init__(fs=fs, path=url, mode=mode, cache_type="readahead", **kwargs)
  720. r = self.session.get(self.fs.encode_url(url), stream=True, **kwargs)
  721. self.fs._raise_not_found_for_status(r, url)
  722. self.it = r.iter_content(1024, False)
  723. self.leftover = b""
  724. self.r = r
  725. def seek(self, *args, **kwargs):
  726. raise ValueError("Cannot seek streaming HTTP file")
  727. def read(self, num=-1):
  728. bufs = [self.leftover]
  729. leng = len(self.leftover)
  730. while leng < num or num < 0:
  731. try:
  732. out = self.it.__next__()
  733. except StopIteration:
  734. break
  735. if out:
  736. bufs.append(out)
  737. else:
  738. break
  739. leng += len(out)
  740. out = b"".join(bufs)
  741. if num >= 0:
  742. self.leftover = out[num:]
  743. out = out[:num]
  744. else:
  745. self.leftover = b""
  746. self.loc += len(out)
  747. return out
  748. def close(self):
  749. self.r.close()
  750. self.closed = True
  751. def get_range(session, url, start, end, **kwargs):
  752. # explicit get a range when we know it must be safe
  753. kwargs = kwargs.copy()
  754. headers = kwargs.pop("headers", {}).copy()
  755. headers["Range"] = f"bytes={start}-{end - 1}"
  756. r = session.get(url, headers=headers, **kwargs)
  757. r.raise_for_status()
  758. return r.content
  759. def _file_info(url, session, size_policy="head", **kwargs):
  760. """Call HEAD on the server to get details about the file (size/checksum etc.)
  761. Default operation is to explicitly allow redirects and use encoding
  762. 'identity' (no compression) to get the true size of the target.
  763. """
  764. logger.debug("Retrieve file size for %s", url)
  765. kwargs = kwargs.copy()
  766. ar = kwargs.pop("allow_redirects", True)
  767. head = kwargs.get("headers", {}).copy()
  768. # TODO: not allowed in JS
  769. # head["Accept-Encoding"] = "identity"
  770. kwargs["headers"] = head
  771. info = {}
  772. if size_policy == "head":
  773. r = session.head(url, allow_redirects=ar, **kwargs)
  774. elif size_policy == "get":
  775. r = session.get(url, allow_redirects=ar, **kwargs)
  776. else:
  777. raise TypeError(f'size_policy must be "head" or "get", got {size_policy}')
  778. r.raise_for_status()
  779. # TODO:
  780. # recognise lack of 'Accept-Ranges',
  781. # or 'Accept-Ranges': 'none' (not 'bytes')
  782. # to mean streaming only, no random access => return None
  783. if "Content-Length" in r.headers:
  784. info["size"] = int(r.headers["Content-Length"])
  785. elif "Content-Range" in r.headers:
  786. info["size"] = int(r.headers["Content-Range"].split("/")[1])
  787. elif "content-length" in r.headers:
  788. info["size"] = int(r.headers["content-length"])
  789. elif "content-range" in r.headers:
  790. info["size"] = int(r.headers["content-range"].split("/")[1])
  791. for checksum_field in ["ETag", "Content-MD5", "Digest"]:
  792. if r.headers.get(checksum_field):
  793. info[checksum_field] = r.headers[checksum_field]
  794. return info
  795. # importing this is enough to register it
  796. def register():
  797. register_implementation("http", HTTPFileSystem, clobber=True)
  798. register_implementation("https", HTTPFileSystem, clobber=True)
  799. register_implementation("sync-http", HTTPFileSystem, clobber=True)
  800. register_implementation("sync-https", HTTPFileSystem, clobber=True)
  801. register()
  802. def unregister():
  803. from fsspec.implementations.http import HTTPFileSystem
  804. register_implementation("http", HTTPFileSystem, clobber=True)
  805. register_implementation("https", HTTPFileSystem, clobber=True)