connector.py 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652
  1. import asyncio
  2. import functools
  3. import random
  4. import socket
  5. import sys
  6. import traceback
  7. import warnings
  8. from collections import OrderedDict, defaultdict, deque
  9. from contextlib import suppress
  10. from http import HTTPStatus
  11. from itertools import chain, cycle, islice
  12. from time import monotonic
  13. from types import TracebackType
  14. from typing import (
  15. TYPE_CHECKING,
  16. Any,
  17. Awaitable,
  18. Callable,
  19. DefaultDict,
  20. Deque,
  21. Dict,
  22. Iterator,
  23. List,
  24. Literal,
  25. Optional,
  26. Sequence,
  27. Set,
  28. Tuple,
  29. Type,
  30. Union,
  31. cast,
  32. )
  33. import aiohappyeyeballs
  34. from . import hdrs, helpers
  35. from .abc import AbstractResolver, ResolveResult
  36. from .client_exceptions import (
  37. ClientConnectionError,
  38. ClientConnectorCertificateError,
  39. ClientConnectorDNSError,
  40. ClientConnectorError,
  41. ClientConnectorSSLError,
  42. ClientHttpProxyError,
  43. ClientProxyConnectionError,
  44. ServerFingerprintMismatch,
  45. UnixClientConnectorError,
  46. cert_errors,
  47. ssl_errors,
  48. )
  49. from .client_proto import ResponseHandler
  50. from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params
  51. from .helpers import (
  52. ceil_timeout,
  53. is_ip_address,
  54. noop,
  55. sentinel,
  56. set_exception,
  57. set_result,
  58. )
  59. from .resolver import DefaultResolver
  60. if TYPE_CHECKING:
  61. import ssl
  62. SSLContext = ssl.SSLContext
  63. else:
  64. try:
  65. import ssl
  66. SSLContext = ssl.SSLContext
  67. except ImportError: # pragma: no cover
  68. ssl = None # type: ignore[assignment]
  69. SSLContext = object # type: ignore[misc,assignment]
  70. EMPTY_SCHEMA_SET = frozenset({""})
  71. HTTP_SCHEMA_SET = frozenset({"http", "https"})
  72. WS_SCHEMA_SET = frozenset({"ws", "wss"})
  73. HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET
  74. HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET
  75. NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < (
  76. 3,
  77. 13,
  78. 1,
  79. ) or sys.version_info < (3, 12, 7)
  80. # Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960
  81. # which first appeared in Python 3.12.7 and 3.13.1
  82. __all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector")
  83. if TYPE_CHECKING:
  84. from .client import ClientTimeout
  85. from .client_reqrep import ConnectionKey
  86. from .tracing import Trace
  87. class _DeprecationWaiter:
  88. __slots__ = ("_awaitable", "_awaited")
  89. def __init__(self, awaitable: Awaitable[Any]) -> None:
  90. self._awaitable = awaitable
  91. self._awaited = False
  92. def __await__(self) -> Any:
  93. self._awaited = True
  94. return self._awaitable.__await__()
  95. def __del__(self) -> None:
  96. if not self._awaited:
  97. warnings.warn(
  98. "Connector.close() is a coroutine, "
  99. "please use await connector.close()",
  100. DeprecationWarning,
  101. )
  102. class Connection:
  103. _source_traceback = None
  104. def __init__(
  105. self,
  106. connector: "BaseConnector",
  107. key: "ConnectionKey",
  108. protocol: ResponseHandler,
  109. loop: asyncio.AbstractEventLoop,
  110. ) -> None:
  111. self._key = key
  112. self._connector = connector
  113. self._loop = loop
  114. self._protocol: Optional[ResponseHandler] = protocol
  115. self._callbacks: List[Callable[[], None]] = []
  116. if loop.get_debug():
  117. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  118. def __repr__(self) -> str:
  119. return f"Connection<{self._key}>"
  120. def __del__(self, _warnings: Any = warnings) -> None:
  121. if self._protocol is not None:
  122. kwargs = {"source": self}
  123. _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs)
  124. if self._loop.is_closed():
  125. return
  126. self._connector._release(self._key, self._protocol, should_close=True)
  127. context = {"client_connection": self, "message": "Unclosed connection"}
  128. if self._source_traceback is not None:
  129. context["source_traceback"] = self._source_traceback
  130. self._loop.call_exception_handler(context)
  131. def __bool__(self) -> Literal[True]:
  132. """Force subclasses to not be falsy, to make checks simpler."""
  133. return True
  134. @property
  135. def loop(self) -> asyncio.AbstractEventLoop:
  136. warnings.warn(
  137. "connector.loop property is deprecated", DeprecationWarning, stacklevel=2
  138. )
  139. return self._loop
  140. @property
  141. def transport(self) -> Optional[asyncio.Transport]:
  142. if self._protocol is None:
  143. return None
  144. return self._protocol.transport
  145. @property
  146. def protocol(self) -> Optional[ResponseHandler]:
  147. return self._protocol
  148. def add_callback(self, callback: Callable[[], None]) -> None:
  149. if callback is not None:
  150. self._callbacks.append(callback)
  151. def _notify_release(self) -> None:
  152. callbacks, self._callbacks = self._callbacks[:], []
  153. for cb in callbacks:
  154. with suppress(Exception):
  155. cb()
  156. def close(self) -> None:
  157. self._notify_release()
  158. if self._protocol is not None:
  159. self._connector._release(self._key, self._protocol, should_close=True)
  160. self._protocol = None
  161. def release(self) -> None:
  162. self._notify_release()
  163. if self._protocol is not None:
  164. self._connector._release(self._key, self._protocol)
  165. self._protocol = None
  166. @property
  167. def closed(self) -> bool:
  168. return self._protocol is None or not self._protocol.is_connected()
  169. class _TransportPlaceholder:
  170. """placeholder for BaseConnector.connect function"""
  171. __slots__ = ()
  172. def close(self) -> None:
  173. """Close the placeholder transport."""
  174. class BaseConnector:
  175. """Base connector class.
  176. keepalive_timeout - (optional) Keep-alive timeout.
  177. force_close - Set to True to force close and do reconnect
  178. after each request (and between redirects).
  179. limit - The total number of simultaneous connections.
  180. limit_per_host - Number of simultaneous connections to one host.
  181. enable_cleanup_closed - Enables clean-up closed ssl transports.
  182. Disabled by default.
  183. timeout_ceil_threshold - Trigger ceiling of timeout values when
  184. it's above timeout_ceil_threshold.
  185. loop - Optional event loop.
  186. """
  187. _closed = True # prevent AttributeError in __del__ if ctor was failed
  188. _source_traceback = None
  189. # abort transport after 2 seconds (cleanup broken connections)
  190. _cleanup_closed_period = 2.0
  191. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET
  192. def __init__(
  193. self,
  194. *,
  195. keepalive_timeout: Union[object, None, float] = sentinel,
  196. force_close: bool = False,
  197. limit: int = 100,
  198. limit_per_host: int = 0,
  199. enable_cleanup_closed: bool = False,
  200. loop: Optional[asyncio.AbstractEventLoop] = None,
  201. timeout_ceil_threshold: float = 5,
  202. ) -> None:
  203. if force_close:
  204. if keepalive_timeout is not None and keepalive_timeout is not sentinel:
  205. raise ValueError(
  206. "keepalive_timeout cannot be set if force_close is True"
  207. )
  208. else:
  209. if keepalive_timeout is sentinel:
  210. keepalive_timeout = 15.0
  211. loop = loop or asyncio.get_running_loop()
  212. self._timeout_ceil_threshold = timeout_ceil_threshold
  213. self._closed = False
  214. if loop.get_debug():
  215. self._source_traceback = traceback.extract_stack(sys._getframe(1))
  216. # Connection pool of reusable connections.
  217. # We use a deque to store connections because it has O(1) popleft()
  218. # and O(1) append() operations to implement a FIFO queue.
  219. self._conns: DefaultDict[
  220. ConnectionKey, Deque[Tuple[ResponseHandler, float]]
  221. ] = defaultdict(deque)
  222. self._limit = limit
  223. self._limit_per_host = limit_per_host
  224. self._acquired: Set[ResponseHandler] = set()
  225. self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = (
  226. defaultdict(set)
  227. )
  228. self._keepalive_timeout = cast(float, keepalive_timeout)
  229. self._force_close = force_close
  230. # {host_key: FIFO list of waiters}
  231. # The FIFO is implemented with an OrderedDict with None keys because
  232. # python does not have an ordered set.
  233. self._waiters: DefaultDict[
  234. ConnectionKey, OrderedDict[asyncio.Future[None], None]
  235. ] = defaultdict(OrderedDict)
  236. self._loop = loop
  237. self._factory = functools.partial(ResponseHandler, loop=loop)
  238. # start keep-alive connection cleanup task
  239. self._cleanup_handle: Optional[asyncio.TimerHandle] = None
  240. # start cleanup closed transports task
  241. self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None
  242. if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED:
  243. warnings.warn(
  244. "enable_cleanup_closed ignored because "
  245. "https://github.com/python/cpython/pull/118960 is fixed "
  246. f"in Python version {sys.version_info}",
  247. DeprecationWarning,
  248. stacklevel=2,
  249. )
  250. enable_cleanup_closed = False
  251. self._cleanup_closed_disabled = not enable_cleanup_closed
  252. self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = []
  253. self._cleanup_closed()
  254. def __del__(self, _warnings: Any = warnings) -> None:
  255. if self._closed:
  256. return
  257. if not self._conns:
  258. return
  259. conns = [repr(c) for c in self._conns.values()]
  260. self._close()
  261. kwargs = {"source": self}
  262. _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs)
  263. context = {
  264. "connector": self,
  265. "connections": conns,
  266. "message": "Unclosed connector",
  267. }
  268. if self._source_traceback is not None:
  269. context["source_traceback"] = self._source_traceback
  270. self._loop.call_exception_handler(context)
  271. def __enter__(self) -> "BaseConnector":
  272. warnings.warn(
  273. '"with Connector():" is deprecated, '
  274. 'use "async with Connector():" instead',
  275. DeprecationWarning,
  276. )
  277. return self
  278. def __exit__(self, *exc: Any) -> None:
  279. self._close()
  280. async def __aenter__(self) -> "BaseConnector":
  281. return self
  282. async def __aexit__(
  283. self,
  284. exc_type: Optional[Type[BaseException]] = None,
  285. exc_value: Optional[BaseException] = None,
  286. exc_traceback: Optional[TracebackType] = None,
  287. ) -> None:
  288. await self.close()
  289. @property
  290. def force_close(self) -> bool:
  291. """Ultimately close connection on releasing if True."""
  292. return self._force_close
  293. @property
  294. def limit(self) -> int:
  295. """The total number for simultaneous connections.
  296. If limit is 0 the connector has no limit.
  297. The default limit size is 100.
  298. """
  299. return self._limit
  300. @property
  301. def limit_per_host(self) -> int:
  302. """The limit for simultaneous connections to the same endpoint.
  303. Endpoints are the same if they are have equal
  304. (host, port, is_ssl) triple.
  305. """
  306. return self._limit_per_host
  307. def _cleanup(self) -> None:
  308. """Cleanup unused transports."""
  309. if self._cleanup_handle:
  310. self._cleanup_handle.cancel()
  311. # _cleanup_handle should be unset, otherwise _release() will not
  312. # recreate it ever!
  313. self._cleanup_handle = None
  314. now = monotonic()
  315. timeout = self._keepalive_timeout
  316. if self._conns:
  317. connections = defaultdict(deque)
  318. deadline = now - timeout
  319. for key, conns in self._conns.items():
  320. alive: Deque[Tuple[ResponseHandler, float]] = deque()
  321. for proto, use_time in conns:
  322. if proto.is_connected() and use_time - deadline >= 0:
  323. alive.append((proto, use_time))
  324. continue
  325. transport = proto.transport
  326. proto.close()
  327. if not self._cleanup_closed_disabled and key.is_ssl:
  328. self._cleanup_closed_transports.append(transport)
  329. if alive:
  330. connections[key] = alive
  331. self._conns = connections
  332. if self._conns:
  333. self._cleanup_handle = helpers.weakref_handle(
  334. self,
  335. "_cleanup",
  336. timeout,
  337. self._loop,
  338. timeout_ceil_threshold=self._timeout_ceil_threshold,
  339. )
  340. def _cleanup_closed(self) -> None:
  341. """Double confirmation for transport close.
  342. Some broken ssl servers may leave socket open without proper close.
  343. """
  344. if self._cleanup_closed_handle:
  345. self._cleanup_closed_handle.cancel()
  346. for transport in self._cleanup_closed_transports:
  347. if transport is not None:
  348. transport.abort()
  349. self._cleanup_closed_transports = []
  350. if not self._cleanup_closed_disabled:
  351. self._cleanup_closed_handle = helpers.weakref_handle(
  352. self,
  353. "_cleanup_closed",
  354. self._cleanup_closed_period,
  355. self._loop,
  356. timeout_ceil_threshold=self._timeout_ceil_threshold,
  357. )
  358. def close(self) -> Awaitable[None]:
  359. """Close all opened transports."""
  360. self._close()
  361. return _DeprecationWaiter(noop())
  362. def _close(self) -> None:
  363. if self._closed:
  364. return
  365. self._closed = True
  366. try:
  367. if self._loop.is_closed():
  368. return
  369. # cancel cleanup task
  370. if self._cleanup_handle:
  371. self._cleanup_handle.cancel()
  372. # cancel cleanup close task
  373. if self._cleanup_closed_handle:
  374. self._cleanup_closed_handle.cancel()
  375. for data in self._conns.values():
  376. for proto, t0 in data:
  377. proto.close()
  378. for proto in self._acquired:
  379. proto.close()
  380. for transport in self._cleanup_closed_transports:
  381. if transport is not None:
  382. transport.abort()
  383. finally:
  384. self._conns.clear()
  385. self._acquired.clear()
  386. for keyed_waiters in self._waiters.values():
  387. for keyed_waiter in keyed_waiters:
  388. keyed_waiter.cancel()
  389. self._waiters.clear()
  390. self._cleanup_handle = None
  391. self._cleanup_closed_transports.clear()
  392. self._cleanup_closed_handle = None
  393. @property
  394. def closed(self) -> bool:
  395. """Is connector closed.
  396. A readonly property.
  397. """
  398. return self._closed
  399. def _available_connections(self, key: "ConnectionKey") -> int:
  400. """
  401. Return number of available connections.
  402. The limit, limit_per_host and the connection key are taken into account.
  403. If it returns less than 1 means that there are no connections
  404. available.
  405. """
  406. # check total available connections
  407. # If there are no limits, this will always return 1
  408. total_remain = 1
  409. if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0:
  410. return total_remain
  411. # check limit per host
  412. if host_remain := self._limit_per_host:
  413. if acquired := self._acquired_per_host.get(key):
  414. host_remain -= len(acquired)
  415. if total_remain > host_remain:
  416. return host_remain
  417. return total_remain
  418. async def connect(
  419. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  420. ) -> Connection:
  421. """Get from pool or create new connection."""
  422. key = req.connection_key
  423. if (conn := await self._get(key, traces)) is not None:
  424. # If we do not have to wait and we can get a connection from the pool
  425. # we can avoid the timeout ceil logic and directly return the connection
  426. return conn
  427. async with ceil_timeout(timeout.connect, timeout.ceil_threshold):
  428. if self._available_connections(key) <= 0:
  429. await self._wait_for_available_connection(key, traces)
  430. if (conn := await self._get(key, traces)) is not None:
  431. return conn
  432. placeholder = cast(ResponseHandler, _TransportPlaceholder())
  433. self._acquired.add(placeholder)
  434. if self._limit_per_host:
  435. self._acquired_per_host[key].add(placeholder)
  436. try:
  437. # Traces are done inside the try block to ensure that the
  438. # that the placeholder is still cleaned up if an exception
  439. # is raised.
  440. if traces:
  441. for trace in traces:
  442. await trace.send_connection_create_start()
  443. proto = await self._create_connection(req, traces, timeout)
  444. if traces:
  445. for trace in traces:
  446. await trace.send_connection_create_end()
  447. except BaseException:
  448. self._release_acquired(key, placeholder)
  449. raise
  450. else:
  451. if self._closed:
  452. proto.close()
  453. raise ClientConnectionError("Connector is closed.")
  454. # The connection was successfully created, drop the placeholder
  455. # and add the real connection to the acquired set. There should
  456. # be no awaits after the proto is added to the acquired set
  457. # to ensure that the connection is not left in the acquired set
  458. # on cancellation.
  459. self._acquired.remove(placeholder)
  460. self._acquired.add(proto)
  461. if self._limit_per_host:
  462. acquired_per_host = self._acquired_per_host[key]
  463. acquired_per_host.remove(placeholder)
  464. acquired_per_host.add(proto)
  465. return Connection(self, key, proto, self._loop)
  466. async def _wait_for_available_connection(
  467. self, key: "ConnectionKey", traces: List["Trace"]
  468. ) -> None:
  469. """Wait for an available connection slot."""
  470. # We loop here because there is a race between
  471. # the connection limit check and the connection
  472. # being acquired. If the connection is acquired
  473. # between the check and the await statement, we
  474. # need to loop again to check if the connection
  475. # slot is still available.
  476. attempts = 0
  477. while True:
  478. fut: asyncio.Future[None] = self._loop.create_future()
  479. keyed_waiters = self._waiters[key]
  480. keyed_waiters[fut] = None
  481. if attempts:
  482. # If we have waited before, we need to move the waiter
  483. # to the front of the queue as otherwise we might get
  484. # starved and hit the timeout.
  485. keyed_waiters.move_to_end(fut, last=False)
  486. try:
  487. # Traces happen in the try block to ensure that the
  488. # the waiter is still cleaned up if an exception is raised.
  489. if traces:
  490. for trace in traces:
  491. await trace.send_connection_queued_start()
  492. await fut
  493. if traces:
  494. for trace in traces:
  495. await trace.send_connection_queued_end()
  496. finally:
  497. # pop the waiter from the queue if its still
  498. # there and not already removed by _release_waiter
  499. keyed_waiters.pop(fut, None)
  500. if not self._waiters.get(key, True):
  501. del self._waiters[key]
  502. if self._available_connections(key) > 0:
  503. break
  504. attempts += 1
  505. async def _get(
  506. self, key: "ConnectionKey", traces: List["Trace"]
  507. ) -> Optional[Connection]:
  508. """Get next reusable connection for the key or None.
  509. The connection will be marked as acquired.
  510. """
  511. if (conns := self._conns.get(key)) is None:
  512. return None
  513. t1 = monotonic()
  514. while conns:
  515. proto, t0 = conns.popleft()
  516. # We will we reuse the connection if its connected and
  517. # the keepalive timeout has not been exceeded
  518. if proto.is_connected() and t1 - t0 <= self._keepalive_timeout:
  519. if not conns:
  520. # The very last connection was reclaimed: drop the key
  521. del self._conns[key]
  522. self._acquired.add(proto)
  523. if self._limit_per_host:
  524. self._acquired_per_host[key].add(proto)
  525. if traces:
  526. for trace in traces:
  527. try:
  528. await trace.send_connection_reuseconn()
  529. except BaseException:
  530. self._release_acquired(key, proto)
  531. raise
  532. return Connection(self, key, proto, self._loop)
  533. # Connection cannot be reused, close it
  534. transport = proto.transport
  535. proto.close()
  536. # only for SSL transports
  537. if not self._cleanup_closed_disabled and key.is_ssl:
  538. self._cleanup_closed_transports.append(transport)
  539. # No more connections: drop the key
  540. del self._conns[key]
  541. return None
  542. def _release_waiter(self) -> None:
  543. """
  544. Iterates over all waiters until one to be released is found.
  545. The one to be released is not finished and
  546. belongs to a host that has available connections.
  547. """
  548. if not self._waiters:
  549. return
  550. # Having the dict keys ordered this avoids to iterate
  551. # at the same order at each call.
  552. queues = list(self._waiters)
  553. random.shuffle(queues)
  554. for key in queues:
  555. if self._available_connections(key) < 1:
  556. continue
  557. waiters = self._waiters[key]
  558. while waiters:
  559. waiter, _ = waiters.popitem(last=False)
  560. if not waiter.done():
  561. waiter.set_result(None)
  562. return
  563. def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None:
  564. """Release acquired connection."""
  565. if self._closed:
  566. # acquired connection is already released on connector closing
  567. return
  568. self._acquired.discard(proto)
  569. if self._limit_per_host and (conns := self._acquired_per_host.get(key)):
  570. conns.discard(proto)
  571. if not conns:
  572. del self._acquired_per_host[key]
  573. self._release_waiter()
  574. def _release(
  575. self,
  576. key: "ConnectionKey",
  577. protocol: ResponseHandler,
  578. *,
  579. should_close: bool = False,
  580. ) -> None:
  581. if self._closed:
  582. # acquired connection is already released on connector closing
  583. return
  584. self._release_acquired(key, protocol)
  585. if self._force_close or should_close or protocol.should_close:
  586. transport = protocol.transport
  587. protocol.close()
  588. if key.is_ssl and not self._cleanup_closed_disabled:
  589. self._cleanup_closed_transports.append(transport)
  590. return
  591. self._conns[key].append((protocol, monotonic()))
  592. if self._cleanup_handle is None:
  593. self._cleanup_handle = helpers.weakref_handle(
  594. self,
  595. "_cleanup",
  596. self._keepalive_timeout,
  597. self._loop,
  598. timeout_ceil_threshold=self._timeout_ceil_threshold,
  599. )
  600. async def _create_connection(
  601. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  602. ) -> ResponseHandler:
  603. raise NotImplementedError()
  604. class _DNSCacheTable:
  605. def __init__(self, ttl: Optional[float] = None) -> None:
  606. self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {}
  607. self._timestamps: Dict[Tuple[str, int], float] = {}
  608. self._ttl = ttl
  609. def __contains__(self, host: object) -> bool:
  610. return host in self._addrs_rr
  611. def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None:
  612. self._addrs_rr[key] = (cycle(addrs), len(addrs))
  613. if self._ttl is not None:
  614. self._timestamps[key] = monotonic()
  615. def remove(self, key: Tuple[str, int]) -> None:
  616. self._addrs_rr.pop(key, None)
  617. if self._ttl is not None:
  618. self._timestamps.pop(key, None)
  619. def clear(self) -> None:
  620. self._addrs_rr.clear()
  621. self._timestamps.clear()
  622. def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]:
  623. loop, length = self._addrs_rr[key]
  624. addrs = list(islice(loop, length))
  625. # Consume one more element to shift internal state of `cycle`
  626. next(loop)
  627. return addrs
  628. def expired(self, key: Tuple[str, int]) -> bool:
  629. if self._ttl is None:
  630. return False
  631. return self._timestamps[key] + self._ttl < monotonic()
  632. def _make_ssl_context(verified: bool) -> SSLContext:
  633. """Create SSL context.
  634. This method is not async-friendly and should be called from a thread
  635. because it will load certificates from disk and do other blocking I/O.
  636. """
  637. if ssl is None:
  638. # No ssl support
  639. return None
  640. if verified:
  641. sslcontext = ssl.create_default_context()
  642. else:
  643. sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
  644. sslcontext.options |= ssl.OP_NO_SSLv2
  645. sslcontext.options |= ssl.OP_NO_SSLv3
  646. sslcontext.check_hostname = False
  647. sslcontext.verify_mode = ssl.CERT_NONE
  648. sslcontext.options |= ssl.OP_NO_COMPRESSION
  649. sslcontext.set_default_verify_paths()
  650. sslcontext.set_alpn_protocols(("http/1.1",))
  651. return sslcontext
  652. # The default SSLContext objects are created at import time
  653. # since they do blocking I/O to load certificates from disk,
  654. # and imports should always be done before the event loop starts
  655. # or in a thread.
  656. _SSL_CONTEXT_VERIFIED = _make_ssl_context(True)
  657. _SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False)
  658. class TCPConnector(BaseConnector):
  659. """TCP connector.
  660. verify_ssl - Set to True to check ssl certifications.
  661. fingerprint - Pass the binary sha256
  662. digest of the expected certificate in DER format to verify
  663. that the certificate the server presents matches. See also
  664. https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning
  665. resolver - Enable DNS lookups and use this
  666. resolver
  667. use_dns_cache - Use memory cache for DNS lookups.
  668. ttl_dns_cache - Max seconds having cached a DNS entry, None forever.
  669. family - socket address family
  670. local_addr - local tuple of (host, port) to bind socket to
  671. keepalive_timeout - (optional) Keep-alive timeout.
  672. force_close - Set to True to force close and do reconnect
  673. after each request (and between redirects).
  674. limit - The total number of simultaneous connections.
  675. limit_per_host - Number of simultaneous connections to one host.
  676. enable_cleanup_closed - Enables clean-up closed ssl transports.
  677. Disabled by default.
  678. happy_eyeballs_delay - This is the “Connection Attempt Delay”
  679. as defined in RFC 8305. To disable
  680. the happy eyeballs algorithm, set to None.
  681. interleave - “First Address Family Count” as defined in RFC 8305
  682. loop - Optional event loop.
  683. """
  684. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"})
  685. def __init__(
  686. self,
  687. *,
  688. verify_ssl: bool = True,
  689. fingerprint: Optional[bytes] = None,
  690. use_dns_cache: bool = True,
  691. ttl_dns_cache: Optional[int] = 10,
  692. family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC,
  693. ssl_context: Optional[SSLContext] = None,
  694. ssl: Union[bool, Fingerprint, SSLContext] = True,
  695. local_addr: Optional[Tuple[str, int]] = None,
  696. resolver: Optional[AbstractResolver] = None,
  697. keepalive_timeout: Union[None, float, object] = sentinel,
  698. force_close: bool = False,
  699. limit: int = 100,
  700. limit_per_host: int = 0,
  701. enable_cleanup_closed: bool = False,
  702. loop: Optional[asyncio.AbstractEventLoop] = None,
  703. timeout_ceil_threshold: float = 5,
  704. happy_eyeballs_delay: Optional[float] = 0.25,
  705. interleave: Optional[int] = None,
  706. ):
  707. super().__init__(
  708. keepalive_timeout=keepalive_timeout,
  709. force_close=force_close,
  710. limit=limit,
  711. limit_per_host=limit_per_host,
  712. enable_cleanup_closed=enable_cleanup_closed,
  713. loop=loop,
  714. timeout_ceil_threshold=timeout_ceil_threshold,
  715. )
  716. self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint)
  717. if resolver is None:
  718. resolver = DefaultResolver(loop=self._loop)
  719. self._resolver = resolver
  720. self._use_dns_cache = use_dns_cache
  721. self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
  722. self._throttle_dns_futures: Dict[
  723. Tuple[str, int], Set["asyncio.Future[None]"]
  724. ] = {}
  725. self._family = family
  726. self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr)
  727. self._happy_eyeballs_delay = happy_eyeballs_delay
  728. self._interleave = interleave
  729. self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set()
  730. def close(self) -> Awaitable[None]:
  731. """Close all ongoing DNS calls."""
  732. for fut in chain.from_iterable(self._throttle_dns_futures.values()):
  733. fut.cancel()
  734. for t in self._resolve_host_tasks:
  735. t.cancel()
  736. return super().close()
  737. @property
  738. def family(self) -> int:
  739. """Socket family like AF_INET."""
  740. return self._family
  741. @property
  742. def use_dns_cache(self) -> bool:
  743. """True if local DNS caching is enabled."""
  744. return self._use_dns_cache
  745. def clear_dns_cache(
  746. self, host: Optional[str] = None, port: Optional[int] = None
  747. ) -> None:
  748. """Remove specified host/port or clear all dns local cache."""
  749. if host is not None and port is not None:
  750. self._cached_hosts.remove((host, port))
  751. elif host is not None or port is not None:
  752. raise ValueError("either both host and port or none of them are allowed")
  753. else:
  754. self._cached_hosts.clear()
  755. async def _resolve_host(
  756. self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None
  757. ) -> List[ResolveResult]:
  758. """Resolve host and return list of addresses."""
  759. if is_ip_address(host):
  760. return [
  761. {
  762. "hostname": host,
  763. "host": host,
  764. "port": port,
  765. "family": self._family,
  766. "proto": 0,
  767. "flags": 0,
  768. }
  769. ]
  770. if not self._use_dns_cache:
  771. if traces:
  772. for trace in traces:
  773. await trace.send_dns_resolvehost_start(host)
  774. res = await self._resolver.resolve(host, port, family=self._family)
  775. if traces:
  776. for trace in traces:
  777. await trace.send_dns_resolvehost_end(host)
  778. return res
  779. key = (host, port)
  780. if key in self._cached_hosts and not self._cached_hosts.expired(key):
  781. # get result early, before any await (#4014)
  782. result = self._cached_hosts.next_addrs(key)
  783. if traces:
  784. for trace in traces:
  785. await trace.send_dns_cache_hit(host)
  786. return result
  787. futures: Set["asyncio.Future[None]"]
  788. #
  789. # If multiple connectors are resolving the same host, we wait
  790. # for the first one to resolve and then use the result for all of them.
  791. # We use a throttle to ensure that we only resolve the host once
  792. # and then use the result for all the waiters.
  793. #
  794. if key in self._throttle_dns_futures:
  795. # get futures early, before any await (#4014)
  796. futures = self._throttle_dns_futures[key]
  797. future: asyncio.Future[None] = self._loop.create_future()
  798. futures.add(future)
  799. if traces:
  800. for trace in traces:
  801. await trace.send_dns_cache_hit(host)
  802. try:
  803. await future
  804. finally:
  805. futures.discard(future)
  806. return self._cached_hosts.next_addrs(key)
  807. # update dict early, before any await (#4014)
  808. self._throttle_dns_futures[key] = futures = set()
  809. # In this case we need to create a task to ensure that we can shield
  810. # the task from cancellation as cancelling this lookup should not cancel
  811. # the underlying lookup or else the cancel event will get broadcast to
  812. # all the waiters across all connections.
  813. #
  814. coro = self._resolve_host_with_throttle(key, host, port, futures, traces)
  815. loop = asyncio.get_running_loop()
  816. if sys.version_info >= (3, 12):
  817. # Optimization for Python 3.12, try to send immediately
  818. resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True)
  819. else:
  820. resolved_host_task = loop.create_task(coro)
  821. if not resolved_host_task.done():
  822. self._resolve_host_tasks.add(resolved_host_task)
  823. resolved_host_task.add_done_callback(self._resolve_host_tasks.discard)
  824. try:
  825. return await asyncio.shield(resolved_host_task)
  826. except asyncio.CancelledError:
  827. def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None:
  828. with suppress(Exception, asyncio.CancelledError):
  829. fut.result()
  830. resolved_host_task.add_done_callback(drop_exception)
  831. raise
  832. async def _resolve_host_with_throttle(
  833. self,
  834. key: Tuple[str, int],
  835. host: str,
  836. port: int,
  837. futures: Set["asyncio.Future[None]"],
  838. traces: Optional[Sequence["Trace"]],
  839. ) -> List[ResolveResult]:
  840. """Resolve host and set result for all waiters.
  841. This method must be run in a task and shielded from cancellation
  842. to avoid cancelling the underlying lookup.
  843. """
  844. try:
  845. if traces:
  846. for trace in traces:
  847. await trace.send_dns_cache_miss(host)
  848. for trace in traces:
  849. await trace.send_dns_resolvehost_start(host)
  850. addrs = await self._resolver.resolve(host, port, family=self._family)
  851. if traces:
  852. for trace in traces:
  853. await trace.send_dns_resolvehost_end(host)
  854. self._cached_hosts.add(key, addrs)
  855. for fut in futures:
  856. set_result(fut, None)
  857. except BaseException as e:
  858. # any DNS exception is set for the waiters to raise the same exception.
  859. # This coro is always run in task that is shielded from cancellation so
  860. # we should never be propagating cancellation here.
  861. for fut in futures:
  862. set_exception(fut, e)
  863. raise
  864. finally:
  865. self._throttle_dns_futures.pop(key)
  866. return self._cached_hosts.next_addrs(key)
  867. async def _create_connection(
  868. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  869. ) -> ResponseHandler:
  870. """Create connection.
  871. Has same keyword arguments as BaseEventLoop.create_connection.
  872. """
  873. if req.proxy:
  874. _, proto = await self._create_proxy_connection(req, traces, timeout)
  875. else:
  876. _, proto = await self._create_direct_connection(req, traces, timeout)
  877. return proto
  878. def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]:
  879. """Logic to get the correct SSL context
  880. 0. if req.ssl is false, return None
  881. 1. if ssl_context is specified in req, use it
  882. 2. if _ssl_context is specified in self, use it
  883. 3. otherwise:
  884. 1. if verify_ssl is not specified in req, use self.ssl_context
  885. (will generate a default context according to self.verify_ssl)
  886. 2. if verify_ssl is True in req, generate a default SSL context
  887. 3. if verify_ssl is False in req, generate a SSL context that
  888. won't verify
  889. """
  890. if not req.is_ssl():
  891. return None
  892. if ssl is None: # pragma: no cover
  893. raise RuntimeError("SSL is not supported.")
  894. sslcontext = req.ssl
  895. if isinstance(sslcontext, ssl.SSLContext):
  896. return sslcontext
  897. if sslcontext is not True:
  898. # not verified or fingerprinted
  899. return _SSL_CONTEXT_UNVERIFIED
  900. sslcontext = self._ssl
  901. if isinstance(sslcontext, ssl.SSLContext):
  902. return sslcontext
  903. if sslcontext is not True:
  904. # not verified or fingerprinted
  905. return _SSL_CONTEXT_UNVERIFIED
  906. return _SSL_CONTEXT_VERIFIED
  907. def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]:
  908. ret = req.ssl
  909. if isinstance(ret, Fingerprint):
  910. return ret
  911. ret = self._ssl
  912. if isinstance(ret, Fingerprint):
  913. return ret
  914. return None
  915. async def _wrap_create_connection(
  916. self,
  917. *args: Any,
  918. addr_infos: List[aiohappyeyeballs.AddrInfoType],
  919. req: ClientRequest,
  920. timeout: "ClientTimeout",
  921. client_error: Type[Exception] = ClientConnectorError,
  922. **kwargs: Any,
  923. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  924. try:
  925. async with ceil_timeout(
  926. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  927. ):
  928. sock = await aiohappyeyeballs.start_connection(
  929. addr_infos=addr_infos,
  930. local_addr_infos=self._local_addr_infos,
  931. happy_eyeballs_delay=self._happy_eyeballs_delay,
  932. interleave=self._interleave,
  933. loop=self._loop,
  934. )
  935. return await self._loop.create_connection(*args, **kwargs, sock=sock)
  936. except cert_errors as exc:
  937. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  938. except ssl_errors as exc:
  939. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  940. except OSError as exc:
  941. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  942. raise
  943. raise client_error(req.connection_key, exc) from exc
  944. async def _wrap_existing_connection(
  945. self,
  946. *args: Any,
  947. req: ClientRequest,
  948. timeout: "ClientTimeout",
  949. client_error: Type[Exception] = ClientConnectorError,
  950. **kwargs: Any,
  951. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  952. try:
  953. async with ceil_timeout(
  954. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  955. ):
  956. return await self._loop.create_connection(*args, **kwargs)
  957. except cert_errors as exc:
  958. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  959. except ssl_errors as exc:
  960. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  961. except OSError as exc:
  962. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  963. raise
  964. raise client_error(req.connection_key, exc) from exc
  965. def _fail_on_no_start_tls(self, req: "ClientRequest") -> None:
  966. """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``.
  967. It is necessary for TLS-in-TLS so that it is possible to
  968. send HTTPS queries through HTTPS proxies.
  969. This doesn't affect regular HTTP requests, though.
  970. """
  971. if not req.is_ssl():
  972. return
  973. proxy_url = req.proxy
  974. assert proxy_url is not None
  975. if proxy_url.scheme != "https":
  976. return
  977. self._check_loop_for_start_tls()
  978. def _check_loop_for_start_tls(self) -> None:
  979. try:
  980. self._loop.start_tls
  981. except AttributeError as attr_exc:
  982. raise RuntimeError(
  983. "An HTTPS request is being sent through an HTTPS proxy. "
  984. "This needs support for TLS in TLS but it is not implemented "
  985. "in your runtime for the stdlib asyncio.\n\n"
  986. "Please upgrade to Python 3.11 or higher. For more details, "
  987. "please see:\n"
  988. "* https://bugs.python.org/issue37179\n"
  989. "* https://github.com/python/cpython/pull/28073\n"
  990. "* https://docs.aiohttp.org/en/stable/"
  991. "client_advanced.html#proxy-support\n"
  992. "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
  993. ) from attr_exc
  994. def _loop_supports_start_tls(self) -> bool:
  995. try:
  996. self._check_loop_for_start_tls()
  997. except RuntimeError:
  998. return False
  999. else:
  1000. return True
  1001. def _warn_about_tls_in_tls(
  1002. self,
  1003. underlying_transport: asyncio.Transport,
  1004. req: ClientRequest,
  1005. ) -> None:
  1006. """Issue a warning if the requested URL has HTTPS scheme."""
  1007. if req.request_info.url.scheme != "https":
  1008. return
  1009. asyncio_supports_tls_in_tls = getattr(
  1010. underlying_transport,
  1011. "_start_tls_compatible",
  1012. False,
  1013. )
  1014. if asyncio_supports_tls_in_tls:
  1015. return
  1016. warnings.warn(
  1017. "An HTTPS request is being sent through an HTTPS proxy. "
  1018. "This support for TLS in TLS is known to be disabled "
  1019. "in the stdlib asyncio (Python <3.11). This is why you'll probably see "
  1020. "an error in the log below.\n\n"
  1021. "It is possible to enable it via monkeypatching. "
  1022. "For more details, see:\n"
  1023. "* https://bugs.python.org/issue37179\n"
  1024. "* https://github.com/python/cpython/pull/28073\n\n"
  1025. "You can temporarily patch this as follows:\n"
  1026. "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n"
  1027. "* https://github.com/aio-libs/aiohttp/discussions/6044\n",
  1028. RuntimeWarning,
  1029. source=self,
  1030. # Why `4`? At least 3 of the calls in the stack originate
  1031. # from the methods in this class.
  1032. stacklevel=3,
  1033. )
  1034. async def _start_tls_connection(
  1035. self,
  1036. underlying_transport: asyncio.Transport,
  1037. req: ClientRequest,
  1038. timeout: "ClientTimeout",
  1039. client_error: Type[Exception] = ClientConnectorError,
  1040. ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
  1041. """Wrap the raw TCP transport with TLS."""
  1042. tls_proto = self._factory() # Create a brand new proto for TLS
  1043. sslcontext = self._get_ssl_context(req)
  1044. if TYPE_CHECKING:
  1045. # _start_tls_connection is unreachable in the current code path
  1046. # if sslcontext is None.
  1047. assert sslcontext is not None
  1048. try:
  1049. async with ceil_timeout(
  1050. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1051. ):
  1052. try:
  1053. tls_transport = await self._loop.start_tls(
  1054. underlying_transport,
  1055. tls_proto,
  1056. sslcontext,
  1057. server_hostname=req.server_hostname or req.host,
  1058. ssl_handshake_timeout=timeout.total,
  1059. )
  1060. except BaseException:
  1061. # We need to close the underlying transport since
  1062. # `start_tls()` probably failed before it had a
  1063. # chance to do this:
  1064. underlying_transport.close()
  1065. raise
  1066. if isinstance(tls_transport, asyncio.Transport):
  1067. fingerprint = self._get_fingerprint(req)
  1068. if fingerprint:
  1069. try:
  1070. fingerprint.check(tls_transport)
  1071. except ServerFingerprintMismatch:
  1072. tls_transport.close()
  1073. if not self._cleanup_closed_disabled:
  1074. self._cleanup_closed_transports.append(tls_transport)
  1075. raise
  1076. except cert_errors as exc:
  1077. raise ClientConnectorCertificateError(req.connection_key, exc) from exc
  1078. except ssl_errors as exc:
  1079. raise ClientConnectorSSLError(req.connection_key, exc) from exc
  1080. except OSError as exc:
  1081. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1082. raise
  1083. raise client_error(req.connection_key, exc) from exc
  1084. except TypeError as type_err:
  1085. # Example cause looks like this:
  1086. # TypeError: transport <asyncio.sslproto._SSLProtocolTransport
  1087. # object at 0x7f760615e460> is not supported by start_tls()
  1088. raise ClientConnectionError(
  1089. "Cannot initialize a TLS-in-TLS connection to host "
  1090. f"{req.host!s}:{req.port:d} through an underlying connection "
  1091. f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} "
  1092. f"[{type_err!s}]"
  1093. ) from type_err
  1094. else:
  1095. if tls_transport is None:
  1096. msg = "Failed to start TLS (possibly caused by closing transport)"
  1097. raise client_error(req.connection_key, OSError(msg))
  1098. tls_proto.connection_made(
  1099. tls_transport
  1100. ) # Kick the state machine of the new TLS protocol
  1101. return tls_transport, tls_proto
  1102. def _convert_hosts_to_addr_infos(
  1103. self, hosts: List[ResolveResult]
  1104. ) -> List[aiohappyeyeballs.AddrInfoType]:
  1105. """Converts the list of hosts to a list of addr_infos.
  1106. The list of hosts is the result of a DNS lookup. The list of
  1107. addr_infos is the result of a call to `socket.getaddrinfo()`.
  1108. """
  1109. addr_infos: List[aiohappyeyeballs.AddrInfoType] = []
  1110. for hinfo in hosts:
  1111. host = hinfo["host"]
  1112. is_ipv6 = ":" in host
  1113. family = socket.AF_INET6 if is_ipv6 else socket.AF_INET
  1114. if self._family and self._family != family:
  1115. continue
  1116. addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"])
  1117. addr_infos.append(
  1118. (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)
  1119. )
  1120. return addr_infos
  1121. async def _create_direct_connection(
  1122. self,
  1123. req: ClientRequest,
  1124. traces: List["Trace"],
  1125. timeout: "ClientTimeout",
  1126. *,
  1127. client_error: Type[Exception] = ClientConnectorError,
  1128. ) -> Tuple[asyncio.Transport, ResponseHandler]:
  1129. sslcontext = self._get_ssl_context(req)
  1130. fingerprint = self._get_fingerprint(req)
  1131. host = req.url.raw_host
  1132. assert host is not None
  1133. # Replace multiple trailing dots with a single one.
  1134. # A trailing dot is only present for fully-qualified domain names.
  1135. # See https://github.com/aio-libs/aiohttp/pull/7364.
  1136. if host.endswith(".."):
  1137. host = host.rstrip(".") + "."
  1138. port = req.port
  1139. assert port is not None
  1140. try:
  1141. # Cancelling this lookup should not cancel the underlying lookup
  1142. # or else the cancel event will get broadcast to all the waiters
  1143. # across all connections.
  1144. hosts = await self._resolve_host(host, port, traces=traces)
  1145. except OSError as exc:
  1146. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1147. raise
  1148. # in case of proxy it is not ClientProxyConnectionError
  1149. # it is problem of resolving proxy ip itself
  1150. raise ClientConnectorDNSError(req.connection_key, exc) from exc
  1151. last_exc: Optional[Exception] = None
  1152. addr_infos = self._convert_hosts_to_addr_infos(hosts)
  1153. while addr_infos:
  1154. # Strip trailing dots, certificates contain FQDN without dots.
  1155. # See https://github.com/aio-libs/aiohttp/issues/3636
  1156. server_hostname = (
  1157. (req.server_hostname or host).rstrip(".") if sslcontext else None
  1158. )
  1159. try:
  1160. transp, proto = await self._wrap_create_connection(
  1161. self._factory,
  1162. timeout=timeout,
  1163. ssl=sslcontext,
  1164. addr_infos=addr_infos,
  1165. server_hostname=server_hostname,
  1166. req=req,
  1167. client_error=client_error,
  1168. )
  1169. except (ClientConnectorError, asyncio.TimeoutError) as exc:
  1170. last_exc = exc
  1171. aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave)
  1172. continue
  1173. if req.is_ssl() and fingerprint:
  1174. try:
  1175. fingerprint.check(transp)
  1176. except ServerFingerprintMismatch as exc:
  1177. transp.close()
  1178. if not self._cleanup_closed_disabled:
  1179. self._cleanup_closed_transports.append(transp)
  1180. last_exc = exc
  1181. # Remove the bad peer from the list of addr_infos
  1182. sock: socket.socket = transp.get_extra_info("socket")
  1183. bad_peer = sock.getpeername()
  1184. aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer)
  1185. continue
  1186. return transp, proto
  1187. else:
  1188. assert last_exc is not None
  1189. raise last_exc
  1190. async def _create_proxy_connection(
  1191. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1192. ) -> Tuple[asyncio.BaseTransport, ResponseHandler]:
  1193. self._fail_on_no_start_tls(req)
  1194. runtime_has_start_tls = self._loop_supports_start_tls()
  1195. headers: Dict[str, str] = {}
  1196. if req.proxy_headers is not None:
  1197. headers = req.proxy_headers # type: ignore[assignment]
  1198. headers[hdrs.HOST] = req.headers[hdrs.HOST]
  1199. url = req.proxy
  1200. assert url is not None
  1201. proxy_req = ClientRequest(
  1202. hdrs.METH_GET,
  1203. url,
  1204. headers=headers,
  1205. auth=req.proxy_auth,
  1206. loop=self._loop,
  1207. ssl=req.ssl,
  1208. )
  1209. # create connection to proxy server
  1210. transport, proto = await self._create_direct_connection(
  1211. proxy_req, [], timeout, client_error=ClientProxyConnectionError
  1212. )
  1213. auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None)
  1214. if auth is not None:
  1215. if not req.is_ssl():
  1216. req.headers[hdrs.PROXY_AUTHORIZATION] = auth
  1217. else:
  1218. proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth
  1219. if req.is_ssl():
  1220. if runtime_has_start_tls:
  1221. self._warn_about_tls_in_tls(transport, req)
  1222. # For HTTPS requests over HTTP proxy
  1223. # we must notify proxy to tunnel connection
  1224. # so we send CONNECT command:
  1225. # CONNECT www.python.org:443 HTTP/1.1
  1226. # Host: www.python.org
  1227. #
  1228. # next we must do TLS handshake and so on
  1229. # to do this we must wrap raw socket into secure one
  1230. # asyncio handles this perfectly
  1231. proxy_req.method = hdrs.METH_CONNECT
  1232. proxy_req.url = req.url
  1233. key = req.connection_key._replace(
  1234. proxy=None, proxy_auth=None, proxy_headers_hash=None
  1235. )
  1236. conn = Connection(self, key, proto, self._loop)
  1237. proxy_resp = await proxy_req.send(conn)
  1238. try:
  1239. protocol = conn._protocol
  1240. assert protocol is not None
  1241. # read_until_eof=True will ensure the connection isn't closed
  1242. # once the response is received and processed allowing
  1243. # START_TLS to work on the connection below.
  1244. protocol.set_response_params(
  1245. read_until_eof=runtime_has_start_tls,
  1246. timeout_ceil_threshold=self._timeout_ceil_threshold,
  1247. )
  1248. resp = await proxy_resp.start(conn)
  1249. except BaseException:
  1250. proxy_resp.close()
  1251. conn.close()
  1252. raise
  1253. else:
  1254. conn._protocol = None
  1255. try:
  1256. if resp.status != 200:
  1257. message = resp.reason
  1258. if message is None:
  1259. message = HTTPStatus(resp.status).phrase
  1260. raise ClientHttpProxyError(
  1261. proxy_resp.request_info,
  1262. resp.history,
  1263. status=resp.status,
  1264. message=message,
  1265. headers=resp.headers,
  1266. )
  1267. if not runtime_has_start_tls:
  1268. rawsock = transport.get_extra_info("socket", default=None)
  1269. if rawsock is None:
  1270. raise RuntimeError(
  1271. "Transport does not expose socket instance"
  1272. )
  1273. # Duplicate the socket, so now we can close proxy transport
  1274. rawsock = rawsock.dup()
  1275. except BaseException:
  1276. # It shouldn't be closed in `finally` because it's fed to
  1277. # `loop.start_tls()` and the docs say not to touch it after
  1278. # passing there.
  1279. transport.close()
  1280. raise
  1281. finally:
  1282. if not runtime_has_start_tls:
  1283. transport.close()
  1284. if not runtime_has_start_tls:
  1285. # HTTP proxy with support for upgrade to HTTPS
  1286. sslcontext = self._get_ssl_context(req)
  1287. return await self._wrap_existing_connection(
  1288. self._factory,
  1289. timeout=timeout,
  1290. ssl=sslcontext,
  1291. sock=rawsock,
  1292. server_hostname=req.host,
  1293. req=req,
  1294. )
  1295. return await self._start_tls_connection(
  1296. # Access the old transport for the last time before it's
  1297. # closed and forgotten forever:
  1298. transport,
  1299. req=req,
  1300. timeout=timeout,
  1301. )
  1302. finally:
  1303. proxy_resp.close()
  1304. return transport, proto
  1305. class UnixConnector(BaseConnector):
  1306. """Unix socket connector.
  1307. path - Unix socket path.
  1308. keepalive_timeout - (optional) Keep-alive timeout.
  1309. force_close - Set to True to force close and do reconnect
  1310. after each request (and between redirects).
  1311. limit - The total number of simultaneous connections.
  1312. limit_per_host - Number of simultaneous connections to one host.
  1313. loop - Optional event loop.
  1314. """
  1315. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"})
  1316. def __init__(
  1317. self,
  1318. path: str,
  1319. force_close: bool = False,
  1320. keepalive_timeout: Union[object, float, None] = sentinel,
  1321. limit: int = 100,
  1322. limit_per_host: int = 0,
  1323. loop: Optional[asyncio.AbstractEventLoop] = None,
  1324. ) -> None:
  1325. super().__init__(
  1326. force_close=force_close,
  1327. keepalive_timeout=keepalive_timeout,
  1328. limit=limit,
  1329. limit_per_host=limit_per_host,
  1330. loop=loop,
  1331. )
  1332. self._path = path
  1333. @property
  1334. def path(self) -> str:
  1335. """Path to unix socket."""
  1336. return self._path
  1337. async def _create_connection(
  1338. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1339. ) -> ResponseHandler:
  1340. try:
  1341. async with ceil_timeout(
  1342. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1343. ):
  1344. _, proto = await self._loop.create_unix_connection(
  1345. self._factory, self._path
  1346. )
  1347. except OSError as exc:
  1348. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1349. raise
  1350. raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc
  1351. return proto
  1352. class NamedPipeConnector(BaseConnector):
  1353. """Named pipe connector.
  1354. Only supported by the proactor event loop.
  1355. See also: https://docs.python.org/3/library/asyncio-eventloop.html
  1356. path - Windows named pipe path.
  1357. keepalive_timeout - (optional) Keep-alive timeout.
  1358. force_close - Set to True to force close and do reconnect
  1359. after each request (and between redirects).
  1360. limit - The total number of simultaneous connections.
  1361. limit_per_host - Number of simultaneous connections to one host.
  1362. loop - Optional event loop.
  1363. """
  1364. allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"})
  1365. def __init__(
  1366. self,
  1367. path: str,
  1368. force_close: bool = False,
  1369. keepalive_timeout: Union[object, float, None] = sentinel,
  1370. limit: int = 100,
  1371. limit_per_host: int = 0,
  1372. loop: Optional[asyncio.AbstractEventLoop] = None,
  1373. ) -> None:
  1374. super().__init__(
  1375. force_close=force_close,
  1376. keepalive_timeout=keepalive_timeout,
  1377. limit=limit,
  1378. limit_per_host=limit_per_host,
  1379. loop=loop,
  1380. )
  1381. if not isinstance(
  1382. self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined]
  1383. ):
  1384. raise RuntimeError(
  1385. "Named Pipes only available in proactor loop under windows"
  1386. )
  1387. self._path = path
  1388. @property
  1389. def path(self) -> str:
  1390. """Path to the named pipe."""
  1391. return self._path
  1392. async def _create_connection(
  1393. self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout"
  1394. ) -> ResponseHandler:
  1395. try:
  1396. async with ceil_timeout(
  1397. timeout.sock_connect, ceil_threshold=timeout.ceil_threshold
  1398. ):
  1399. _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined]
  1400. self._factory, self._path
  1401. )
  1402. # the drain is required so that the connection_made is called
  1403. # and transport is set otherwise it is not set before the
  1404. # `assert conn.transport is not None`
  1405. # in client.py's _request method
  1406. await asyncio.sleep(0)
  1407. # other option is to manually set transport like
  1408. # `proto.transport = trans`
  1409. except OSError as exc:
  1410. if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
  1411. raise
  1412. raise ClientConnectorError(req.connection_key, exc) from exc
  1413. return cast(ResponseHandler, proto)