_receivebuffer.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import re
  2. import sys
  3. from typing import List, Optional, Union
  4. __all__ = ["ReceiveBuffer"]
  5. # Operations we want to support:
  6. # - find next \r\n or \r\n\r\n (\n or \n\n are also acceptable),
  7. # or wait until there is one
  8. # - read at-most-N bytes
  9. # Goals:
  10. # - on average, do this fast
  11. # - worst case, do this in O(n) where n is the number of bytes processed
  12. # Plan:
  13. # - store bytearray, offset, how far we've searched for a separator token
  14. # - use the how-far-we've-searched data to avoid rescanning
  15. # - while doing a stream of uninterrupted processing, advance offset instead
  16. # of constantly copying
  17. # WARNING:
  18. # - I haven't benchmarked or profiled any of this yet.
  19. #
  20. # Note that starting in Python 3.4, deleting the initial n bytes from a
  21. # bytearray is amortized O(n), thanks to some excellent work by Antoine
  22. # Martin:
  23. #
  24. # https://bugs.python.org/issue19087
  25. #
  26. # This means that if we only supported 3.4+, we could get rid of the code here
  27. # involving self._start and self.compress, because it's doing exactly the same
  28. # thing that bytearray now does internally.
  29. #
  30. # BUT unfortunately, we still support 2.7, and reading short segments out of a
  31. # long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
  32. # delete this code. Yet:
  33. #
  34. # https://pythonclock.org/
  35. #
  36. # (Two things to double-check first though: make sure PyPy also has the
  37. # optimization, and benchmark to make sure it's a win, since we do have a
  38. # slightly clever thing where we delay calling compress() until we've
  39. # processed a whole event, which could in theory be slightly more efficient
  40. # than the internal bytearray support.)
  41. blank_line_regex = re.compile(b"\n\r?\n", re.MULTILINE)
  42. class ReceiveBuffer:
  43. def __init__(self) -> None:
  44. self._data = bytearray()
  45. self._next_line_search = 0
  46. self._multiple_lines_search = 0
  47. def __iadd__(self, byteslike: Union[bytes, bytearray]) -> "ReceiveBuffer":
  48. self._data += byteslike
  49. return self
  50. def __bool__(self) -> bool:
  51. return bool(len(self))
  52. def __len__(self) -> int:
  53. return len(self._data)
  54. # for @property unprocessed_data
  55. def __bytes__(self) -> bytes:
  56. return bytes(self._data)
  57. def _extract(self, count: int) -> bytearray:
  58. # extracting an initial slice of the data buffer and return it
  59. out = self._data[:count]
  60. del self._data[:count]
  61. self._next_line_search = 0
  62. self._multiple_lines_search = 0
  63. return out
  64. def maybe_extract_at_most(self, count: int) -> Optional[bytearray]:
  65. """
  66. Extract a fixed number of bytes from the buffer.
  67. """
  68. out = self._data[:count]
  69. if not out:
  70. return None
  71. return self._extract(count)
  72. def maybe_extract_next_line(self) -> Optional[bytearray]:
  73. """
  74. Extract the first line, if it is completed in the buffer.
  75. """
  76. # Only search in buffer space that we've not already looked at.
  77. search_start_index = max(0, self._next_line_search - 1)
  78. partial_idx = self._data.find(b"\r\n", search_start_index)
  79. if partial_idx == -1:
  80. self._next_line_search = len(self._data)
  81. return None
  82. # + 2 is to compensate len(b"\r\n")
  83. idx = partial_idx + 2
  84. return self._extract(idx)
  85. def maybe_extract_lines(self) -> Optional[List[bytearray]]:
  86. """
  87. Extract everything up to the first blank line, and return a list of lines.
  88. """
  89. # Handle the case where we have an immediate empty line.
  90. if self._data[:1] == b"\n":
  91. self._extract(1)
  92. return []
  93. if self._data[:2] == b"\r\n":
  94. self._extract(2)
  95. return []
  96. # Only search in buffer space that we've not already looked at.
  97. match = blank_line_regex.search(self._data, self._multiple_lines_search)
  98. if match is None:
  99. self._multiple_lines_search = max(0, len(self._data) - 2)
  100. return None
  101. # Truncate the buffer and return it.
  102. idx = match.span(0)[-1]
  103. out = self._extract(idx)
  104. lines = out.split(b"\n")
  105. for line in lines:
  106. if line.endswith(b"\r"):
  107. del line[-1]
  108. assert lines[-2] == lines[-1] == b""
  109. del lines[-2:]
  110. return lines
  111. # In theory we should wait until `\r\n` before starting to validate
  112. # incoming data. However it's interesting to detect (very) invalid data
  113. # early given they might not even contain `\r\n` at all (hence only
  114. # timeout will get rid of them).
  115. # This is not a 100% effective detection but more of a cheap sanity check
  116. # allowing for early abort in some useful cases.
  117. # This is especially interesting when peer is messing up with HTTPS and
  118. # sent us a TLS stream where we were expecting plain HTTP given all
  119. # versions of TLS so far start handshake with a 0x16 message type code.
  120. def is_next_line_obviously_invalid_request_line(self) -> bool:
  121. try:
  122. # HTTP header line must not contain non-printable characters
  123. # and should not start with a space
  124. return self._data[0] < 0x21
  125. except IndexError:
  126. return False