|
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """DNS Zones."""
- import contextlib
- import io
- import os
- import struct
- from typing import (
- Any,
- Callable,
- Iterable,
- Iterator,
- List,
- MutableMapping,
- Optional,
- Set,
- Tuple,
- Union,
- )
- import dns.exception
- import dns.grange
- import dns.immutable
- import dns.name
- import dns.node
- import dns.rdata
- import dns.rdataclass
- import dns.rdataset
- import dns.rdatatype
- import dns.rdtypes.ANY.SOA
- import dns.rdtypes.ANY.ZONEMD
- import dns.rrset
- import dns.tokenizer
- import dns.transaction
- import dns.ttl
- import dns.zonefile
- from dns.zonetypes import DigestHashAlgorithm, DigestScheme, _digest_hashers
- class BadZone(dns.exception.DNSException):
- """The DNS zone is malformed."""
- class NoSOA(BadZone):
- """The DNS zone has no SOA RR at its origin."""
- class NoNS(BadZone):
- """The DNS zone has no NS RRset at its origin."""
- class UnknownOrigin(BadZone):
- """The DNS zone's origin is unknown."""
- class UnsupportedDigestScheme(dns.exception.DNSException):
- """The zone digest's scheme is unsupported."""
- class UnsupportedDigestHashAlgorithm(dns.exception.DNSException):
- """The zone digest's origin is unsupported."""
- class NoDigest(dns.exception.DNSException):
- """The DNS zone has no ZONEMD RRset at its origin."""
- class DigestVerificationFailure(dns.exception.DNSException):
- """The ZONEMD digest failed to verify."""
- def _validate_name(
- name: dns.name.Name,
- origin: Optional[dns.name.Name],
- relativize: bool,
- ) -> dns.name.Name:
- # This name validation code is shared by Zone and Version
- if origin is None:
- # This should probably never happen as other code (e.g.
- # _rr_line) will notice the lack of an origin before us, but
- # we check just in case!
- raise KeyError("no zone origin is defined")
- if name.is_absolute():
- if not name.is_subdomain(origin):
- raise KeyError("name parameter must be a subdomain of the zone origin")
- if relativize:
- name = name.relativize(origin)
- else:
- # We have a relative name. Make sure that the derelativized name is
- # not too long.
- try:
- abs_name = name.derelativize(origin)
- except dns.name.NameTooLong:
- # We map dns.name.NameTooLong to KeyError to be consistent with
- # the other exceptions above.
- raise KeyError("relative name too long for zone")
- if not relativize:
- # We have a relative name in a non-relative zone, so use the
- # derelativized name.
- name = abs_name
- return name
- class Zone(dns.transaction.TransactionManager):
- """A DNS zone.
- A ``Zone`` is a mapping from names to nodes. The zone object may be
- treated like a Python dictionary, e.g. ``zone[name]`` will retrieve
- the node associated with that name. The *name* may be a
- ``dns.name.Name object``, or it may be a string. In either case,
- if the name is relative it is treated as relative to the origin of
- the zone.
- """
- node_factory: Callable[[], dns.node.Node] = dns.node.Node
- map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict
- writable_version_factory: Optional[Callable[[], "WritableVersion"]] = None
- immutable_version_factory: Optional[Callable[[], "ImmutableVersion"]] = None
- __slots__ = ["rdclass", "origin", "nodes", "relativize"]
- def __init__(
- self,
- origin: Optional[Union[dns.name.Name, str]],
- rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- relativize: bool = True,
- ):
- """Initialize a zone object.
- *origin* is the origin of the zone. It may be a ``dns.name.Name``,
- a ``str``, or ``None``. If ``None``, then the zone's origin will
- be set by the first ``$ORIGIN`` line in a zone file.
- *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
- *relativize*, a ``bool``, determine's whether domain names are
- relativized to the zone's origin. The default is ``True``.
- """
- if origin is not None:
- if isinstance(origin, str):
- origin = dns.name.from_text(origin)
- elif not isinstance(origin, dns.name.Name):
- raise ValueError("origin parameter must be convertible to a DNS name")
- if not origin.is_absolute():
- raise ValueError("origin parameter must be an absolute name")
- self.origin = origin
- self.rdclass = rdclass
- self.nodes: MutableMapping[dns.name.Name, dns.node.Node] = self.map_factory()
- self.relativize = relativize
- def __eq__(self, other):
- """Two zones are equal if they have the same origin, class, and
- nodes.
- Returns a ``bool``.
- """
- if not isinstance(other, Zone):
- return False
- if (
- self.rdclass != other.rdclass
- or self.origin != other.origin
- or self.nodes != other.nodes
- ):
- return False
- return True
- def __ne__(self, other):
- """Are two zones not equal?
- Returns a ``bool``.
- """
- return not self.__eq__(other)
- def _validate_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
- # Note that any changes in this method should have corresponding changes
- # made in the Version _validate_name() method.
- if isinstance(name, str):
- name = dns.name.from_text(name, None)
- elif not isinstance(name, dns.name.Name):
- raise KeyError("name parameter must be convertible to a DNS name")
- return _validate_name(name, self.origin, self.relativize)
- def __getitem__(self, key):
- key = self._validate_name(key)
- return self.nodes[key]
- def __setitem__(self, key, value):
- key = self._validate_name(key)
- self.nodes[key] = value
- def __delitem__(self, key):
- key = self._validate_name(key)
- del self.nodes[key]
- def __iter__(self):
- return self.nodes.__iter__()
- def keys(self):
- return self.nodes.keys()
- def values(self):
- return self.nodes.values()
- def items(self):
- return self.nodes.items()
- def get(self, key):
- key = self._validate_name(key)
- return self.nodes.get(key)
- def __contains__(self, key):
- key = self._validate_name(key)
- return key in self.nodes
- def find_node(
- self, name: Union[dns.name.Name, str], create: bool = False
- ) -> dns.node.Node:
- """Find a node in the zone, possibly creating it.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Raises ``KeyError`` if the name is not known and create was
- not specified, or if the name was not a subdomain of the origin.
- Returns a ``dns.node.Node``.
- """
- name = self._validate_name(name)
- node = self.nodes.get(name)
- if node is None:
- if not create:
- raise KeyError
- node = self.node_factory()
- self.nodes[name] = node
- return node
- def get_node(
- self, name: Union[dns.name.Name, str], create: bool = False
- ) -> Optional[dns.node.Node]:
- """Get a node in the zone, possibly creating it.
- This method is like ``find_node()``, except it returns None instead
- of raising an exception if the node does not exist and creation
- has not been requested.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Returns a ``dns.node.Node`` or ``None``.
- """
- try:
- node = self.find_node(name, create)
- except KeyError:
- node = None
- return node
- def delete_node(self, name: Union[dns.name.Name, str]) -> None:
- """Delete the specified node if it exists.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- It is not an error if the node does not exist.
- """
- name = self._validate_name(name)
- if name in self.nodes:
- del self.nodes[name]
- def find_rdataset(
- self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- create: bool = False,
- ) -> dns.rdataset.Rdataset:
- """Look for an rdataset with the specified name and type in the zone,
- and return an rdataset encapsulating it.
- The rdataset returned is not a copy; changes to it will change
- the zone.
- KeyError is raised if the name or type are not found.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdatatype.RdataType`` or ``str`` the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Raises ``KeyError`` if the name is not known and create was
- not specified, or if the name was not a subdomain of the origin.
- Returns a ``dns.rdataset.Rdataset``.
- """
- name = self._validate_name(name)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- node = self.find_node(name, create)
- return node.find_rdataset(self.rdclass, rdtype, covers, create)
- def get_rdataset(
- self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
- """Look for an rdataset with the specified name and type in the zone.
- This method is like ``find_rdataset()``, except it returns None instead
- of raising an exception if the rdataset does not exist and creation
- has not been requested.
- The rdataset returned is not a copy; changes to it will change
- the zone.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Raises ``KeyError`` if the name is not known and create was
- not specified, or if the name was not a subdomain of the origin.
- Returns a ``dns.rdataset.Rdataset`` or ``None``.
- """
- try:
- rdataset = self.find_rdataset(name, rdtype, covers, create)
- except KeyError:
- rdataset = None
- return rdataset
- def delete_rdataset(
- self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> None:
- """Delete the rdataset matching *rdtype* and *covers*, if it
- exists at the node specified by *name*.
- It is not an error if the node does not exist, or if there is no matching
- rdataset at the node.
- If the node has no rdatasets after the deletion, it will itself be deleted.
- *name*: the name of the node to find. The value may be a ``dns.name.Name`` or a
- ``str``. If absolute, the name must be a subdomain of the zone's origin. If
- ``zone.relativize`` is ``True``, then the name will be relativized.
- *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdatatype.RdataType`` or ``str`` or ``None``, the covered
- type. Usually this value is ``dns.rdatatype.NONE``, but if the rdtype is
- ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``, then the covers value will be
- the rdata type the SIG/RRSIG covers. The library treats the SIG and RRSIG types
- as if they were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA). This
- makes RRSIGs much easier to work with than if RRSIGs covering different rdata
- types were aggregated into a single RRSIG rdataset.
- """
- name = self._validate_name(name)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- node = self.get_node(name)
- if node is not None:
- node.delete_rdataset(self.rdclass, rdtype, covers)
- if len(node) == 0:
- self.delete_node(name)
- def replace_rdataset(
- self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
- ) -> None:
- """Replace an rdataset at name.
- It is not an error if there is no rdataset matching I{replacement}.
- Ownership of the *replacement* object is transferred to the zone;
- in other words, this method does not store a copy of *replacement*
- at the node, it stores *replacement* itself.
- If the node does not exist, it is created.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *replacement*, a ``dns.rdataset.Rdataset``, the replacement rdataset.
- """
- if replacement.rdclass != self.rdclass:
- raise ValueError("replacement.rdclass != zone.rdclass")
- node = self.find_node(name, True)
- node.replace_rdataset(replacement)
- def find_rrset(
- self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> dns.rrset.RRset:
- """Look for an rdataset with the specified name and type in the zone,
- and return an RRset encapsulating it.
- This method is less efficient than the similar
- ``find_rdataset()`` because it creates an RRset instead of
- returning the matching rdataset. It may be more convenient
- for some uses since it returns an object which binds the owner
- name to the rdataset.
- This method may not be used to create new nodes or rdatasets;
- use ``find_rdataset`` instead.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Raises ``KeyError`` if the name is not known and create was
- not specified, or if the name was not a subdomain of the origin.
- Returns a ``dns.rrset.RRset`` or ``None``.
- """
- vname = self._validate_name(name)
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- rdataset = self.nodes[vname].find_rdataset(self.rdclass, rdtype, covers)
- rrset = dns.rrset.RRset(vname, self.rdclass, rdtype, covers)
- rrset.update(rdataset)
- return rrset
- def get_rrset(
- self,
- name: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str],
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> Optional[dns.rrset.RRset]:
- """Look for an rdataset with the specified name and type in the zone,
- and return an RRset encapsulating it.
- This method is less efficient than the similar ``get_rdataset()``
- because it creates an RRset instead of returning the matching
- rdataset. It may be more convenient for some uses since it
- returns an object which binds the owner name to the rdataset.
- This method may not be used to create new nodes or rdatasets;
- use ``get_rdataset()`` instead.
- *name*: the name of the node to find.
- The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
- name must be a subdomain of the zone's origin. If ``zone.relativize``
- is ``True``, then the name will be relativized.
- *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- *create*, a ``bool``. If true, the node will be created if it does
- not exist.
- Returns a ``dns.rrset.RRset`` or ``None``.
- """
- try:
- rrset = self.find_rrset(name, rdtype, covers)
- except KeyError:
- rrset = None
- return rrset
- def iterate_rdatasets(
- self,
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
- """Return a generator which yields (name, rdataset) tuples for
- all rdatasets in the zone which have the specified *rdtype*
- and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
- then all rdatasets will be matched.
- *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- """
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- for name, node in self.items():
- for rds in node:
- if rdtype == dns.rdatatype.ANY or (
- rds.rdtype == rdtype and rds.covers == covers
- ):
- yield (name, rds)
- def iterate_rdatas(
- self,
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
- covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
- ) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
- """Return a generator which yields (name, ttl, rdata) tuples for
- all rdatas in the zone which have the specified *rdtype*
- and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
- then all rdatas will be matched.
- *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
- *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
- Usually this value is ``dns.rdatatype.NONE``, but if the
- rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
- then the covers value will be the rdata type the SIG/RRSIG
- covers. The library treats the SIG and RRSIG types as if they
- were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
- This makes RRSIGs much easier to work with than if RRSIGs
- covering different rdata types were aggregated into a single
- RRSIG rdataset.
- """
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- covers = dns.rdatatype.RdataType.make(covers)
- for name, node in self.items():
- for rds in node:
- if rdtype == dns.rdatatype.ANY or (
- rds.rdtype == rdtype and rds.covers == covers
- ):
- for rdata in rds:
- yield (name, rds.ttl, rdata)
- def to_file(
- self,
- f: Any,
- sorted: bool = True,
- relativize: bool = True,
- nl: Optional[str] = None,
- want_comments: bool = False,
- want_origin: bool = False,
- ) -> None:
- """Write a zone to a file.
- *f*, a file or `str`. If *f* is a string, it is treated
- as the name of a file to open.
- *sorted*, a ``bool``. If True, the default, then the file
- will be written with the names sorted in DNSSEC order from
- least to greatest. Otherwise the names will be written in
- whatever order they happen to have in the zone's dictionary.
- *relativize*, a ``bool``. If True, the default, then domain
- names in the output will be relativized to the zone's origin
- if possible.
- *nl*, a ``str`` or None. The end of line string. If not
- ``None``, the output will use the platform's native
- end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
- *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
- as part of writing the file. If ``False``, the default, do not
- emit them.
- *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
- the start of the file. If ``False``, the default, do not emit
- one.
- """
- if isinstance(f, str):
- cm: contextlib.AbstractContextManager = open(f, "wb")
- else:
- cm = contextlib.nullcontext(f)
- with cm as f:
- # must be in this way, f.encoding may contain None, or even
- # attribute may not be there
- file_enc = getattr(f, "encoding", None)
- if file_enc is None:
- file_enc = "utf-8"
- if nl is None:
- # binary mode, '\n' is not enough
- nl_b = os.linesep.encode(file_enc)
- nl = "\n"
- elif isinstance(nl, str):
- nl_b = nl.encode(file_enc)
- else:
- nl_b = nl
- nl = nl.decode()
- if want_origin:
- assert self.origin is not None
- l = "$ORIGIN " + self.origin.to_text()
- l_b = l.encode(file_enc)
- try:
- f.write(l_b)
- f.write(nl_b)
- except TypeError: # textual mode
- f.write(l)
- f.write(nl)
- if sorted:
- names = list(self.keys())
- names.sort()
- else:
- names = self.keys()
- for n in names:
- l = self[n].to_text(
- n,
- origin=self.origin,
- relativize=relativize,
- want_comments=want_comments,
- )
- l_b = l.encode(file_enc)
- try:
- f.write(l_b)
- f.write(nl_b)
- except TypeError: # textual mode
- f.write(l)
- f.write(nl)
- def to_text(
- self,
- sorted: bool = True,
- relativize: bool = True,
- nl: Optional[str] = None,
- want_comments: bool = False,
- want_origin: bool = False,
- ) -> str:
- """Return a zone's text as though it were written to a file.
- *sorted*, a ``bool``. If True, the default, then the file
- will be written with the names sorted in DNSSEC order from
- least to greatest. Otherwise the names will be written in
- whatever order they happen to have in the zone's dictionary.
- *relativize*, a ``bool``. If True, the default, then domain
- names in the output will be relativized to the zone's origin
- if possible.
- *nl*, a ``str`` or None. The end of line string. If not
- ``None``, the output will use the platform's native
- end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
- *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
- as part of writing the file. If ``False``, the default, do not
- emit them.
- *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
- the start of the output. If ``False``, the default, do not emit
- one.
- Returns a ``str``.
- """
- temp_buffer = io.StringIO()
- self.to_file(temp_buffer, sorted, relativize, nl, want_comments, want_origin)
- return_value = temp_buffer.getvalue()
- temp_buffer.close()
- return return_value
- def check_origin(self) -> None:
- """Do some simple checking of the zone's origin.
- Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
- Raises ``dns.zone.NoNS`` if there is no NS RRset.
- Raises ``KeyError`` if there is no origin node.
- """
- if self.relativize:
- name = dns.name.empty
- else:
- assert self.origin is not None
- name = self.origin
- if self.get_rdataset(name, dns.rdatatype.SOA) is None:
- raise NoSOA
- if self.get_rdataset(name, dns.rdatatype.NS) is None:
- raise NoNS
- def get_soa(
- self, txn: Optional[dns.transaction.Transaction] = None
- ) -> dns.rdtypes.ANY.SOA.SOA:
- """Get the zone SOA rdata.
- Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
- Returns a ``dns.rdtypes.ANY.SOA.SOA`` Rdata.
- """
- if self.relativize:
- origin_name = dns.name.empty
- else:
- if self.origin is None:
- # get_soa() has been called very early, and there must not be
- # an SOA if there is no origin.
- raise NoSOA
- origin_name = self.origin
- soa: Optional[dns.rdataset.Rdataset]
- if txn:
- soa = txn.get(origin_name, dns.rdatatype.SOA)
- else:
- soa = self.get_rdataset(origin_name, dns.rdatatype.SOA)
- if soa is None:
- raise NoSOA
- return soa[0]
- def _compute_digest(
- self,
- hash_algorithm: DigestHashAlgorithm,
- scheme: DigestScheme = DigestScheme.SIMPLE,
- ) -> bytes:
- hashinfo = _digest_hashers.get(hash_algorithm)
- if not hashinfo:
- raise UnsupportedDigestHashAlgorithm
- if scheme != DigestScheme.SIMPLE:
- raise UnsupportedDigestScheme
- if self.relativize:
- origin_name = dns.name.empty
- else:
- assert self.origin is not None
- origin_name = self.origin
- hasher = hashinfo()
- for name, node in sorted(self.items()):
- rrnamebuf = name.to_digestable(self.origin)
- for rdataset in sorted(node, key=lambda rds: (rds.rdtype, rds.covers)):
- if name == origin_name and dns.rdatatype.ZONEMD in (
- rdataset.rdtype,
- rdataset.covers,
- ):
- continue
- rrfixed = struct.pack(
- "!HHI", rdataset.rdtype, rdataset.rdclass, rdataset.ttl
- )
- rdatas = [rdata.to_digestable(self.origin) for rdata in rdataset]
- for rdata in sorted(rdatas):
- rrlen = struct.pack("!H", len(rdata))
- hasher.update(rrnamebuf + rrfixed + rrlen + rdata)
- return hasher.digest()
- def compute_digest(
- self,
- hash_algorithm: DigestHashAlgorithm,
- scheme: DigestScheme = DigestScheme.SIMPLE,
- ) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
- serial = self.get_soa().serial
- digest = self._compute_digest(hash_algorithm, scheme)
- return dns.rdtypes.ANY.ZONEMD.ZONEMD(
- self.rdclass, dns.rdatatype.ZONEMD, serial, scheme, hash_algorithm, digest
- )
- def verify_digest(
- self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD] = None
- ) -> None:
- digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]]
- if zonemd:
- digests = [zonemd]
- else:
- assert self.origin is not None
- rds = self.get_rdataset(self.origin, dns.rdatatype.ZONEMD)
- if rds is None:
- raise NoDigest
- digests = rds
- for digest in digests:
- try:
- computed = self._compute_digest(digest.hash_algorithm, digest.scheme)
- if computed == digest.digest:
- return
- except Exception:
- pass
- raise DigestVerificationFailure
- # TransactionManager methods
- def reader(self) -> "Transaction":
- return Transaction(self, False, Version(self, 1, self.nodes, self.origin))
- def writer(self, replacement: bool = False) -> "Transaction":
- txn = Transaction(self, replacement)
- txn._setup_version()
- return txn
- def origin_information(
- self,
- ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]:
- effective: Optional[dns.name.Name]
- if self.relativize:
- effective = dns.name.empty
- else:
- effective = self.origin
- return (self.origin, self.relativize, effective)
- def get_class(self):
- return self.rdclass
- # Transaction methods
- def _end_read(self, txn):
- pass
- def _end_write(self, txn):
- pass
- def _commit_version(self, _, version, origin):
- self.nodes = version.nodes
- if self.origin is None:
- self.origin = origin
- def _get_next_version_id(self):
- # Versions are ephemeral and all have id 1
- return 1
- # These classes used to be in dns.versioned, but have moved here so we can use
- # the copy-on-write transaction mechanism for both kinds of zones. In a
- # regular zone, the version only exists during the transaction, and the nodes
- # are regular dns.node.Nodes.
- # A node with a version id.
- class VersionedNode(dns.node.Node): # lgtm[py/missing-equals]
- __slots__ = ["id"]
- def __init__(self):
- super().__init__()
- # A proper id will get set by the Version
- self.id = 0
- @dns.immutable.immutable
- class ImmutableVersionedNode(VersionedNode):
- def __init__(self, node):
- super().__init__()
- self.id = node.id
- self.rdatasets = tuple(
- [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
- )
- def find_rdataset(
- self,
- rdclass: dns.rdataclass.RdataClass,
- rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- create: bool = False,
- ) -> dns.rdataset.Rdataset:
- if create:
- raise TypeError("immutable")
- return super().find_rdataset(rdclass, rdtype, covers, False)
- def get_rdataset(
- self,
- rdclass: dns.rdataclass.RdataClass,
- rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- create: bool = False,
- ) -> Optional[dns.rdataset.Rdataset]:
- if create:
- raise TypeError("immutable")
- return super().get_rdataset(rdclass, rdtype, covers, False)
- def delete_rdataset(
- self,
- rdclass: dns.rdataclass.RdataClass,
- rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
- ) -> None:
- raise TypeError("immutable")
- def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
- raise TypeError("immutable")
- def is_immutable(self) -> bool:
- return True
- class Version:
- def __init__(
- self,
- zone: Zone,
- id: int,
- nodes: Optional[MutableMapping[dns.name.Name, dns.node.Node]] = None,
- origin: Optional[dns.name.Name] = None,
- ):
- self.zone = zone
- self.id = id
- if nodes is not None:
- self.nodes = nodes
- else:
- self.nodes = zone.map_factory()
- self.origin = origin
- def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
- return _validate_name(name, self.origin, self.zone.relativize)
- def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
- name = self._validate_name(name)
- return self.nodes.get(name)
- def get_rdataset(
- self,
- name: dns.name.Name,
- rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType,
- ) -> Optional[dns.rdataset.Rdataset]:
- node = self.get_node(name)
- if node is None:
- return None
- return node.get_rdataset(self.zone.rdclass, rdtype, covers)
- def keys(self):
- return self.nodes.keys()
- def items(self):
- return self.nodes.items()
- class WritableVersion(Version):
- def __init__(self, zone: Zone, replacement: bool = False):
- # The zone._versions_lock must be held by our caller in a versioned
- # zone.
- id = zone._get_next_version_id()
- super().__init__(zone, id)
- if not replacement:
- # We copy the map, because that gives us a simple and thread-safe
- # way of doing versions, and we have a garbage collector to help
- # us. We only make new node objects if we actually change the
- # node.
- self.nodes.update(zone.nodes)
- # We have to copy the zone origin as it may be None in the first
- # version, and we don't want to mutate the zone until we commit.
- self.origin = zone.origin
- self.changed: Set[dns.name.Name] = set()
- def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node:
- name = self._validate_name(name)
- node = self.nodes.get(name)
- if node is None or name not in self.changed:
- new_node = self.zone.node_factory()
- if hasattr(new_node, "id"):
- # We keep doing this for backwards compatibility, as earlier
- # code used new_node.id != self.id for the "do we need to CoW?"
- # test. Now we use the changed set as this works with both
- # regular zones and versioned zones.
- #
- # We ignore the mypy error as this is safe but it doesn't see it.
- new_node.id = self.id # type: ignore
- if node is not None:
- # moo! copy on write!
- new_node.rdatasets.extend(node.rdatasets)
- self.nodes[name] = new_node
- self.changed.add(name)
- return new_node
- else:
- return node
- def delete_node(self, name: dns.name.Name) -> None:
- name = self._validate_name(name)
- if name in self.nodes:
- del self.nodes[name]
- self.changed.add(name)
- def put_rdataset(
- self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
- ) -> None:
- node = self._maybe_cow(name)
- node.replace_rdataset(rdataset)
- def delete_rdataset(
- self,
- name: dns.name.Name,
- rdtype: dns.rdatatype.RdataType,
- covers: dns.rdatatype.RdataType,
- ) -> None:
- node = self._maybe_cow(name)
- node.delete_rdataset(self.zone.rdclass, rdtype, covers)
- if len(node) == 0:
- del self.nodes[name]
- @dns.immutable.immutable
- class ImmutableVersion(Version):
- def __init__(self, version: WritableVersion):
- # We tell super() that it's a replacement as we don't want it
- # to copy the nodes, as we're about to do that with an
- # immutable Dict.
- super().__init__(version.zone, True)
- # set the right id!
- self.id = version.id
- # keep the origin
- self.origin = version.origin
- # Make changed nodes immutable
- for name in version.changed:
- node = version.nodes.get(name)
- # it might not exist if we deleted it in the version
- if node:
- version.nodes[name] = ImmutableVersionedNode(node)
- # We're changing the type of the nodes dictionary here on purpose, so
- # we ignore the mypy error.
- self.nodes = dns.immutable.Dict(
- version.nodes, True, self.zone.map_factory
- ) # type: ignore
- class Transaction(dns.transaction.Transaction):
- def __init__(self, zone, replacement, version=None, make_immutable=False):
- read_only = version is not None
- super().__init__(zone, replacement, read_only)
- self.version = version
- self.make_immutable = make_immutable
- @property
- def zone(self):
- return self.manager
- def _setup_version(self):
- assert self.version is None
- factory = self.manager.writable_version_factory
- if factory is None:
- factory = WritableVersion
- self.version = factory(self.zone, self.replacement)
- def _get_rdataset(self, name, rdtype, covers):
- return self.version.get_rdataset(name, rdtype, covers)
- def _put_rdataset(self, name, rdataset):
- assert not self.read_only
- self.version.put_rdataset(name, rdataset)
- def _delete_name(self, name):
- assert not self.read_only
- self.version.delete_node(name)
- def _delete_rdataset(self, name, rdtype, covers):
- assert not self.read_only
- self.version.delete_rdataset(name, rdtype, covers)
- def _name_exists(self, name):
- return self.version.get_node(name) is not None
- def _changed(self):
- if self.read_only:
- return False
- else:
- return len(self.version.changed) > 0
- def _end_transaction(self, commit):
- if self.read_only:
- self.zone._end_read(self)
- elif commit and len(self.version.changed) > 0:
- if self.make_immutable:
- factory = self.manager.immutable_version_factory
- if factory is None:
- factory = ImmutableVersion
- version = factory(self.version)
- else:
- version = self.version
- self.zone._commit_version(self, version, self.version.origin)
- else:
- # rollback
- self.zone._end_write(self)
- def _set_origin(self, origin):
- if self.version.origin is None:
- self.version.origin = origin
- def _iterate_rdatasets(self):
- for name, node in self.version.items():
- for rdataset in node:
- yield (name, rdataset)
- def _iterate_names(self):
- return self.version.keys()
- def _get_node(self, name):
- return self.version.get_node(name)
- def _origin_information(self):
- (absolute, relativize, effective) = self.manager.origin_information()
- if absolute is None and self.version.origin is not None:
- # No origin has been committed yet, but we've learned one as part of
- # this txn. Use it.
- absolute = self.version.origin
- if relativize:
- effective = dns.name.empty
- else:
- effective = absolute
- return (absolute, relativize, effective)
- def _from_text(
- text: Any,
- origin: Optional[Union[dns.name.Name, str]] = None,
- rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- relativize: bool = True,
- zone_factory: Any = Zone,
- filename: Optional[str] = None,
- allow_include: bool = False,
- check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
- ) -> Zone:
- # See the comments for the public APIs from_text() and from_file() for
- # details.
- # 'text' can also be a file, but we don't publish that fact
- # since it's an implementation detail. The official file
- # interface is from_file().
- if filename is None:
- filename = "<string>"
- zone = zone_factory(origin, rdclass, relativize=relativize)
- with zone.writer(True) as txn:
- tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
- reader = dns.zonefile.Reader(
- tok,
- rdclass,
- txn,
- allow_include=allow_include,
- allow_directives=allow_directives,
- )
- try:
- reader.read()
- except dns.zonefile.UnknownOrigin:
- # for backwards compatibility
- raise dns.zone.UnknownOrigin
- # Now that we're done reading, do some basic checking of the zone.
- if check_origin:
- zone.check_origin()
- return zone
- def from_text(
- text: str,
- origin: Optional[Union[dns.name.Name, str]] = None,
- rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- relativize: bool = True,
- zone_factory: Any = Zone,
- filename: Optional[str] = None,
- allow_include: bool = False,
- check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
- ) -> Zone:
- """Build a zone object from a zone file format string.
- *text*, a ``str``, the zone file format input.
- *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
- of the zone; if not specified, the first ``$ORIGIN`` statement in the
- zone file will determine the origin of the zone.
- *rdclass*, a ``dns.rdataclass.RdataClass``, the zone's rdata class; the default is
- class IN.
- *relativize*, a ``bool``, determine's whether domain names are
- relativized to the zone's origin. The default is ``True``.
- *zone_factory*, the zone factory to use or ``None``. If ``None``, then
- ``dns.zone.Zone`` will be used. The value may be any class or callable
- that returns a subclass of ``dns.zone.Zone``.
- *filename*, a ``str`` or ``None``, the filename to emit when
- describing where an error occurred; the default is ``'<string>'``.
- *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
- directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
- will raise a ``SyntaxError`` exception.
- *check_origin*, a ``bool``. If ``True``, the default, then sanity
- checks of the origin node will be made by calling the zone's
- ``check_origin()`` method.
- *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
- encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
- is used.
- *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
- then directives are permitted, and the *allow_include* parameter controls whether
- ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
- processing is done and any directive-like text will be treated as a regular owner
- name. If a non-empty iterable, then only the listed directives (including the
- ``$``) are allowed.
- Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
- Raises ``dns.zone.NoNS`` if there is no NS RRset.
- Raises ``KeyError`` if there is no origin node.
- Returns a subclass of ``dns.zone.Zone``.
- """
- return _from_text(
- text,
- origin,
- rdclass,
- relativize,
- zone_factory,
- filename,
- allow_include,
- check_origin,
- idna_codec,
- allow_directives,
- )
- def from_file(
- f: Any,
- origin: Optional[Union[dns.name.Name, str]] = None,
- rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- relativize: bool = True,
- zone_factory: Any = Zone,
- filename: Optional[str] = None,
- allow_include: bool = True,
- check_origin: bool = True,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- allow_directives: Union[bool, Iterable[str]] = True,
- ) -> Zone:
- """Read a zone file and build a zone object.
- *f*, a file or ``str``. If *f* is a string, it is treated
- as the name of a file to open.
- *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
- of the zone; if not specified, the first ``$ORIGIN`` statement in the
- zone file will determine the origin of the zone.
- *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
- *relativize*, a ``bool``, determine's whether domain names are
- relativized to the zone's origin. The default is ``True``.
- *zone_factory*, the zone factory to use or ``None``. If ``None``, then
- ``dns.zone.Zone`` will be used. The value may be any class or callable
- that returns a subclass of ``dns.zone.Zone``.
- *filename*, a ``str`` or ``None``, the filename to emit when
- describing where an error occurred; the default is ``'<string>'``.
- *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
- directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
- will raise a ``SyntaxError`` exception.
- *check_origin*, a ``bool``. If ``True``, the default, then sanity
- checks of the origin node will be made by calling the zone's
- ``check_origin()`` method.
- *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
- encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
- is used.
- *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
- then directives are permitted, and the *allow_include* parameter controls whether
- ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
- processing is done and any directive-like text will be treated as a regular owner
- name. If a non-empty iterable, then only the listed directives (including the
- ``$``) are allowed.
- Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
- Raises ``dns.zone.NoNS`` if there is no NS RRset.
- Raises ``KeyError`` if there is no origin node.
- Returns a subclass of ``dns.zone.Zone``.
- """
- if isinstance(f, str):
- if filename is None:
- filename = f
- cm: contextlib.AbstractContextManager = open(f)
- else:
- cm = contextlib.nullcontext(f)
- with cm as f:
- return _from_text(
- f,
- origin,
- rdclass,
- relativize,
- zone_factory,
- filename,
- allow_include,
- check_origin,
- idna_codec,
- allow_directives,
- )
- assert False # make mypy happy lgtm[py/unreachable-statement]
- def from_xfr(
- xfr: Any,
- zone_factory: Any = Zone,
- relativize: bool = True,
- check_origin: bool = True,
- ) -> Zone:
- """Convert the output of a zone transfer generator into a zone object.
- *xfr*, a generator of ``dns.message.Message`` objects, typically
- ``dns.query.xfr()``.
- *relativize*, a ``bool``, determine's whether domain names are
- relativized to the zone's origin. The default is ``True``.
- It is essential that the relativize setting matches the one specified
- to the generator.
- *check_origin*, a ``bool``. If ``True``, the default, then sanity
- checks of the origin node will be made by calling the zone's
- ``check_origin()`` method.
- Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
- Raises ``dns.zone.NoNS`` if there is no NS RRset.
- Raises ``KeyError`` if there is no origin node.
- Raises ``ValueError`` if no messages are yielded by the generator.
- Returns a subclass of ``dns.zone.Zone``.
- """
- z = None
- for r in xfr:
- if z is None:
- if relativize:
- origin = r.origin
- else:
- origin = r.answer[0].name
- rdclass = r.answer[0].rdclass
- z = zone_factory(origin, rdclass, relativize=relativize)
- for rrset in r.answer:
- znode = z.nodes.get(rrset.name)
- if not znode:
- znode = z.node_factory()
- z.nodes[rrset.name] = znode
- zrds = znode.find_rdataset(rrset.rdclass, rrset.rdtype, rrset.covers, True)
- zrds.update_ttl(rrset.ttl)
- for rd in rrset:
- zrds.add(rd)
- if z is None:
- raise ValueError("empty transfer")
- if check_origin:
- z.check_origin()
- return z
|