123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572 |
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2009-2017 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """EDNS Options"""
- import binascii
- import math
- import socket
- import struct
- from typing import Any, Dict, Optional, Union
- import dns.enum
- import dns.inet
- import dns.rdata
- import dns.wire
- class OptionType(dns.enum.IntEnum):
- #: NSID
- NSID = 3
- #: DAU
- DAU = 5
- #: DHU
- DHU = 6
- #: N3U
- N3U = 7
- #: ECS (client-subnet)
- ECS = 8
- #: EXPIRE
- EXPIRE = 9
- #: COOKIE
- COOKIE = 10
- #: KEEPALIVE
- KEEPALIVE = 11
- #: PADDING
- PADDING = 12
- #: CHAIN
- CHAIN = 13
- #: EDE (extended-dns-error)
- EDE = 15
- #: REPORTCHANNEL
- REPORTCHANNEL = 18
- @classmethod
- def _maximum(cls):
- return 65535
- class Option:
- """Base class for all EDNS option types."""
- def __init__(self, otype: Union[OptionType, str]):
- """Initialize an option.
- *otype*, a ``dns.edns.OptionType``, is the option type.
- """
- self.otype = OptionType.make(otype)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
- """Convert an option to wire format.
- Returns a ``bytes`` or ``None``.
- """
- raise NotImplementedError # pragma: no cover
- def to_text(self) -> str:
- raise NotImplementedError # pragma: no cover
- @classmethod
- def from_wire_parser(cls, otype: OptionType, parser: "dns.wire.Parser") -> "Option":
- """Build an EDNS option object from wire format.
- *otype*, a ``dns.edns.OptionType``, is the option type.
- *parser*, a ``dns.wire.Parser``, the parser, which should be
- restructed to the option length.
- Returns a ``dns.edns.Option``.
- """
- raise NotImplementedError # pragma: no cover
- def _cmp(self, other):
- """Compare an EDNS option with another option of the same type.
- Returns < 0 if < *other*, 0 if == *other*, and > 0 if > *other*.
- """
- wire = self.to_wire()
- owire = other.to_wire()
- if wire == owire:
- return 0
- if wire > owire:
- return 1
- return -1
- def __eq__(self, other):
- if not isinstance(other, Option):
- return False
- if self.otype != other.otype:
- return False
- return self._cmp(other) == 0
- def __ne__(self, other):
- if not isinstance(other, Option):
- return True
- if self.otype != other.otype:
- return True
- return self._cmp(other) != 0
- def __lt__(self, other):
- if not isinstance(other, Option) or self.otype != other.otype:
- return NotImplemented
- return self._cmp(other) < 0
- def __le__(self, other):
- if not isinstance(other, Option) or self.otype != other.otype:
- return NotImplemented
- return self._cmp(other) <= 0
- def __ge__(self, other):
- if not isinstance(other, Option) or self.otype != other.otype:
- return NotImplemented
- return self._cmp(other) >= 0
- def __gt__(self, other):
- if not isinstance(other, Option) or self.otype != other.otype:
- return NotImplemented
- return self._cmp(other) > 0
- def __str__(self):
- return self.to_text()
- class GenericOption(Option): # lgtm[py/missing-equals]
- """Generic Option Class
- This class is used for EDNS option types for which we have no better
- implementation.
- """
- def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]):
- super().__init__(otype)
- self.data = dns.rdata.Rdata._as_bytes(data, True)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
- if file:
- file.write(self.data)
- return None
- else:
- return self.data
- def to_text(self) -> str:
- return "Generic %d" % self.otype
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
- ) -> Option:
- return cls(otype, parser.get_remaining())
- class ECSOption(Option): # lgtm[py/missing-equals]
- """EDNS Client Subnet (ECS, RFC7871)"""
- def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0):
- """*address*, a ``str``, is the client address information.
- *srclen*, an ``int``, the source prefix length, which is the
- leftmost number of bits of the address to be used for the
- lookup. The default is 24 for IPv4 and 56 for IPv6.
- *scopelen*, an ``int``, the scope prefix length. This value
- must be 0 in queries, and should be set in responses.
- """
- super().__init__(OptionType.ECS)
- af = dns.inet.af_for_address(address)
- if af == socket.AF_INET6:
- self.family = 2
- if srclen is None:
- srclen = 56
- address = dns.rdata.Rdata._as_ipv6_address(address)
- srclen = dns.rdata.Rdata._as_int(srclen, 0, 128)
- scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 128)
- elif af == socket.AF_INET:
- self.family = 1
- if srclen is None:
- srclen = 24
- address = dns.rdata.Rdata._as_ipv4_address(address)
- srclen = dns.rdata.Rdata._as_int(srclen, 0, 32)
- scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 32)
- else: # pragma: no cover (this will never happen)
- raise ValueError("Bad address family")
- assert srclen is not None
- self.address = address
- self.srclen = srclen
- self.scopelen = scopelen
- addrdata = dns.inet.inet_pton(af, address)
- nbytes = int(math.ceil(srclen / 8.0))
- # Truncate to srclen and pad to the end of the last octet needed
- # See RFC section 6
- self.addrdata = addrdata[:nbytes]
- nbits = srclen % 8
- if nbits != 0:
- last = struct.pack("B", ord(self.addrdata[-1:]) & (0xFF << (8 - nbits)))
- self.addrdata = self.addrdata[:-1] + last
- def to_text(self) -> str:
- return f"ECS {self.address}/{self.srclen} scope/{self.scopelen}"
- @staticmethod
- def from_text(text: str) -> Option:
- """Convert a string into a `dns.edns.ECSOption`
- *text*, a `str`, the text form of the option.
- Returns a `dns.edns.ECSOption`.
- Examples:
- >>> import dns.edns
- >>>
- >>> # basic example
- >>> dns.edns.ECSOption.from_text('1.2.3.4/24')
- >>>
- >>> # also understands scope
- >>> dns.edns.ECSOption.from_text('1.2.3.4/24/32')
- >>>
- >>> # IPv6
- >>> dns.edns.ECSOption.from_text('2001:4b98::1/64/64')
- >>>
- >>> # it understands results from `dns.edns.ECSOption.to_text()`
- >>> dns.edns.ECSOption.from_text('ECS 1.2.3.4/24/32')
- """
- optional_prefix = "ECS"
- tokens = text.split()
- ecs_text = None
- if len(tokens) == 1:
- ecs_text = tokens[0]
- elif len(tokens) == 2:
- if tokens[0] != optional_prefix:
- raise ValueError(f'could not parse ECS from "{text}"')
- ecs_text = tokens[1]
- else:
- raise ValueError(f'could not parse ECS from "{text}"')
- n_slashes = ecs_text.count("/")
- if n_slashes == 1:
- address, tsrclen = ecs_text.split("/")
- tscope = "0"
- elif n_slashes == 2:
- address, tsrclen, tscope = ecs_text.split("/")
- else:
- raise ValueError(f'could not parse ECS from "{text}"')
- try:
- scope = int(tscope)
- except ValueError:
- raise ValueError("invalid scope " + f'"{tscope}": scope must be an integer')
- try:
- srclen = int(tsrclen)
- except ValueError:
- raise ValueError(
- "invalid srclen " + f'"{tsrclen}": srclen must be an integer'
- )
- return ECSOption(address, srclen, scope)
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
- value = (
- struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata
- )
- if file:
- file.write(value)
- return None
- else:
- return value
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
- ) -> Option:
- family, src, scope = parser.get_struct("!HBB")
- addrlen = int(math.ceil(src / 8.0))
- prefix = parser.get_bytes(addrlen)
- if family == 1:
- pad = 4 - addrlen
- addr = dns.ipv4.inet_ntoa(prefix + b"\x00" * pad)
- elif family == 2:
- pad = 16 - addrlen
- addr = dns.ipv6.inet_ntoa(prefix + b"\x00" * pad)
- else:
- raise ValueError("unsupported family")
- return cls(addr, src, scope)
- class EDECode(dns.enum.IntEnum):
- OTHER = 0
- UNSUPPORTED_DNSKEY_ALGORITHM = 1
- UNSUPPORTED_DS_DIGEST_TYPE = 2
- STALE_ANSWER = 3
- FORGED_ANSWER = 4
- DNSSEC_INDETERMINATE = 5
- DNSSEC_BOGUS = 6
- SIGNATURE_EXPIRED = 7
- SIGNATURE_NOT_YET_VALID = 8
- DNSKEY_MISSING = 9
- RRSIGS_MISSING = 10
- NO_ZONE_KEY_BIT_SET = 11
- NSEC_MISSING = 12
- CACHED_ERROR = 13
- NOT_READY = 14
- BLOCKED = 15
- CENSORED = 16
- FILTERED = 17
- PROHIBITED = 18
- STALE_NXDOMAIN_ANSWER = 19
- NOT_AUTHORITATIVE = 20
- NOT_SUPPORTED = 21
- NO_REACHABLE_AUTHORITY = 22
- NETWORK_ERROR = 23
- INVALID_DATA = 24
- @classmethod
- def _maximum(cls):
- return 65535
- class EDEOption(Option): # lgtm[py/missing-equals]
- """Extended DNS Error (EDE, RFC8914)"""
- _preserve_case = {"DNSKEY", "DS", "DNSSEC", "RRSIGs", "NSEC", "NXDOMAIN"}
- def __init__(self, code: Union[EDECode, str], text: Optional[str] = None):
- """*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the
- extended error.
- *text*, a ``str`` or ``None``, specifying additional information about
- the error.
- """
- super().__init__(OptionType.EDE)
- self.code = EDECode.make(code)
- if text is not None and not isinstance(text, str):
- raise ValueError("text must be string or None")
- self.text = text
- def to_text(self) -> str:
- output = f"EDE {self.code}"
- if self.code in EDECode:
- desc = EDECode.to_text(self.code)
- desc = " ".join(
- word if word in self._preserve_case else word.title()
- for word in desc.split("_")
- )
- output += f" ({desc})"
- if self.text is not None:
- output += f": {self.text}"
- return output
- def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
- value = struct.pack("!H", self.code)
- if self.text is not None:
- value += self.text.encode("utf8")
- if file:
- file.write(value)
- return None
- else:
- return value
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
- ) -> Option:
- code = EDECode.make(parser.get_uint16())
- text = parser.get_remaining()
- if text:
- if text[-1] == 0: # text MAY be null-terminated
- text = text[:-1]
- btext = text.decode("utf8")
- else:
- btext = None
- return cls(code, btext)
- class NSIDOption(Option):
- def __init__(self, nsid: bytes):
- super().__init__(OptionType.NSID)
- self.nsid = nsid
- def to_wire(self, file: Any = None) -> Optional[bytes]:
- if file:
- file.write(self.nsid)
- return None
- else:
- return self.nsid
- def to_text(self) -> str:
- if all(c >= 0x20 and c <= 0x7E for c in self.nsid):
- # All ASCII printable, so it's probably a string.
- value = self.nsid.decode()
- else:
- value = binascii.hexlify(self.nsid).decode()
- return f"NSID {value}"
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
- ) -> Option:
- return cls(parser.get_remaining())
- class CookieOption(Option):
- def __init__(self, client: bytes, server: bytes):
- super().__init__(dns.edns.OptionType.COOKIE)
- self.client = client
- self.server = server
- if len(client) != 8:
- raise ValueError("client cookie must be 8 bytes")
- if len(server) != 0 and (len(server) < 8 or len(server) > 32):
- raise ValueError("server cookie must be empty or between 8 and 32 bytes")
- def to_wire(self, file: Any = None) -> Optional[bytes]:
- if file:
- file.write(self.client)
- if len(self.server) > 0:
- file.write(self.server)
- return None
- else:
- return self.client + self.server
- def to_text(self) -> str:
- client = binascii.hexlify(self.client).decode()
- if len(self.server) > 0:
- server = binascii.hexlify(self.server).decode()
- else:
- server = ""
- return f"COOKIE {client}{server}"
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
- ) -> Option:
- return cls(parser.get_bytes(8), parser.get_remaining())
- class ReportChannelOption(Option):
- # RFC 9567
- def __init__(self, agent_domain: dns.name.Name):
- super().__init__(OptionType.REPORTCHANNEL)
- self.agent_domain = agent_domain
- def to_wire(self, file: Any = None) -> Optional[bytes]:
- return self.agent_domain.to_wire(file)
- def to_text(self) -> str:
- return "REPORTCHANNEL " + self.agent_domain.to_text()
- @classmethod
- def from_wire_parser(
- cls, otype: Union[OptionType, str], parser: dns.wire.Parser
- ) -> Option:
- return cls(parser.get_name())
- _type_to_class: Dict[OptionType, Any] = {
- OptionType.ECS: ECSOption,
- OptionType.EDE: EDEOption,
- OptionType.NSID: NSIDOption,
- OptionType.COOKIE: CookieOption,
- OptionType.REPORTCHANNEL: ReportChannelOption,
- }
- def get_option_class(otype: OptionType) -> Any:
- """Return the class for the specified option type.
- The GenericOption class is used if a more specific class is not
- known.
- """
- cls = _type_to_class.get(otype)
- if cls is None:
- cls = GenericOption
- return cls
- def option_from_wire_parser(
- otype: Union[OptionType, str], parser: "dns.wire.Parser"
- ) -> Option:
- """Build an EDNS option object from wire format.
- *otype*, an ``int``, is the option type.
- *parser*, a ``dns.wire.Parser``, the parser, which should be
- restricted to the option length.
- Returns an instance of a subclass of ``dns.edns.Option``.
- """
- otype = OptionType.make(otype)
- cls = get_option_class(otype)
- return cls.from_wire_parser(otype, parser)
- def option_from_wire(
- otype: Union[OptionType, str], wire: bytes, current: int, olen: int
- ) -> Option:
- """Build an EDNS option object from wire format.
- *otype*, an ``int``, is the option type.
- *wire*, a ``bytes``, is the wire-format message.
- *current*, an ``int``, is the offset in *wire* of the beginning
- of the rdata.
- *olen*, an ``int``, is the length of the wire-format option data
- Returns an instance of a subclass of ``dns.edns.Option``.
- """
- parser = dns.wire.Parser(wire, current)
- with parser.restrict_to(olen):
- return option_from_wire_parser(otype, parser)
- def register_type(implementation: Any, otype: OptionType) -> None:
- """Register the implementation of an option type.
- *implementation*, a ``class``, is a subclass of ``dns.edns.Option``.
- *otype*, an ``int``, is the option type.
- """
- _type_to_class[otype] = implementation
- ### BEGIN generated OptionType constants
- NSID = OptionType.NSID
- DAU = OptionType.DAU
- DHU = OptionType.DHU
- N3U = OptionType.N3U
- ECS = OptionType.ECS
- EXPIRE = OptionType.EXPIRE
- COOKIE = OptionType.COOKIE
- KEEPALIVE = OptionType.KEEPALIVE
- PADDING = OptionType.PADDING
- CHAIN = OptionType.CHAIN
- EDE = OptionType.EDE
- REPORTCHANNEL = OptionType.REPORTCHANNEL
- ### END generated OptionType constants
|