123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- import re
- from typing import AnyStr, cast, List, overload, Sequence, Tuple, TYPE_CHECKING, Union
- from ._abnf import field_name, field_value
- from ._util import bytesify, LocalProtocolError, validate
- if TYPE_CHECKING:
- from ._events import Request
- try:
- from typing import Literal
- except ImportError:
- from typing_extensions import Literal # type: ignore
- # Facts
- # -----
- #
- # Headers are:
- # keys: case-insensitive ascii
- # values: mixture of ascii and raw bytes
- #
- # "Historically, HTTP has allowed field content with text in the ISO-8859-1
- # charset [ISO-8859-1], supporting other charsets only through use of
- # [RFC2047] encoding. In practice, most HTTP header field values use only a
- # subset of the US-ASCII charset [USASCII]. Newly defined header fields SHOULD
- # limit their field values to US-ASCII octets. A recipient SHOULD treat other
- # octets in field content (obs-text) as opaque data."
- # And it deprecates all non-ascii values
- #
- # Leading/trailing whitespace in header names is forbidden
- #
- # Values get leading/trailing whitespace stripped
- #
- # Content-Disposition actually needs to contain unicode semantically; to
- # accomplish this it has a terrifically weird way of encoding the filename
- # itself as ascii (and even this still has lots of cross-browser
- # incompatibilities)
- #
- # Order is important:
- # "a proxy MUST NOT change the order of these field values when forwarding a
- # message"
- # (and there are several headers where the order indicates a preference)
- #
- # Multiple occurences of the same header:
- # "A sender MUST NOT generate multiple header fields with the same field name
- # in a message unless either the entire field value for that header field is
- # defined as a comma-separated list [or the header is Set-Cookie which gets a
- # special exception]" - RFC 7230. (cookies are in RFC 6265)
- #
- # So every header aside from Set-Cookie can be merged by b", ".join if it
- # occurs repeatedly. But, of course, they can't necessarily be split by
- # .split(b","), because quoting.
- #
- # Given all this mess (case insensitive, duplicates allowed, order is
- # important, ...), there doesn't appear to be any standard way to handle
- # headers in Python -- they're almost like dicts, but... actually just
- # aren't. For now we punt and just use a super simple representation: headers
- # are a list of pairs
- #
- # [(name1, value1), (name2, value2), ...]
- #
- # where all entries are bytestrings, names are lowercase and have no
- # leading/trailing whitespace, and values are bytestrings with no
- # leading/trailing whitespace. Searching and updating are done via naive O(n)
- # methods.
- #
- # Maybe a dict-of-lists would be better?
- _content_length_re = re.compile(rb"[0-9]+")
- _field_name_re = re.compile(field_name.encode("ascii"))
- _field_value_re = re.compile(field_value.encode("ascii"))
- class Headers(Sequence[Tuple[bytes, bytes]]):
- """
- A list-like interface that allows iterating over headers as byte-pairs
- of (lowercased-name, value).
- Internally we actually store the representation as three-tuples,
- including both the raw original casing, in order to preserve casing
- over-the-wire, and the lowercased name, for case-insensitive comparisions.
- r = Request(
- method="GET",
- target="/",
- headers=[("Host", "example.org"), ("Connection", "keep-alive")],
- http_version="1.1",
- )
- assert r.headers == [
- (b"host", b"example.org"),
- (b"connection", b"keep-alive")
- ]
- assert r.headers.raw_items() == [
- (b"Host", b"example.org"),
- (b"Connection", b"keep-alive")
- ]
- """
- __slots__ = "_full_items"
- def __init__(self, full_items: List[Tuple[bytes, bytes, bytes]]) -> None:
- self._full_items = full_items
- def __bool__(self) -> bool:
- return bool(self._full_items)
- def __eq__(self, other: object) -> bool:
- return list(self) == list(other) # type: ignore
- def __len__(self) -> int:
- return len(self._full_items)
- def __repr__(self) -> str:
- return "<Headers(%s)>" % repr(list(self))
- def __getitem__(self, idx: int) -> Tuple[bytes, bytes]: # type: ignore[override]
- _, name, value = self._full_items[idx]
- return (name, value)
- def raw_items(self) -> List[Tuple[bytes, bytes]]:
- return [(raw_name, value) for raw_name, _, value in self._full_items]
- HeaderTypes = Union[
- List[Tuple[bytes, bytes]],
- List[Tuple[bytes, str]],
- List[Tuple[str, bytes]],
- List[Tuple[str, str]],
- ]
- @overload
- def normalize_and_validate(headers: Headers, _parsed: Literal[True]) -> Headers:
- ...
- @overload
- def normalize_and_validate(headers: HeaderTypes, _parsed: Literal[False]) -> Headers:
- ...
- @overload
- def normalize_and_validate(
- headers: Union[Headers, HeaderTypes], _parsed: bool = False
- ) -> Headers:
- ...
- def normalize_and_validate(
- headers: Union[Headers, HeaderTypes], _parsed: bool = False
- ) -> Headers:
- new_headers = []
- seen_content_length = None
- saw_transfer_encoding = False
- for name, value in headers:
- # For headers coming out of the parser, we can safely skip some steps,
- # because it always returns bytes and has already run these regexes
- # over the data:
- if not _parsed:
- name = bytesify(name)
- value = bytesify(value)
- validate(_field_name_re, name, "Illegal header name {!r}", name)
- validate(_field_value_re, value, "Illegal header value {!r}", value)
- assert isinstance(name, bytes)
- assert isinstance(value, bytes)
- raw_name = name
- name = name.lower()
- if name == b"content-length":
- lengths = {length.strip() for length in value.split(b",")}
- if len(lengths) != 1:
- raise LocalProtocolError("conflicting Content-Length headers")
- value = lengths.pop()
- validate(_content_length_re, value, "bad Content-Length")
- if seen_content_length is None:
- seen_content_length = value
- new_headers.append((raw_name, name, value))
- elif seen_content_length != value:
- raise LocalProtocolError("conflicting Content-Length headers")
- elif name == b"transfer-encoding":
- # "A server that receives a request message with a transfer coding
- # it does not understand SHOULD respond with 501 (Not
- # Implemented)."
- # https://tools.ietf.org/html/rfc7230#section-3.3.1
- if saw_transfer_encoding:
- raise LocalProtocolError(
- "multiple Transfer-Encoding headers", error_status_hint=501
- )
- # "All transfer-coding names are case-insensitive"
- # -- https://tools.ietf.org/html/rfc7230#section-4
- value = value.lower()
- if value != b"chunked":
- raise LocalProtocolError(
- "Only Transfer-Encoding: chunked is supported",
- error_status_hint=501,
- )
- saw_transfer_encoding = True
- new_headers.append((raw_name, name, value))
- else:
- new_headers.append((raw_name, name, value))
- return Headers(new_headers)
- def get_comma_header(headers: Headers, name: bytes) -> List[bytes]:
- # Should only be used for headers whose value is a list of
- # comma-separated, case-insensitive values.
- #
- # The header name `name` is expected to be lower-case bytes.
- #
- # Connection: meets these criteria (including cast insensitivity).
- #
- # Content-Length: technically is just a single value (1*DIGIT), but the
- # standard makes reference to implementations that do multiple values, and
- # using this doesn't hurt. Ditto, case insensitivity doesn't things either
- # way.
- #
- # Transfer-Encoding: is more complex (allows for quoted strings), so
- # splitting on , is actually wrong. For example, this is legal:
- #
- # Transfer-Encoding: foo; options="1,2", chunked
- #
- # and should be parsed as
- #
- # foo; options="1,2"
- # chunked
- #
- # but this naive function will parse it as
- #
- # foo; options="1
- # 2"
- # chunked
- #
- # However, this is okay because the only thing we are going to do with
- # any Transfer-Encoding is reject ones that aren't just "chunked", so
- # both of these will be treated the same anyway.
- #
- # Expect: the only legal value is the literal string
- # "100-continue". Splitting on commas is harmless. Case insensitive.
- #
- out: List[bytes] = []
- for _, found_name, found_raw_value in headers._full_items:
- if found_name == name:
- found_raw_value = found_raw_value.lower()
- for found_split_value in found_raw_value.split(b","):
- found_split_value = found_split_value.strip()
- if found_split_value:
- out.append(found_split_value)
- return out
- def set_comma_header(headers: Headers, name: bytes, new_values: List[bytes]) -> Headers:
- # The header name `name` is expected to be lower-case bytes.
- #
- # Note that when we store the header we use title casing for the header
- # names, in order to match the conventional HTTP header style.
- #
- # Simply calling `.title()` is a blunt approach, but it's correct
- # here given the cases where we're using `set_comma_header`...
- #
- # Connection, Content-Length, Transfer-Encoding.
- new_headers: List[Tuple[bytes, bytes]] = []
- for found_raw_name, found_name, found_raw_value in headers._full_items:
- if found_name != name:
- new_headers.append((found_raw_name, found_raw_value))
- for new_value in new_values:
- new_headers.append((name.title(), new_value))
- return normalize_and_validate(new_headers)
- def has_expect_100_continue(request: "Request") -> bool:
- # https://tools.ietf.org/html/rfc7231#section-5.1.1
- # "A server that receives a 100-continue expectation in an HTTP/1.0 request
- # MUST ignore that expectation."
- if request.http_version < b"1.1":
- return False
- expect = get_comma_header(request.headers, b"expect")
- return b"100-continue" in expect
|