fetch.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708
  1. """
  2. Support for streaming http requests in emscripten.
  3. A few caveats -
  4. If your browser (or Node.js) has WebAssembly JavaScript Promise Integration enabled
  5. https://github.com/WebAssembly/js-promise-integration/blob/main/proposals/js-promise-integration/Overview.md
  6. *and* you launch pyodide using `pyodide.runPythonAsync`, this will fetch data using the
  7. JavaScript asynchronous fetch api (wrapped via `pyodide.ffi.call_sync`). In this case
  8. timeouts and streaming should just work.
  9. Otherwise, it uses a combination of XMLHttpRequest and a web-worker for streaming.
  10. This approach has several caveats:
  11. Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
  12. Streaming only works if you're running pyodide in a web worker.
  13. Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
  14. operation, so it requires that you have crossOriginIsolation enabled, by serving over https
  15. (or from localhost) with the two headers below set:
  16. Cross-Origin-Opener-Policy: same-origin
  17. Cross-Origin-Embedder-Policy: require-corp
  18. You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
  19. JavaScript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
  20. request into a buffer and then returning it. it shows a warning in the JavaScript console in this case.
  21. Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
  22. control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
  23. NB: in this code, there are a lot of JavaScript objects. They are named js_*
  24. to make it clear what type of object they are.
  25. """
  26. from __future__ import annotations
  27. import io
  28. import json
  29. from email.parser import Parser
  30. from importlib.resources import files
  31. from typing import TYPE_CHECKING, Any
  32. import js # type: ignore[import-not-found]
  33. from pyodide.ffi import ( # type: ignore[import-not-found]
  34. JsArray,
  35. JsException,
  36. JsProxy,
  37. to_js,
  38. )
  39. if TYPE_CHECKING:
  40. from typing_extensions import Buffer
  41. from .request import EmscriptenRequest
  42. from .response import EmscriptenResponse
  43. """
  44. There are some headers that trigger unintended CORS preflight requests.
  45. See also https://github.com/koenvo/pyodide-http/issues/22
  46. """
  47. HEADERS_TO_IGNORE = ("user-agent",)
  48. SUCCESS_HEADER = -1
  49. SUCCESS_EOF = -2
  50. ERROR_TIMEOUT = -3
  51. ERROR_EXCEPTION = -4
  52. _STREAMING_WORKER_CODE = (
  53. files(__package__)
  54. .joinpath("emscripten_fetch_worker.js")
  55. .read_text(encoding="utf-8")
  56. )
  57. class _RequestError(Exception):
  58. def __init__(
  59. self,
  60. message: str | None = None,
  61. *,
  62. request: EmscriptenRequest | None = None,
  63. response: EmscriptenResponse | None = None,
  64. ):
  65. self.request = request
  66. self.response = response
  67. self.message = message
  68. super().__init__(self.message)
  69. class _StreamingError(_RequestError):
  70. pass
  71. class _TimeoutError(_RequestError):
  72. pass
  73. def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
  74. return to_js(dict_val, dict_converter=js.Object.fromEntries)
  75. class _ReadStream(io.RawIOBase):
  76. def __init__(
  77. self,
  78. int_buffer: JsArray,
  79. byte_buffer: JsArray,
  80. timeout: float,
  81. worker: JsProxy,
  82. connection_id: int,
  83. request: EmscriptenRequest,
  84. ):
  85. self.int_buffer = int_buffer
  86. self.byte_buffer = byte_buffer
  87. self.read_pos = 0
  88. self.read_len = 0
  89. self.connection_id = connection_id
  90. self.worker = worker
  91. self.timeout = int(1000 * timeout) if timeout > 0 else None
  92. self.is_live = True
  93. self._is_closed = False
  94. self.request: EmscriptenRequest | None = request
  95. def __del__(self) -> None:
  96. self.close()
  97. # this is compatible with _base_connection
  98. def is_closed(self) -> bool:
  99. return self._is_closed
  100. # for compatibility with RawIOBase
  101. @property
  102. def closed(self) -> bool:
  103. return self.is_closed()
  104. def close(self) -> None:
  105. if self.is_closed():
  106. return
  107. self.read_len = 0
  108. self.read_pos = 0
  109. self.int_buffer = None
  110. self.byte_buffer = None
  111. self._is_closed = True
  112. self.request = None
  113. if self.is_live:
  114. self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
  115. self.is_live = False
  116. super().close()
  117. def readable(self) -> bool:
  118. return True
  119. def writable(self) -> bool:
  120. return False
  121. def seekable(self) -> bool:
  122. return False
  123. def readinto(self, byte_obj: Buffer) -> int:
  124. if not self.int_buffer:
  125. raise _StreamingError(
  126. "No buffer for stream in _ReadStream.readinto",
  127. request=self.request,
  128. response=None,
  129. )
  130. if self.read_len == 0:
  131. # wait for the worker to send something
  132. js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
  133. self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
  134. if (
  135. js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
  136. == "timed-out"
  137. ):
  138. raise _TimeoutError
  139. data_len = self.int_buffer[0]
  140. if data_len > 0:
  141. self.read_len = data_len
  142. self.read_pos = 0
  143. elif data_len == ERROR_EXCEPTION:
  144. string_len = self.int_buffer[1]
  145. # decode the error string
  146. js_decoder = js.TextDecoder.new()
  147. json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
  148. raise _StreamingError(
  149. f"Exception thrown in fetch: {json_str}",
  150. request=self.request,
  151. response=None,
  152. )
  153. else:
  154. # EOF, free the buffers and return zero
  155. # and free the request
  156. self.is_live = False
  157. self.close()
  158. return 0
  159. # copy from int32array to python bytes
  160. ret_length = min(self.read_len, len(memoryview(byte_obj)))
  161. subarray = self.byte_buffer.subarray(
  162. self.read_pos, self.read_pos + ret_length
  163. ).to_py()
  164. memoryview(byte_obj)[0:ret_length] = subarray
  165. self.read_len -= ret_length
  166. self.read_pos += ret_length
  167. return ret_length
  168. class _StreamingFetcher:
  169. def __init__(self) -> None:
  170. # make web-worker and data buffer on startup
  171. self.streaming_ready = False
  172. js_data_blob = js.Blob.new(
  173. to_js([_STREAMING_WORKER_CODE], create_pyproxies=False),
  174. _obj_from_dict({"type": "application/javascript"}),
  175. )
  176. def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
  177. def onMsg(e: JsProxy) -> None:
  178. self.streaming_ready = True
  179. js_resolve_fn(e)
  180. def onErr(e: JsProxy) -> None:
  181. js_reject_fn(e) # Defensive: never happens in ci
  182. self.js_worker.onmessage = onMsg
  183. self.js_worker.onerror = onErr
  184. js_data_url = js.URL.createObjectURL(js_data_blob)
  185. self.js_worker = js.globalThis.Worker.new(js_data_url)
  186. self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
  187. def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
  188. headers = {
  189. k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
  190. }
  191. body = request.body
  192. fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
  193. # start the request off in the worker
  194. timeout = int(1000 * request.timeout) if request.timeout > 0 else None
  195. js_shared_buffer = js.SharedArrayBuffer.new(1048576)
  196. js_int_buffer = js.Int32Array.new(js_shared_buffer)
  197. js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
  198. js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
  199. js.Atomics.notify(js_int_buffer, 0)
  200. js_absolute_url = js.URL.new(request.url, js.location).href
  201. self.js_worker.postMessage(
  202. _obj_from_dict(
  203. {
  204. "buffer": js_shared_buffer,
  205. "url": js_absolute_url,
  206. "fetchParams": fetch_data,
  207. }
  208. )
  209. )
  210. # wait for the worker to send something
  211. js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
  212. if js_int_buffer[0] == ERROR_TIMEOUT:
  213. raise _TimeoutError(
  214. "Timeout connecting to streaming request",
  215. request=request,
  216. response=None,
  217. )
  218. elif js_int_buffer[0] == SUCCESS_HEADER:
  219. # got response
  220. # header length is in second int of intBuffer
  221. string_len = js_int_buffer[1]
  222. # decode the rest to a JSON string
  223. js_decoder = js.TextDecoder.new()
  224. # this does a copy (the slice) because decode can't work on shared array
  225. # for some silly reason
  226. json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
  227. # get it as an object
  228. response_obj = json.loads(json_str)
  229. return EmscriptenResponse(
  230. request=request,
  231. status_code=response_obj["status"],
  232. headers=response_obj["headers"],
  233. body=_ReadStream(
  234. js_int_buffer,
  235. js_byte_buffer,
  236. request.timeout,
  237. self.js_worker,
  238. response_obj["connectionID"],
  239. request,
  240. ),
  241. )
  242. elif js_int_buffer[0] == ERROR_EXCEPTION:
  243. string_len = js_int_buffer[1]
  244. # decode the error string
  245. js_decoder = js.TextDecoder.new()
  246. json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
  247. raise _StreamingError(
  248. f"Exception thrown in fetch: {json_str}", request=request, response=None
  249. )
  250. else:
  251. raise _StreamingError(
  252. f"Unknown status from worker in fetch: {js_int_buffer[0]}",
  253. request=request,
  254. response=None,
  255. )
  256. class _JSPIReadStream(io.RawIOBase):
  257. """
  258. A read stream that uses pyodide.ffi.run_sync to read from a JavaScript fetch
  259. response. This requires support for WebAssembly JavaScript Promise Integration
  260. in the containing browser, and for pyodide to be launched via runPythonAsync.
  261. :param js_read_stream:
  262. The JavaScript stream reader
  263. :param timeout:
  264. Timeout in seconds
  265. :param request:
  266. The request we're handling
  267. :param response:
  268. The response this stream relates to
  269. :param js_abort_controller:
  270. A JavaScript AbortController object, used for timeouts
  271. """
  272. def __init__(
  273. self,
  274. js_read_stream: Any,
  275. timeout: float,
  276. request: EmscriptenRequest,
  277. response: EmscriptenResponse,
  278. js_abort_controller: Any, # JavaScript AbortController for timeouts
  279. ):
  280. self.js_read_stream = js_read_stream
  281. self.timeout = timeout
  282. self._is_closed = False
  283. self._is_done = False
  284. self.request: EmscriptenRequest | None = request
  285. self.response: EmscriptenResponse | None = response
  286. self.current_buffer = None
  287. self.current_buffer_pos = 0
  288. self.js_abort_controller = js_abort_controller
  289. def __del__(self) -> None:
  290. self.close()
  291. # this is compatible with _base_connection
  292. def is_closed(self) -> bool:
  293. return self._is_closed
  294. # for compatibility with RawIOBase
  295. @property
  296. def closed(self) -> bool:
  297. return self.is_closed()
  298. def close(self) -> None:
  299. if self.is_closed():
  300. return
  301. self.read_len = 0
  302. self.read_pos = 0
  303. self.js_read_stream.cancel()
  304. self.js_read_stream = None
  305. self._is_closed = True
  306. self._is_done = True
  307. self.request = None
  308. self.response = None
  309. super().close()
  310. def readable(self) -> bool:
  311. return True
  312. def writable(self) -> bool:
  313. return False
  314. def seekable(self) -> bool:
  315. return False
  316. def _get_next_buffer(self) -> bool:
  317. result_js = _run_sync_with_timeout(
  318. self.js_read_stream.read(),
  319. self.timeout,
  320. self.js_abort_controller,
  321. request=self.request,
  322. response=self.response,
  323. )
  324. if result_js.done:
  325. self._is_done = True
  326. return False
  327. else:
  328. self.current_buffer = result_js.value.to_py()
  329. self.current_buffer_pos = 0
  330. return True
  331. def readinto(self, byte_obj: Buffer) -> int:
  332. if self.current_buffer is None:
  333. if not self._get_next_buffer() or self.current_buffer is None:
  334. self.close()
  335. return 0
  336. ret_length = min(
  337. len(byte_obj), len(self.current_buffer) - self.current_buffer_pos
  338. )
  339. byte_obj[0:ret_length] = self.current_buffer[
  340. self.current_buffer_pos : self.current_buffer_pos + ret_length
  341. ]
  342. self.current_buffer_pos += ret_length
  343. if self.current_buffer_pos == len(self.current_buffer):
  344. self.current_buffer = None
  345. return ret_length
  346. # check if we are in a worker or not
  347. def is_in_browser_main_thread() -> bool:
  348. return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
  349. def is_cross_origin_isolated() -> bool:
  350. return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
  351. def is_in_node() -> bool:
  352. return (
  353. hasattr(js, "process")
  354. and hasattr(js.process, "release")
  355. and hasattr(js.process.release, "name")
  356. and js.process.release.name == "node"
  357. )
  358. def is_worker_available() -> bool:
  359. return hasattr(js, "Worker") and hasattr(js, "Blob")
  360. _fetcher: _StreamingFetcher | None = None
  361. if is_worker_available() and (
  362. (is_cross_origin_isolated() and not is_in_browser_main_thread())
  363. and (not is_in_node())
  364. ):
  365. _fetcher = _StreamingFetcher()
  366. else:
  367. _fetcher = None
  368. NODE_JSPI_ERROR = (
  369. "urllib3 only works in Node.js with pyodide.runPythonAsync"
  370. " and requires the flag --experimental-wasm-stack-switching in "
  371. " versions of node <24."
  372. )
  373. def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
  374. if has_jspi():
  375. return send_jspi_request(request, True)
  376. elif is_in_node():
  377. raise _RequestError(
  378. message=NODE_JSPI_ERROR,
  379. request=request,
  380. response=None,
  381. )
  382. if _fetcher and streaming_ready():
  383. return _fetcher.send(request)
  384. else:
  385. _show_streaming_warning()
  386. return None
  387. _SHOWN_TIMEOUT_WARNING = False
  388. def _show_timeout_warning() -> None:
  389. global _SHOWN_TIMEOUT_WARNING
  390. if not _SHOWN_TIMEOUT_WARNING:
  391. _SHOWN_TIMEOUT_WARNING = True
  392. message = "Warning: Timeout is not available on main browser thread"
  393. js.console.warn(message)
  394. _SHOWN_STREAMING_WARNING = False
  395. def _show_streaming_warning() -> None:
  396. global _SHOWN_STREAMING_WARNING
  397. if not _SHOWN_STREAMING_WARNING:
  398. _SHOWN_STREAMING_WARNING = True
  399. message = "Can't stream HTTP requests because: \n"
  400. if not is_cross_origin_isolated():
  401. message += " Page is not cross-origin isolated\n"
  402. if is_in_browser_main_thread():
  403. message += " Python is running in main browser thread\n"
  404. if not is_worker_available():
  405. message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in
  406. if streaming_ready() is False:
  407. message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
  408. is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
  409. from js import console
  410. console.warn(message)
  411. def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
  412. if has_jspi():
  413. return send_jspi_request(request, False)
  414. elif is_in_node():
  415. raise _RequestError(
  416. message=NODE_JSPI_ERROR,
  417. request=request,
  418. response=None,
  419. )
  420. try:
  421. js_xhr = js.XMLHttpRequest.new()
  422. if not is_in_browser_main_thread():
  423. js_xhr.responseType = "arraybuffer"
  424. if request.timeout:
  425. js_xhr.timeout = int(request.timeout * 1000)
  426. else:
  427. js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
  428. if request.timeout:
  429. # timeout isn't available on the main thread - show a warning in console
  430. # if it is set
  431. _show_timeout_warning()
  432. js_xhr.open(request.method, request.url, False)
  433. for name, value in request.headers.items():
  434. if name.lower() not in HEADERS_TO_IGNORE:
  435. js_xhr.setRequestHeader(name, value)
  436. js_xhr.send(to_js(request.body))
  437. headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
  438. if not is_in_browser_main_thread():
  439. body = js_xhr.response.to_py().tobytes()
  440. else:
  441. body = js_xhr.response.encode("ISO-8859-15")
  442. return EmscriptenResponse(
  443. status_code=js_xhr.status, headers=headers, body=body, request=request
  444. )
  445. except JsException as err:
  446. if err.name == "TimeoutError":
  447. raise _TimeoutError(err.message, request=request)
  448. elif err.name == "NetworkError":
  449. raise _RequestError(err.message, request=request)
  450. else:
  451. # general http error
  452. raise _RequestError(err.message, request=request)
  453. def send_jspi_request(
  454. request: EmscriptenRequest, streaming: bool
  455. ) -> EmscriptenResponse:
  456. """
  457. Send a request using WebAssembly JavaScript Promise Integration
  458. to wrap the asynchronous JavaScript fetch api (experimental).
  459. :param request:
  460. Request to send
  461. :param streaming:
  462. Whether to stream the response
  463. :return: The response object
  464. :rtype: EmscriptenResponse
  465. """
  466. timeout = request.timeout
  467. js_abort_controller = js.AbortController.new()
  468. headers = {k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE}
  469. req_body = request.body
  470. fetch_data = {
  471. "headers": headers,
  472. "body": to_js(req_body),
  473. "method": request.method,
  474. "signal": js_abort_controller.signal,
  475. }
  476. # Call JavaScript fetch (async api, returns a promise)
  477. fetcher_promise_js = js.fetch(request.url, _obj_from_dict(fetch_data))
  478. # Now suspend WebAssembly until we resolve that promise
  479. # or time out.
  480. response_js = _run_sync_with_timeout(
  481. fetcher_promise_js,
  482. timeout,
  483. js_abort_controller,
  484. request=request,
  485. response=None,
  486. )
  487. headers = {}
  488. header_iter = response_js.headers.entries()
  489. while True:
  490. iter_value_js = header_iter.next()
  491. if getattr(iter_value_js, "done", False):
  492. break
  493. else:
  494. headers[str(iter_value_js.value[0])] = str(iter_value_js.value[1])
  495. status_code = response_js.status
  496. body: bytes | io.RawIOBase = b""
  497. response = EmscriptenResponse(
  498. status_code=status_code, headers=headers, body=b"", request=request
  499. )
  500. if streaming:
  501. # get via inputstream
  502. if response_js.body is not None:
  503. # get a reader from the fetch response
  504. body_stream_js = response_js.body.getReader()
  505. body = _JSPIReadStream(
  506. body_stream_js, timeout, request, response, js_abort_controller
  507. )
  508. else:
  509. # get directly via arraybuffer
  510. # n.b. this is another async JavaScript call.
  511. body = _run_sync_with_timeout(
  512. response_js.arrayBuffer(),
  513. timeout,
  514. js_abort_controller,
  515. request=request,
  516. response=response,
  517. ).to_py()
  518. response.body = body
  519. return response
  520. def _run_sync_with_timeout(
  521. promise: Any,
  522. timeout: float,
  523. js_abort_controller: Any,
  524. request: EmscriptenRequest | None,
  525. response: EmscriptenResponse | None,
  526. ) -> Any:
  527. """
  528. Await a JavaScript promise synchronously with a timeout which is implemented
  529. via the AbortController
  530. :param promise:
  531. Javascript promise to await
  532. :param timeout:
  533. Timeout in seconds
  534. :param js_abort_controller:
  535. A JavaScript AbortController object, used on timeout
  536. :param request:
  537. The request being handled
  538. :param response:
  539. The response being handled (if it exists yet)
  540. :raises _TimeoutError: If the request times out
  541. :raises _RequestError: If the request raises a JavaScript exception
  542. :return: The result of awaiting the promise.
  543. """
  544. timer_id = None
  545. if timeout > 0:
  546. timer_id = js.setTimeout(
  547. js_abort_controller.abort.bind(js_abort_controller), int(timeout * 1000)
  548. )
  549. try:
  550. from pyodide.ffi import run_sync
  551. # run_sync here uses WebAssembly JavaScript Promise Integration to
  552. # suspend python until the JavaScript promise resolves.
  553. return run_sync(promise)
  554. except JsException as err:
  555. if err.name == "AbortError":
  556. raise _TimeoutError(
  557. message="Request timed out", request=request, response=response
  558. )
  559. else:
  560. raise _RequestError(message=err.message, request=request, response=response)
  561. finally:
  562. if timer_id is not None:
  563. js.clearTimeout(timer_id)
  564. def has_jspi() -> bool:
  565. """
  566. Return true if jspi can be used.
  567. This requires both browser support and also WebAssembly
  568. to be in the correct state - i.e. that the javascript
  569. call into python was async not sync.
  570. :return: True if jspi can be used.
  571. :rtype: bool
  572. """
  573. try:
  574. from pyodide.ffi import can_run_sync, run_sync # noqa: F401
  575. return bool(can_run_sync())
  576. except ImportError:
  577. return False
  578. def streaming_ready() -> bool | None:
  579. if _fetcher:
  580. return _fetcher.streaming_ready
  581. else:
  582. return None # no fetcher, return None to signify that
  583. async def wait_for_streaming_ready() -> bool:
  584. if _fetcher:
  585. await _fetcher.js_worker_ready_promise
  586. return True
  587. else:
  588. return False