web_app.py 19 KB


  1. import asyncio
  2. import logging
  3. import warnings
  4. from functools import lru_cache, partial, update_wrapper
  5. from typing import (
  6. TYPE_CHECKING,
  7. Any,
  8. AsyncIterator,
  9. Awaitable,
  10. Callable,
  11. Dict,
  12. Iterable,
  13. Iterator,
  14. List,
  15. Mapping,
  16. MutableMapping,
  17. Optional,
  18. Sequence,
  19. Tuple,
  20. Type,
  21. TypeVar,
  22. Union,
  23. cast,
  24. overload,
  25. )
  26. from aiosignal import Signal
  27. from frozenlist import FrozenList
  28. from . import hdrs
  29. from .abc import (
  30. AbstractAccessLogger,
  31. AbstractMatchInfo,
  32. AbstractRouter,
  33. AbstractStreamWriter,
  34. )
  35. from .helpers import DEBUG, AppKey
  36. from .http_parser import RawRequestMessage
  37. from .log import web_logger
  38. from .streams import StreamReader
  39. from .typedefs import Handler, Middleware
  40. from .web_exceptions import NotAppKeyWarning
  41. from .web_log import AccessLogger
  42. from .web_middlewares import _fix_request_current_app
  43. from .web_protocol import RequestHandler
  44. from .web_request import Request
  45. from .web_response import StreamResponse
  46. from .web_routedef import AbstractRouteDef
  47. from .web_server import Server
  48. from .web_urldispatcher import (
  49. AbstractResource,
  50. AbstractRoute,
  51. Domain,
  52. MaskDomain,
  53. MatchedSubAppResource,
  54. PrefixedSubAppResource,
  55. SystemRoute,
  56. UrlDispatcher,
  57. )
  58. __all__ = ("Application", "CleanupError")
  59. if TYPE_CHECKING:
  60. _AppSignal = Signal[Callable[["Application"], Awaitable[None]]]
  61. _RespPrepareSignal = Signal[Callable[[Request, StreamResponse], Awaitable[None]]]
  62. _Middlewares = FrozenList[Middleware]
  63. _MiddlewaresHandlers = Optional[Sequence[Tuple[Middleware, bool]]]
  64. _Subapps = List["Application"]
  65. else:
  66. # No type checker mode, skip types
  67. _AppSignal = Signal
  68. _RespPrepareSignal = Signal
  69. _Middlewares = FrozenList
  70. _MiddlewaresHandlers = Optional[Sequence]
  71. _Subapps = List
  72. _T = TypeVar("_T")
  73. _U = TypeVar("_U")
  74. _Resource = TypeVar("_Resource", bound=AbstractResource)
  75. def _build_middlewares(
  76. handler: Handler, apps: Tuple["Application", ...]
  77. ) -> Callable[[Request], Awaitable[StreamResponse]]:
  78. """Apply middlewares to handler."""
  79. for app in apps[::-1]:
  80. for m, _ in app._middlewares_handlers: # type: ignore[union-attr]
  81. handler = update_wrapper(partial(m, handler=handler), handler) # type: ignore[misc]
  82. return handler
  83. _cached_build_middleware = lru_cache(maxsize=1024)(_build_middlewares)
  84. class Application(MutableMapping[Union[str, AppKey[Any]], Any]):
  85. ATTRS = frozenset(
  86. [
  87. "logger",
  88. "_debug",
  89. "_router",
  90. "_loop",
  91. "_handler_args",
  92. "_middlewares",
  93. "_middlewares_handlers",
  94. "_has_legacy_middlewares",
  95. "_run_middlewares",
  96. "_state",
  97. "_frozen",
  98. "_pre_frozen",
  99. "_subapps",
  100. "_on_response_prepare",
  101. "_on_startup",
  102. "_on_shutdown",
  103. "_on_cleanup",
  104. "_client_max_size",
  105. "_cleanup_ctx",
  106. ]
  107. )
  108. def __init__(
  109. self,
  110. *,
  111. logger: logging.Logger = web_logger,
  112. router: Optional[UrlDispatcher] = None,
  113. middlewares: Iterable[Middleware] = (),
  114. handler_args: Optional[Mapping[str, Any]] = None,
  115. client_max_size: int = 1024**2,
  116. loop: Optional[asyncio.AbstractEventLoop] = None,
  117. debug: Any = ..., # mypy doesn't support ellipsis
  118. ) -> None:
  119. if router is None:
  120. router = UrlDispatcher()
  121. else:
  122. warnings.warn(
  123. "router argument is deprecated", DeprecationWarning, stacklevel=2
  124. )
  125. assert isinstance(router, AbstractRouter), router
  126. if loop is not None:
  127. warnings.warn(
  128. "loop argument is deprecated", DeprecationWarning, stacklevel=2
  129. )
  130. if debug is not ...:
  131. warnings.warn(
  132. "debug argument is deprecated", DeprecationWarning, stacklevel=2
  133. )
  134. self._debug = debug
  135. self._router: UrlDispatcher = router
  136. self._loop = loop
  137. self._handler_args = handler_args
  138. self.logger = logger
  139. self._middlewares: _Middlewares = FrozenList(middlewares)
  140. # initialized on freezing
  141. self._middlewares_handlers: _MiddlewaresHandlers = None
  142. # initialized on freezing
  143. self._run_middlewares: Optional[bool] = None
  144. self._has_legacy_middlewares: bool = True
  145. self._state: Dict[Union[AppKey[Any], str], object] = {}
  146. self._frozen = False
  147. self._pre_frozen = False
  148. self._subapps: _Subapps = []
  149. self._on_response_prepare: _RespPrepareSignal = Signal(self)
  150. self._on_startup: _AppSignal = Signal(self)
  151. self._on_shutdown: _AppSignal = Signal(self)
  152. self._on_cleanup: _AppSignal = Signal(self)
  153. self._cleanup_ctx = CleanupContext()
  154. self._on_startup.append(self._cleanup_ctx._on_startup)
  155. self._on_cleanup.append(self._cleanup_ctx._on_cleanup)
  156. self._client_max_size = client_max_size
  157. def __init_subclass__(cls: Type["Application"]) -> None:
  158. warnings.warn(
  159. "Inheritance class {} from web.Application "
  160. "is discouraged".format(cls.__name__),
  161. DeprecationWarning,
  162. stacklevel=3,
  163. )
  164. if DEBUG: # pragma: no cover
  165. def __setattr__(self, name: str, val: Any) -> None:
  166. if name not in self.ATTRS:
  167. warnings.warn(
  168. "Setting custom web.Application.{} attribute "
  169. "is discouraged".format(name),
  170. DeprecationWarning,
  171. stacklevel=2,
  172. )
  173. super().__setattr__(name, val)
  174. # MutableMapping API
  175. def __eq__(self, other: object) -> bool:
  176. return self is other
  177. @overload # type: ignore[override]
  178. def __getitem__(self, key: AppKey[_T]) -> _T: ...
  179. @overload
  180. def __getitem__(self, key: str) -> Any: ...
  181. def __getitem__(self, key: Union[str, AppKey[_T]]) -> Any:
  182. return self._state[key]
  183. def _check_frozen(self) -> None:
  184. if self._frozen:
  185. warnings.warn(
  186. "Changing state of started or joined application is deprecated",
  187. DeprecationWarning,
  188. stacklevel=3,
  189. )
  190. @overload # type: ignore[override]
  191. def __setitem__(self, key: AppKey[_T], value: _T) -> None: ...
  192. @overload
  193. def __setitem__(self, key: str, value: Any) -> None: ...
  194. def __setitem__(self, key: Union[str, AppKey[_T]], value: Any) -> None:
  195. self._check_frozen()
  196. if not isinstance(key, AppKey):
  197. warnings.warn(
  198. "It is recommended to use web.AppKey instances for keys.\n"
  199. + "https://docs.aiohttp.org/en/stable/web_advanced.html"
  200. + "#application-s-config",
  201. category=NotAppKeyWarning,
  202. stacklevel=2,
  203. )
  204. self._state[key] = value
  205. def __delitem__(self, key: Union[str, AppKey[_T]]) -> None:
  206. self._check_frozen()
  207. del self._state[key]
  208. def __len__(self) -> int:
  209. return len(self._state)
  210. def __iter__(self) -> Iterator[Union[str, AppKey[Any]]]:
  211. return iter(self._state)
  212. def __hash__(self) -> int:
  213. return id(self)
  214. @overload # type: ignore[override]
  215. def get(self, key: AppKey[_T], default: None = ...) -> Optional[_T]: ...
  216. @overload
  217. def get(self, key: AppKey[_T], default: _U) -> Union[_T, _U]: ...
  218. @overload
  219. def get(self, key: str, default: Any = ...) -> Any: ...
  220. def get(self, key: Union[str, AppKey[_T]], default: Any = None) -> Any:
  221. return self._state.get(key, default)
  222. ########
  223. @property
  224. def loop(self) -> asyncio.AbstractEventLoop:
  225. # Technically the loop can be None
  226. # but we mask it by explicit type cast
  227. # to provide more convenient type annotation
  228. warnings.warn("loop property is deprecated", DeprecationWarning, stacklevel=2)
  229. return cast(asyncio.AbstractEventLoop, self._loop)
  230. def _set_loop(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
  231. if loop is None:
  232. loop = asyncio.get_event_loop()
  233. if self._loop is not None and self._loop is not loop:
  234. raise RuntimeError(
  235. "web.Application instance initialized with different loop"
  236. )
  237. self._loop = loop
  238. # set loop debug
  239. if self._debug is ...:
  240. self._debug = loop.get_debug()
  241. # set loop to sub applications
  242. for subapp in self._subapps:
  243. subapp._set_loop(loop)
  244. @property
  245. def pre_frozen(self) -> bool:
  246. return self._pre_frozen
  247. def pre_freeze(self) -> None:
  248. if self._pre_frozen:
  249. return
  250. self._pre_frozen = True
  251. self._middlewares.freeze()
  252. self._router.freeze()
  253. self._on_response_prepare.freeze()
  254. self._cleanup_ctx.freeze()
  255. self._on_startup.freeze()
  256. self._on_shutdown.freeze()
  257. self._on_cleanup.freeze()
  258. self._middlewares_handlers = tuple(self._prepare_middleware())
  259. self._has_legacy_middlewares = any(
  260. not new_style for _, new_style in self._middlewares_handlers
  261. )
  262. # If current app and any subapp do not have middlewares avoid run all
  263. # of the code footprint that it implies, which have a middleware
  264. # hardcoded per app that sets up the current_app attribute. If no
  265. # middlewares are configured the handler will receive the proper
  266. # current_app without needing all of this code.
  267. self._run_middlewares = True if self.middlewares else False
  268. for subapp in self._subapps:
  269. subapp.pre_freeze()
  270. self._run_middlewares = self._run_middlewares or subapp._run_middlewares
  271. @property
  272. def frozen(self) -> bool:
  273. return self._frozen
  274. def freeze(self) -> None:
  275. if self._frozen:
  276. return
  277. self.pre_freeze()
  278. self._frozen = True
  279. for subapp in self._subapps:
  280. subapp.freeze()
  281. @property
  282. def debug(self) -> bool:
  283. warnings.warn("debug property is deprecated", DeprecationWarning, stacklevel=2)
  284. return self._debug # type: ignore[no-any-return]
  285. def _reg_subapp_signals(self, subapp: "Application") -> None:
  286. def reg_handler(signame: str) -> None:
  287. subsig = getattr(subapp, signame)
  288. async def handler(app: "Application") -> None:
  289. await subsig.send(subapp)
  290. appsig = getattr(self, signame)
  291. appsig.append(handler)
  292. reg_handler("on_startup")
  293. reg_handler("on_shutdown")
  294. reg_handler("on_cleanup")
  295. def add_subapp(self, prefix: str, subapp: "Application") -> PrefixedSubAppResource:
  296. if not isinstance(prefix, str):
  297. raise TypeError("Prefix must be str")
  298. prefix = prefix.rstrip("/")
  299. if not prefix:
  300. raise ValueError("Prefix cannot be empty")
  301. factory = partial(PrefixedSubAppResource, prefix, subapp)
  302. return self._add_subapp(factory, subapp)
  303. def _add_subapp(
  304. self, resource_factory: Callable[[], _Resource], subapp: "Application"
  305. ) -> _Resource:
  306. if self.frozen:
  307. raise RuntimeError("Cannot add sub application to frozen application")
  308. if subapp.frozen:
  309. raise RuntimeError("Cannot add frozen application")
  310. resource = resource_factory()
  311. self.router.register_resource(resource)
  312. self._reg_subapp_signals(subapp)
  313. self._subapps.append(subapp)
  314. subapp.pre_freeze()
  315. if self._loop is not None:
  316. subapp._set_loop(self._loop)
  317. return resource
  318. def add_domain(self, domain: str, subapp: "Application") -> MatchedSubAppResource:
  319. if not isinstance(domain, str):
  320. raise TypeError("Domain must be str")
  321. elif "*" in domain:
  322. rule: Domain = MaskDomain(domain)
  323. else:
  324. rule = Domain(domain)
  325. factory = partial(MatchedSubAppResource, rule, subapp)
  326. return self._add_subapp(factory, subapp)
  327. def add_routes(self, routes: Iterable[AbstractRouteDef]) -> List[AbstractRoute]:
  328. return self.router.add_routes(routes)
  329. @property
  330. def on_response_prepare(self) -> _RespPrepareSignal:
  331. return self._on_response_prepare
  332. @property
  333. def on_startup(self) -> _AppSignal:
  334. return self._on_startup
  335. @property
  336. def on_shutdown(self) -> _AppSignal:
  337. return self._on_shutdown
  338. @property
  339. def on_cleanup(self) -> _AppSignal:
  340. return self._on_cleanup
  341. @property
  342. def cleanup_ctx(self) -> "CleanupContext":
  343. return self._cleanup_ctx
  344. @property
  345. def router(self) -> UrlDispatcher:
  346. return self._router
  347. @property
  348. def middlewares(self) -> _Middlewares:
  349. return self._middlewares
  350. def _make_handler(
  351. self,
  352. *,
  353. loop: Optional[asyncio.AbstractEventLoop] = None,
  354. access_log_class: Type[AbstractAccessLogger] = AccessLogger,
  355. **kwargs: Any,
  356. ) -> Server:
  357. if not issubclass(access_log_class, AbstractAccessLogger):
  358. raise TypeError(
  359. "access_log_class must be subclass of "
  360. "aiohttp.abc.AbstractAccessLogger, got {}".format(access_log_class)
  361. )
  362. self._set_loop(loop)
  363. self.freeze()
  364. kwargs["debug"] = self._debug
  365. kwargs["access_log_class"] = access_log_class
  366. if self._handler_args:
  367. for k, v in self._handler_args.items():
  368. kwargs[k] = v
  369. return Server(
  370. self._handle, # type: ignore[arg-type]
  371. request_factory=self._make_request,
  372. loop=self._loop,
  373. **kwargs,
  374. )
  375. def make_handler(
  376. self,
  377. *,
  378. loop: Optional[asyncio.AbstractEventLoop] = None,
  379. access_log_class: Type[AbstractAccessLogger] = AccessLogger,
  380. **kwargs: Any,
  381. ) -> Server:
  382. warnings.warn(
  383. "Application.make_handler(...) is deprecated, use AppRunner API instead",
  384. DeprecationWarning,
  385. stacklevel=2,
  386. )
  387. return self._make_handler(
  388. loop=loop, access_log_class=access_log_class, **kwargs
  389. )
  390. async def startup(self) -> None:
  391. """Causes on_startup signal
  392. Should be called in the event loop along with the request handler.
  393. """
  394. await self.on_startup.send(self)
  395. async def shutdown(self) -> None:
  396. """Causes on_shutdown signal
  397. Should be called before cleanup()
  398. """
  399. await self.on_shutdown.send(self)
  400. async def cleanup(self) -> None:
  401. """Causes on_cleanup signal
  402. Should be called after shutdown()
  403. """
  404. if self.on_cleanup.frozen:
  405. await self.on_cleanup.send(self)
  406. else:
  407. # If an exception occurs in startup, ensure cleanup contexts are completed.
  408. await self._cleanup_ctx._on_cleanup(self)
  409. def _make_request(
  410. self,
  411. message: RawRequestMessage,
  412. payload: StreamReader,
  413. protocol: RequestHandler,
  414. writer: AbstractStreamWriter,
  415. task: "asyncio.Task[None]",
  416. _cls: Type[Request] = Request,
  417. ) -> Request:
  418. if TYPE_CHECKING:
  419. assert self._loop is not None
  420. return _cls(
  421. message,
  422. payload,
  423. protocol,
  424. writer,
  425. task,
  426. self._loop,
  427. client_max_size=self._client_max_size,
  428. )
  429. def _prepare_middleware(self) -> Iterator[Tuple[Middleware, bool]]:
  430. for m in reversed(self._middlewares):
  431. if getattr(m, "__middleware_version__", None) == 1:
  432. yield m, True
  433. else:
  434. warnings.warn(
  435. f'old-style middleware "{m!r}" deprecated, see #2252',
  436. DeprecationWarning,
  437. stacklevel=2,
  438. )
  439. yield m, False
  440. yield _fix_request_current_app(self), True
  441. async def _handle(self, request: Request) -> StreamResponse:
  442. loop = asyncio.get_event_loop()
  443. debug = loop.get_debug()
  444. match_info = await self._router.resolve(request)
  445. if debug: # pragma: no cover
  446. if not isinstance(match_info, AbstractMatchInfo):
  447. raise TypeError(
  448. "match_info should be AbstractMatchInfo "
  449. "instance, not {!r}".format(match_info)
  450. )
  451. match_info.add_app(self)
  452. match_info.freeze()
  453. request._match_info = match_info
  454. if request.headers.get(hdrs.EXPECT):
  455. resp = await match_info.expect_handler(request)
  456. await request.writer.drain()
  457. if resp is not None:
  458. return resp
  459. handler = match_info.handler
  460. if self._run_middlewares:
  461. # If its a SystemRoute, don't cache building the middlewares since
  462. # they are constructed for every MatchInfoError as a new handler
  463. # is made each time.
  464. if not self._has_legacy_middlewares and not isinstance(
  465. match_info.route, SystemRoute
  466. ):
  467. handler = _cached_build_middleware(handler, match_info.apps)
  468. else:
  469. for app in match_info.apps[::-1]:
  470. for m, new_style in app._middlewares_handlers: # type: ignore[union-attr]
  471. if new_style:
  472. handler = update_wrapper(
  473. partial(m, handler=handler), handler # type: ignore[misc]
  474. )
  475. else:
  476. handler = await m(app, handler) # type: ignore[arg-type,assignment]
  477. return await handler(request)
  478. def __call__(self) -> "Application":
  479. """gunicorn compatibility"""
  480. return self
  481. def __repr__(self) -> str:
  482. return f"<Application 0x{id(self):x}>"
  483. def __bool__(self) -> bool:
  484. return True
  485. class CleanupError(RuntimeError):
  486. @property
  487. def exceptions(self) -> List[BaseException]:
  488. return cast(List[BaseException], self.args[1])
  489. if TYPE_CHECKING:
  490. _CleanupContextBase = FrozenList[Callable[[Application], AsyncIterator[None]]]
  491. else:
  492. _CleanupContextBase = FrozenList
  493. class CleanupContext(_CleanupContextBase):
  494. def __init__(self) -> None:
  495. super().__init__()
  496. self._exits: List[AsyncIterator[None]] = []
  497. async def _on_startup(self, app: Application) -> None:
  498. for cb in self:
  499. it = cb(app).__aiter__()
  500. await it.__anext__()
  501. self._exits.append(it)
  502. async def _on_cleanup(self, app: Application) -> None:
  503. errors = []
  504. for it in reversed(self._exits):
  505. try:
  506. await it.__anext__()
  507. except StopAsyncIteration:
  508. pass
  509. except (Exception, asyncio.CancelledError) as exc:
  510. errors.append(exc)
  511. else:
  512. errors.append(RuntimeError(f"{it!r} has more than one 'yield'"))
  513. if errors:
  514. if len(errors) == 1:
  515. raise errors[0]
  516. else:
  517. raise CleanupError("Multiple errors on cleanup stage", errors)