123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708 |
- """
- Support for streaming http requests in emscripten.
- A few caveats -
- If your browser (or Node.js) has WebAssembly JavaScript Promise Integration enabled
- https://github.com/WebAssembly/js-promise-integration/blob/main/proposals/js-promise-integration/Overview.md
- *and* you launch pyodide using `pyodide.runPythonAsync`, this will fetch data using the
- JavaScript asynchronous fetch api (wrapped via `pyodide.ffi.call_sync`). In this case
- timeouts and streaming should just work.
- Otherwise, it uses a combination of XMLHttpRequest and a web-worker for streaming.
- This approach has several caveats:
- Firstly, you can't do streaming http in the main UI thread, because atomics.wait isn't allowed.
- Streaming only works if you're running pyodide in a web worker.
- Secondly, this uses an extra web worker and SharedArrayBuffer to do the asynchronous fetch
- operation, so it requires that you have crossOriginIsolation enabled, by serving over https
- (or from localhost) with the two headers below set:
- Cross-Origin-Opener-Policy: same-origin
- Cross-Origin-Embedder-Policy: require-corp
- You can tell if cross origin isolation is successfully enabled by looking at the global crossOriginIsolated variable in
- JavaScript console. If it isn't, streaming requests will fallback to XMLHttpRequest, i.e. getting the whole
- request into a buffer and then returning it. it shows a warning in the JavaScript console in this case.
- Finally, the webworker which does the streaming fetch is created on initial import, but will only be started once
- control is returned to javascript. Call `await wait_for_streaming_ready()` to wait for streaming fetch.
- NB: in this code, there are a lot of JavaScript objects. They are named js_*
- to make it clear what type of object they are.
- """
- from __future__ import annotations
- import io
- import json
- from email.parser import Parser
- from importlib.resources import files
- from typing import TYPE_CHECKING, Any
- import js # type: ignore[import-not-found]
- from pyodide.ffi import ( # type: ignore[import-not-found]
- JsArray,
- JsException,
- JsProxy,
- to_js,
- )
- if TYPE_CHECKING:
- from typing_extensions import Buffer
- from .request import EmscriptenRequest
- from .response import EmscriptenResponse
- """
- There are some headers that trigger unintended CORS preflight requests.
- See also https://github.com/koenvo/pyodide-http/issues/22
- """
- HEADERS_TO_IGNORE = ("user-agent",)
- SUCCESS_HEADER = -1
- SUCCESS_EOF = -2
- ERROR_TIMEOUT = -3
- ERROR_EXCEPTION = -4
- _STREAMING_WORKER_CODE = (
- files(__package__)
- .joinpath("emscripten_fetch_worker.js")
- .read_text(encoding="utf-8")
- )
- class _RequestError(Exception):
- def __init__(
- self,
- message: str | None = None,
- *,
- request: EmscriptenRequest | None = None,
- response: EmscriptenResponse | None = None,
- ):
- self.request = request
- self.response = response
- self.message = message
- super().__init__(self.message)
- class _StreamingError(_RequestError):
- pass
- class _TimeoutError(_RequestError):
- pass
- def _obj_from_dict(dict_val: dict[str, Any]) -> JsProxy:
- return to_js(dict_val, dict_converter=js.Object.fromEntries)
- class _ReadStream(io.RawIOBase):
- def __init__(
- self,
- int_buffer: JsArray,
- byte_buffer: JsArray,
- timeout: float,
- worker: JsProxy,
- connection_id: int,
- request: EmscriptenRequest,
- ):
- self.int_buffer = int_buffer
- self.byte_buffer = byte_buffer
- self.read_pos = 0
- self.read_len = 0
- self.connection_id = connection_id
- self.worker = worker
- self.timeout = int(1000 * timeout) if timeout > 0 else None
- self.is_live = True
- self._is_closed = False
- self.request: EmscriptenRequest | None = request
- def __del__(self) -> None:
- self.close()
- # this is compatible with _base_connection
- def is_closed(self) -> bool:
- return self._is_closed
- # for compatibility with RawIOBase
- @property
- def closed(self) -> bool:
- return self.is_closed()
- def close(self) -> None:
- if self.is_closed():
- return
- self.read_len = 0
- self.read_pos = 0
- self.int_buffer = None
- self.byte_buffer = None
- self._is_closed = True
- self.request = None
- if self.is_live:
- self.worker.postMessage(_obj_from_dict({"close": self.connection_id}))
- self.is_live = False
- super().close()
- def readable(self) -> bool:
- return True
- def writable(self) -> bool:
- return False
- def seekable(self) -> bool:
- return False
- def readinto(self, byte_obj: Buffer) -> int:
- if not self.int_buffer:
- raise _StreamingError(
- "No buffer for stream in _ReadStream.readinto",
- request=self.request,
- response=None,
- )
- if self.read_len == 0:
- # wait for the worker to send something
- js.Atomics.store(self.int_buffer, 0, ERROR_TIMEOUT)
- self.worker.postMessage(_obj_from_dict({"getMore": self.connection_id}))
- if (
- js.Atomics.wait(self.int_buffer, 0, ERROR_TIMEOUT, self.timeout)
- == "timed-out"
- ):
- raise _TimeoutError
- data_len = self.int_buffer[0]
- if data_len > 0:
- self.read_len = data_len
- self.read_pos = 0
- elif data_len == ERROR_EXCEPTION:
- string_len = self.int_buffer[1]
- # decode the error string
- js_decoder = js.TextDecoder.new()
- json_str = js_decoder.decode(self.byte_buffer.slice(0, string_len))
- raise _StreamingError(
- f"Exception thrown in fetch: {json_str}",
- request=self.request,
- response=None,
- )
- else:
- # EOF, free the buffers and return zero
- # and free the request
- self.is_live = False
- self.close()
- return 0
- # copy from int32array to python bytes
- ret_length = min(self.read_len, len(memoryview(byte_obj)))
- subarray = self.byte_buffer.subarray(
- self.read_pos, self.read_pos + ret_length
- ).to_py()
- memoryview(byte_obj)[0:ret_length] = subarray
- self.read_len -= ret_length
- self.read_pos += ret_length
- return ret_length
- class _StreamingFetcher:
- def __init__(self) -> None:
- # make web-worker and data buffer on startup
- self.streaming_ready = False
- js_data_blob = js.Blob.new(
- to_js([_STREAMING_WORKER_CODE], create_pyproxies=False),
- _obj_from_dict({"type": "application/javascript"}),
- )
- def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None:
- def onMsg(e: JsProxy) -> None:
- self.streaming_ready = True
- js_resolve_fn(e)
- def onErr(e: JsProxy) -> None:
- js_reject_fn(e) # Defensive: never happens in ci
- self.js_worker.onmessage = onMsg
- self.js_worker.onerror = onErr
- js_data_url = js.URL.createObjectURL(js_data_blob)
- self.js_worker = js.globalThis.Worker.new(js_data_url)
- self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver)
- def send(self, request: EmscriptenRequest) -> EmscriptenResponse:
- headers = {
- k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE
- }
- body = request.body
- fetch_data = {"headers": headers, "body": to_js(body), "method": request.method}
- # start the request off in the worker
- timeout = int(1000 * request.timeout) if request.timeout > 0 else None
- js_shared_buffer = js.SharedArrayBuffer.new(1048576)
- js_int_buffer = js.Int32Array.new(js_shared_buffer)
- js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8)
- js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT)
- js.Atomics.notify(js_int_buffer, 0)
- js_absolute_url = js.URL.new(request.url, js.location).href
- self.js_worker.postMessage(
- _obj_from_dict(
- {
- "buffer": js_shared_buffer,
- "url": js_absolute_url,
- "fetchParams": fetch_data,
- }
- )
- )
- # wait for the worker to send something
- js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout)
- if js_int_buffer[0] == ERROR_TIMEOUT:
- raise _TimeoutError(
- "Timeout connecting to streaming request",
- request=request,
- response=None,
- )
- elif js_int_buffer[0] == SUCCESS_HEADER:
- # got response
- # header length is in second int of intBuffer
- string_len = js_int_buffer[1]
- # decode the rest to a JSON string
- js_decoder = js.TextDecoder.new()
- # this does a copy (the slice) because decode can't work on shared array
- # for some silly reason
- json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
- # get it as an object
- response_obj = json.loads(json_str)
- return EmscriptenResponse(
- request=request,
- status_code=response_obj["status"],
- headers=response_obj["headers"],
- body=_ReadStream(
- js_int_buffer,
- js_byte_buffer,
- request.timeout,
- self.js_worker,
- response_obj["connectionID"],
- request,
- ),
- )
- elif js_int_buffer[0] == ERROR_EXCEPTION:
- string_len = js_int_buffer[1]
- # decode the error string
- js_decoder = js.TextDecoder.new()
- json_str = js_decoder.decode(js_byte_buffer.slice(0, string_len))
- raise _StreamingError(
- f"Exception thrown in fetch: {json_str}", request=request, response=None
- )
- else:
- raise _StreamingError(
- f"Unknown status from worker in fetch: {js_int_buffer[0]}",
- request=request,
- response=None,
- )
- class _JSPIReadStream(io.RawIOBase):
- """
- A read stream that uses pyodide.ffi.run_sync to read from a JavaScript fetch
- response. This requires support for WebAssembly JavaScript Promise Integration
- in the containing browser, and for pyodide to be launched via runPythonAsync.
- :param js_read_stream:
- The JavaScript stream reader
- :param timeout:
- Timeout in seconds
- :param request:
- The request we're handling
- :param response:
- The response this stream relates to
- :param js_abort_controller:
- A JavaScript AbortController object, used for timeouts
- """
- def __init__(
- self,
- js_read_stream: Any,
- timeout: float,
- request: EmscriptenRequest,
- response: EmscriptenResponse,
- js_abort_controller: Any, # JavaScript AbortController for timeouts
- ):
- self.js_read_stream = js_read_stream
- self.timeout = timeout
- self._is_closed = False
- self._is_done = False
- self.request: EmscriptenRequest | None = request
- self.response: EmscriptenResponse | None = response
- self.current_buffer = None
- self.current_buffer_pos = 0
- self.js_abort_controller = js_abort_controller
- def __del__(self) -> None:
- self.close()
- # this is compatible with _base_connection
- def is_closed(self) -> bool:
- return self._is_closed
- # for compatibility with RawIOBase
- @property
- def closed(self) -> bool:
- return self.is_closed()
- def close(self) -> None:
- if self.is_closed():
- return
- self.read_len = 0
- self.read_pos = 0
- self.js_read_stream.cancel()
- self.js_read_stream = None
- self._is_closed = True
- self._is_done = True
- self.request = None
- self.response = None
- super().close()
- def readable(self) -> bool:
- return True
- def writable(self) -> bool:
- return False
- def seekable(self) -> bool:
- return False
- def _get_next_buffer(self) -> bool:
- result_js = _run_sync_with_timeout(
- self.js_read_stream.read(),
- self.timeout,
- self.js_abort_controller,
- request=self.request,
- response=self.response,
- )
- if result_js.done:
- self._is_done = True
- return False
- else:
- self.current_buffer = result_js.value.to_py()
- self.current_buffer_pos = 0
- return True
- def readinto(self, byte_obj: Buffer) -> int:
- if self.current_buffer is None:
- if not self._get_next_buffer() or self.current_buffer is None:
- self.close()
- return 0
- ret_length = min(
- len(byte_obj), len(self.current_buffer) - self.current_buffer_pos
- )
- byte_obj[0:ret_length] = self.current_buffer[
- self.current_buffer_pos : self.current_buffer_pos + ret_length
- ]
- self.current_buffer_pos += ret_length
- if self.current_buffer_pos == len(self.current_buffer):
- self.current_buffer = None
- return ret_length
- # check if we are in a worker or not
- def is_in_browser_main_thread() -> bool:
- return hasattr(js, "window") and hasattr(js, "self") and js.self == js.window
- def is_cross_origin_isolated() -> bool:
- return hasattr(js, "crossOriginIsolated") and js.crossOriginIsolated
- def is_in_node() -> bool:
- return (
- hasattr(js, "process")
- and hasattr(js.process, "release")
- and hasattr(js.process.release, "name")
- and js.process.release.name == "node"
- )
- def is_worker_available() -> bool:
- return hasattr(js, "Worker") and hasattr(js, "Blob")
- _fetcher: _StreamingFetcher | None = None
- if is_worker_available() and (
- (is_cross_origin_isolated() and not is_in_browser_main_thread())
- and (not is_in_node())
- ):
- _fetcher = _StreamingFetcher()
- else:
- _fetcher = None
- NODE_JSPI_ERROR = (
- "urllib3 only works in Node.js with pyodide.runPythonAsync"
- " and requires the flag --experimental-wasm-stack-switching in "
- " versions of node <24."
- )
- def send_streaming_request(request: EmscriptenRequest) -> EmscriptenResponse | None:
- if has_jspi():
- return send_jspi_request(request, True)
- elif is_in_node():
- raise _RequestError(
- message=NODE_JSPI_ERROR,
- request=request,
- response=None,
- )
- if _fetcher and streaming_ready():
- return _fetcher.send(request)
- else:
- _show_streaming_warning()
- return None
- _SHOWN_TIMEOUT_WARNING = False
- def _show_timeout_warning() -> None:
- global _SHOWN_TIMEOUT_WARNING
- if not _SHOWN_TIMEOUT_WARNING:
- _SHOWN_TIMEOUT_WARNING = True
- message = "Warning: Timeout is not available on main browser thread"
- js.console.warn(message)
- _SHOWN_STREAMING_WARNING = False
- def _show_streaming_warning() -> None:
- global _SHOWN_STREAMING_WARNING
- if not _SHOWN_STREAMING_WARNING:
- _SHOWN_STREAMING_WARNING = True
- message = "Can't stream HTTP requests because: \n"
- if not is_cross_origin_isolated():
- message += " Page is not cross-origin isolated\n"
- if is_in_browser_main_thread():
- message += " Python is running in main browser thread\n"
- if not is_worker_available():
- message += " Worker or Blob classes are not available in this environment." # Defensive: this is always False in browsers that we test in
- if streaming_ready() is False:
- message += """ Streaming fetch worker isn't ready. If you want to be sure that streaming fetch
- is working, you need to call: 'await urllib3.contrib.emscripten.fetch.wait_for_streaming_ready()`"""
- from js import console
- console.warn(message)
- def send_request(request: EmscriptenRequest) -> EmscriptenResponse:
- if has_jspi():
- return send_jspi_request(request, False)
- elif is_in_node():
- raise _RequestError(
- message=NODE_JSPI_ERROR,
- request=request,
- response=None,
- )
- try:
- js_xhr = js.XMLHttpRequest.new()
- if not is_in_browser_main_thread():
- js_xhr.responseType = "arraybuffer"
- if request.timeout:
- js_xhr.timeout = int(request.timeout * 1000)
- else:
- js_xhr.overrideMimeType("text/plain; charset=ISO-8859-15")
- if request.timeout:
- # timeout isn't available on the main thread - show a warning in console
- # if it is set
- _show_timeout_warning()
- js_xhr.open(request.method, request.url, False)
- for name, value in request.headers.items():
- if name.lower() not in HEADERS_TO_IGNORE:
- js_xhr.setRequestHeader(name, value)
- js_xhr.send(to_js(request.body))
- headers = dict(Parser().parsestr(js_xhr.getAllResponseHeaders()))
- if not is_in_browser_main_thread():
- body = js_xhr.response.to_py().tobytes()
- else:
- body = js_xhr.response.encode("ISO-8859-15")
- return EmscriptenResponse(
- status_code=js_xhr.status, headers=headers, body=body, request=request
- )
- except JsException as err:
- if err.name == "TimeoutError":
- raise _TimeoutError(err.message, request=request)
- elif err.name == "NetworkError":
- raise _RequestError(err.message, request=request)
- else:
- # general http error
- raise _RequestError(err.message, request=request)
- def send_jspi_request(
- request: EmscriptenRequest, streaming: bool
- ) -> EmscriptenResponse:
- """
- Send a request using WebAssembly JavaScript Promise Integration
- to wrap the asynchronous JavaScript fetch api (experimental).
- :param request:
- Request to send
- :param streaming:
- Whether to stream the response
- :return: The response object
- :rtype: EmscriptenResponse
- """
- timeout = request.timeout
- js_abort_controller = js.AbortController.new()
- headers = {k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE}
- req_body = request.body
- fetch_data = {
- "headers": headers,
- "body": to_js(req_body),
- "method": request.method,
- "signal": js_abort_controller.signal,
- }
- # Call JavaScript fetch (async api, returns a promise)
- fetcher_promise_js = js.fetch(request.url, _obj_from_dict(fetch_data))
- # Now suspend WebAssembly until we resolve that promise
- # or time out.
- response_js = _run_sync_with_timeout(
- fetcher_promise_js,
- timeout,
- js_abort_controller,
- request=request,
- response=None,
- )
- headers = {}
- header_iter = response_js.headers.entries()
- while True:
- iter_value_js = header_iter.next()
- if getattr(iter_value_js, "done", False):
- break
- else:
- headers[str(iter_value_js.value[0])] = str(iter_value_js.value[1])
- status_code = response_js.status
- body: bytes | io.RawIOBase = b""
- response = EmscriptenResponse(
- status_code=status_code, headers=headers, body=b"", request=request
- )
- if streaming:
- # get via inputstream
- if response_js.body is not None:
- # get a reader from the fetch response
- body_stream_js = response_js.body.getReader()
- body = _JSPIReadStream(
- body_stream_js, timeout, request, response, js_abort_controller
- )
- else:
- # get directly via arraybuffer
- # n.b. this is another async JavaScript call.
- body = _run_sync_with_timeout(
- response_js.arrayBuffer(),
- timeout,
- js_abort_controller,
- request=request,
- response=response,
- ).to_py()
- response.body = body
- return response
- def _run_sync_with_timeout(
- promise: Any,
- timeout: float,
- js_abort_controller: Any,
- request: EmscriptenRequest | None,
- response: EmscriptenResponse | None,
- ) -> Any:
- """
- Await a JavaScript promise synchronously with a timeout which is implemented
- via the AbortController
- :param promise:
- Javascript promise to await
- :param timeout:
- Timeout in seconds
- :param js_abort_controller:
- A JavaScript AbortController object, used on timeout
- :param request:
- The request being handled
- :param response:
- The response being handled (if it exists yet)
- :raises _TimeoutError: If the request times out
- :raises _RequestError: If the request raises a JavaScript exception
- :return: The result of awaiting the promise.
- """
- timer_id = None
- if timeout > 0:
- timer_id = js.setTimeout(
- js_abort_controller.abort.bind(js_abort_controller), int(timeout * 1000)
- )
- try:
- from pyodide.ffi import run_sync
- # run_sync here uses WebAssembly JavaScript Promise Integration to
- # suspend python until the JavaScript promise resolves.
- return run_sync(promise)
- except JsException as err:
- if err.name == "AbortError":
- raise _TimeoutError(
- message="Request timed out", request=request, response=response
- )
- else:
- raise _RequestError(message=err.message, request=request, response=response)
- finally:
- if timer_id is not None:
- js.clearTimeout(timer_id)
- def has_jspi() -> bool:
- """
- Return true if jspi can be used.
- This requires both browser support and also WebAssembly
- to be in the correct state - i.e. that the javascript
- call into python was async not sync.
- :return: True if jspi can be used.
- :rtype: bool
- """
- try:
- from pyodide.ffi import can_run_sync, run_sync # noqa: F401
- return bool(can_run_sync())
- except ImportError:
- return False
- def streaming_ready() -> bool | None:
- if _fetcher:
- return _fetcher.streaming_ready
- else:
- return None # no fetcher, return None to signify that
- async def wait_for_streaming_ready() -> bool:
- if _fetcher:
- await _fetcher.js_worker_ready_promise
- return True
- else:
- return False
|