web_ws.py 22 KB


  1. import asyncio
  2. import base64
  3. import binascii
  4. import hashlib
  5. import json
  6. import sys
  7. from typing import Any, Final, Iterable, Optional, Tuple, Union, cast
  8. import attr
  9. from multidict import CIMultiDict
  10. from . import hdrs
  11. from ._websocket.reader import WebSocketDataQueue
  12. from ._websocket.writer import DEFAULT_LIMIT
  13. from .abc import AbstractStreamWriter
  14. from .client_exceptions import WSMessageTypeError
  15. from .helpers import calculate_timeout_when, set_exception, set_result
  16. from .http import (
  17. WS_CLOSED_MESSAGE,
  18. WS_CLOSING_MESSAGE,
  19. WS_KEY,
  20. WebSocketError,
  21. WebSocketReader,
  22. WebSocketWriter,
  23. WSCloseCode,
  24. WSMessage,
  25. WSMsgType as WSMsgType,
  26. ws_ext_gen,
  27. ws_ext_parse,
  28. )
  29. from .http_websocket import _INTERNAL_RECEIVE_TYPES
  30. from .log import ws_logger
  31. from .streams import EofStream
  32. from .typedefs import JSONDecoder, JSONEncoder
  33. from .web_exceptions import HTTPBadRequest, HTTPException
  34. from .web_request import BaseRequest
  35. from .web_response import StreamResponse
  36. if sys.version_info >= (3, 11):
  37. import asyncio as async_timeout
  38. else:
  39. import async_timeout
  40. __all__ = (
  41. "WebSocketResponse",
  42. "WebSocketReady",
  43. "WSMsgType",
  44. )
  45. THRESHOLD_CONNLOST_ACCESS: Final[int] = 5
  46. @attr.s(auto_attribs=True, frozen=True, slots=True)
  47. class WebSocketReady:
  48. ok: bool
  49. protocol: Optional[str]
  50. def __bool__(self) -> bool:
  51. return self.ok
  52. class WebSocketResponse(StreamResponse):
  53. _length_check: bool = False
  54. _ws_protocol: Optional[str] = None
  55. _writer: Optional[WebSocketWriter] = None
  56. _reader: Optional[WebSocketDataQueue] = None
  57. _closed: bool = False
  58. _closing: bool = False
  59. _conn_lost: int = 0
  60. _close_code: Optional[int] = None
  61. _loop: Optional[asyncio.AbstractEventLoop] = None
  62. _waiting: bool = False
  63. _close_wait: Optional[asyncio.Future[None]] = None
  64. _exception: Optional[BaseException] = None
  65. _heartbeat_when: float = 0.0
  66. _heartbeat_cb: Optional[asyncio.TimerHandle] = None
  67. _pong_response_cb: Optional[asyncio.TimerHandle] = None
  68. _ping_task: Optional[asyncio.Task[None]] = None
  69. def __init__(
  70. self,
  71. *,
  72. timeout: float = 10.0,
  73. receive_timeout: Optional[float] = None,
  74. autoclose: bool = True,
  75. autoping: bool = True,
  76. heartbeat: Optional[float] = None,
  77. protocols: Iterable[str] = (),
  78. compress: bool = True,
  79. max_msg_size: int = 4 * 1024 * 1024,
  80. writer_limit: int = DEFAULT_LIMIT,
  81. ) -> None:
  82. super().__init__(status=101)
  83. self._protocols = protocols
  84. self._timeout = timeout
  85. self._receive_timeout = receive_timeout
  86. self._autoclose = autoclose
  87. self._autoping = autoping
  88. self._heartbeat = heartbeat
  89. if heartbeat is not None:
  90. self._pong_heartbeat = heartbeat / 2.0
  91. self._compress: Union[bool, int] = compress
  92. self._max_msg_size = max_msg_size
  93. self._writer_limit = writer_limit
  94. def _cancel_heartbeat(self) -> None:
  95. self._cancel_pong_response_cb()
  96. if self._heartbeat_cb is not None:
  97. self._heartbeat_cb.cancel()
  98. self._heartbeat_cb = None
  99. if self._ping_task is not None:
  100. self._ping_task.cancel()
  101. self._ping_task = None
  102. def _cancel_pong_response_cb(self) -> None:
  103. if self._pong_response_cb is not None:
  104. self._pong_response_cb.cancel()
  105. self._pong_response_cb = None
  106. def _reset_heartbeat(self) -> None:
  107. if self._heartbeat is None:
  108. return
  109. self._cancel_pong_response_cb()
  110. req = self._req
  111. timeout_ceil_threshold = (
  112. req._protocol._timeout_ceil_threshold if req is not None else 5
  113. )
  114. loop = self._loop
  115. assert loop is not None
  116. now = loop.time()
  117. when = calculate_timeout_when(now, self._heartbeat, timeout_ceil_threshold)
  118. self._heartbeat_when = when
  119. if self._heartbeat_cb is None:
  120. # We do not cancel the previous heartbeat_cb here because
  121. # it generates a significant amount of TimerHandle churn
  122. # which causes asyncio to rebuild the heap frequently.
  123. # Instead _send_heartbeat() will reschedule the next
  124. # heartbeat if it fires too early.
  125. self._heartbeat_cb = loop.call_at(when, self._send_heartbeat)
  126. def _send_heartbeat(self) -> None:
  127. self._heartbeat_cb = None
  128. loop = self._loop
  129. assert loop is not None and self._writer is not None
  130. now = loop.time()
  131. if now < self._heartbeat_when:
  132. # Heartbeat fired too early, reschedule
  133. self._heartbeat_cb = loop.call_at(
  134. self._heartbeat_when, self._send_heartbeat
  135. )
  136. return
  137. req = self._req
  138. timeout_ceil_threshold = (
  139. req._protocol._timeout_ceil_threshold if req is not None else 5
  140. )
  141. when = calculate_timeout_when(now, self._pong_heartbeat, timeout_ceil_threshold)
  142. self._cancel_pong_response_cb()
  143. self._pong_response_cb = loop.call_at(when, self._pong_not_received)
  144. coro = self._writer.send_frame(b"", WSMsgType.PING)
  145. if sys.version_info >= (3, 12):
  146. # Optimization for Python 3.12, try to send the ping
  147. # immediately to avoid having to schedule
  148. # the task on the event loop.
  149. ping_task = asyncio.Task(coro, loop=loop, eager_start=True)
  150. else:
  151. ping_task = loop.create_task(coro)
  152. if not ping_task.done():
  153. self._ping_task = ping_task
  154. ping_task.add_done_callback(self._ping_task_done)
  155. else:
  156. self._ping_task_done(ping_task)
  157. def _ping_task_done(self, task: "asyncio.Task[None]") -> None:
  158. """Callback for when the ping task completes."""
  159. if not task.cancelled() and (exc := task.exception()):
  160. self._handle_ping_pong_exception(exc)
  161. self._ping_task = None
  162. def _pong_not_received(self) -> None:
  163. if self._req is not None and self._req.transport is not None:
  164. self._handle_ping_pong_exception(
  165. asyncio.TimeoutError(
  166. f"No PONG received after {self._pong_heartbeat} seconds"
  167. )
  168. )
  169. def _handle_ping_pong_exception(self, exc: BaseException) -> None:
  170. """Handle exceptions raised during ping/pong processing."""
  171. if self._closed:
  172. return
  173. self._set_closed()
  174. self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
  175. self._exception = exc
  176. if self._waiting and not self._closing and self._reader is not None:
  177. self._reader.feed_data(WSMessage(WSMsgType.ERROR, exc, None), 0)
  178. def _set_closed(self) -> None:
  179. """Set the connection to closed.
  180. Cancel any heartbeat timers and set the closed flag.
  181. """
  182. self._closed = True
  183. self._cancel_heartbeat()
  184. async def prepare(self, request: BaseRequest) -> AbstractStreamWriter:
  185. # make pre-check to don't hide it by do_handshake() exceptions
  186. if self._payload_writer is not None:
  187. return self._payload_writer
  188. protocol, writer = self._pre_start(request)
  189. payload_writer = await super().prepare(request)
  190. assert payload_writer is not None
  191. self._post_start(request, protocol, writer)
  192. await payload_writer.drain()
  193. return payload_writer
  194. def _handshake(
  195. self, request: BaseRequest
  196. ) -> Tuple["CIMultiDict[str]", Optional[str], int, bool]:
  197. headers = request.headers
  198. if "websocket" != headers.get(hdrs.UPGRADE, "").lower().strip():
  199. raise HTTPBadRequest(
  200. text=(
  201. "No WebSocket UPGRADE hdr: {}\n Can "
  202. '"Upgrade" only to "WebSocket".'
  203. ).format(headers.get(hdrs.UPGRADE))
  204. )
  205. if "upgrade" not in headers.get(hdrs.CONNECTION, "").lower():
  206. raise HTTPBadRequest(
  207. text="No CONNECTION upgrade hdr: {}".format(
  208. headers.get(hdrs.CONNECTION)
  209. )
  210. )
  211. # find common sub-protocol between client and server
  212. protocol: Optional[str] = None
  213. if hdrs.SEC_WEBSOCKET_PROTOCOL in headers:
  214. req_protocols = [
  215. str(proto.strip())
  216. for proto in headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
  217. ]
  218. for proto in req_protocols:
  219. if proto in self._protocols:
  220. protocol = proto
  221. break
  222. else:
  223. # No overlap found: Return no protocol as per spec
  224. ws_logger.warning(
  225. "%s: Client protocols %r don’t overlap server-known ones %r",
  226. request.remote,
  227. req_protocols,
  228. self._protocols,
  229. )
  230. # check supported version
  231. version = headers.get(hdrs.SEC_WEBSOCKET_VERSION, "")
  232. if version not in ("13", "8", "7"):
  233. raise HTTPBadRequest(text=f"Unsupported version: {version}")
  234. # check client handshake for validity
  235. key = headers.get(hdrs.SEC_WEBSOCKET_KEY)
  236. try:
  237. if not key or len(base64.b64decode(key)) != 16:
  238. raise HTTPBadRequest(text=f"Handshake error: {key!r}")
  239. except binascii.Error:
  240. raise HTTPBadRequest(text=f"Handshake error: {key!r}") from None
  241. accept_val = base64.b64encode(
  242. hashlib.sha1(key.encode() + WS_KEY).digest()
  243. ).decode()
  244. response_headers = CIMultiDict(
  245. {
  246. hdrs.UPGRADE: "websocket",
  247. hdrs.CONNECTION: "upgrade",
  248. hdrs.SEC_WEBSOCKET_ACCEPT: accept_val,
  249. }
  250. )
  251. notakeover = False
  252. compress = 0
  253. if self._compress:
  254. extensions = headers.get(hdrs.SEC_WEBSOCKET_EXTENSIONS)
  255. # Server side always get return with no exception.
  256. # If something happened, just drop compress extension
  257. compress, notakeover = ws_ext_parse(extensions, isserver=True)
  258. if compress:
  259. enabledext = ws_ext_gen(
  260. compress=compress, isserver=True, server_notakeover=notakeover
  261. )
  262. response_headers[hdrs.SEC_WEBSOCKET_EXTENSIONS] = enabledext
  263. if protocol:
  264. response_headers[hdrs.SEC_WEBSOCKET_PROTOCOL] = protocol
  265. return (
  266. response_headers,
  267. protocol,
  268. compress,
  269. notakeover,
  270. )
  271. def _pre_start(self, request: BaseRequest) -> Tuple[Optional[str], WebSocketWriter]:
  272. self._loop = request._loop
  273. headers, protocol, compress, notakeover = self._handshake(request)
  274. self.set_status(101)
  275. self.headers.update(headers)
  276. self.force_close()
  277. self._compress = compress
  278. transport = request._protocol.transport
  279. assert transport is not None
  280. writer = WebSocketWriter(
  281. request._protocol,
  282. transport,
  283. compress=compress,
  284. notakeover=notakeover,
  285. limit=self._writer_limit,
  286. )
  287. return protocol, writer
  288. def _post_start(
  289. self, request: BaseRequest, protocol: Optional[str], writer: WebSocketWriter
  290. ) -> None:
  291. self._ws_protocol = protocol
  292. self._writer = writer
  293. self._reset_heartbeat()
  294. loop = self._loop
  295. assert loop is not None
  296. self._reader = WebSocketDataQueue(request._protocol, 2**16, loop=loop)
  297. request.protocol.set_parser(
  298. WebSocketReader(
  299. self._reader, self._max_msg_size, compress=bool(self._compress)
  300. )
  301. )
  302. # disable HTTP keepalive for WebSocket
  303. request.protocol.keep_alive(False)
  304. def can_prepare(self, request: BaseRequest) -> WebSocketReady:
  305. if self._writer is not None:
  306. raise RuntimeError("Already started")
  307. try:
  308. _, protocol, _, _ = self._handshake(request)
  309. except HTTPException:
  310. return WebSocketReady(False, None)
  311. else:
  312. return WebSocketReady(True, protocol)
  313. @property
  314. def closed(self) -> bool:
  315. return self._closed
  316. @property
  317. def close_code(self) -> Optional[int]:
  318. return self._close_code
  319. @property
  320. def ws_protocol(self) -> Optional[str]:
  321. return self._ws_protocol
  322. @property
  323. def compress(self) -> Union[int, bool]:
  324. return self._compress
  325. def get_extra_info(self, name: str, default: Any = None) -> Any:
  326. """Get optional transport information.
  327. If no value associated with ``name`` is found, ``default`` is returned.
  328. """
  329. writer = self._writer
  330. if writer is None:
  331. return default
  332. transport = writer.transport
  333. if transport is None:
  334. return default
  335. return transport.get_extra_info(name, default)
  336. def exception(self) -> Optional[BaseException]:
  337. return self._exception
  338. async def ping(self, message: bytes = b"") -> None:
  339. if self._writer is None:
  340. raise RuntimeError("Call .prepare() first")
  341. await self._writer.send_frame(message, WSMsgType.PING)
  342. async def pong(self, message: bytes = b"") -> None:
  343. # unsolicited pong
  344. if self._writer is None:
  345. raise RuntimeError("Call .prepare() first")
  346. await self._writer.send_frame(message, WSMsgType.PONG)
  347. async def send_frame(
  348. self, message: bytes, opcode: WSMsgType, compress: Optional[int] = None
  349. ) -> None:
  350. """Send a frame over the websocket."""
  351. if self._writer is None:
  352. raise RuntimeError("Call .prepare() first")
  353. await self._writer.send_frame(message, opcode, compress)
  354. async def send_str(self, data: str, compress: Optional[int] = None) -> None:
  355. if self._writer is None:
  356. raise RuntimeError("Call .prepare() first")
  357. if not isinstance(data, str):
  358. raise TypeError("data argument must be str (%r)" % type(data))
  359. await self._writer.send_frame(
  360. data.encode("utf-8"), WSMsgType.TEXT, compress=compress
  361. )
  362. async def send_bytes(self, data: bytes, compress: Optional[int] = None) -> None:
  363. if self._writer is None:
  364. raise RuntimeError("Call .prepare() first")
  365. if not isinstance(data, (bytes, bytearray, memoryview)):
  366. raise TypeError("data argument must be byte-ish (%r)" % type(data))
  367. await self._writer.send_frame(data, WSMsgType.BINARY, compress=compress)
  368. async def send_json(
  369. self,
  370. data: Any,
  371. compress: Optional[int] = None,
  372. *,
  373. dumps: JSONEncoder = json.dumps,
  374. ) -> None:
  375. await self.send_str(dumps(data), compress=compress)
  376. async def write_eof(self) -> None: # type: ignore[override]
  377. if self._eof_sent:
  378. return
  379. if self._payload_writer is None:
  380. raise RuntimeError("Response has not been started")
  381. await self.close()
  382. self._eof_sent = True
  383. async def close(
  384. self, *, code: int = WSCloseCode.OK, message: bytes = b"", drain: bool = True
  385. ) -> bool:
  386. """Close websocket connection."""
  387. if self._writer is None:
  388. raise RuntimeError("Call .prepare() first")
  389. if self._closed:
  390. return False
  391. self._set_closed()
  392. try:
  393. await self._writer.close(code, message)
  394. writer = self._payload_writer
  395. assert writer is not None
  396. if drain:
  397. await writer.drain()
  398. except (asyncio.CancelledError, asyncio.TimeoutError):
  399. self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
  400. raise
  401. except Exception as exc:
  402. self._exception = exc
  403. self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
  404. return True
  405. reader = self._reader
  406. assert reader is not None
  407. # we need to break `receive()` cycle before we can call
  408. # `reader.read()` as `close()` may be called from different task
  409. if self._waiting:
  410. assert self._loop is not None
  411. assert self._close_wait is None
  412. self._close_wait = self._loop.create_future()
  413. reader.feed_data(WS_CLOSING_MESSAGE, 0)
  414. await self._close_wait
  415. if self._closing:
  416. self._close_transport()
  417. return True
  418. try:
  419. async with async_timeout.timeout(self._timeout):
  420. while True:
  421. msg = await reader.read()
  422. if msg.type is WSMsgType.CLOSE:
  423. self._set_code_close_transport(msg.data)
  424. return True
  425. except asyncio.CancelledError:
  426. self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
  427. raise
  428. except Exception as exc:
  429. self._exception = exc
  430. self._set_code_close_transport(WSCloseCode.ABNORMAL_CLOSURE)
  431. return True
  432. def _set_closing(self, code: WSCloseCode) -> None:
  433. """Set the close code and mark the connection as closing."""
  434. self._closing = True
  435. self._close_code = code
  436. self._cancel_heartbeat()
  437. def _set_code_close_transport(self, code: WSCloseCode) -> None:
  438. """Set the close code and close the transport."""
  439. self._close_code = code
  440. self._close_transport()
  441. def _close_transport(self) -> None:
  442. """Close the transport."""
  443. if self._req is not None and self._req.transport is not None:
  444. self._req.transport.close()
  445. async def receive(self, timeout: Optional[float] = None) -> WSMessage:
  446. if self._reader is None:
  447. raise RuntimeError("Call .prepare() first")
  448. receive_timeout = timeout or self._receive_timeout
  449. while True:
  450. if self._waiting:
  451. raise RuntimeError("Concurrent call to receive() is not allowed")
  452. if self._closed:
  453. self._conn_lost += 1
  454. if self._conn_lost >= THRESHOLD_CONNLOST_ACCESS:
  455. raise RuntimeError("WebSocket connection is closed.")
  456. return WS_CLOSED_MESSAGE
  457. elif self._closing:
  458. return WS_CLOSING_MESSAGE
  459. try:
  460. self._waiting = True
  461. try:
  462. if receive_timeout:
  463. # Entering the context manager and creating
  464. # Timeout() object can take almost 50% of the
  465. # run time in this loop so we avoid it if
  466. # there is no read timeout.
  467. async with async_timeout.timeout(receive_timeout):
  468. msg = await self._reader.read()
  469. else:
  470. msg = await self._reader.read()
  471. self._reset_heartbeat()
  472. finally:
  473. self._waiting = False
  474. if self._close_wait:
  475. set_result(self._close_wait, None)
  476. except asyncio.TimeoutError:
  477. raise
  478. except EofStream:
  479. self._close_code = WSCloseCode.OK
  480. await self.close()
  481. return WSMessage(WSMsgType.CLOSED, None, None)
  482. except WebSocketError as exc:
  483. self._close_code = exc.code
  484. await self.close(code=exc.code)
  485. return WSMessage(WSMsgType.ERROR, exc, None)
  486. except Exception as exc:
  487. self._exception = exc
  488. self._set_closing(WSCloseCode.ABNORMAL_CLOSURE)
  489. await self.close()
  490. return WSMessage(WSMsgType.ERROR, exc, None)
  491. if msg.type not in _INTERNAL_RECEIVE_TYPES:
  492. # If its not a close/closing/ping/pong message
  493. # we can return it immediately
  494. return msg
  495. if msg.type is WSMsgType.CLOSE:
  496. self._set_closing(msg.data)
  497. # Could be closed while awaiting reader.
  498. if not self._closed and self._autoclose:
  499. # The client is likely going to close the
  500. # connection out from under us so we do not
  501. # want to drain any pending writes as it will
  502. # likely result writing to a broken pipe.
  503. await self.close(drain=False)
  504. elif msg.type is WSMsgType.CLOSING:
  505. self._set_closing(WSCloseCode.OK)
  506. elif msg.type is WSMsgType.PING and self._autoping:
  507. await self.pong(msg.data)
  508. continue
  509. elif msg.type is WSMsgType.PONG and self._autoping:
  510. continue
  511. return msg
  512. async def receive_str(self, *, timeout: Optional[float] = None) -> str:
  513. msg = await self.receive(timeout)
  514. if msg.type is not WSMsgType.TEXT:
  515. raise WSMessageTypeError(
  516. f"Received message {msg.type}:{msg.data!r} is not WSMsgType.TEXT"
  517. )
  518. return cast(str, msg.data)
  519. async def receive_bytes(self, *, timeout: Optional[float] = None) -> bytes:
  520. msg = await self.receive(timeout)
  521. if msg.type is not WSMsgType.BINARY:
  522. raise WSMessageTypeError(
  523. f"Received message {msg.type}:{msg.data!r} is not WSMsgType.BINARY"
  524. )
  525. return cast(bytes, msg.data)
  526. async def receive_json(
  527. self, *, loads: JSONDecoder = json.loads, timeout: Optional[float] = None
  528. ) -> Any:
  529. data = await self.receive_str(timeout=timeout)
  530. return loads(data)
  531. async def write(self, data: bytes) -> None:
  532. raise RuntimeError("Cannot call .write() for websocket")
  533. def __aiter__(self) -> "WebSocketResponse":
  534. return self
  535. async def __anext__(self) -> WSMessage:
  536. msg = await self.receive()
  537. if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
  538. raise StopAsyncIteration
  539. return msg
  540. def _cancel(self, exc: BaseException) -> None:
  541. # web_protocol calls this from connection_lost
  542. # or when the server is shutting down.
  543. self._closing = True
  544. self._cancel_heartbeat()
  545. if self._reader is not None:
  546. set_exception(self._reader, exc)