tracing.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. from types import SimpleNamespace
  2. from typing import TYPE_CHECKING, Awaitable, Mapping, Optional, Protocol, Type, TypeVar
  3. import attr
  4. from aiosignal import Signal
  5. from multidict import CIMultiDict
  6. from yarl import URL
  7. from .client_reqrep import ClientResponse
  8. if TYPE_CHECKING:
  9. from .client import ClientSession
  10. _ParamT_contra = TypeVar("_ParamT_contra", contravariant=True)
  11. class _SignalCallback(Protocol[_ParamT_contra]):
  12. def __call__(
  13. self,
  14. __client_session: ClientSession,
  15. __trace_config_ctx: SimpleNamespace,
  16. __params: _ParamT_contra,
  17. ) -> Awaitable[None]: ...
  18. __all__ = (
  19. "TraceConfig",
  20. "TraceRequestStartParams",
  21. "TraceRequestEndParams",
  22. "TraceRequestExceptionParams",
  23. "TraceConnectionQueuedStartParams",
  24. "TraceConnectionQueuedEndParams",
  25. "TraceConnectionCreateStartParams",
  26. "TraceConnectionCreateEndParams",
  27. "TraceConnectionReuseconnParams",
  28. "TraceDnsResolveHostStartParams",
  29. "TraceDnsResolveHostEndParams",
  30. "TraceDnsCacheHitParams",
  31. "TraceDnsCacheMissParams",
  32. "TraceRequestRedirectParams",
  33. "TraceRequestChunkSentParams",
  34. "TraceResponseChunkReceivedParams",
  35. "TraceRequestHeadersSentParams",
  36. )
  37. class TraceConfig:
  38. """First-class used to trace requests launched via ClientSession objects."""
  39. def __init__(
  40. self, trace_config_ctx_factory: Type[SimpleNamespace] = SimpleNamespace
  41. ) -> None:
  42. self._on_request_start: Signal[_SignalCallback[TraceRequestStartParams]] = (
  43. Signal(self)
  44. )
  45. self._on_request_chunk_sent: Signal[
  46. _SignalCallback[TraceRequestChunkSentParams]
  47. ] = Signal(self)
  48. self._on_response_chunk_received: Signal[
  49. _SignalCallback[TraceResponseChunkReceivedParams]
  50. ] = Signal(self)
  51. self._on_request_end: Signal[_SignalCallback[TraceRequestEndParams]] = Signal(
  52. self
  53. )
  54. self._on_request_exception: Signal[
  55. _SignalCallback[TraceRequestExceptionParams]
  56. ] = Signal(self)
  57. self._on_request_redirect: Signal[
  58. _SignalCallback[TraceRequestRedirectParams]
  59. ] = Signal(self)
  60. self._on_connection_queued_start: Signal[
  61. _SignalCallback[TraceConnectionQueuedStartParams]
  62. ] = Signal(self)
  63. self._on_connection_queued_end: Signal[
  64. _SignalCallback[TraceConnectionQueuedEndParams]
  65. ] = Signal(self)
  66. self._on_connection_create_start: Signal[
  67. _SignalCallback[TraceConnectionCreateStartParams]
  68. ] = Signal(self)
  69. self._on_connection_create_end: Signal[
  70. _SignalCallback[TraceConnectionCreateEndParams]
  71. ] = Signal(self)
  72. self._on_connection_reuseconn: Signal[
  73. _SignalCallback[TraceConnectionReuseconnParams]
  74. ] = Signal(self)
  75. self._on_dns_resolvehost_start: Signal[
  76. _SignalCallback[TraceDnsResolveHostStartParams]
  77. ] = Signal(self)
  78. self._on_dns_resolvehost_end: Signal[
  79. _SignalCallback[TraceDnsResolveHostEndParams]
  80. ] = Signal(self)
  81. self._on_dns_cache_hit: Signal[_SignalCallback[TraceDnsCacheHitParams]] = (
  82. Signal(self)
  83. )
  84. self._on_dns_cache_miss: Signal[_SignalCallback[TraceDnsCacheMissParams]] = (
  85. Signal(self)
  86. )
  87. self._on_request_headers_sent: Signal[
  88. _SignalCallback[TraceRequestHeadersSentParams]
  89. ] = Signal(self)
  90. self._trace_config_ctx_factory = trace_config_ctx_factory
  91. def trace_config_ctx(
  92. self, trace_request_ctx: Optional[Mapping[str, str]] = None
  93. ) -> SimpleNamespace:
  94. """Return a new trace_config_ctx instance"""
  95. return self._trace_config_ctx_factory(trace_request_ctx=trace_request_ctx)
  96. def freeze(self) -> None:
  97. self._on_request_start.freeze()
  98. self._on_request_chunk_sent.freeze()
  99. self._on_response_chunk_received.freeze()
  100. self._on_request_end.freeze()
  101. self._on_request_exception.freeze()
  102. self._on_request_redirect.freeze()
  103. self._on_connection_queued_start.freeze()
  104. self._on_connection_queued_end.freeze()
  105. self._on_connection_create_start.freeze()
  106. self._on_connection_create_end.freeze()
  107. self._on_connection_reuseconn.freeze()
  108. self._on_dns_resolvehost_start.freeze()
  109. self._on_dns_resolvehost_end.freeze()
  110. self._on_dns_cache_hit.freeze()
  111. self._on_dns_cache_miss.freeze()
  112. self._on_request_headers_sent.freeze()
  113. @property
  114. def on_request_start(self) -> "Signal[_SignalCallback[TraceRequestStartParams]]":
  115. return self._on_request_start
  116. @property
  117. def on_request_chunk_sent(
  118. self,
  119. ) -> "Signal[_SignalCallback[TraceRequestChunkSentParams]]":
  120. return self._on_request_chunk_sent
  121. @property
  122. def on_response_chunk_received(
  123. self,
  124. ) -> "Signal[_SignalCallback[TraceResponseChunkReceivedParams]]":
  125. return self._on_response_chunk_received
  126. @property
  127. def on_request_end(self) -> "Signal[_SignalCallback[TraceRequestEndParams]]":
  128. return self._on_request_end
  129. @property
  130. def on_request_exception(
  131. self,
  132. ) -> "Signal[_SignalCallback[TraceRequestExceptionParams]]":
  133. return self._on_request_exception
  134. @property
  135. def on_request_redirect(
  136. self,
  137. ) -> "Signal[_SignalCallback[TraceRequestRedirectParams]]":
  138. return self._on_request_redirect
  139. @property
  140. def on_connection_queued_start(
  141. self,
  142. ) -> "Signal[_SignalCallback[TraceConnectionQueuedStartParams]]":
  143. return self._on_connection_queued_start
  144. @property
  145. def on_connection_queued_end(
  146. self,
  147. ) -> "Signal[_SignalCallback[TraceConnectionQueuedEndParams]]":
  148. return self._on_connection_queued_end
  149. @property
  150. def on_connection_create_start(
  151. self,
  152. ) -> "Signal[_SignalCallback[TraceConnectionCreateStartParams]]":
  153. return self._on_connection_create_start
  154. @property
  155. def on_connection_create_end(
  156. self,
  157. ) -> "Signal[_SignalCallback[TraceConnectionCreateEndParams]]":
  158. return self._on_connection_create_end
  159. @property
  160. def on_connection_reuseconn(
  161. self,
  162. ) -> "Signal[_SignalCallback[TraceConnectionReuseconnParams]]":
  163. return self._on_connection_reuseconn
  164. @property
  165. def on_dns_resolvehost_start(
  166. self,
  167. ) -> "Signal[_SignalCallback[TraceDnsResolveHostStartParams]]":
  168. return self._on_dns_resolvehost_start
  169. @property
  170. def on_dns_resolvehost_end(
  171. self,
  172. ) -> "Signal[_SignalCallback[TraceDnsResolveHostEndParams]]":
  173. return self._on_dns_resolvehost_end
  174. @property
  175. def on_dns_cache_hit(self) -> "Signal[_SignalCallback[TraceDnsCacheHitParams]]":
  176. return self._on_dns_cache_hit
  177. @property
  178. def on_dns_cache_miss(self) -> "Signal[_SignalCallback[TraceDnsCacheMissParams]]":
  179. return self._on_dns_cache_miss
  180. @property
  181. def on_request_headers_sent(
  182. self,
  183. ) -> "Signal[_SignalCallback[TraceRequestHeadersSentParams]]":
  184. return self._on_request_headers_sent
  185. @attr.s(auto_attribs=True, frozen=True, slots=True)
  186. class TraceRequestStartParams:
  187. """Parameters sent by the `on_request_start` signal"""
  188. method: str
  189. url: URL
  190. headers: "CIMultiDict[str]"
  191. @attr.s(auto_attribs=True, frozen=True, slots=True)
  192. class TraceRequestChunkSentParams:
  193. """Parameters sent by the `on_request_chunk_sent` signal"""
  194. method: str
  195. url: URL
  196. chunk: bytes
  197. @attr.s(auto_attribs=True, frozen=True, slots=True)
  198. class TraceResponseChunkReceivedParams:
  199. """Parameters sent by the `on_response_chunk_received` signal"""
  200. method: str
  201. url: URL
  202. chunk: bytes
  203. @attr.s(auto_attribs=True, frozen=True, slots=True)
  204. class TraceRequestEndParams:
  205. """Parameters sent by the `on_request_end` signal"""
  206. method: str
  207. url: URL
  208. headers: "CIMultiDict[str]"
  209. response: ClientResponse
  210. @attr.s(auto_attribs=True, frozen=True, slots=True)
  211. class TraceRequestExceptionParams:
  212. """Parameters sent by the `on_request_exception` signal"""
  213. method: str
  214. url: URL
  215. headers: "CIMultiDict[str]"
  216. exception: BaseException
  217. @attr.s(auto_attribs=True, frozen=True, slots=True)
  218. class TraceRequestRedirectParams:
  219. """Parameters sent by the `on_request_redirect` signal"""
  220. method: str
  221. url: URL
  222. headers: "CIMultiDict[str]"
  223. response: ClientResponse
  224. @attr.s(auto_attribs=True, frozen=True, slots=True)
  225. class TraceConnectionQueuedStartParams:
  226. """Parameters sent by the `on_connection_queued_start` signal"""
  227. @attr.s(auto_attribs=True, frozen=True, slots=True)
  228. class TraceConnectionQueuedEndParams:
  229. """Parameters sent by the `on_connection_queued_end` signal"""
  230. @attr.s(auto_attribs=True, frozen=True, slots=True)
  231. class TraceConnectionCreateStartParams:
  232. """Parameters sent by the `on_connection_create_start` signal"""
  233. @attr.s(auto_attribs=True, frozen=True, slots=True)
  234. class TraceConnectionCreateEndParams:
  235. """Parameters sent by the `on_connection_create_end` signal"""
  236. @attr.s(auto_attribs=True, frozen=True, slots=True)
  237. class TraceConnectionReuseconnParams:
  238. """Parameters sent by the `on_connection_reuseconn` signal"""
  239. @attr.s(auto_attribs=True, frozen=True, slots=True)
  240. class TraceDnsResolveHostStartParams:
  241. """Parameters sent by the `on_dns_resolvehost_start` signal"""
  242. host: str
  243. @attr.s(auto_attribs=True, frozen=True, slots=True)
  244. class TraceDnsResolveHostEndParams:
  245. """Parameters sent by the `on_dns_resolvehost_end` signal"""
  246. host: str
  247. @attr.s(auto_attribs=True, frozen=True, slots=True)
  248. class TraceDnsCacheHitParams:
  249. """Parameters sent by the `on_dns_cache_hit` signal"""
  250. host: str
  251. @attr.s(auto_attribs=True, frozen=True, slots=True)
  252. class TraceDnsCacheMissParams:
  253. """Parameters sent by the `on_dns_cache_miss` signal"""
  254. host: str
  255. @attr.s(auto_attribs=True, frozen=True, slots=True)
  256. class TraceRequestHeadersSentParams:
  257. """Parameters sent by the `on_request_headers_sent` signal"""
  258. method: str
  259. url: URL
  260. headers: "CIMultiDict[str]"
  261. class Trace:
  262. """Internal dependency holder class.
  263. Used to keep together the main dependencies used
  264. at the moment of send a signal.
  265. """
  266. def __init__(
  267. self,
  268. session: "ClientSession",
  269. trace_config: TraceConfig,
  270. trace_config_ctx: SimpleNamespace,
  271. ) -> None:
  272. self._trace_config = trace_config
  273. self._trace_config_ctx = trace_config_ctx
  274. self._session = session
  275. async def send_request_start(
  276. self, method: str, url: URL, headers: "CIMultiDict[str]"
  277. ) -> None:
  278. return await self._trace_config.on_request_start.send(
  279. self._session,
  280. self._trace_config_ctx,
  281. TraceRequestStartParams(method, url, headers),
  282. )
  283. async def send_request_chunk_sent(
  284. self, method: str, url: URL, chunk: bytes
  285. ) -> None:
  286. return await self._trace_config.on_request_chunk_sent.send(
  287. self._session,
  288. self._trace_config_ctx,
  289. TraceRequestChunkSentParams(method, url, chunk),
  290. )
  291. async def send_response_chunk_received(
  292. self, method: str, url: URL, chunk: bytes
  293. ) -> None:
  294. return await self._trace_config.on_response_chunk_received.send(
  295. self._session,
  296. self._trace_config_ctx,
  297. TraceResponseChunkReceivedParams(method, url, chunk),
  298. )
  299. async def send_request_end(
  300. self,
  301. method: str,
  302. url: URL,
  303. headers: "CIMultiDict[str]",
  304. response: ClientResponse,
  305. ) -> None:
  306. return await self._trace_config.on_request_end.send(
  307. self._session,
  308. self._trace_config_ctx,
  309. TraceRequestEndParams(method, url, headers, response),
  310. )
  311. async def send_request_exception(
  312. self,
  313. method: str,
  314. url: URL,
  315. headers: "CIMultiDict[str]",
  316. exception: BaseException,
  317. ) -> None:
  318. return await self._trace_config.on_request_exception.send(
  319. self._session,
  320. self._trace_config_ctx,
  321. TraceRequestExceptionParams(method, url, headers, exception),
  322. )
  323. async def send_request_redirect(
  324. self,
  325. method: str,
  326. url: URL,
  327. headers: "CIMultiDict[str]",
  328. response: ClientResponse,
  329. ) -> None:
  330. return await self._trace_config._on_request_redirect.send(
  331. self._session,
  332. self._trace_config_ctx,
  333. TraceRequestRedirectParams(method, url, headers, response),
  334. )
  335. async def send_connection_queued_start(self) -> None:
  336. return await self._trace_config.on_connection_queued_start.send(
  337. self._session, self._trace_config_ctx, TraceConnectionQueuedStartParams()
  338. )
  339. async def send_connection_queued_end(self) -> None:
  340. return await self._trace_config.on_connection_queued_end.send(
  341. self._session, self._trace_config_ctx, TraceConnectionQueuedEndParams()
  342. )
  343. async def send_connection_create_start(self) -> None:
  344. return await self._trace_config.on_connection_create_start.send(
  345. self._session, self._trace_config_ctx, TraceConnectionCreateStartParams()
  346. )
  347. async def send_connection_create_end(self) -> None:
  348. return await self._trace_config.on_connection_create_end.send(
  349. self._session, self._trace_config_ctx, TraceConnectionCreateEndParams()
  350. )
  351. async def send_connection_reuseconn(self) -> None:
  352. return await self._trace_config.on_connection_reuseconn.send(
  353. self._session, self._trace_config_ctx, TraceConnectionReuseconnParams()
  354. )
  355. async def send_dns_resolvehost_start(self, host: str) -> None:
  356. return await self._trace_config.on_dns_resolvehost_start.send(
  357. self._session, self._trace_config_ctx, TraceDnsResolveHostStartParams(host)
  358. )
  359. async def send_dns_resolvehost_end(self, host: str) -> None:
  360. return await self._trace_config.on_dns_resolvehost_end.send(
  361. self._session, self._trace_config_ctx, TraceDnsResolveHostEndParams(host)
  362. )
  363. async def send_dns_cache_hit(self, host: str) -> None:
  364. return await self._trace_config.on_dns_cache_hit.send(
  365. self._session, self._trace_config_ctx, TraceDnsCacheHitParams(host)
  366. )
  367. async def send_dns_cache_miss(self, host: str) -> None:
  368. return await self._trace_config.on_dns_cache_miss.send(
  369. self._session, self._trace_config_ctx, TraceDnsCacheMissParams(host)
  370. )
  371. async def send_request_headers(
  372. self, method: str, url: URL, headers: "CIMultiDict[str]"
  373. ) -> None:
  374. return await self._trace_config._on_request_headers_sent.send(
  375. self._session,
  376. self._trace_config_ctx,
  377. TraceRequestHeadersSentParams(method, url, headers),
  378. )