123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2001-2017 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """Help for building DNS wire format messages"""
- import contextlib
- import io
- import random
- import struct
- import time
- import dns.exception
- import dns.tsig
- QUESTION = 0
- ANSWER = 1
- AUTHORITY = 2
- ADDITIONAL = 3
- @contextlib.contextmanager
- def prefixed_length(output, length_length):
- output.write(b"\00" * length_length)
- start = output.tell()
- yield
- end = output.tell()
- length = end - start
- if length > 0:
- try:
- output.seek(start - length_length)
- try:
- output.write(length.to_bytes(length_length, "big"))
- except OverflowError:
- raise dns.exception.FormError
- finally:
- output.seek(end)
- class Renderer:
- """Helper class for building DNS wire-format messages.
- Most applications can use the higher-level L{dns.message.Message}
- class and its to_wire() method to generate wire-format messages.
- This class is for those applications which need finer control
- over the generation of messages.
- Typical use::
- r = dns.renderer.Renderer(id=1, flags=0x80, max_size=512)
- r.add_question(qname, qtype, qclass)
- r.add_rrset(dns.renderer.ANSWER, rrset_1)
- r.add_rrset(dns.renderer.ANSWER, rrset_2)
- r.add_rrset(dns.renderer.AUTHORITY, ns_rrset)
- r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_1)
- r.add_rrset(dns.renderer.ADDITIONAL, ad_rrset_2)
- r.add_edns(0, 0, 4096)
- r.write_header()
- r.add_tsig(keyname, secret, 300, 1, 0, '', request_mac)
- wire = r.get_wire()
- If padding is going to be used, then the OPT record MUST be
- written after everything else in the additional section except for
- the TSIG (if any).
- output, an io.BytesIO, where rendering is written
- id: the message id
- flags: the message flags
- max_size: the maximum size of the message
- origin: the origin to use when rendering relative names
- compress: the compression table
- section: an int, the section currently being rendered
- counts: list of the number of RRs in each section
- mac: the MAC of the rendered message (if TSIG was used)
- """
- def __init__(self, id=None, flags=0, max_size=65535, origin=None):
- """Initialize a new renderer."""
- self.output = io.BytesIO()
- if id is None:
- self.id = random.randint(0, 65535)
- else:
- self.id = id
- self.flags = flags
- self.max_size = max_size
- self.origin = origin
- self.compress = {}
- self.section = QUESTION
- self.counts = [0, 0, 0, 0]
- self.output.write(b"\x00" * 12)
- self.mac = ""
- self.reserved = 0
- self.was_padded = False
- def _rollback(self, where):
- """Truncate the output buffer at offset *where*, and remove any
- compression table entries that pointed beyond the truncation
- point.
- """
- self.output.seek(where)
- self.output.truncate()
- keys_to_delete = []
- for k, v in self.compress.items():
- if v >= where:
- keys_to_delete.append(k)
- for k in keys_to_delete:
- del self.compress[k]
- def _set_section(self, section):
- """Set the renderer's current section.
- Sections must be rendered order: QUESTION, ANSWER, AUTHORITY,
- ADDITIONAL. Sections may be empty.
- Raises dns.exception.FormError if an attempt was made to set
- a section value less than the current section.
- """
- if self.section != section:
- if self.section > section:
- raise dns.exception.FormError
- self.section = section
- @contextlib.contextmanager
- def _track_size(self):
- start = self.output.tell()
- yield start
- if self.output.tell() > self.max_size:
- self._rollback(start)
- raise dns.exception.TooBig
- @contextlib.contextmanager
- def _temporarily_seek_to(self, where):
- current = self.output.tell()
- try:
- self.output.seek(where)
- yield
- finally:
- self.output.seek(current)
- def add_question(self, qname, rdtype, rdclass=dns.rdataclass.IN):
- """Add a question to the message."""
- self._set_section(QUESTION)
- with self._track_size():
- qname.to_wire(self.output, self.compress, self.origin)
- self.output.write(struct.pack("!HH", rdtype, rdclass))
- self.counts[QUESTION] += 1
- def add_rrset(self, section, rrset, **kw):
- """Add the rrset to the specified section.
- Any keyword arguments are passed on to the rdataset's to_wire()
- routine.
- """
- self._set_section(section)
- with self._track_size():
- n = rrset.to_wire(self.output, self.compress, self.origin, **kw)
- self.counts[section] += n
- def add_rdataset(self, section, name, rdataset, **kw):
- """Add the rdataset to the specified section, using the specified
- name as the owner name.
- Any keyword arguments are passed on to the rdataset's to_wire()
- routine.
- """
- self._set_section(section)
- with self._track_size():
- n = rdataset.to_wire(name, self.output, self.compress, self.origin, **kw)
- self.counts[section] += n
- def add_opt(self, opt, pad=0, opt_size=0, tsig_size=0):
- """Add *opt* to the additional section, applying padding if desired. The
- padding will take the specified precomputed OPT size and TSIG size into
- account.
- Note that we don't have reliable way of knowing how big a GSS-TSIG digest
- might be, so we we might not get an even multiple of the pad in that case."""
- if pad:
- ttl = opt.ttl
- assert opt_size >= 11
- opt_rdata = opt[0]
- size_without_padding = self.output.tell() + opt_size + tsig_size
- remainder = size_without_padding % pad
- if remainder:
- pad = b"\x00" * (pad - remainder)
- else:
- pad = b""
- options = list(opt_rdata.options)
- options.append(dns.edns.GenericOption(dns.edns.OptionType.PADDING, pad))
- opt = dns.message.Message._make_opt(ttl, opt_rdata.rdclass, options)
- self.was_padded = True
- self.add_rrset(ADDITIONAL, opt)
- def add_edns(self, edns, ednsflags, payload, options=None):
- """Add an EDNS OPT record to the message."""
- # make sure the EDNS version in ednsflags agrees with edns
- ednsflags &= 0xFF00FFFF
- ednsflags |= edns << 16
- opt = dns.message.Message._make_opt(ednsflags, payload, options)
- self.add_opt(opt)
- def add_tsig(
- self,
- keyname,
- secret,
- fudge,
- id,
- tsig_error,
- other_data,
- request_mac,
- algorithm=dns.tsig.default_algorithm,
- ):
- """Add a TSIG signature to the message."""
- s = self.output.getvalue()
- if isinstance(secret, dns.tsig.Key):
- key = secret
- else:
- key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig(
- keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
- )
- (tsig, _) = dns.tsig.sign(s, key, tsig[0], int(time.time()), request_mac)
- self._write_tsig(tsig, keyname)
- def add_multi_tsig(
- self,
- ctx,
- keyname,
- secret,
- fudge,
- id,
- tsig_error,
- other_data,
- request_mac,
- algorithm=dns.tsig.default_algorithm,
- ):
- """Add a TSIG signature to the message. Unlike add_tsig(), this can be
- used for a series of consecutive DNS envelopes, e.g. for a zone
- transfer over TCP [RFC2845, 4.4].
- For the first message in the sequence, give ctx=None. For each
- subsequent message, give the ctx that was returned from the
- add_multi_tsig() call for the previous message."""
- s = self.output.getvalue()
- if isinstance(secret, dns.tsig.Key):
- key = secret
- else:
- key = dns.tsig.Key(keyname, secret, algorithm)
- tsig = dns.message.Message._make_tsig(
- keyname, algorithm, 0, fudge, b"", id, tsig_error, other_data
- )
- (tsig, ctx) = dns.tsig.sign(
- s, key, tsig[0], int(time.time()), request_mac, ctx, True
- )
- self._write_tsig(tsig, keyname)
- return ctx
- def _write_tsig(self, tsig, keyname):
- if self.was_padded:
- compress = None
- else:
- compress = self.compress
- self._set_section(ADDITIONAL)
- with self._track_size():
- keyname.to_wire(self.output, compress, self.origin)
- self.output.write(
- struct.pack("!HHI", dns.rdatatype.TSIG, dns.rdataclass.ANY, 0)
- )
- with prefixed_length(self.output, 2):
- tsig.to_wire(self.output)
- self.counts[ADDITIONAL] += 1
- with self._temporarily_seek_to(10):
- self.output.write(struct.pack("!H", self.counts[ADDITIONAL]))
- def write_header(self):
- """Write the DNS message header.
- Writing the DNS message header is done after all sections
- have been rendered, but before the optional TSIG signature
- is added.
- """
- with self._temporarily_seek_to(0):
- self.output.write(
- struct.pack(
- "!HHHHHH",
- self.id,
- self.flags,
- self.counts[0],
- self.counts[1],
- self.counts[2],
- self.counts[3],
- )
- )
- def get_wire(self):
- """Return the wire format message."""
- return self.output.getvalue()
- def reserve(self, size: int) -> None:
- """Reserve *size* bytes."""
- if size < 0:
- raise ValueError("reserved amount must be non-negative")
- if size > self.max_size:
- raise ValueError("cannot reserve more than the maximum size")
- self.reserved += size
- self.max_size -= size
- def release_reserved(self) -> None:
- """Release the reserved bytes."""
- self.max_size += self.reserved
- self.reserved = 0
|