zone.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434
  1. # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
  2. # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
  3. #
  4. # Permission to use, copy, modify, and distribute this software and its
  5. # documentation for any purpose with or without fee is hereby granted,
  6. # provided that the above copyright notice and this permission notice
  7. # appear in all copies.
  8. #
  9. # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
  10. # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
  11. # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
  12. # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  13. # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  14. # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
  15. # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. """DNS Zones."""
  17. import contextlib
  18. import io
  19. import os
  20. import struct
  21. from typing import (
  22. Any,
  23. Callable,
  24. Iterable,
  25. Iterator,
  26. List,
  27. MutableMapping,
  28. Optional,
  29. Set,
  30. Tuple,
  31. Union,
  32. )
  33. import dns.exception
  34. import dns.grange
  35. import dns.immutable
  36. import dns.name
  37. import dns.node
  38. import dns.rdata
  39. import dns.rdataclass
  40. import dns.rdataset
  41. import dns.rdatatype
  42. import dns.rdtypes.ANY.SOA
  43. import dns.rdtypes.ANY.ZONEMD
  44. import dns.rrset
  45. import dns.tokenizer
  46. import dns.transaction
  47. import dns.ttl
  48. import dns.zonefile
  49. from dns.zonetypes import DigestHashAlgorithm, DigestScheme, _digest_hashers
  50. class BadZone(dns.exception.DNSException):
  51. """The DNS zone is malformed."""
  52. class NoSOA(BadZone):
  53. """The DNS zone has no SOA RR at its origin."""
  54. class NoNS(BadZone):
  55. """The DNS zone has no NS RRset at its origin."""
  56. class UnknownOrigin(BadZone):
  57. """The DNS zone's origin is unknown."""
  58. class UnsupportedDigestScheme(dns.exception.DNSException):
  59. """The zone digest's scheme is unsupported."""
  60. class UnsupportedDigestHashAlgorithm(dns.exception.DNSException):
  61. """The zone digest's origin is unsupported."""
  62. class NoDigest(dns.exception.DNSException):
  63. """The DNS zone has no ZONEMD RRset at its origin."""
  64. class DigestVerificationFailure(dns.exception.DNSException):
  65. """The ZONEMD digest failed to verify."""
  66. def _validate_name(
  67. name: dns.name.Name,
  68. origin: Optional[dns.name.Name],
  69. relativize: bool,
  70. ) -> dns.name.Name:
  71. # This name validation code is shared by Zone and Version
  72. if origin is None:
  73. # This should probably never happen as other code (e.g.
  74. # _rr_line) will notice the lack of an origin before us, but
  75. # we check just in case!
  76. raise KeyError("no zone origin is defined")
  77. if name.is_absolute():
  78. if not name.is_subdomain(origin):
  79. raise KeyError("name parameter must be a subdomain of the zone origin")
  80. if relativize:
  81. name = name.relativize(origin)
  82. else:
  83. # We have a relative name. Make sure that the derelativized name is
  84. # not too long.
  85. try:
  86. abs_name = name.derelativize(origin)
  87. except dns.name.NameTooLong:
  88. # We map dns.name.NameTooLong to KeyError to be consistent with
  89. # the other exceptions above.
  90. raise KeyError("relative name too long for zone")
  91. if not relativize:
  92. # We have a relative name in a non-relative zone, so use the
  93. # derelativized name.
  94. name = abs_name
  95. return name
  96. class Zone(dns.transaction.TransactionManager):
  97. """A DNS zone.
  98. A ``Zone`` is a mapping from names to nodes. The zone object may be
  99. treated like a Python dictionary, e.g. ``zone[name]`` will retrieve
  100. the node associated with that name. The *name* may be a
  101. ``dns.name.Name object``, or it may be a string. In either case,
  102. if the name is relative it is treated as relative to the origin of
  103. the zone.
  104. """
  105. node_factory: Callable[[], dns.node.Node] = dns.node.Node
  106. map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict
  107. writable_version_factory: Optional[Callable[[], "WritableVersion"]] = None
  108. immutable_version_factory: Optional[Callable[[], "ImmutableVersion"]] = None
  109. __slots__ = ["rdclass", "origin", "nodes", "relativize"]
  110. def __init__(
  111. self,
  112. origin: Optional[Union[dns.name.Name, str]],
  113. rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
  114. relativize: bool = True,
  115. ):
  116. """Initialize a zone object.
  117. *origin* is the origin of the zone. It may be a ``dns.name.Name``,
  118. a ``str``, or ``None``. If ``None``, then the zone's origin will
  119. be set by the first ``$ORIGIN`` line in a zone file.
  120. *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
  121. *relativize*, a ``bool``, determine's whether domain names are
  122. relativized to the zone's origin. The default is ``True``.
  123. """
  124. if origin is not None:
  125. if isinstance(origin, str):
  126. origin = dns.name.from_text(origin)
  127. elif not isinstance(origin, dns.name.Name):
  128. raise ValueError("origin parameter must be convertible to a DNS name")
  129. if not origin.is_absolute():
  130. raise ValueError("origin parameter must be an absolute name")
  131. self.origin = origin
  132. self.rdclass = rdclass
  133. self.nodes: MutableMapping[dns.name.Name, dns.node.Node] = self.map_factory()
  134. self.relativize = relativize
  135. def __eq__(self, other):
  136. """Two zones are equal if they have the same origin, class, and
  137. nodes.
  138. Returns a ``bool``.
  139. """
  140. if not isinstance(other, Zone):
  141. return False
  142. if (
  143. self.rdclass != other.rdclass
  144. or self.origin != other.origin
  145. or self.nodes != other.nodes
  146. ):
  147. return False
  148. return True
  149. def __ne__(self, other):
  150. """Are two zones not equal?
  151. Returns a ``bool``.
  152. """
  153. return not self.__eq__(other)
  154. def _validate_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
  155. # Note that any changes in this method should have corresponding changes
  156. # made in the Version _validate_name() method.
  157. if isinstance(name, str):
  158. name = dns.name.from_text(name, None)
  159. elif not isinstance(name, dns.name.Name):
  160. raise KeyError("name parameter must be convertible to a DNS name")
  161. return _validate_name(name, self.origin, self.relativize)
  162. def __getitem__(self, key):
  163. key = self._validate_name(key)
  164. return self.nodes[key]
  165. def __setitem__(self, key, value):
  166. key = self._validate_name(key)
  167. self.nodes[key] = value
  168. def __delitem__(self, key):
  169. key = self._validate_name(key)
  170. del self.nodes[key]
  171. def __iter__(self):
  172. return self.nodes.__iter__()
  173. def keys(self):
  174. return self.nodes.keys()
  175. def values(self):
  176. return self.nodes.values()
  177. def items(self):
  178. return self.nodes.items()
  179. def get(self, key):
  180. key = self._validate_name(key)
  181. return self.nodes.get(key)
  182. def __contains__(self, key):
  183. key = self._validate_name(key)
  184. return key in self.nodes
  185. def find_node(
  186. self, name: Union[dns.name.Name, str], create: bool = False
  187. ) -> dns.node.Node:
  188. """Find a node in the zone, possibly creating it.
  189. *name*: the name of the node to find.
  190. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  191. name must be a subdomain of the zone's origin. If ``zone.relativize``
  192. is ``True``, then the name will be relativized.
  193. *create*, a ``bool``. If true, the node will be created if it does
  194. not exist.
  195. Raises ``KeyError`` if the name is not known and create was
  196. not specified, or if the name was not a subdomain of the origin.
  197. Returns a ``dns.node.Node``.
  198. """
  199. name = self._validate_name(name)
  200. node = self.nodes.get(name)
  201. if node is None:
  202. if not create:
  203. raise KeyError
  204. node = self.node_factory()
  205. self.nodes[name] = node
  206. return node
  207. def get_node(
  208. self, name: Union[dns.name.Name, str], create: bool = False
  209. ) -> Optional[dns.node.Node]:
  210. """Get a node in the zone, possibly creating it.
  211. This method is like ``find_node()``, except it returns None instead
  212. of raising an exception if the node does not exist and creation
  213. has not been requested.
  214. *name*: the name of the node to find.
  215. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  216. name must be a subdomain of the zone's origin. If ``zone.relativize``
  217. is ``True``, then the name will be relativized.
  218. *create*, a ``bool``. If true, the node will be created if it does
  219. not exist.
  220. Returns a ``dns.node.Node`` or ``None``.
  221. """
  222. try:
  223. node = self.find_node(name, create)
  224. except KeyError:
  225. node = None
  226. return node
  227. def delete_node(self, name: Union[dns.name.Name, str]) -> None:
  228. """Delete the specified node if it exists.
  229. *name*: the name of the node to find.
  230. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  231. name must be a subdomain of the zone's origin. If ``zone.relativize``
  232. is ``True``, then the name will be relativized.
  233. It is not an error if the node does not exist.
  234. """
  235. name = self._validate_name(name)
  236. if name in self.nodes:
  237. del self.nodes[name]
  238. def find_rdataset(
  239. self,
  240. name: Union[dns.name.Name, str],
  241. rdtype: Union[dns.rdatatype.RdataType, str],
  242. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  243. create: bool = False,
  244. ) -> dns.rdataset.Rdataset:
  245. """Look for an rdataset with the specified name and type in the zone,
  246. and return an rdataset encapsulating it.
  247. The rdataset returned is not a copy; changes to it will change
  248. the zone.
  249. KeyError is raised if the name or type are not found.
  250. *name*: the name of the node to find.
  251. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  252. name must be a subdomain of the zone's origin. If ``zone.relativize``
  253. is ``True``, then the name will be relativized.
  254. *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
  255. *covers*, a ``dns.rdatatype.RdataType`` or ``str`` the covered type.
  256. Usually this value is ``dns.rdatatype.NONE``, but if the
  257. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  258. then the covers value will be the rdata type the SIG/RRSIG
  259. covers. The library treats the SIG and RRSIG types as if they
  260. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  261. This makes RRSIGs much easier to work with than if RRSIGs
  262. covering different rdata types were aggregated into a single
  263. RRSIG rdataset.
  264. *create*, a ``bool``. If true, the node will be created if it does
  265. not exist.
  266. Raises ``KeyError`` if the name is not known and create was
  267. not specified, or if the name was not a subdomain of the origin.
  268. Returns a ``dns.rdataset.Rdataset``.
  269. """
  270. name = self._validate_name(name)
  271. rdtype = dns.rdatatype.RdataType.make(rdtype)
  272. covers = dns.rdatatype.RdataType.make(covers)
  273. node = self.find_node(name, create)
  274. return node.find_rdataset(self.rdclass, rdtype, covers, create)
  275. def get_rdataset(
  276. self,
  277. name: Union[dns.name.Name, str],
  278. rdtype: Union[dns.rdatatype.RdataType, str],
  279. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  280. create: bool = False,
  281. ) -> Optional[dns.rdataset.Rdataset]:
  282. """Look for an rdataset with the specified name and type in the zone.
  283. This method is like ``find_rdataset()``, except it returns None instead
  284. of raising an exception if the rdataset does not exist and creation
  285. has not been requested.
  286. The rdataset returned is not a copy; changes to it will change
  287. the zone.
  288. *name*: the name of the node to find.
  289. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  290. name must be a subdomain of the zone's origin. If ``zone.relativize``
  291. is ``True``, then the name will be relativized.
  292. *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
  293. *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
  294. Usually this value is ``dns.rdatatype.NONE``, but if the
  295. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  296. then the covers value will be the rdata type the SIG/RRSIG
  297. covers. The library treats the SIG and RRSIG types as if they
  298. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  299. This makes RRSIGs much easier to work with than if RRSIGs
  300. covering different rdata types were aggregated into a single
  301. RRSIG rdataset.
  302. *create*, a ``bool``. If true, the node will be created if it does
  303. not exist.
  304. Raises ``KeyError`` if the name is not known and create was
  305. not specified, or if the name was not a subdomain of the origin.
  306. Returns a ``dns.rdataset.Rdataset`` or ``None``.
  307. """
  308. try:
  309. rdataset = self.find_rdataset(name, rdtype, covers, create)
  310. except KeyError:
  311. rdataset = None
  312. return rdataset
  313. def delete_rdataset(
  314. self,
  315. name: Union[dns.name.Name, str],
  316. rdtype: Union[dns.rdatatype.RdataType, str],
  317. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  318. ) -> None:
  319. """Delete the rdataset matching *rdtype* and *covers*, if it
  320. exists at the node specified by *name*.
  321. It is not an error if the node does not exist, or if there is no matching
  322. rdataset at the node.
  323. If the node has no rdatasets after the deletion, it will itself be deleted.
  324. *name*: the name of the node to find. The value may be a ``dns.name.Name`` or a
  325. ``str``. If absolute, the name must be a subdomain of the zone's origin. If
  326. ``zone.relativize`` is ``True``, then the name will be relativized.
  327. *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
  328. *covers*, a ``dns.rdatatype.RdataType`` or ``str`` or ``None``, the covered
  329. type. Usually this value is ``dns.rdatatype.NONE``, but if the rdtype is
  330. ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``, then the covers value will be
  331. the rdata type the SIG/RRSIG covers. The library treats the SIG and RRSIG types
  332. as if they were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA). This
  333. makes RRSIGs much easier to work with than if RRSIGs covering different rdata
  334. types were aggregated into a single RRSIG rdataset.
  335. """
  336. name = self._validate_name(name)
  337. rdtype = dns.rdatatype.RdataType.make(rdtype)
  338. covers = dns.rdatatype.RdataType.make(covers)
  339. node = self.get_node(name)
  340. if node is not None:
  341. node.delete_rdataset(self.rdclass, rdtype, covers)
  342. if len(node) == 0:
  343. self.delete_node(name)
  344. def replace_rdataset(
  345. self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
  346. ) -> None:
  347. """Replace an rdataset at name.
  348. It is not an error if there is no rdataset matching I{replacement}.
  349. Ownership of the *replacement* object is transferred to the zone;
  350. in other words, this method does not store a copy of *replacement*
  351. at the node, it stores *replacement* itself.
  352. If the node does not exist, it is created.
  353. *name*: the name of the node to find.
  354. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  355. name must be a subdomain of the zone's origin. If ``zone.relativize``
  356. is ``True``, then the name will be relativized.
  357. *replacement*, a ``dns.rdataset.Rdataset``, the replacement rdataset.
  358. """
  359. if replacement.rdclass != self.rdclass:
  360. raise ValueError("replacement.rdclass != zone.rdclass")
  361. node = self.find_node(name, True)
  362. node.replace_rdataset(replacement)
  363. def find_rrset(
  364. self,
  365. name: Union[dns.name.Name, str],
  366. rdtype: Union[dns.rdatatype.RdataType, str],
  367. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  368. ) -> dns.rrset.RRset:
  369. """Look for an rdataset with the specified name and type in the zone,
  370. and return an RRset encapsulating it.
  371. This method is less efficient than the similar
  372. ``find_rdataset()`` because it creates an RRset instead of
  373. returning the matching rdataset. It may be more convenient
  374. for some uses since it returns an object which binds the owner
  375. name to the rdataset.
  376. This method may not be used to create new nodes or rdatasets;
  377. use ``find_rdataset`` instead.
  378. *name*: the name of the node to find.
  379. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  380. name must be a subdomain of the zone's origin. If ``zone.relativize``
  381. is ``True``, then the name will be relativized.
  382. *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
  383. *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
  384. Usually this value is ``dns.rdatatype.NONE``, but if the
  385. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  386. then the covers value will be the rdata type the SIG/RRSIG
  387. covers. The library treats the SIG and RRSIG types as if they
  388. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  389. This makes RRSIGs much easier to work with than if RRSIGs
  390. covering different rdata types were aggregated into a single
  391. RRSIG rdataset.
  392. *create*, a ``bool``. If true, the node will be created if it does
  393. not exist.
  394. Raises ``KeyError`` if the name is not known and create was
  395. not specified, or if the name was not a subdomain of the origin.
  396. Returns a ``dns.rrset.RRset`` or ``None``.
  397. """
  398. vname = self._validate_name(name)
  399. rdtype = dns.rdatatype.RdataType.make(rdtype)
  400. covers = dns.rdatatype.RdataType.make(covers)
  401. rdataset = self.nodes[vname].find_rdataset(self.rdclass, rdtype, covers)
  402. rrset = dns.rrset.RRset(vname, self.rdclass, rdtype, covers)
  403. rrset.update(rdataset)
  404. return rrset
  405. def get_rrset(
  406. self,
  407. name: Union[dns.name.Name, str],
  408. rdtype: Union[dns.rdatatype.RdataType, str],
  409. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  410. ) -> Optional[dns.rrset.RRset]:
  411. """Look for an rdataset with the specified name and type in the zone,
  412. and return an RRset encapsulating it.
  413. This method is less efficient than the similar ``get_rdataset()``
  414. because it creates an RRset instead of returning the matching
  415. rdataset. It may be more convenient for some uses since it
  416. returns an object which binds the owner name to the rdataset.
  417. This method may not be used to create new nodes or rdatasets;
  418. use ``get_rdataset()`` instead.
  419. *name*: the name of the node to find.
  420. The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
  421. name must be a subdomain of the zone's origin. If ``zone.relativize``
  422. is ``True``, then the name will be relativized.
  423. *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
  424. *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
  425. Usually this value is ``dns.rdatatype.NONE``, but if the
  426. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  427. then the covers value will be the rdata type the SIG/RRSIG
  428. covers. The library treats the SIG and RRSIG types as if they
  429. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  430. This makes RRSIGs much easier to work with than if RRSIGs
  431. covering different rdata types were aggregated into a single
  432. RRSIG rdataset.
  433. *create*, a ``bool``. If true, the node will be created if it does
  434. not exist.
  435. Returns a ``dns.rrset.RRset`` or ``None``.
  436. """
  437. try:
  438. rrset = self.find_rrset(name, rdtype, covers)
  439. except KeyError:
  440. rrset = None
  441. return rrset
  442. def iterate_rdatasets(
  443. self,
  444. rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
  445. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  446. ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
  447. """Return a generator which yields (name, rdataset) tuples for
  448. all rdatasets in the zone which have the specified *rdtype*
  449. and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
  450. then all rdatasets will be matched.
  451. *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
  452. *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
  453. Usually this value is ``dns.rdatatype.NONE``, but if the
  454. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  455. then the covers value will be the rdata type the SIG/RRSIG
  456. covers. The library treats the SIG and RRSIG types as if they
  457. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  458. This makes RRSIGs much easier to work with than if RRSIGs
  459. covering different rdata types were aggregated into a single
  460. RRSIG rdataset.
  461. """
  462. rdtype = dns.rdatatype.RdataType.make(rdtype)
  463. covers = dns.rdatatype.RdataType.make(covers)
  464. for name, node in self.items():
  465. for rds in node:
  466. if rdtype == dns.rdatatype.ANY or (
  467. rds.rdtype == rdtype and rds.covers == covers
  468. ):
  469. yield (name, rds)
  470. def iterate_rdatas(
  471. self,
  472. rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
  473. covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
  474. ) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
  475. """Return a generator which yields (name, ttl, rdata) tuples for
  476. all rdatas in the zone which have the specified *rdtype*
  477. and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
  478. then all rdatas will be matched.
  479. *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
  480. *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
  481. Usually this value is ``dns.rdatatype.NONE``, but if the
  482. rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
  483. then the covers value will be the rdata type the SIG/RRSIG
  484. covers. The library treats the SIG and RRSIG types as if they
  485. were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
  486. This makes RRSIGs much easier to work with than if RRSIGs
  487. covering different rdata types were aggregated into a single
  488. RRSIG rdataset.
  489. """
  490. rdtype = dns.rdatatype.RdataType.make(rdtype)
  491. covers = dns.rdatatype.RdataType.make(covers)
  492. for name, node in self.items():
  493. for rds in node:
  494. if rdtype == dns.rdatatype.ANY or (
  495. rds.rdtype == rdtype and rds.covers == covers
  496. ):
  497. for rdata in rds:
  498. yield (name, rds.ttl, rdata)
  499. def to_file(
  500. self,
  501. f: Any,
  502. sorted: bool = True,
  503. relativize: bool = True,
  504. nl: Optional[str] = None,
  505. want_comments: bool = False,
  506. want_origin: bool = False,
  507. ) -> None:
  508. """Write a zone to a file.
  509. *f*, a file or `str`. If *f* is a string, it is treated
  510. as the name of a file to open.
  511. *sorted*, a ``bool``. If True, the default, then the file
  512. will be written with the names sorted in DNSSEC order from
  513. least to greatest. Otherwise the names will be written in
  514. whatever order they happen to have in the zone's dictionary.
  515. *relativize*, a ``bool``. If True, the default, then domain
  516. names in the output will be relativized to the zone's origin
  517. if possible.
  518. *nl*, a ``str`` or None. The end of line string. If not
  519. ``None``, the output will use the platform's native
  520. end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
  521. *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
  522. as part of writing the file. If ``False``, the default, do not
  523. emit them.
  524. *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
  525. the start of the file. If ``False``, the default, do not emit
  526. one.
  527. """
  528. if isinstance(f, str):
  529. cm: contextlib.AbstractContextManager = open(f, "wb")
  530. else:
  531. cm = contextlib.nullcontext(f)
  532. with cm as f:
  533. # must be in this way, f.encoding may contain None, or even
  534. # attribute may not be there
  535. file_enc = getattr(f, "encoding", None)
  536. if file_enc is None:
  537. file_enc = "utf-8"
  538. if nl is None:
  539. # binary mode, '\n' is not enough
  540. nl_b = os.linesep.encode(file_enc)
  541. nl = "\n"
  542. elif isinstance(nl, str):
  543. nl_b = nl.encode(file_enc)
  544. else:
  545. nl_b = nl
  546. nl = nl.decode()
  547. if want_origin:
  548. assert self.origin is not None
  549. l = "$ORIGIN " + self.origin.to_text()
  550. l_b = l.encode(file_enc)
  551. try:
  552. f.write(l_b)
  553. f.write(nl_b)
  554. except TypeError: # textual mode
  555. f.write(l)
  556. f.write(nl)
  557. if sorted:
  558. names = list(self.keys())
  559. names.sort()
  560. else:
  561. names = self.keys()
  562. for n in names:
  563. l = self[n].to_text(
  564. n,
  565. origin=self.origin,
  566. relativize=relativize,
  567. want_comments=want_comments,
  568. )
  569. l_b = l.encode(file_enc)
  570. try:
  571. f.write(l_b)
  572. f.write(nl_b)
  573. except TypeError: # textual mode
  574. f.write(l)
  575. f.write(nl)
  576. def to_text(
  577. self,
  578. sorted: bool = True,
  579. relativize: bool = True,
  580. nl: Optional[str] = None,
  581. want_comments: bool = False,
  582. want_origin: bool = False,
  583. ) -> str:
  584. """Return a zone's text as though it were written to a file.
  585. *sorted*, a ``bool``. If True, the default, then the file
  586. will be written with the names sorted in DNSSEC order from
  587. least to greatest. Otherwise the names will be written in
  588. whatever order they happen to have in the zone's dictionary.
  589. *relativize*, a ``bool``. If True, the default, then domain
  590. names in the output will be relativized to the zone's origin
  591. if possible.
  592. *nl*, a ``str`` or None. The end of line string. If not
  593. ``None``, the output will use the platform's native
  594. end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
  595. *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
  596. as part of writing the file. If ``False``, the default, do not
  597. emit them.
  598. *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
  599. the start of the output. If ``False``, the default, do not emit
  600. one.
  601. Returns a ``str``.
  602. """
  603. temp_buffer = io.StringIO()
  604. self.to_file(temp_buffer, sorted, relativize, nl, want_comments, want_origin)
  605. return_value = temp_buffer.getvalue()
  606. temp_buffer.close()
  607. return return_value
  608. def check_origin(self) -> None:
  609. """Do some simple checking of the zone's origin.
  610. Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
  611. Raises ``dns.zone.NoNS`` if there is no NS RRset.
  612. Raises ``KeyError`` if there is no origin node.
  613. """
  614. if self.relativize:
  615. name = dns.name.empty
  616. else:
  617. assert self.origin is not None
  618. name = self.origin
  619. if self.get_rdataset(name, dns.rdatatype.SOA) is None:
  620. raise NoSOA
  621. if self.get_rdataset(name, dns.rdatatype.NS) is None:
  622. raise NoNS
  623. def get_soa(
  624. self, txn: Optional[dns.transaction.Transaction] = None
  625. ) -> dns.rdtypes.ANY.SOA.SOA:
  626. """Get the zone SOA rdata.
  627. Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
  628. Returns a ``dns.rdtypes.ANY.SOA.SOA`` Rdata.
  629. """
  630. if self.relativize:
  631. origin_name = dns.name.empty
  632. else:
  633. if self.origin is None:
  634. # get_soa() has been called very early, and there must not be
  635. # an SOA if there is no origin.
  636. raise NoSOA
  637. origin_name = self.origin
  638. soa: Optional[dns.rdataset.Rdataset]
  639. if txn:
  640. soa = txn.get(origin_name, dns.rdatatype.SOA)
  641. else:
  642. soa = self.get_rdataset(origin_name, dns.rdatatype.SOA)
  643. if soa is None:
  644. raise NoSOA
  645. return soa[0]
  646. def _compute_digest(
  647. self,
  648. hash_algorithm: DigestHashAlgorithm,
  649. scheme: DigestScheme = DigestScheme.SIMPLE,
  650. ) -> bytes:
  651. hashinfo = _digest_hashers.get(hash_algorithm)
  652. if not hashinfo:
  653. raise UnsupportedDigestHashAlgorithm
  654. if scheme != DigestScheme.SIMPLE:
  655. raise UnsupportedDigestScheme
  656. if self.relativize:
  657. origin_name = dns.name.empty
  658. else:
  659. assert self.origin is not None
  660. origin_name = self.origin
  661. hasher = hashinfo()
  662. for name, node in sorted(self.items()):
  663. rrnamebuf = name.to_digestable(self.origin)
  664. for rdataset in sorted(node, key=lambda rds: (rds.rdtype, rds.covers)):
  665. if name == origin_name and dns.rdatatype.ZONEMD in (
  666. rdataset.rdtype,
  667. rdataset.covers,
  668. ):
  669. continue
  670. rrfixed = struct.pack(
  671. "!HHI", rdataset.rdtype, rdataset.rdclass, rdataset.ttl
  672. )
  673. rdatas = [rdata.to_digestable(self.origin) for rdata in rdataset]
  674. for rdata in sorted(rdatas):
  675. rrlen = struct.pack("!H", len(rdata))
  676. hasher.update(rrnamebuf + rrfixed + rrlen + rdata)
  677. return hasher.digest()
  678. def compute_digest(
  679. self,
  680. hash_algorithm: DigestHashAlgorithm,
  681. scheme: DigestScheme = DigestScheme.SIMPLE,
  682. ) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
  683. serial = self.get_soa().serial
  684. digest = self._compute_digest(hash_algorithm, scheme)
  685. return dns.rdtypes.ANY.ZONEMD.ZONEMD(
  686. self.rdclass, dns.rdatatype.ZONEMD, serial, scheme, hash_algorithm, digest
  687. )
  688. def verify_digest(
  689. self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD] = None
  690. ) -> None:
  691. digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]]
  692. if zonemd:
  693. digests = [zonemd]
  694. else:
  695. assert self.origin is not None
  696. rds = self.get_rdataset(self.origin, dns.rdatatype.ZONEMD)
  697. if rds is None:
  698. raise NoDigest
  699. digests = rds
  700. for digest in digests:
  701. try:
  702. computed = self._compute_digest(digest.hash_algorithm, digest.scheme)
  703. if computed == digest.digest:
  704. return
  705. except Exception:
  706. pass
  707. raise DigestVerificationFailure
  708. # TransactionManager methods
  709. def reader(self) -> "Transaction":
  710. return Transaction(self, False, Version(self, 1, self.nodes, self.origin))
  711. def writer(self, replacement: bool = False) -> "Transaction":
  712. txn = Transaction(self, replacement)
  713. txn._setup_version()
  714. return txn
  715. def origin_information(
  716. self,
  717. ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]:
  718. effective: Optional[dns.name.Name]
  719. if self.relativize:
  720. effective = dns.name.empty
  721. else:
  722. effective = self.origin
  723. return (self.origin, self.relativize, effective)
  724. def get_class(self):
  725. return self.rdclass
  726. # Transaction methods
  727. def _end_read(self, txn):
  728. pass
  729. def _end_write(self, txn):
  730. pass
  731. def _commit_version(self, _, version, origin):
  732. self.nodes = version.nodes
  733. if self.origin is None:
  734. self.origin = origin
  735. def _get_next_version_id(self):
  736. # Versions are ephemeral and all have id 1
  737. return 1
  738. # These classes used to be in dns.versioned, but have moved here so we can use
  739. # the copy-on-write transaction mechanism for both kinds of zones. In a
  740. # regular zone, the version only exists during the transaction, and the nodes
  741. # are regular dns.node.Nodes.
  742. # A node with a version id.
  743. class VersionedNode(dns.node.Node): # lgtm[py/missing-equals]
  744. __slots__ = ["id"]
  745. def __init__(self):
  746. super().__init__()
  747. # A proper id will get set by the Version
  748. self.id = 0
  749. @dns.immutable.immutable
  750. class ImmutableVersionedNode(VersionedNode):
  751. def __init__(self, node):
  752. super().__init__()
  753. self.id = node.id
  754. self.rdatasets = tuple(
  755. [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
  756. )
  757. def find_rdataset(
  758. self,
  759. rdclass: dns.rdataclass.RdataClass,
  760. rdtype: dns.rdatatype.RdataType,
  761. covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
  762. create: bool = False,
  763. ) -> dns.rdataset.Rdataset:
  764. if create:
  765. raise TypeError("immutable")
  766. return super().find_rdataset(rdclass, rdtype, covers, False)
  767. def get_rdataset(
  768. self,
  769. rdclass: dns.rdataclass.RdataClass,
  770. rdtype: dns.rdatatype.RdataType,
  771. covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
  772. create: bool = False,
  773. ) -> Optional[dns.rdataset.Rdataset]:
  774. if create:
  775. raise TypeError("immutable")
  776. return super().get_rdataset(rdclass, rdtype, covers, False)
  777. def delete_rdataset(
  778. self,
  779. rdclass: dns.rdataclass.RdataClass,
  780. rdtype: dns.rdatatype.RdataType,
  781. covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
  782. ) -> None:
  783. raise TypeError("immutable")
  784. def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
  785. raise TypeError("immutable")
  786. def is_immutable(self) -> bool:
  787. return True
  788. class Version:
  789. def __init__(
  790. self,
  791. zone: Zone,
  792. id: int,
  793. nodes: Optional[MutableMapping[dns.name.Name, dns.node.Node]] = None,
  794. origin: Optional[dns.name.Name] = None,
  795. ):
  796. self.zone = zone
  797. self.id = id
  798. if nodes is not None:
  799. self.nodes = nodes
  800. else:
  801. self.nodes = zone.map_factory()
  802. self.origin = origin
  803. def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
  804. return _validate_name(name, self.origin, self.zone.relativize)
  805. def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
  806. name = self._validate_name(name)
  807. return self.nodes.get(name)
  808. def get_rdataset(
  809. self,
  810. name: dns.name.Name,
  811. rdtype: dns.rdatatype.RdataType,
  812. covers: dns.rdatatype.RdataType,
  813. ) -> Optional[dns.rdataset.Rdataset]:
  814. node = self.get_node(name)
  815. if node is None:
  816. return None
  817. return node.get_rdataset(self.zone.rdclass, rdtype, covers)
  818. def keys(self):
  819. return self.nodes.keys()
  820. def items(self):
  821. return self.nodes.items()
  822. class WritableVersion(Version):
  823. def __init__(self, zone: Zone, replacement: bool = False):
  824. # The zone._versions_lock must be held by our caller in a versioned
  825. # zone.
  826. id = zone._get_next_version_id()
  827. super().__init__(zone, id)
  828. if not replacement:
  829. # We copy the map, because that gives us a simple and thread-safe
  830. # way of doing versions, and we have a garbage collector to help
  831. # us. We only make new node objects if we actually change the
  832. # node.
  833. self.nodes.update(zone.nodes)
  834. # We have to copy the zone origin as it may be None in the first
  835. # version, and we don't want to mutate the zone until we commit.
  836. self.origin = zone.origin
  837. self.changed: Set[dns.name.Name] = set()
  838. def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node:
  839. name = self._validate_name(name)
  840. node = self.nodes.get(name)
  841. if node is None or name not in self.changed:
  842. new_node = self.zone.node_factory()
  843. if hasattr(new_node, "id"):
  844. # We keep doing this for backwards compatibility, as earlier
  845. # code used new_node.id != self.id for the "do we need to CoW?"
  846. # test. Now we use the changed set as this works with both
  847. # regular zones and versioned zones.
  848. #
  849. # We ignore the mypy error as this is safe but it doesn't see it.
  850. new_node.id = self.id # type: ignore
  851. if node is not None:
  852. # moo! copy on write!
  853. new_node.rdatasets.extend(node.rdatasets)
  854. self.nodes[name] = new_node
  855. self.changed.add(name)
  856. return new_node
  857. else:
  858. return node
  859. def delete_node(self, name: dns.name.Name) -> None:
  860. name = self._validate_name(name)
  861. if name in self.nodes:
  862. del self.nodes[name]
  863. self.changed.add(name)
  864. def put_rdataset(
  865. self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
  866. ) -> None:
  867. node = self._maybe_cow(name)
  868. node.replace_rdataset(rdataset)
  869. def delete_rdataset(
  870. self,
  871. name: dns.name.Name,
  872. rdtype: dns.rdatatype.RdataType,
  873. covers: dns.rdatatype.RdataType,
  874. ) -> None:
  875. node = self._maybe_cow(name)
  876. node.delete_rdataset(self.zone.rdclass, rdtype, covers)
  877. if len(node) == 0:
  878. del self.nodes[name]
  879. @dns.immutable.immutable
  880. class ImmutableVersion(Version):
  881. def __init__(self, version: WritableVersion):
  882. # We tell super() that it's a replacement as we don't want it
  883. # to copy the nodes, as we're about to do that with an
  884. # immutable Dict.
  885. super().__init__(version.zone, True)
  886. # set the right id!
  887. self.id = version.id
  888. # keep the origin
  889. self.origin = version.origin
  890. # Make changed nodes immutable
  891. for name in version.changed:
  892. node = version.nodes.get(name)
  893. # it might not exist if we deleted it in the version
  894. if node:
  895. version.nodes[name] = ImmutableVersionedNode(node)
  896. # We're changing the type of the nodes dictionary here on purpose, so
  897. # we ignore the mypy error.
  898. self.nodes = dns.immutable.Dict(
  899. version.nodes, True, self.zone.map_factory
  900. ) # type: ignore
  901. class Transaction(dns.transaction.Transaction):
  902. def __init__(self, zone, replacement, version=None, make_immutable=False):
  903. read_only = version is not None
  904. super().__init__(zone, replacement, read_only)
  905. self.version = version
  906. self.make_immutable = make_immutable
  907. @property
  908. def zone(self):
  909. return self.manager
  910. def _setup_version(self):
  911. assert self.version is None
  912. factory = self.manager.writable_version_factory
  913. if factory is None:
  914. factory = WritableVersion
  915. self.version = factory(self.zone, self.replacement)
  916. def _get_rdataset(self, name, rdtype, covers):
  917. return self.version.get_rdataset(name, rdtype, covers)
  918. def _put_rdataset(self, name, rdataset):
  919. assert not self.read_only
  920. self.version.put_rdataset(name, rdataset)
  921. def _delete_name(self, name):
  922. assert not self.read_only
  923. self.version.delete_node(name)
  924. def _delete_rdataset(self, name, rdtype, covers):
  925. assert not self.read_only
  926. self.version.delete_rdataset(name, rdtype, covers)
  927. def _name_exists(self, name):
  928. return self.version.get_node(name) is not None
  929. def _changed(self):
  930. if self.read_only:
  931. return False
  932. else:
  933. return len(self.version.changed) > 0
  934. def _end_transaction(self, commit):
  935. if self.read_only:
  936. self.zone._end_read(self)
  937. elif commit and len(self.version.changed) > 0:
  938. if self.make_immutable:
  939. factory = self.manager.immutable_version_factory
  940. if factory is None:
  941. factory = ImmutableVersion
  942. version = factory(self.version)
  943. else:
  944. version = self.version
  945. self.zone._commit_version(self, version, self.version.origin)
  946. else:
  947. # rollback
  948. self.zone._end_write(self)
  949. def _set_origin(self, origin):
  950. if self.version.origin is None:
  951. self.version.origin = origin
  952. def _iterate_rdatasets(self):
  953. for name, node in self.version.items():
  954. for rdataset in node:
  955. yield (name, rdataset)
  956. def _iterate_names(self):
  957. return self.version.keys()
  958. def _get_node(self, name):
  959. return self.version.get_node(name)
  960. def _origin_information(self):
  961. (absolute, relativize, effective) = self.manager.origin_information()
  962. if absolute is None and self.version.origin is not None:
  963. # No origin has been committed yet, but we've learned one as part of
  964. # this txn. Use it.
  965. absolute = self.version.origin
  966. if relativize:
  967. effective = dns.name.empty
  968. else:
  969. effective = absolute
  970. return (absolute, relativize, effective)
  971. def _from_text(
  972. text: Any,
  973. origin: Optional[Union[dns.name.Name, str]] = None,
  974. rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
  975. relativize: bool = True,
  976. zone_factory: Any = Zone,
  977. filename: Optional[str] = None,
  978. allow_include: bool = False,
  979. check_origin: bool = True,
  980. idna_codec: Optional[dns.name.IDNACodec] = None,
  981. allow_directives: Union[bool, Iterable[str]] = True,
  982. ) -> Zone:
  983. # See the comments for the public APIs from_text() and from_file() for
  984. # details.
  985. # 'text' can also be a file, but we don't publish that fact
  986. # since it's an implementation detail. The official file
  987. # interface is from_file().
  988. if filename is None:
  989. filename = "<string>"
  990. zone = zone_factory(origin, rdclass, relativize=relativize)
  991. with zone.writer(True) as txn:
  992. tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
  993. reader = dns.zonefile.Reader(
  994. tok,
  995. rdclass,
  996. txn,
  997. allow_include=allow_include,
  998. allow_directives=allow_directives,
  999. )
  1000. try:
  1001. reader.read()
  1002. except dns.zonefile.UnknownOrigin:
  1003. # for backwards compatibility
  1004. raise dns.zone.UnknownOrigin
  1005. # Now that we're done reading, do some basic checking of the zone.
  1006. if check_origin:
  1007. zone.check_origin()
  1008. return zone
  1009. def from_text(
  1010. text: str,
  1011. origin: Optional[Union[dns.name.Name, str]] = None,
  1012. rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
  1013. relativize: bool = True,
  1014. zone_factory: Any = Zone,
  1015. filename: Optional[str] = None,
  1016. allow_include: bool = False,
  1017. check_origin: bool = True,
  1018. idna_codec: Optional[dns.name.IDNACodec] = None,
  1019. allow_directives: Union[bool, Iterable[str]] = True,
  1020. ) -> Zone:
  1021. """Build a zone object from a zone file format string.
  1022. *text*, a ``str``, the zone file format input.
  1023. *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
  1024. of the zone; if not specified, the first ``$ORIGIN`` statement in the
  1025. zone file will determine the origin of the zone.
  1026. *rdclass*, a ``dns.rdataclass.RdataClass``, the zone's rdata class; the default is
  1027. class IN.
  1028. *relativize*, a ``bool``, determine's whether domain names are
  1029. relativized to the zone's origin. The default is ``True``.
  1030. *zone_factory*, the zone factory to use or ``None``. If ``None``, then
  1031. ``dns.zone.Zone`` will be used. The value may be any class or callable
  1032. that returns a subclass of ``dns.zone.Zone``.
  1033. *filename*, a ``str`` or ``None``, the filename to emit when
  1034. describing where an error occurred; the default is ``'<string>'``.
  1035. *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
  1036. directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
  1037. will raise a ``SyntaxError`` exception.
  1038. *check_origin*, a ``bool``. If ``True``, the default, then sanity
  1039. checks of the origin node will be made by calling the zone's
  1040. ``check_origin()`` method.
  1041. *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
  1042. encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
  1043. is used.
  1044. *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
  1045. then directives are permitted, and the *allow_include* parameter controls whether
  1046. ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
  1047. processing is done and any directive-like text will be treated as a regular owner
  1048. name. If a non-empty iterable, then only the listed directives (including the
  1049. ``$``) are allowed.
  1050. Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
  1051. Raises ``dns.zone.NoNS`` if there is no NS RRset.
  1052. Raises ``KeyError`` if there is no origin node.
  1053. Returns a subclass of ``dns.zone.Zone``.
  1054. """
  1055. return _from_text(
  1056. text,
  1057. origin,
  1058. rdclass,
  1059. relativize,
  1060. zone_factory,
  1061. filename,
  1062. allow_include,
  1063. check_origin,
  1064. idna_codec,
  1065. allow_directives,
  1066. )
  1067. def from_file(
  1068. f: Any,
  1069. origin: Optional[Union[dns.name.Name, str]] = None,
  1070. rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
  1071. relativize: bool = True,
  1072. zone_factory: Any = Zone,
  1073. filename: Optional[str] = None,
  1074. allow_include: bool = True,
  1075. check_origin: bool = True,
  1076. idna_codec: Optional[dns.name.IDNACodec] = None,
  1077. allow_directives: Union[bool, Iterable[str]] = True,
  1078. ) -> Zone:
  1079. """Read a zone file and build a zone object.
  1080. *f*, a file or ``str``. If *f* is a string, it is treated
  1081. as the name of a file to open.
  1082. *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
  1083. of the zone; if not specified, the first ``$ORIGIN`` statement in the
  1084. zone file will determine the origin of the zone.
  1085. *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
  1086. *relativize*, a ``bool``, determine's whether domain names are
  1087. relativized to the zone's origin. The default is ``True``.
  1088. *zone_factory*, the zone factory to use or ``None``. If ``None``, then
  1089. ``dns.zone.Zone`` will be used. The value may be any class or callable
  1090. that returns a subclass of ``dns.zone.Zone``.
  1091. *filename*, a ``str`` or ``None``, the filename to emit when
  1092. describing where an error occurred; the default is ``'<string>'``.
  1093. *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
  1094. directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
  1095. will raise a ``SyntaxError`` exception.
  1096. *check_origin*, a ``bool``. If ``True``, the default, then sanity
  1097. checks of the origin node will be made by calling the zone's
  1098. ``check_origin()`` method.
  1099. *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
  1100. encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
  1101. is used.
  1102. *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
  1103. then directives are permitted, and the *allow_include* parameter controls whether
  1104. ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
  1105. processing is done and any directive-like text will be treated as a regular owner
  1106. name. If a non-empty iterable, then only the listed directives (including the
  1107. ``$``) are allowed.
  1108. Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
  1109. Raises ``dns.zone.NoNS`` if there is no NS RRset.
  1110. Raises ``KeyError`` if there is no origin node.
  1111. Returns a subclass of ``dns.zone.Zone``.
  1112. """
  1113. if isinstance(f, str):
  1114. if filename is None:
  1115. filename = f
  1116. cm: contextlib.AbstractContextManager = open(f)
  1117. else:
  1118. cm = contextlib.nullcontext(f)
  1119. with cm as f:
  1120. return _from_text(
  1121. f,
  1122. origin,
  1123. rdclass,
  1124. relativize,
  1125. zone_factory,
  1126. filename,
  1127. allow_include,
  1128. check_origin,
  1129. idna_codec,
  1130. allow_directives,
  1131. )
  1132. assert False # make mypy happy lgtm[py/unreachable-statement]
  1133. def from_xfr(
  1134. xfr: Any,
  1135. zone_factory: Any = Zone,
  1136. relativize: bool = True,
  1137. check_origin: bool = True,
  1138. ) -> Zone:
  1139. """Convert the output of a zone transfer generator into a zone object.
  1140. *xfr*, a generator of ``dns.message.Message`` objects, typically
  1141. ``dns.query.xfr()``.
  1142. *relativize*, a ``bool``, determine's whether domain names are
  1143. relativized to the zone's origin. The default is ``True``.
  1144. It is essential that the relativize setting matches the one specified
  1145. to the generator.
  1146. *check_origin*, a ``bool``. If ``True``, the default, then sanity
  1147. checks of the origin node will be made by calling the zone's
  1148. ``check_origin()`` method.
  1149. Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
  1150. Raises ``dns.zone.NoNS`` if there is no NS RRset.
  1151. Raises ``KeyError`` if there is no origin node.
  1152. Raises ``ValueError`` if no messages are yielded by the generator.
  1153. Returns a subclass of ``dns.zone.Zone``.
  1154. """
  1155. z = None
  1156. for r in xfr:
  1157. if z is None:
  1158. if relativize:
  1159. origin = r.origin
  1160. else:
  1161. origin = r.answer[0].name
  1162. rdclass = r.answer[0].rdclass
  1163. z = zone_factory(origin, rdclass, relativize=relativize)
  1164. for rrset in r.answer:
  1165. znode = z.nodes.get(rrset.name)
  1166. if not znode:
  1167. znode = z.node_factory()
  1168. z.nodes[rrset.name] = znode
  1169. zrds = znode.find_rdataset(rrset.rdclass, rrset.rdtype, rrset.covers, True)
  1170. zrds.update_ttl(rrset.ttl)
  1171. for rd in rrset:
  1172. zrds.add(rd)
  1173. if z is None:
  1174. raise ValueError("empty transfer")
  1175. if check_origin:
  1176. z.check_origin()
  1177. return z