123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744 |
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """DNS Zones."""
- import re
- import sys
- from typing import Any, Iterable, List, Optional, Set, Tuple, Union
- import dns.exception
- import dns.grange
- import dns.name
- import dns.node
- import dns.rdata
- import dns.rdataclass
- import dns.rdatatype
- import dns.rdtypes.ANY.SOA
- import dns.rrset
- import dns.tokenizer
- import dns.transaction
- import dns.ttl
- class UnknownOrigin(dns.exception.DNSException):
- """Unknown origin"""
- class CNAMEAndOtherData(dns.exception.DNSException):
- """A node has a CNAME and other data"""
- def _check_cname_and_other_data(txn, name, rdataset):
- rdataset_kind = dns.node.NodeKind.classify_rdataset(rdataset)
- node = txn.get_node(name)
- if node is None:
- # empty nodes are neutral.
- return
- node_kind = node.classify()
- if (
- node_kind == dns.node.NodeKind.CNAME
- and rdataset_kind == dns.node.NodeKind.REGULAR
- ):
- raise CNAMEAndOtherData("rdataset type is not compatible with a CNAME node")
- elif (
- node_kind == dns.node.NodeKind.REGULAR
- and rdataset_kind == dns.node.NodeKind.CNAME
- ):
- raise CNAMEAndOtherData(
- "CNAME rdataset is not compatible with a regular data node"
- )
- # Otherwise at least one of the node and the rdataset is neutral, so
- # adding the rdataset is ok
- SavedStateType = Tuple[
- dns.tokenizer.Tokenizer,
- Optional[dns.name.Name], # current_origin
- Optional[dns.name.Name], # last_name
- Optional[Any], # current_file
- int, # last_ttl
- bool, # last_ttl_known
- int, # default_ttl
- bool,
- ] # default_ttl_known
- def _upper_dollarize(s):
- s = s.upper()
- if not s.startswith("$"):
- s = "$" + s
- return s
- class Reader:
- """Read a DNS zone file into a transaction."""
- def __init__(
- self,
- tok: dns.tokenizer.Tokenizer,
- rdclass: dns.rdataclass.RdataClass,
- txn: dns.transaction.Transaction,
- allow_include: bool = False,
- allow_directives: Union[bool, Iterable[str]] = True,
- force_name: Optional[dns.name.Name] = None,
- force_ttl: Optional[int] = None,
- force_rdclass: Optional[dns.rdataclass.RdataClass] = None,
- force_rdtype: Optional[dns.rdatatype.RdataType] = None,
- default_ttl: Optional[int] = None,
- ):
- self.tok = tok
- (self.zone_origin, self.relativize, _) = txn.manager.origin_information()
- self.current_origin = self.zone_origin
- self.last_ttl = 0
- self.last_ttl_known = False
- if force_ttl is not None:
- default_ttl = force_ttl
- if default_ttl is None:
- self.default_ttl = 0
- self.default_ttl_known = False
- else:
- self.default_ttl = default_ttl
- self.default_ttl_known = True
- self.last_name = self.current_origin
- self.zone_rdclass = rdclass
- self.txn = txn
- self.saved_state: List[SavedStateType] = []
- self.current_file: Optional[Any] = None
- self.allowed_directives: Set[str]
- if allow_directives is True:
- self.allowed_directives = {"$GENERATE", "$ORIGIN", "$TTL"}
- if allow_include:
- self.allowed_directives.add("$INCLUDE")
- elif allow_directives is False:
- # allow_include was ignored in earlier releases if allow_directives was
- # False, so we continue that.
- self.allowed_directives = set()
- else:
- # Note that if directives are explicitly specified, then allow_include
- # is ignored.
- self.allowed_directives = set(_upper_dollarize(d) for d in allow_directives)
- self.force_name = force_name
- self.force_ttl = force_ttl
- self.force_rdclass = force_rdclass
- self.force_rdtype = force_rdtype
- self.txn.check_put_rdataset(_check_cname_and_other_data)
- def _eat_line(self):
- while 1:
- token = self.tok.get()
- if token.is_eol_or_eof():
- break
- def _get_identifier(self):
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- return token
- def _rr_line(self):
- """Process one line from a DNS zone file."""
- token = None
- # Name
- if self.force_name is not None:
- name = self.force_name
- else:
- if self.current_origin is None:
- raise UnknownOrigin
- token = self.tok.get(want_leading=True)
- if not token.is_whitespace():
- self.last_name = self.tok.as_name(token, self.current_origin)
- else:
- token = self.tok.get()
- if token.is_eol_or_eof():
- # treat leading WS followed by EOL/EOF as if they were EOL/EOF.
- return
- self.tok.unget(token)
- name = self.last_name
- if not name.is_subdomain(self.zone_origin):
- self._eat_line()
- return
- if self.relativize:
- name = name.relativize(self.zone_origin)
- # TTL
- if self.force_ttl is not None:
- ttl = self.force_ttl
- self.last_ttl = ttl
- self.last_ttl_known = True
- else:
- token = self._get_identifier()
- ttl = None
- try:
- ttl = dns.ttl.from_text(token.value)
- self.last_ttl = ttl
- self.last_ttl_known = True
- token = None
- except dns.ttl.BadTTL:
- self.tok.unget(token)
- # Class
- if self.force_rdclass is not None:
- rdclass = self.force_rdclass
- else:
- token = self._get_identifier()
- try:
- rdclass = dns.rdataclass.from_text(token.value)
- except dns.exception.SyntaxError:
- raise
- except Exception:
- rdclass = self.zone_rdclass
- self.tok.unget(token)
- if rdclass != self.zone_rdclass:
- raise dns.exception.SyntaxError("RR class is not zone's class")
- if ttl is None:
- # support for <class> <ttl> <type> syntax
- token = self._get_identifier()
- ttl = None
- try:
- ttl = dns.ttl.from_text(token.value)
- self.last_ttl = ttl
- self.last_ttl_known = True
- token = None
- except dns.ttl.BadTTL:
- if self.default_ttl_known:
- ttl = self.default_ttl
- elif self.last_ttl_known:
- ttl = self.last_ttl
- self.tok.unget(token)
- # Type
- if self.force_rdtype is not None:
- rdtype = self.force_rdtype
- else:
- token = self._get_identifier()
- try:
- rdtype = dns.rdatatype.from_text(token.value)
- except Exception:
- raise dns.exception.SyntaxError(f"unknown rdatatype '{token.value}'")
- try:
- rd = dns.rdata.from_text(
- rdclass,
- rdtype,
- self.tok,
- self.current_origin,
- self.relativize,
- self.zone_origin,
- )
- except dns.exception.SyntaxError:
- # Catch and reraise.
- raise
- except Exception:
- # All exceptions that occur in the processing of rdata
- # are treated as syntax errors. This is not strictly
- # correct, but it is correct almost all of the time.
- # We convert them to syntax errors so that we can emit
- # helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError(f"caught exception {str(ty)}: {str(va)}")
- if not self.default_ttl_known and rdtype == dns.rdatatype.SOA:
- # The pre-RFC2308 and pre-BIND9 behavior inherits the zone default
- # TTL from the SOA minttl if no $TTL statement is present before the
- # SOA is parsed.
- self.default_ttl = rd.minimum
- self.default_ttl_known = True
- if ttl is None:
- # if we didn't have a TTL on the SOA, set it!
- ttl = rd.minimum
- # TTL check. We had to wait until now to do this as the SOA RR's
- # own TTL can be inferred from its minimum.
- if ttl is None:
- raise dns.exception.SyntaxError("Missing default TTL value")
- self.txn.add(name, ttl, rd)
- def _parse_modify(self, side: str) -> Tuple[str, str, int, int, str]:
- # Here we catch everything in '{' '}' in a group so we can replace it
- # with ''.
- is_generate1 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+),(.)}).*$")
- is_generate2 = re.compile(r"^.*\$({(\+|-?)(\d+)}).*$")
- is_generate3 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+)}).*$")
- # Sometimes there are modifiers in the hostname. These come after
- # the dollar sign. They are in the form: ${offset[,width[,base]]}.
- # Make names
- mod = ""
- sign = "+"
- offset = "0"
- width = "0"
- base = "d"
- g1 = is_generate1.match(side)
- if g1:
- mod, sign, offset, width, base = g1.groups()
- if sign == "":
- sign = "+"
- else:
- g2 = is_generate2.match(side)
- if g2:
- mod, sign, offset = g2.groups()
- if sign == "":
- sign = "+"
- width = "0"
- base = "d"
- else:
- g3 = is_generate3.match(side)
- if g3:
- mod, sign, offset, width = g3.groups()
- if sign == "":
- sign = "+"
- base = "d"
- ioffset = int(offset)
- iwidth = int(width)
- if sign not in ["+", "-"]:
- raise dns.exception.SyntaxError(f"invalid offset sign {sign}")
- if base not in ["d", "o", "x", "X", "n", "N"]:
- raise dns.exception.SyntaxError(f"invalid type {base}")
- return mod, sign, ioffset, iwidth, base
- def _generate_line(self):
- # range lhs [ttl] [class] type rhs [ comment ]
- """Process one line containing the GENERATE statement from a DNS
- zone file."""
- if self.current_origin is None:
- raise UnknownOrigin
- token = self.tok.get()
- # Range (required)
- try:
- start, stop, step = dns.grange.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError
- # lhs (required)
- try:
- lhs = token.value
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError
- # TTL
- try:
- ttl = dns.ttl.from_text(token.value)
- self.last_ttl = ttl
- self.last_ttl_known = True
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.ttl.BadTTL:
- if not (self.last_ttl_known or self.default_ttl_known):
- raise dns.exception.SyntaxError("Missing default TTL value")
- if self.default_ttl_known:
- ttl = self.default_ttl
- elif self.last_ttl_known:
- ttl = self.last_ttl
- # Class
- try:
- rdclass = dns.rdataclass.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except dns.exception.SyntaxError:
- raise dns.exception.SyntaxError
- except Exception:
- rdclass = self.zone_rdclass
- if rdclass != self.zone_rdclass:
- raise dns.exception.SyntaxError("RR class is not zone's class")
- # Type
- try:
- rdtype = dns.rdatatype.from_text(token.value)
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError
- except Exception:
- raise dns.exception.SyntaxError(f"unknown rdatatype '{token.value}'")
- # rhs (required)
- rhs = token.value
- def _calculate_index(counter: int, offset_sign: str, offset: int) -> int:
- """Calculate the index from the counter and offset."""
- if offset_sign == "-":
- offset *= -1
- return counter + offset
- def _format_index(index: int, base: str, width: int) -> str:
- """Format the index with the given base, and zero-fill it
- to the given width."""
- if base in ["d", "o", "x", "X"]:
- return format(index, base).zfill(width)
- # base can only be n or N here
- hexa = _format_index(index, "x", width)
- nibbles = ".".join(hexa[::-1])[:width]
- if base == "N":
- nibbles = nibbles.upper()
- return nibbles
- lmod, lsign, loffset, lwidth, lbase = self._parse_modify(lhs)
- rmod, rsign, roffset, rwidth, rbase = self._parse_modify(rhs)
- for i in range(start, stop + 1, step):
- # +1 because bind is inclusive and python is exclusive
- lindex = _calculate_index(i, lsign, loffset)
- rindex = _calculate_index(i, rsign, roffset)
- lzfindex = _format_index(lindex, lbase, lwidth)
- rzfindex = _format_index(rindex, rbase, rwidth)
- name = lhs.replace(f"${lmod}", lzfindex)
- rdata = rhs.replace(f"${rmod}", rzfindex)
- self.last_name = dns.name.from_text(
- name, self.current_origin, self.tok.idna_codec
- )
- name = self.last_name
- if not name.is_subdomain(self.zone_origin):
- self._eat_line()
- return
- if self.relativize:
- name = name.relativize(self.zone_origin)
- try:
- rd = dns.rdata.from_text(
- rdclass,
- rdtype,
- rdata,
- self.current_origin,
- self.relativize,
- self.zone_origin,
- )
- except dns.exception.SyntaxError:
- # Catch and reraise.
- raise
- except Exception:
- # All exceptions that occur in the processing of rdata
- # are treated as syntax errors. This is not strictly
- # correct, but it is correct almost all of the time.
- # We convert them to syntax errors so that we can emit
- # helpful filename:line info.
- (ty, va) = sys.exc_info()[:2]
- raise dns.exception.SyntaxError(
- f"caught exception {str(ty)}: {str(va)}"
- )
- self.txn.add(name, ttl, rd)
- def read(self) -> None:
- """Read a DNS zone file and build a zone object.
- @raises dns.zone.NoSOA: No SOA RR was found at the zone origin
- @raises dns.zone.NoNS: No NS RRset was found at the zone origin
- """
- try:
- while 1:
- token = self.tok.get(True, True)
- if token.is_eof():
- if self.current_file is not None:
- self.current_file.close()
- if len(self.saved_state) > 0:
- (
- self.tok,
- self.current_origin,
- self.last_name,
- self.current_file,
- self.last_ttl,
- self.last_ttl_known,
- self.default_ttl,
- self.default_ttl_known,
- ) = self.saved_state.pop(-1)
- continue
- break
- elif token.is_eol():
- continue
- elif token.is_comment():
- self.tok.get_eol()
- continue
- elif token.value[0] == "$" and len(self.allowed_directives) > 0:
- # Note that we only run directive processing code if at least
- # one directive is allowed in order to be backwards compatible
- c = token.value.upper()
- if c not in self.allowed_directives:
- raise dns.exception.SyntaxError(
- f"zone file directive '{c}' is not allowed"
- )
- if c == "$TTL":
- token = self.tok.get()
- if not token.is_identifier():
- raise dns.exception.SyntaxError("bad $TTL")
- self.default_ttl = dns.ttl.from_text(token.value)
- self.default_ttl_known = True
- self.tok.get_eol()
- elif c == "$ORIGIN":
- self.current_origin = self.tok.get_name()
- self.tok.get_eol()
- if self.zone_origin is None:
- self.zone_origin = self.current_origin
- self.txn._set_origin(self.current_origin)
- elif c == "$INCLUDE":
- token = self.tok.get()
- filename = token.value
- token = self.tok.get()
- new_origin: Optional[dns.name.Name]
- if token.is_identifier():
- new_origin = dns.name.from_text(
- token.value, self.current_origin, self.tok.idna_codec
- )
- self.tok.get_eol()
- elif not token.is_eol_or_eof():
- raise dns.exception.SyntaxError("bad origin in $INCLUDE")
- else:
- new_origin = self.current_origin
- self.saved_state.append(
- (
- self.tok,
- self.current_origin,
- self.last_name,
- self.current_file,
- self.last_ttl,
- self.last_ttl_known,
- self.default_ttl,
- self.default_ttl_known,
- )
- )
- self.current_file = open(filename)
- self.tok = dns.tokenizer.Tokenizer(self.current_file, filename)
- self.current_origin = new_origin
- elif c == "$GENERATE":
- self._generate_line()
- else:
- raise dns.exception.SyntaxError(
- f"Unknown zone file directive '{c}'"
- )
- continue
- self.tok.unget(token)
- self._rr_line()
- except dns.exception.SyntaxError as detail:
- (filename, line_number) = self.tok.where()
- if detail is None:
- detail = "syntax error"
- ex = dns.exception.SyntaxError(
- "%s:%d: %s" % (filename, line_number, detail)
- )
- tb = sys.exc_info()[2]
- raise ex.with_traceback(tb) from None
- class RRsetsReaderTransaction(dns.transaction.Transaction):
- def __init__(self, manager, replacement, read_only):
- assert not read_only
- super().__init__(manager, replacement, read_only)
- self.rdatasets = {}
- def _get_rdataset(self, name, rdtype, covers):
- return self.rdatasets.get((name, rdtype, covers))
- def _get_node(self, name):
- rdatasets = []
- for (rdataset_name, _, _), rdataset in self.rdatasets.items():
- if name == rdataset_name:
- rdatasets.append(rdataset)
- if len(rdatasets) == 0:
- return None
- node = dns.node.Node()
- node.rdatasets = rdatasets
- return node
- def _put_rdataset(self, name, rdataset):
- self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset
- def _delete_name(self, name):
- # First remove any changes involving the name
- remove = []
- for key in self.rdatasets:
- if key[0] == name:
- remove.append(key)
- if len(remove) > 0:
- for key in remove:
- del self.rdatasets[key]
- def _delete_rdataset(self, name, rdtype, covers):
- try:
- del self.rdatasets[(name, rdtype, covers)]
- except KeyError:
- pass
- def _name_exists(self, name):
- for n, _, _ in self.rdatasets:
- if n == name:
- return True
- return False
- def _changed(self):
- return len(self.rdatasets) > 0
- def _end_transaction(self, commit):
- if commit and self._changed():
- rrsets = []
- for (name, _, _), rdataset in self.rdatasets.items():
- rrset = dns.rrset.RRset(
- name, rdataset.rdclass, rdataset.rdtype, rdataset.covers
- )
- rrset.update(rdataset)
- rrsets.append(rrset)
- self.manager.set_rrsets(rrsets)
- def _set_origin(self, origin):
- pass
- def _iterate_rdatasets(self):
- raise NotImplementedError # pragma: no cover
- def _iterate_names(self):
- raise NotImplementedError # pragma: no cover
- class RRSetsReaderManager(dns.transaction.TransactionManager):
- def __init__(
- self, origin=dns.name.root, relativize=False, rdclass=dns.rdataclass.IN
- ):
- self.origin = origin
- self.relativize = relativize
- self.rdclass = rdclass
- self.rrsets = []
- def reader(self): # pragma: no cover
- raise NotImplementedError
- def writer(self, replacement=False):
- assert replacement is True
- return RRsetsReaderTransaction(self, True, False)
- def get_class(self):
- return self.rdclass
- def origin_information(self):
- if self.relativize:
- effective = dns.name.empty
- else:
- effective = self.origin
- return (self.origin, self.relativize, effective)
- def set_rrsets(self, rrsets):
- self.rrsets = rrsets
- def read_rrsets(
- text: Any,
- name: Optional[Union[dns.name.Name, str]] = None,
- ttl: Optional[int] = None,
- rdclass: Optional[Union[dns.rdataclass.RdataClass, str]] = dns.rdataclass.IN,
- default_rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
- default_ttl: Optional[Union[int, str]] = None,
- idna_codec: Optional[dns.name.IDNACodec] = None,
- origin: Optional[Union[dns.name.Name, str]] = dns.name.root,
- relativize: bool = False,
- ) -> List[dns.rrset.RRset]:
- """Read one or more rrsets from the specified text, possibly subject
- to restrictions.
- *text*, a file object or a string, is the input to process.
- *name*, a string, ``dns.name.Name``, or ``None``, is the owner name of
- the rrset. If not ``None``, then the owner name is "forced", and the
- input must not specify an owner name. If ``None``, then any owner names
- are allowed and must be present in the input.
- *ttl*, an ``int``, string, or None. If not ``None``, the the TTL is
- forced to be the specified value and the input must not specify a TTL.
- If ``None``, then a TTL may be specified in the input. If it is not
- specified, then the *default_ttl* will be used.
- *rdclass*, a ``dns.rdataclass.RdataClass``, string, or ``None``. If
- not ``None``, then the class is forced to the specified value, and the
- input must not specify a class. If ``None``, then the input may specify
- a class that matches *default_rdclass*. Note that it is not possible to
- return rrsets with differing classes; specifying ``None`` for the class
- simply allows the user to optionally type a class as that may be convenient
- when cutting and pasting.
- *default_rdclass*, a ``dns.rdataclass.RdataClass`` or string. The class
- of the returned rrsets.
- *rdtype*, a ``dns.rdatatype.RdataType``, string, or ``None``. If not
- ``None``, then the type is forced to the specified value, and the
- input must not specify a type. If ``None``, then a type must be present
- for each RR.
- *default_ttl*, an ``int``, string, or ``None``. If not ``None``, then if
- the TTL is not forced and is not specified, then this value will be used.
- if ``None``, then if the TTL is not forced an error will occur if the TTL
- is not specified.
- *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
- encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
- is used. Note that codecs only apply to the owner name; dnspython does
- not do IDNA for names in rdata, as there is no IDNA zonefile format.
- *origin*, a string, ``dns.name.Name``, or ``None``, is the origin for any
- relative names in the input, and also the origin to relativize to if
- *relativize* is ``True``.
- *relativize*, a bool. If ``True``, names are relativized to the *origin*;
- if ``False`` then any relative names in the input are made absolute by
- appending the *origin*.
- """
- if isinstance(origin, str):
- origin = dns.name.from_text(origin, dns.name.root, idna_codec)
- if isinstance(name, str):
- name = dns.name.from_text(name, origin, idna_codec)
- if isinstance(ttl, str):
- ttl = dns.ttl.from_text(ttl)
- if isinstance(default_ttl, str):
- default_ttl = dns.ttl.from_text(default_ttl)
- if rdclass is not None:
- rdclass = dns.rdataclass.RdataClass.make(rdclass)
- else:
- rdclass = None
- default_rdclass = dns.rdataclass.RdataClass.make(default_rdclass)
- if rdtype is not None:
- rdtype = dns.rdatatype.RdataType.make(rdtype)
- else:
- rdtype = None
- manager = RRSetsReaderManager(origin, relativize, default_rdclass)
- with manager.writer(True) as txn:
- tok = dns.tokenizer.Tokenizer(text, "<input>", idna_codec=idna_codec)
- reader = Reader(
- tok,
- default_rdclass,
- txn,
- allow_directives=False,
- force_name=name,
- force_ttl=ttl,
- force_rdclass=rdclass,
- force_rdtype=rdtype,
- default_ttl=default_ttl,
- )
- reader.read()
- return manager.rrsets
|