padding.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import annotations
  5. import abc
  6. import typing
  7. from cryptography import utils
  8. from cryptography.exceptions import AlreadyFinalized
  9. from cryptography.hazmat.bindings._rust import (
  10. PKCS7PaddingContext,
  11. PKCS7UnpaddingContext,
  12. check_ansix923_padding,
  13. )
  14. class PaddingContext(metaclass=abc.ABCMeta):
  15. @abc.abstractmethod
  16. def update(self, data: bytes) -> bytes:
  17. """
  18. Pads the provided bytes and returns any available data as bytes.
  19. """
  20. @abc.abstractmethod
  21. def finalize(self) -> bytes:
  22. """
  23. Finalize the padding, returns bytes.
  24. """
  25. def _byte_padding_check(block_size: int) -> None:
  26. if not (0 <= block_size <= 2040):
  27. raise ValueError("block_size must be in range(0, 2041).")
  28. if block_size % 8 != 0:
  29. raise ValueError("block_size must be a multiple of 8.")
  30. def _byte_padding_update(
  31. buffer_: bytes | None, data: bytes, block_size: int
  32. ) -> tuple[bytes, bytes]:
  33. if buffer_ is None:
  34. raise AlreadyFinalized("Context was already finalized.")
  35. utils._check_byteslike("data", data)
  36. buffer_ += bytes(data)
  37. finished_blocks = len(buffer_) // (block_size // 8)
  38. result = buffer_[: finished_blocks * (block_size // 8)]
  39. buffer_ = buffer_[finished_blocks * (block_size // 8) :]
  40. return buffer_, result
  41. def _byte_padding_pad(
  42. buffer_: bytes | None,
  43. block_size: int,
  44. paddingfn: typing.Callable[[int], bytes],
  45. ) -> bytes:
  46. if buffer_ is None:
  47. raise AlreadyFinalized("Context was already finalized.")
  48. pad_size = block_size // 8 - len(buffer_)
  49. return buffer_ + paddingfn(pad_size)
  50. def _byte_unpadding_update(
  51. buffer_: bytes | None, data: bytes, block_size: int
  52. ) -> tuple[bytes, bytes]:
  53. if buffer_ is None:
  54. raise AlreadyFinalized("Context was already finalized.")
  55. utils._check_byteslike("data", data)
  56. buffer_ += bytes(data)
  57. finished_blocks = max(len(buffer_) // (block_size // 8) - 1, 0)
  58. result = buffer_[: finished_blocks * (block_size // 8)]
  59. buffer_ = buffer_[finished_blocks * (block_size // 8) :]
  60. return buffer_, result
  61. def _byte_unpadding_check(
  62. buffer_: bytes | None,
  63. block_size: int,
  64. checkfn: typing.Callable[[bytes], int],
  65. ) -> bytes:
  66. if buffer_ is None:
  67. raise AlreadyFinalized("Context was already finalized.")
  68. if len(buffer_) != block_size // 8:
  69. raise ValueError("Invalid padding bytes.")
  70. valid = checkfn(buffer_)
  71. if not valid:
  72. raise ValueError("Invalid padding bytes.")
  73. pad_size = buffer_[-1]
  74. return buffer_[:-pad_size]
  75. class PKCS7:
  76. def __init__(self, block_size: int):
  77. _byte_padding_check(block_size)
  78. self.block_size = block_size
  79. def padder(self) -> PaddingContext:
  80. return PKCS7PaddingContext(self.block_size)
  81. def unpadder(self) -> PaddingContext:
  82. return PKCS7UnpaddingContext(self.block_size)
  83. PaddingContext.register(PKCS7PaddingContext)
  84. PaddingContext.register(PKCS7UnpaddingContext)
  85. class ANSIX923:
  86. def __init__(self, block_size: int):
  87. _byte_padding_check(block_size)
  88. self.block_size = block_size
  89. def padder(self) -> PaddingContext:
  90. return _ANSIX923PaddingContext(self.block_size)
  91. def unpadder(self) -> PaddingContext:
  92. return _ANSIX923UnpaddingContext(self.block_size)
  93. class _ANSIX923PaddingContext(PaddingContext):
  94. _buffer: bytes | None
  95. def __init__(self, block_size: int):
  96. self.block_size = block_size
  97. # TODO: more copies than necessary, we should use zero-buffer (#193)
  98. self._buffer = b""
  99. def update(self, data: bytes) -> bytes:
  100. self._buffer, result = _byte_padding_update(
  101. self._buffer, data, self.block_size
  102. )
  103. return result
  104. def _padding(self, size: int) -> bytes:
  105. return bytes([0]) * (size - 1) + bytes([size])
  106. def finalize(self) -> bytes:
  107. result = _byte_padding_pad(
  108. self._buffer, self.block_size, self._padding
  109. )
  110. self._buffer = None
  111. return result
  112. class _ANSIX923UnpaddingContext(PaddingContext):
  113. _buffer: bytes | None
  114. def __init__(self, block_size: int):
  115. self.block_size = block_size
  116. # TODO: more copies than necessary, we should use zero-buffer (#193)
  117. self._buffer = b""
  118. def update(self, data: bytes) -> bytes:
  119. self._buffer, result = _byte_unpadding_update(
  120. self._buffer, data, self.block_size
  121. )
  122. return result
  123. def finalize(self) -> bytes:
  124. result = _byte_unpadding_check(
  125. self._buffer,
  126. self.block_size,
  127. check_ansix923_padding,
  128. )
  129. self._buffer = None
  130. return result