_decoders.py 12 KB


  1. """
  2. Handlers for Content-Encoding.
  3. See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
  4. """
  5. from __future__ import annotations
  6. import codecs
  7. import io
  8. import typing
  9. import zlib
  10. from ._exceptions import DecodingError
  11. # Brotli support is optional
  12. try:
  13. # The C bindings in `brotli` are recommended for CPython.
  14. import brotli
  15. except ImportError: # pragma: no cover
  16. try:
  17. # The CFFI bindings in `brotlicffi` are recommended for PyPy
  18. # and other environments.
  19. import brotlicffi as brotli
  20. except ImportError:
  21. brotli = None
  22. # Zstandard support is optional
  23. try:
  24. import zstandard
  25. except ImportError: # pragma: no cover
  26. zstandard = None # type: ignore
  27. class ContentDecoder:
  28. def decode(self, data: bytes) -> bytes:
  29. raise NotImplementedError() # pragma: no cover
  30. def flush(self) -> bytes:
  31. raise NotImplementedError() # pragma: no cover
  32. class IdentityDecoder(ContentDecoder):
  33. """
  34. Handle unencoded data.
  35. """
  36. def decode(self, data: bytes) -> bytes:
  37. return data
  38. def flush(self) -> bytes:
  39. return b""
  40. class DeflateDecoder(ContentDecoder):
  41. """
  42. Handle 'deflate' decoding.
  43. See: https://stackoverflow.com/questions/1838699
  44. """
  45. def __init__(self) -> None:
  46. self.first_attempt = True
  47. self.decompressor = zlib.decompressobj()
  48. def decode(self, data: bytes) -> bytes:
  49. was_first_attempt = self.first_attempt
  50. self.first_attempt = False
  51. try:
  52. return self.decompressor.decompress(data)
  53. except zlib.error as exc:
  54. if was_first_attempt:
  55. self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
  56. return self.decode(data)
  57. raise DecodingError(str(exc)) from exc
  58. def flush(self) -> bytes:
  59. try:
  60. return self.decompressor.flush()
  61. except zlib.error as exc: # pragma: no cover
  62. raise DecodingError(str(exc)) from exc
  63. class GZipDecoder(ContentDecoder):
  64. """
  65. Handle 'gzip' decoding.
  66. See: https://stackoverflow.com/questions/1838699
  67. """
  68. def __init__(self) -> None:
  69. self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
  70. def decode(self, data: bytes) -> bytes:
  71. try:
  72. return self.decompressor.decompress(data)
  73. except zlib.error as exc:
  74. raise DecodingError(str(exc)) from exc
  75. def flush(self) -> bytes:
  76. try:
  77. return self.decompressor.flush()
  78. except zlib.error as exc: # pragma: no cover
  79. raise DecodingError(str(exc)) from exc
  80. class BrotliDecoder(ContentDecoder):
  81. """
  82. Handle 'brotli' decoding.
  83. Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
  84. or `pip install brotli`. See https://github.com/google/brotli
  85. Supports both 'brotlipy' and 'Brotli' packages since they share an import
  86. name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
  87. """
  88. def __init__(self) -> None:
  89. if brotli is None: # pragma: no cover
  90. raise ImportError(
  91. "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
  92. "packages have been installed. "
  93. "Make sure to install httpx using `pip install httpx[brotli]`."
  94. ) from None
  95. self.decompressor = brotli.Decompressor()
  96. self.seen_data = False
  97. self._decompress: typing.Callable[[bytes], bytes]
  98. if hasattr(self.decompressor, "decompress"):
  99. # The 'brotlicffi' package.
  100. self._decompress = self.decompressor.decompress # pragma: no cover
  101. else:
  102. # The 'brotli' package.
  103. self._decompress = self.decompressor.process # pragma: no cover
  104. def decode(self, data: bytes) -> bytes:
  105. if not data:
  106. return b""
  107. self.seen_data = True
  108. try:
  109. return self._decompress(data)
  110. except brotli.error as exc:
  111. raise DecodingError(str(exc)) from exc
  112. def flush(self) -> bytes:
  113. if not self.seen_data:
  114. return b""
  115. try:
  116. if hasattr(self.decompressor, "finish"):
  117. # Only available in the 'brotlicffi' package.
  118. # As the decompressor decompresses eagerly, this
  119. # will never actually emit any data. However, it will potentially throw
  120. # errors if a truncated or damaged data stream has been used.
  121. self.decompressor.finish() # pragma: no cover
  122. return b""
  123. except brotli.error as exc: # pragma: no cover
  124. raise DecodingError(str(exc)) from exc
  125. class ZStandardDecoder(ContentDecoder):
  126. """
  127. Handle 'zstd' RFC 8878 decoding.
  128. Requires `pip install zstandard`.
  129. Can be installed as a dependency of httpx using `pip install httpx[zstd]`.
  130. """
  131. # inspired by the ZstdDecoder implementation in urllib3
  132. def __init__(self) -> None:
  133. if zstandard is None: # pragma: no cover
  134. raise ImportError(
  135. "Using 'ZStandardDecoder', ..."
  136. "Make sure to install httpx using `pip install httpx[zstd]`."
  137. ) from None
  138. self.decompressor = zstandard.ZstdDecompressor().decompressobj()
  139. self.seen_data = False
  140. def decode(self, data: bytes) -> bytes:
  141. assert zstandard is not None
  142. self.seen_data = True
  143. output = io.BytesIO()
  144. try:
  145. output.write(self.decompressor.decompress(data))
  146. while self.decompressor.eof and self.decompressor.unused_data:
  147. unused_data = self.decompressor.unused_data
  148. self.decompressor = zstandard.ZstdDecompressor().decompressobj()
  149. output.write(self.decompressor.decompress(unused_data))
  150. except zstandard.ZstdError as exc:
  151. raise DecodingError(str(exc)) from exc
  152. return output.getvalue()
  153. def flush(self) -> bytes:
  154. if not self.seen_data:
  155. return b""
  156. ret = self.decompressor.flush() # note: this is a no-op
  157. if not self.decompressor.eof:
  158. raise DecodingError("Zstandard data is incomplete") # pragma: no cover
  159. return bytes(ret)
  160. class MultiDecoder(ContentDecoder):
  161. """
  162. Handle the case where multiple encodings have been applied.
  163. """
  164. def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
  165. """
  166. 'children' should be a sequence of decoders in the order in which
  167. each was applied.
  168. """
  169. # Note that we reverse the order for decoding.
  170. self.children = list(reversed(children))
  171. def decode(self, data: bytes) -> bytes:
  172. for child in self.children:
  173. data = child.decode(data)
  174. return data
  175. def flush(self) -> bytes:
  176. data = b""
  177. for child in self.children:
  178. data = child.decode(data) + child.flush()
  179. return data
  180. class ByteChunker:
  181. """
  182. Handles returning byte content in fixed-size chunks.
  183. """
  184. def __init__(self, chunk_size: int | None = None) -> None:
  185. self._buffer = io.BytesIO()
  186. self._chunk_size = chunk_size
  187. def decode(self, content: bytes) -> list[bytes]:
  188. if self._chunk_size is None:
  189. return [content] if content else []
  190. self._buffer.write(content)
  191. if self._buffer.tell() >= self._chunk_size:
  192. value = self._buffer.getvalue()
  193. chunks = [
  194. value[i : i + self._chunk_size]
  195. for i in range(0, len(value), self._chunk_size)
  196. ]
  197. if len(chunks[-1]) == self._chunk_size:
  198. self._buffer.seek(0)
  199. self._buffer.truncate()
  200. return chunks
  201. else:
  202. self._buffer.seek(0)
  203. self._buffer.write(chunks[-1])
  204. self._buffer.truncate()
  205. return chunks[:-1]
  206. else:
  207. return []
  208. def flush(self) -> list[bytes]:
  209. value = self._buffer.getvalue()
  210. self._buffer.seek(0)
  211. self._buffer.truncate()
  212. return [value] if value else []
  213. class TextChunker:
  214. """
  215. Handles returning text content in fixed-size chunks.
  216. """
  217. def __init__(self, chunk_size: int | None = None) -> None:
  218. self._buffer = io.StringIO()
  219. self._chunk_size = chunk_size
  220. def decode(self, content: str) -> list[str]:
  221. if self._chunk_size is None:
  222. return [content] if content else []
  223. self._buffer.write(content)
  224. if self._buffer.tell() >= self._chunk_size:
  225. value = self._buffer.getvalue()
  226. chunks = [
  227. value[i : i + self._chunk_size]
  228. for i in range(0, len(value), self._chunk_size)
  229. ]
  230. if len(chunks[-1]) == self._chunk_size:
  231. self._buffer.seek(0)
  232. self._buffer.truncate()
  233. return chunks
  234. else:
  235. self._buffer.seek(0)
  236. self._buffer.write(chunks[-1])
  237. self._buffer.truncate()
  238. return chunks[:-1]
  239. else:
  240. return []
  241. def flush(self) -> list[str]:
  242. value = self._buffer.getvalue()
  243. self._buffer.seek(0)
  244. self._buffer.truncate()
  245. return [value] if value else []
  246. class TextDecoder:
  247. """
  248. Handles incrementally decoding bytes into text
  249. """
  250. def __init__(self, encoding: str = "utf-8") -> None:
  251. self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
  252. def decode(self, data: bytes) -> str:
  253. return self.decoder.decode(data)
  254. def flush(self) -> str:
  255. return self.decoder.decode(b"", True)
  256. class LineDecoder:
  257. """
  258. Handles incrementally reading lines from text.
  259. Has the same behaviour as the stdllib splitlines,
  260. but handling the input iteratively.
  261. """
  262. def __init__(self) -> None:
  263. self.buffer: list[str] = []
  264. self.trailing_cr: bool = False
  265. def decode(self, text: str) -> list[str]:
  266. # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
  267. NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
  268. # We always push a trailing `\r` into the next decode iteration.
  269. if self.trailing_cr:
  270. text = "\r" + text
  271. self.trailing_cr = False
  272. if text.endswith("\r"):
  273. self.trailing_cr = True
  274. text = text[:-1]
  275. if not text:
  276. # NOTE: the edge case input of empty text doesn't occur in practice,
  277. # because other httpx internals filter out this value
  278. return [] # pragma: no cover
  279. trailing_newline = text[-1] in NEWLINE_CHARS
  280. lines = text.splitlines()
  281. if len(lines) == 1 and not trailing_newline:
  282. # No new lines, buffer the input and continue.
  283. self.buffer.append(lines[0])
  284. return []
  285. if self.buffer:
  286. # Include any existing buffer in the first portion of the
  287. # splitlines result.
  288. lines = ["".join(self.buffer) + lines[0]] + lines[1:]
  289. self.buffer = []
  290. if not trailing_newline:
  291. # If the last segment of splitlines is not newline terminated,
  292. # then drop it from our output and start a new buffer.
  293. self.buffer = [lines.pop()]
  294. return lines
  295. def flush(self) -> list[str]:
  296. if not self.buffer and not self.trailing_cr:
  297. return []
  298. lines = ["".join(self.buffer)]
  299. self.buffer = []
  300. self.trailing_cr = False
  301. return lines
  302. SUPPORTED_DECODERS = {
  303. "identity": IdentityDecoder,
  304. "gzip": GZipDecoder,
  305. "deflate": DeflateDecoder,
  306. "br": BrotliDecoder,
  307. "zstd": ZStandardDecoder,
  308. }
  309. if brotli is None:
  310. SUPPORTED_DECODERS.pop("br") # pragma: no cover
  311. if zstandard is None:
  312. SUPPORTED_DECODERS.pop("zstd") # pragma: no cover