|
- # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
- # Copyright (C) 2003-2017 Nominum, Inc.
- #
- # Permission to use, copy, modify, and distribute this software and its
- # documentation for any purpose with or without fee is hereby granted,
- # provided that the above copyright notice and this permission notice
- # appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
- # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
- # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- """Asynchronous DNS stub resolver."""
- import socket
- import time
- from typing import Any, Dict, List, Optional, Union
- import dns._ddr
- import dns.asyncbackend
- import dns.asyncquery
- import dns.exception
- import dns.name
- import dns.query
- import dns.rdataclass
- import dns.rdatatype
- import dns.resolver # lgtm[py/import-and-import-from]
- # import some resolver symbols for brevity
- from dns.resolver import NXDOMAIN, NoAnswer, NoRootSOA, NotAbsolute
- # for indentation purposes below
- _udp = dns.asyncquery.udp
- _tcp = dns.asyncquery.tcp
- class Resolver(dns.resolver.BaseResolver):
- """Asynchronous DNS stub resolver."""
- async def resolve(
- self,
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- tcp: bool = False,
- source: Optional[str] = None,
- raise_on_no_answer: bool = True,
- source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
- ) -> dns.resolver.Answer:
- """Query nameservers asynchronously to find the answer to the question.
- *backend*, a ``dns.asyncbackend.Backend``, or ``None``. If ``None``,
- the default, then dnspython will use the default backend.
- See :py:func:`dns.resolver.Resolver.resolve()` for the
- documentation of the other parameters, exceptions, and return
- type of this method.
- """
- resolution = dns.resolver._Resolution(
- self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search
- )
- if not backend:
- backend = dns.asyncbackend.get_default_backend()
- start = time.time()
- while True:
- (request, answer) = resolution.next_request()
- # Note we need to say "if answer is not None" and not just
- # "if answer" because answer implements __len__, and python
- # will call that. We want to return if we have an answer
- # object, including in cases where its length is 0.
- if answer is not None:
- # cache hit!
- return answer
- assert request is not None # needed for type checking
- done = False
- while not done:
- (nameserver, tcp, backoff) = resolution.next_nameserver()
- if backoff:
- await backend.sleep(backoff)
- timeout = self._compute_timeout(start, lifetime, resolution.errors)
- try:
- response = await nameserver.async_query(
- request,
- timeout=timeout,
- source=source,
- source_port=source_port,
- max_size=tcp,
- backend=backend,
- )
- except Exception as ex:
- (_, done) = resolution.query_result(None, ex)
- continue
- (answer, done) = resolution.query_result(response, None)
- # Note we need to say "if answer is not None" and not just
- # "if answer" because answer implements __len__, and python
- # will call that. We want to return if we have an answer
- # object, including in cases where its length is 0.
- if answer is not None:
- return answer
- async def resolve_address(
- self, ipaddr: str, *args: Any, **kwargs: Any
- ) -> dns.resolver.Answer:
- """Use an asynchronous resolver to run a reverse query for PTR
- records.
- This utilizes the resolve() method to perform a PTR lookup on the
- specified IP address.
- *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get
- the PTR record for.
- All other arguments that can be passed to the resolve() function
- except for rdtype and rdclass are also supported by this
- function.
- """
- # We make a modified kwargs for type checking happiness, as otherwise
- # we get a legit warning about possibly having rdtype and rdclass
- # in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
- modified_kwargs.update(kwargs)
- modified_kwargs["rdtype"] = dns.rdatatype.PTR
- modified_kwargs["rdclass"] = dns.rdataclass.IN
- return await self.resolve(
- dns.reversename.from_address(ipaddr), *args, **modified_kwargs
- )
- async def resolve_name(
- self,
- name: Union[dns.name.Name, str],
- family: int = socket.AF_UNSPEC,
- **kwargs: Any,
- ) -> dns.resolver.HostAnswers:
- """Use an asynchronous resolver to query for address records.
- This utilizes the resolve() method to perform A and/or AAAA lookups on
- the specified name.
- *qname*, a ``dns.name.Name`` or ``str``, the name to resolve.
- *family*, an ``int``, the address family. If socket.AF_UNSPEC
- (the default), both A and AAAA records will be retrieved.
- All other arguments that can be passed to the resolve() function
- except for rdtype and rdclass are also supported by this
- function.
- """
- # We make a modified kwargs for type checking happiness, as otherwise
- # we get a legit warning about possibly having rdtype and rdclass
- # in the kwargs more than once.
- modified_kwargs: Dict[str, Any] = {}
- modified_kwargs.update(kwargs)
- modified_kwargs.pop("rdtype", None)
- modified_kwargs["rdclass"] = dns.rdataclass.IN
- if family == socket.AF_INET:
- v4 = await self.resolve(name, dns.rdatatype.A, **modified_kwargs)
- return dns.resolver.HostAnswers.make(v4=v4)
- elif family == socket.AF_INET6:
- v6 = await self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs)
- return dns.resolver.HostAnswers.make(v6=v6)
- elif family != socket.AF_UNSPEC:
- raise NotImplementedError(f"unknown address family {family}")
- raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True)
- lifetime = modified_kwargs.pop("lifetime", None)
- start = time.time()
- v6 = await self.resolve(
- name,
- dns.rdatatype.AAAA,
- raise_on_no_answer=False,
- lifetime=self._compute_timeout(start, lifetime),
- **modified_kwargs,
- )
- # Note that setting name ensures we query the same name
- # for A as we did for AAAA. (This is just in case search lists
- # are active by default in the resolver configuration and
- # we might be talking to a server that says NXDOMAIN when it
- # wants to say NOERROR no data.
- name = v6.qname
- v4 = await self.resolve(
- name,
- dns.rdatatype.A,
- raise_on_no_answer=False,
- lifetime=self._compute_timeout(start, lifetime),
- **modified_kwargs,
- )
- answers = dns.resolver.HostAnswers.make(
- v6=v6, v4=v4, add_empty=not raise_on_no_answer
- )
- if not answers:
- raise NoAnswer(response=v6.response)
- return answers
- # pylint: disable=redefined-outer-name
- async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
- """Determine the canonical name of *name*.
- The canonical name is the name the resolver uses for queries
- after all CNAME and DNAME renamings have been applied.
- *name*, a ``dns.name.Name`` or ``str``, the query name.
- This method can raise any exception that ``resolve()`` can
- raise, other than ``dns.resolver.NoAnswer`` and
- ``dns.resolver.NXDOMAIN``.
- Returns a ``dns.name.Name``.
- """
- try:
- answer = await self.resolve(name, raise_on_no_answer=False)
- canonical_name = answer.canonical_name
- except dns.resolver.NXDOMAIN as e:
- canonical_name = e.canonical_name
- return canonical_name
- async def try_ddr(self, lifetime: float = 5.0) -> None:
- """Try to update the resolver's nameservers using Discovery of Designated
- Resolvers (DDR). If successful, the resolver will subsequently use
- DNS-over-HTTPS or DNS-over-TLS for future queries.
- *lifetime*, a float, is the maximum time to spend attempting DDR. The default
- is 5 seconds.
- If the SVCB query is successful and results in a non-empty list of nameservers,
- then the resolver's nameservers are set to the returned servers in priority
- order.
- The current implementation does not use any address hints from the SVCB record,
- nor does it resolve addresses for the SCVB target name, rather it assumes that
- the bootstrap nameserver will always be one of the addresses and uses it.
- A future revision to the code may offer fuller support. The code verifies that
- the bootstrap nameserver is in the Subject Alternative Name field of the
- TLS certficate.
- """
- try:
- expiration = time.time() + lifetime
- answer = await self.resolve(
- dns._ddr._local_resolver_name, "svcb", lifetime=lifetime
- )
- timeout = dns.query._remaining(expiration)
- nameservers = await dns._ddr._get_nameservers_async(answer, timeout)
- if len(nameservers) > 0:
- self.nameservers = nameservers
- except Exception:
- pass
- default_resolver = None
- def get_default_resolver() -> Resolver:
- """Get the default asynchronous resolver, initializing it if necessary."""
- if default_resolver is None:
- reset_default_resolver()
- assert default_resolver is not None
- return default_resolver
- def reset_default_resolver() -> None:
- """Re-initialize default asynchronous resolver.
- Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX
- systems) will be re-read immediately.
- """
- global default_resolver
- default_resolver = Resolver()
- async def resolve(
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- tcp: bool = False,
- source: Optional[str] = None,
- raise_on_no_answer: bool = True,
- source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
- ) -> dns.resolver.Answer:
- """Query nameservers asynchronously to find the answer to the question.
- This is a convenience function that uses the default resolver
- object to make the query.
- See :py:func:`dns.asyncresolver.Resolver.resolve` for more
- information on the parameters.
- """
- return await get_default_resolver().resolve(
- qname,
- rdtype,
- rdclass,
- tcp,
- source,
- raise_on_no_answer,
- source_port,
- lifetime,
- search,
- backend,
- )
- async def resolve_address(
- ipaddr: str, *args: Any, **kwargs: Any
- ) -> dns.resolver.Answer:
- """Use a resolver to run a reverse query for PTR records.
- See :py:func:`dns.asyncresolver.Resolver.resolve_address` for more
- information on the parameters.
- """
- return await get_default_resolver().resolve_address(ipaddr, *args, **kwargs)
- async def resolve_name(
- name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any
- ) -> dns.resolver.HostAnswers:
- """Use a resolver to asynchronously query for address records.
- See :py:func:`dns.asyncresolver.Resolver.resolve_name` for more
- information on the parameters.
- """
- return await get_default_resolver().resolve_name(name, family, **kwargs)
- async def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name:
- """Determine the canonical name of *name*.
- See :py:func:`dns.resolver.Resolver.canonical_name` for more
- information on the parameters and possible exceptions.
- """
- return await get_default_resolver().canonical_name(name)
- async def try_ddr(timeout: float = 5.0) -> None:
- """Try to update the default resolver's nameservers using Discovery of Designated
- Resolvers (DDR). If successful, the resolver will subsequently use
- DNS-over-HTTPS or DNS-over-TLS for future queries.
- See :py:func:`dns.resolver.Resolver.try_ddr` for more information.
- """
- return await get_default_resolver().try_ddr(timeout)
- async def zone_for_name(
- name: Union[dns.name.Name, str],
- rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
- tcp: bool = False,
- resolver: Optional[Resolver] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
- ) -> dns.name.Name:
- """Find the name of the zone which contains the specified name.
- See :py:func:`dns.resolver.Resolver.zone_for_name` for more
- information on the parameters and possible exceptions.
- """
- if isinstance(name, str):
- name = dns.name.from_text(name, dns.name.root)
- if resolver is None:
- resolver = get_default_resolver()
- if not name.is_absolute():
- raise NotAbsolute(name)
- while True:
- try:
- answer = await resolver.resolve(
- name, dns.rdatatype.SOA, rdclass, tcp, backend=backend
- )
- assert answer.rrset is not None
- if answer.rrset.name == name:
- return name
- # otherwise we were CNAMEd or DNAMEd and need to look higher
- except (NXDOMAIN, NoAnswer):
- pass
- try:
- name = name.parent()
- except dns.name.NoParent: # pragma: no cover
- raise NoRootSOA
- async def make_resolver_at(
- where: Union[dns.name.Name, str],
- port: int = 53,
- family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
- ) -> Resolver:
- """Make a stub resolver using the specified destination as the full resolver.
- *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the
- full resolver.
- *port*, an ``int``, the port to use. If not specified, the default is 53.
- *family*, an ``int``, the address family to use. This parameter is used if
- *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case
- the first address returned by ``resolve_name()`` will be used, otherwise the
- first address of the specified family will be used.
- *resolver*, a ``dns.asyncresolver.Resolver`` or ``None``, the resolver to use for
- resolution of hostnames. If not specified, the default resolver will be used.
- Returns a ``dns.resolver.Resolver`` or raises an exception.
- """
- if resolver is None:
- resolver = get_default_resolver()
- nameservers: List[Union[str, dns.nameserver.Nameserver]] = []
- if isinstance(where, str) and dns.inet.is_address(where):
- nameservers.append(dns.nameserver.Do53Nameserver(where, port))
- else:
- answers = await resolver.resolve_name(where, family)
- for address in answers.addresses():
- nameservers.append(dns.nameserver.Do53Nameserver(address, port))
- res = dns.asyncresolver.Resolver(configure=False)
- res.nameservers = nameservers
- return res
- async def resolve_at(
- where: Union[dns.name.Name, str],
- qname: Union[dns.name.Name, str],
- rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
- rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
- tcp: bool = False,
- source: Optional[str] = None,
- raise_on_no_answer: bool = True,
- source_port: int = 0,
- lifetime: Optional[float] = None,
- search: Optional[bool] = None,
- backend: Optional[dns.asyncbackend.Backend] = None,
- port: int = 53,
- family: int = socket.AF_UNSPEC,
- resolver: Optional[Resolver] = None,
- ) -> dns.resolver.Answer:
- """Query nameservers to find the answer to the question.
- This is a convenience function that calls ``dns.asyncresolver.make_resolver_at()``
- to make a resolver, and then uses it to resolve the query.
- See ``dns.asyncresolver.Resolver.resolve`` for more information on the resolution
- parameters, and ``dns.asyncresolver.make_resolver_at`` for information about the
- resolver parameters *where*, *port*, *family*, and *resolver*.
- If making more than one query, it is more efficient to call
- ``dns.asyncresolver.make_resolver_at()`` and then use that resolver for the queries
- instead of calling ``resolve_at()`` multiple times.
- """
- res = await make_resolver_at(where, port, family, resolver)
- return await res.resolve(
- qname,
- rdtype,
- rdclass,
- tcp,
- source,
- raise_on_no_answer,
- source_port,
- lifetime,
- search,
- backend,
- )
|