request.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. from __future__ import annotations
  2. import io
  3. import typing
  4. from base64 import b64encode
  5. from enum import Enum
  6. from ..exceptions import UnrewindableBodyError
  7. from .util import to_bytes
  8. if typing.TYPE_CHECKING:
  9. from typing import Final
  10. # Pass as a value within ``headers`` to skip
  11. # emitting some HTTP headers that are added automatically.
  12. # The only headers that are supported are ``Accept-Encoding``,
  13. # ``Host``, and ``User-Agent``.
  14. SKIP_HEADER = "@@@SKIP_HEADER@@@"
  15. SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
  16. ACCEPT_ENCODING = "gzip,deflate"
  17. try:
  18. try:
  19. import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
  20. except ImportError:
  21. import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
  22. except ImportError:
  23. pass
  24. else:
  25. ACCEPT_ENCODING += ",br"
  26. try:
  27. import zstandard as _unused_module_zstd # noqa: F401
  28. except ImportError:
  29. pass
  30. else:
  31. ACCEPT_ENCODING += ",zstd"
  32. class _TYPE_FAILEDTELL(Enum):
  33. token = 0
  34. _FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
  35. _TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
  36. # When sending a request with these methods we aren't expecting
  37. # a body so don't need to set an explicit 'Content-Length: 0'
  38. # The reason we do this in the negative instead of tracking methods
  39. # which 'should' have a body is because unknown methods should be
  40. # treated as if they were 'POST' which *does* expect a body.
  41. _METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
  42. def make_headers(
  43. keep_alive: bool | None = None,
  44. accept_encoding: bool | list[str] | str | None = None,
  45. user_agent: str | None = None,
  46. basic_auth: str | None = None,
  47. proxy_basic_auth: str | None = None,
  48. disable_cache: bool | None = None,
  49. ) -> dict[str, str]:
  50. """
  51. Shortcuts for generating request headers.
  52. :param keep_alive:
  53. If ``True``, adds 'connection: keep-alive' header.
  54. :param accept_encoding:
  55. Can be a boolean, list, or string.
  56. ``True`` translates to 'gzip,deflate'. If the dependencies for
  57. Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or Zstandard
  58. (the ``zstandard`` package) algorithms are installed, then their encodings are
  59. included in the string ('br' and 'zstd', respectively).
  60. List will get joined by comma.
  61. String will be used as provided.
  62. :param user_agent:
  63. String representing the user-agent you want, such as
  64. "python-urllib3/0.6"
  65. :param basic_auth:
  66. Colon-separated username:password string for 'authorization: basic ...'
  67. auth header.
  68. :param proxy_basic_auth:
  69. Colon-separated username:password string for 'proxy-authorization: basic ...'
  70. auth header.
  71. :param disable_cache:
  72. If ``True``, adds 'cache-control: no-cache' header.
  73. Example:
  74. .. code-block:: python
  75. import urllib3
  76. print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
  77. # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
  78. print(urllib3.util.make_headers(accept_encoding=True))
  79. # {'accept-encoding': 'gzip,deflate'}
  80. """
  81. headers: dict[str, str] = {}
  82. if accept_encoding:
  83. if isinstance(accept_encoding, str):
  84. pass
  85. elif isinstance(accept_encoding, list):
  86. accept_encoding = ",".join(accept_encoding)
  87. else:
  88. accept_encoding = ACCEPT_ENCODING
  89. headers["accept-encoding"] = accept_encoding
  90. if user_agent:
  91. headers["user-agent"] = user_agent
  92. if keep_alive:
  93. headers["connection"] = "keep-alive"
  94. if basic_auth:
  95. headers["authorization"] = (
  96. f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
  97. )
  98. if proxy_basic_auth:
  99. headers["proxy-authorization"] = (
  100. f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
  101. )
  102. if disable_cache:
  103. headers["cache-control"] = "no-cache"
  104. return headers
  105. def set_file_position(
  106. body: typing.Any, pos: _TYPE_BODY_POSITION | None
  107. ) -> _TYPE_BODY_POSITION | None:
  108. """
  109. If a position is provided, move file to that point.
  110. Otherwise, we'll attempt to record a position for future use.
  111. """
  112. if pos is not None:
  113. rewind_body(body, pos)
  114. elif getattr(body, "tell", None) is not None:
  115. try:
  116. pos = body.tell()
  117. except OSError:
  118. # This differentiates from None, allowing us to catch
  119. # a failed `tell()` later when trying to rewind the body.
  120. pos = _FAILEDTELL
  121. return pos
  122. def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
  123. """
  124. Attempt to rewind body to a certain position.
  125. Primarily used for request redirects and retries.
  126. :param body:
  127. File-like object that supports seek.
  128. :param int pos:
  129. Position to seek to in file.
  130. """
  131. body_seek = getattr(body, "seek", None)
  132. if body_seek is not None and isinstance(body_pos, int):
  133. try:
  134. body_seek(body_pos)
  135. except OSError as e:
  136. raise UnrewindableBodyError(
  137. "An error occurred when rewinding request body for redirect/retry."
  138. ) from e
  139. elif body_pos is _FAILEDTELL:
  140. raise UnrewindableBodyError(
  141. "Unable to record file position for rewinding "
  142. "request body during a redirect/retry."
  143. )
  144. else:
  145. raise ValueError(
  146. f"body_pos must be of type integer, instead it was {type(body_pos)}."
  147. )
  148. class ChunksAndContentLength(typing.NamedTuple):
  149. chunks: typing.Iterable[bytes] | None
  150. content_length: int | None
  151. def body_to_chunks(
  152. body: typing.Any | None, method: str, blocksize: int
  153. ) -> ChunksAndContentLength:
  154. """Takes the HTTP request method, body, and blocksize and
  155. transforms them into an iterable of chunks to pass to
  156. socket.sendall() and an optional 'Content-Length' header.
  157. A 'Content-Length' of 'None' indicates the length of the body
  158. can't be determined so should use 'Transfer-Encoding: chunked'
  159. for framing instead.
  160. """
  161. chunks: typing.Iterable[bytes] | None
  162. content_length: int | None
  163. # No body, we need to make a recommendation on 'Content-Length'
  164. # based on whether that request method is expected to have
  165. # a body or not.
  166. if body is None:
  167. chunks = None
  168. if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
  169. content_length = 0
  170. else:
  171. content_length = None
  172. # Bytes or strings become bytes
  173. elif isinstance(body, (str, bytes)):
  174. chunks = (to_bytes(body),)
  175. content_length = len(chunks[0])
  176. # File-like object, TODO: use seek() and tell() for length?
  177. elif hasattr(body, "read"):
  178. def chunk_readable() -> typing.Iterable[bytes]:
  179. nonlocal body, blocksize
  180. encode = isinstance(body, io.TextIOBase)
  181. while True:
  182. datablock = body.read(blocksize)
  183. if not datablock:
  184. break
  185. if encode:
  186. datablock = datablock.encode("utf-8")
  187. yield datablock
  188. chunks = chunk_readable()
  189. content_length = None
  190. # Otherwise we need to start checking via duck-typing.
  191. else:
  192. try:
  193. # Check if the body implements the buffer API.
  194. mv = memoryview(body)
  195. except TypeError:
  196. try:
  197. # Check if the body is an iterable
  198. chunks = iter(body)
  199. content_length = None
  200. except TypeError:
  201. raise TypeError(
  202. f"'body' must be a bytes-like object, file-like "
  203. f"object, or iterable. Instead was {body!r}"
  204. ) from None
  205. else:
  206. # Since it implements the buffer API can be passed directly to socket.sendall()
  207. chunks = (body,)
  208. content_length = mv.nbytes
  209. return ChunksAndContentLength(chunks=chunks, content_length=content_length)