base.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import annotations
  5. import abc
  6. import typing
  7. from cryptography.hazmat.bindings._rust import openssl as rust_openssl
  8. from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
  9. from cryptography.hazmat.primitives.ciphers import modes
  10. class CipherContext(metaclass=abc.ABCMeta):
  11. @abc.abstractmethod
  12. def update(self, data: bytes) -> bytes:
  13. """
  14. Processes the provided bytes through the cipher and returns the results
  15. as bytes.
  16. """
  17. @abc.abstractmethod
  18. def update_into(self, data: bytes, buf: bytes) -> int:
  19. """
  20. Processes the provided bytes and writes the resulting data into the
  21. provided buffer. Returns the number of bytes written.
  22. """
  23. @abc.abstractmethod
  24. def finalize(self) -> bytes:
  25. """
  26. Returns the results of processing the final block as bytes.
  27. """
  28. @abc.abstractmethod
  29. def reset_nonce(self, nonce: bytes) -> None:
  30. """
  31. Resets the nonce for the cipher context to the provided value.
  32. Raises an exception if it does not support reset or if the
  33. provided nonce does not have a valid length.
  34. """
  35. class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
  36. @abc.abstractmethod
  37. def authenticate_additional_data(self, data: bytes) -> None:
  38. """
  39. Authenticates the provided bytes.
  40. """
  41. class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
  42. @abc.abstractmethod
  43. def finalize_with_tag(self, tag: bytes) -> bytes:
  44. """
  45. Returns the results of processing the final block as bytes and allows
  46. delayed passing of the authentication tag.
  47. """
  48. class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
  49. @property
  50. @abc.abstractmethod
  51. def tag(self) -> bytes:
  52. """
  53. Returns tag bytes. This is only available after encryption is
  54. finalized.
  55. """
  56. Mode = typing.TypeVar(
  57. "Mode", bound=typing.Optional[modes.Mode], covariant=True
  58. )
  59. class Cipher(typing.Generic[Mode]):
  60. def __init__(
  61. self,
  62. algorithm: CipherAlgorithm,
  63. mode: Mode,
  64. backend: typing.Any = None,
  65. ) -> None:
  66. if not isinstance(algorithm, CipherAlgorithm):
  67. raise TypeError("Expected interface of CipherAlgorithm.")
  68. if mode is not None:
  69. # mypy needs this assert to narrow the type from our generic
  70. # type. Maybe it won't some time in the future.
  71. assert isinstance(mode, modes.Mode)
  72. mode.validate_for_algorithm(algorithm)
  73. self.algorithm = algorithm
  74. self.mode = mode
  75. @typing.overload
  76. def encryptor(
  77. self: Cipher[modes.ModeWithAuthenticationTag],
  78. ) -> AEADEncryptionContext: ...
  79. @typing.overload
  80. def encryptor(
  81. self: _CIPHER_TYPE,
  82. ) -> CipherContext: ...
  83. def encryptor(self):
  84. if isinstance(self.mode, modes.ModeWithAuthenticationTag):
  85. if self.mode.tag is not None:
  86. raise ValueError(
  87. "Authentication tag must be None when encrypting."
  88. )
  89. return rust_openssl.ciphers.create_encryption_ctx(
  90. self.algorithm, self.mode
  91. )
  92. @typing.overload
  93. def decryptor(
  94. self: Cipher[modes.ModeWithAuthenticationTag],
  95. ) -> AEADDecryptionContext: ...
  96. @typing.overload
  97. def decryptor(
  98. self: _CIPHER_TYPE,
  99. ) -> CipherContext: ...
  100. def decryptor(self):
  101. return rust_openssl.ciphers.create_decryption_ctx(
  102. self.algorithm, self.mode
  103. )
  104. _CIPHER_TYPE = Cipher[
  105. typing.Union[
  106. modes.ModeWithNonce,
  107. modes.ModeWithTweak,
  108. None,
  109. modes.ECB,
  110. modes.ModeWithInitializationVector,
  111. ]
  112. ]
  113. CipherContext.register(rust_openssl.ciphers.CipherContext)
  114. AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext)
  115. AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)