text.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from __future__ import annotations
  2. import codecs
  3. from collections.abc import Callable, Mapping
  4. from dataclasses import InitVar, dataclass, field
  5. from typing import Any
  6. from ..abc import (
  7. AnyByteReceiveStream,
  8. AnyByteSendStream,
  9. AnyByteStream,
  10. ObjectReceiveStream,
  11. ObjectSendStream,
  12. ObjectStream,
  13. )
  14. @dataclass(eq=False)
  15. class TextReceiveStream(ObjectReceiveStream[str]):
  16. """
  17. Stream wrapper that decodes bytes to strings using the given encoding.
  18. Decoding is done using :class:`~codecs.IncrementalDecoder` which returns any
  19. completely received unicode characters as soon as they come in.
  20. :param transport_stream: any bytes-based receive stream
  21. :param encoding: character encoding to use for decoding bytes to strings (defaults
  22. to ``utf-8``)
  23. :param errors: handling scheme for decoding errors (defaults to ``strict``; see the
  24. `codecs module documentation`_ for a comprehensive list of options)
  25. .. _codecs module documentation:
  26. https://docs.python.org/3/library/codecs.html#codec-objects
  27. """
  28. transport_stream: AnyByteReceiveStream
  29. encoding: InitVar[str] = "utf-8"
  30. errors: InitVar[str] = "strict"
  31. _decoder: codecs.IncrementalDecoder = field(init=False)
  32. def __post_init__(self, encoding: str, errors: str) -> None:
  33. decoder_class = codecs.getincrementaldecoder(encoding)
  34. self._decoder = decoder_class(errors=errors)
  35. async def receive(self) -> str:
  36. while True:
  37. chunk = await self.transport_stream.receive()
  38. decoded = self._decoder.decode(chunk)
  39. if decoded:
  40. return decoded
  41. async def aclose(self) -> None:
  42. await self.transport_stream.aclose()
  43. self._decoder.reset()
  44. @property
  45. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  46. return self.transport_stream.extra_attributes
  47. @dataclass(eq=False)
  48. class TextSendStream(ObjectSendStream[str]):
  49. """
  50. Sends strings to the wrapped stream as bytes using the given encoding.
  51. :param AnyByteSendStream transport_stream: any bytes-based send stream
  52. :param str encoding: character encoding to use for encoding strings to bytes
  53. (defaults to ``utf-8``)
  54. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  55. the `codecs module documentation`_ for a comprehensive list of options)
  56. .. _codecs module documentation:
  57. https://docs.python.org/3/library/codecs.html#codec-objects
  58. """
  59. transport_stream: AnyByteSendStream
  60. encoding: InitVar[str] = "utf-8"
  61. errors: str = "strict"
  62. _encoder: Callable[..., tuple[bytes, int]] = field(init=False)
  63. def __post_init__(self, encoding: str) -> None:
  64. self._encoder = codecs.getencoder(encoding)
  65. async def send(self, item: str) -> None:
  66. encoded = self._encoder(item, self.errors)[0]
  67. await self.transport_stream.send(encoded)
  68. async def aclose(self) -> None:
  69. await self.transport_stream.aclose()
  70. @property
  71. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  72. return self.transport_stream.extra_attributes
  73. @dataclass(eq=False)
  74. class TextStream(ObjectStream[str]):
  75. """
  76. A bidirectional stream that decodes bytes to strings on receive and encodes strings
  77. to bytes on send.
  78. Extra attributes will be provided from both streams, with the receive stream
  79. providing the values in case of a conflict.
  80. :param AnyByteStream transport_stream: any bytes-based stream
  81. :param str encoding: character encoding to use for encoding/decoding strings to/from
  82. bytes (defaults to ``utf-8``)
  83. :param str errors: handling scheme for encoding errors (defaults to ``strict``; see
  84. the `codecs module documentation`_ for a comprehensive list of options)
  85. .. _codecs module documentation:
  86. https://docs.python.org/3/library/codecs.html#codec-objects
  87. """
  88. transport_stream: AnyByteStream
  89. encoding: InitVar[str] = "utf-8"
  90. errors: InitVar[str] = "strict"
  91. _receive_stream: TextReceiveStream = field(init=False)
  92. _send_stream: TextSendStream = field(init=False)
  93. def __post_init__(self, encoding: str, errors: str) -> None:
  94. self._receive_stream = TextReceiveStream(
  95. self.transport_stream, encoding=encoding, errors=errors
  96. )
  97. self._send_stream = TextSendStream(
  98. self.transport_stream, encoding=encoding, errors=errors
  99. )
  100. async def receive(self) -> str:
  101. return await self._receive_stream.receive()
  102. async def send(self, item: str) -> None:
  103. await self._send_stream.send(item)
  104. async def send_eof(self) -> None:
  105. await self.transport_stream.send_eof()
  106. async def aclose(self) -> None:
  107. await self._send_stream.aclose()
  108. await self._receive_stream.aclose()
  109. @property
  110. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  111. return {
  112. **self._send_stream.extra_attributes,
  113. **self._receive_stream.extra_attributes,
  114. }