_asyncio_backend.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
  2. """asyncio library query support"""
  3. import asyncio
  4. import socket
  5. import sys
  6. import dns._asyncbackend
  7. import dns._features
  8. import dns.exception
  9. import dns.inet
  10. _is_win32 = sys.platform == "win32"
  11. def _get_running_loop():
  12. try:
  13. return asyncio.get_running_loop()
  14. except AttributeError: # pragma: no cover
  15. return asyncio.get_event_loop()
  16. class _DatagramProtocol:
  17. def __init__(self):
  18. self.transport = None
  19. self.recvfrom = None
  20. def connection_made(self, transport):
  21. self.transport = transport
  22. def datagram_received(self, data, addr):
  23. if self.recvfrom and not self.recvfrom.done():
  24. self.recvfrom.set_result((data, addr))
  25. def error_received(self, exc): # pragma: no cover
  26. if self.recvfrom and not self.recvfrom.done():
  27. self.recvfrom.set_exception(exc)
  28. def connection_lost(self, exc):
  29. if self.recvfrom and not self.recvfrom.done():
  30. if exc is None:
  31. # EOF we triggered. Is there a better way to do this?
  32. try:
  33. raise EOFError("EOF")
  34. except EOFError as e:
  35. self.recvfrom.set_exception(e)
  36. else:
  37. self.recvfrom.set_exception(exc)
  38. def close(self):
  39. self.transport.close()
  40. async def _maybe_wait_for(awaitable, timeout):
  41. if timeout is not None:
  42. try:
  43. return await asyncio.wait_for(awaitable, timeout)
  44. except asyncio.TimeoutError:
  45. raise dns.exception.Timeout(timeout=timeout)
  46. else:
  47. return await awaitable
  48. class DatagramSocket(dns._asyncbackend.DatagramSocket):
  49. def __init__(self, family, transport, protocol):
  50. super().__init__(family, socket.SOCK_DGRAM)
  51. self.transport = transport
  52. self.protocol = protocol
  53. async def sendto(self, what, destination, timeout): # pragma: no cover
  54. # no timeout for asyncio sendto
  55. self.transport.sendto(what, destination)
  56. return len(what)
  57. async def recvfrom(self, size, timeout):
  58. # ignore size as there's no way I know to tell protocol about it
  59. done = _get_running_loop().create_future()
  60. try:
  61. assert self.protocol.recvfrom is None
  62. self.protocol.recvfrom = done
  63. await _maybe_wait_for(done, timeout)
  64. return done.result()
  65. finally:
  66. self.protocol.recvfrom = None
  67. async def close(self):
  68. self.protocol.close()
  69. async def getpeername(self):
  70. return self.transport.get_extra_info("peername")
  71. async def getsockname(self):
  72. return self.transport.get_extra_info("sockname")
  73. async def getpeercert(self, timeout):
  74. raise NotImplementedError
  75. class StreamSocket(dns._asyncbackend.StreamSocket):
  76. def __init__(self, af, reader, writer):
  77. super().__init__(af, socket.SOCK_STREAM)
  78. self.reader = reader
  79. self.writer = writer
  80. async def sendall(self, what, timeout):
  81. self.writer.write(what)
  82. return await _maybe_wait_for(self.writer.drain(), timeout)
  83. async def recv(self, size, timeout):
  84. return await _maybe_wait_for(self.reader.read(size), timeout)
  85. async def close(self):
  86. self.writer.close()
  87. async def getpeername(self):
  88. return self.writer.get_extra_info("peername")
  89. async def getsockname(self):
  90. return self.writer.get_extra_info("sockname")
  91. async def getpeercert(self, timeout):
  92. return self.writer.get_extra_info("peercert")
  93. if dns._features.have("doh"):
  94. import anyio
  95. import httpcore
  96. import httpcore._backends.anyio
  97. import httpx
  98. _CoreAsyncNetworkBackend = httpcore.AsyncNetworkBackend
  99. _CoreAnyIOStream = httpcore._backends.anyio.AnyIOStream
  100. from dns.query import _compute_times, _expiration_for_this_attempt, _remaining
  101. class _NetworkBackend(_CoreAsyncNetworkBackend):
  102. def __init__(self, resolver, local_port, bootstrap_address, family):
  103. super().__init__()
  104. self._local_port = local_port
  105. self._resolver = resolver
  106. self._bootstrap_address = bootstrap_address
  107. self._family = family
  108. if local_port != 0:
  109. raise NotImplementedError(
  110. "the asyncio transport for HTTPX cannot set the local port"
  111. )
  112. async def connect_tcp(
  113. self, host, port, timeout, local_address, socket_options=None
  114. ): # pylint: disable=signature-differs
  115. addresses = []
  116. _, expiration = _compute_times(timeout)
  117. if dns.inet.is_address(host):
  118. addresses.append(host)
  119. elif self._bootstrap_address is not None:
  120. addresses.append(self._bootstrap_address)
  121. else:
  122. timeout = _remaining(expiration)
  123. family = self._family
  124. if local_address:
  125. family = dns.inet.af_for_address(local_address)
  126. answers = await self._resolver.resolve_name(
  127. host, family=family, lifetime=timeout
  128. )
  129. addresses = answers.addresses()
  130. for address in addresses:
  131. try:
  132. attempt_expiration = _expiration_for_this_attempt(2.0, expiration)
  133. timeout = _remaining(attempt_expiration)
  134. with anyio.fail_after(timeout):
  135. stream = await anyio.connect_tcp(
  136. remote_host=address,
  137. remote_port=port,
  138. local_host=local_address,
  139. )
  140. return _CoreAnyIOStream(stream)
  141. except Exception:
  142. pass
  143. raise httpcore.ConnectError
  144. async def connect_unix_socket(
  145. self, path, timeout, socket_options=None
  146. ): # pylint: disable=signature-differs
  147. raise NotImplementedError
  148. async def sleep(self, seconds): # pylint: disable=signature-differs
  149. await anyio.sleep(seconds)
  150. class _HTTPTransport(httpx.AsyncHTTPTransport):
  151. def __init__(
  152. self,
  153. *args,
  154. local_port=0,
  155. bootstrap_address=None,
  156. resolver=None,
  157. family=socket.AF_UNSPEC,
  158. **kwargs,
  159. ):
  160. if resolver is None and bootstrap_address is None:
  161. # pylint: disable=import-outside-toplevel,redefined-outer-name
  162. import dns.asyncresolver
  163. resolver = dns.asyncresolver.Resolver()
  164. super().__init__(*args, **kwargs)
  165. self._pool._network_backend = _NetworkBackend(
  166. resolver, local_port, bootstrap_address, family
  167. )
  168. else:
  169. _HTTPTransport = dns._asyncbackend.NullTransport # type: ignore
  170. class Backend(dns._asyncbackend.Backend):
  171. def name(self):
  172. return "asyncio"
  173. async def make_socket(
  174. self,
  175. af,
  176. socktype,
  177. proto=0,
  178. source=None,
  179. destination=None,
  180. timeout=None,
  181. ssl_context=None,
  182. server_hostname=None,
  183. ):
  184. loop = _get_running_loop()
  185. if socktype == socket.SOCK_DGRAM:
  186. if _is_win32 and source is None:
  187. # Win32 wants explicit binding before recvfrom(). This is the
  188. # proper fix for [#637].
  189. source = (dns.inet.any_for_af(af), 0)
  190. transport, protocol = await loop.create_datagram_endpoint(
  191. _DatagramProtocol,
  192. source,
  193. family=af,
  194. proto=proto,
  195. remote_addr=destination,
  196. )
  197. return DatagramSocket(af, transport, protocol)
  198. elif socktype == socket.SOCK_STREAM:
  199. if destination is None:
  200. # This shouldn't happen, but we check to make code analysis software
  201. # happier.
  202. raise ValueError("destination required for stream sockets")
  203. (r, w) = await _maybe_wait_for(
  204. asyncio.open_connection(
  205. destination[0],
  206. destination[1],
  207. ssl=ssl_context,
  208. family=af,
  209. proto=proto,
  210. local_addr=source,
  211. server_hostname=server_hostname,
  212. ),
  213. timeout,
  214. )
  215. return StreamSocket(af, r, w)
  216. raise NotImplementedError(
  217. "unsupported socket " + f"type {socktype}"
  218. ) # pragma: no cover
  219. async def sleep(self, interval):
  220. await asyncio.sleep(interval)
  221. def datagram_connection_required(self):
  222. return False
  223. def get_transport_class(self):
  224. return _HTTPTransport
  225. async def wait_for(self, awaitable, timeout):
  226. return await _maybe_wait_for(awaitable, timeout)