ocsp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. # This file is dual licensed under the terms of the Apache License, Version
  2. # 2.0, and the BSD License. See the LICENSE file in the root of this repository
  3. # for complete details.
  4. from __future__ import annotations
  5. import datetime
  6. import typing
  7. from cryptography import utils, x509
  8. from cryptography.hazmat.bindings._rust import ocsp
  9. from cryptography.hazmat.primitives import hashes
  10. from cryptography.hazmat.primitives.asymmetric.types import (
  11. CertificateIssuerPrivateKeyTypes,
  12. )
  13. from cryptography.x509.base import (
  14. _EARLIEST_UTC_TIME,
  15. _convert_to_naive_utc_time,
  16. _reject_duplicate_extension,
  17. )
  18. class OCSPResponderEncoding(utils.Enum):
  19. HASH = "By Hash"
  20. NAME = "By Name"
  21. class OCSPResponseStatus(utils.Enum):
  22. SUCCESSFUL = 0
  23. MALFORMED_REQUEST = 1
  24. INTERNAL_ERROR = 2
  25. TRY_LATER = 3
  26. SIG_REQUIRED = 5
  27. UNAUTHORIZED = 6
  28. _ALLOWED_HASHES = (
  29. hashes.SHA1,
  30. hashes.SHA224,
  31. hashes.SHA256,
  32. hashes.SHA384,
  33. hashes.SHA512,
  34. )
  35. def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None:
  36. if not isinstance(algorithm, _ALLOWED_HASHES):
  37. raise ValueError(
  38. "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
  39. )
  40. class OCSPCertStatus(utils.Enum):
  41. GOOD = 0
  42. REVOKED = 1
  43. UNKNOWN = 2
  44. class _SingleResponse:
  45. def __init__(
  46. self,
  47. cert: x509.Certificate,
  48. issuer: x509.Certificate,
  49. algorithm: hashes.HashAlgorithm,
  50. cert_status: OCSPCertStatus,
  51. this_update: datetime.datetime,
  52. next_update: datetime.datetime | None,
  53. revocation_time: datetime.datetime | None,
  54. revocation_reason: x509.ReasonFlags | None,
  55. ):
  56. if not isinstance(cert, x509.Certificate) or not isinstance(
  57. issuer, x509.Certificate
  58. ):
  59. raise TypeError("cert and issuer must be a Certificate")
  60. _verify_algorithm(algorithm)
  61. if not isinstance(this_update, datetime.datetime):
  62. raise TypeError("this_update must be a datetime object")
  63. if next_update is not None and not isinstance(
  64. next_update, datetime.datetime
  65. ):
  66. raise TypeError("next_update must be a datetime object or None")
  67. self._cert = cert
  68. self._issuer = issuer
  69. self._algorithm = algorithm
  70. self._this_update = this_update
  71. self._next_update = next_update
  72. if not isinstance(cert_status, OCSPCertStatus):
  73. raise TypeError(
  74. "cert_status must be an item from the OCSPCertStatus enum"
  75. )
  76. if cert_status is not OCSPCertStatus.REVOKED:
  77. if revocation_time is not None:
  78. raise ValueError(
  79. "revocation_time can only be provided if the certificate "
  80. "is revoked"
  81. )
  82. if revocation_reason is not None:
  83. raise ValueError(
  84. "revocation_reason can only be provided if the certificate"
  85. " is revoked"
  86. )
  87. else:
  88. if not isinstance(revocation_time, datetime.datetime):
  89. raise TypeError("revocation_time must be a datetime object")
  90. revocation_time = _convert_to_naive_utc_time(revocation_time)
  91. if revocation_time < _EARLIEST_UTC_TIME:
  92. raise ValueError(
  93. "The revocation_time must be on or after"
  94. " 1950 January 1."
  95. )
  96. if revocation_reason is not None and not isinstance(
  97. revocation_reason, x509.ReasonFlags
  98. ):
  99. raise TypeError(
  100. "revocation_reason must be an item from the ReasonFlags "
  101. "enum or None"
  102. )
  103. self._cert_status = cert_status
  104. self._revocation_time = revocation_time
  105. self._revocation_reason = revocation_reason
  106. OCSPRequest = ocsp.OCSPRequest
  107. OCSPResponse = ocsp.OCSPResponse
  108. OCSPSingleResponse = ocsp.OCSPSingleResponse
  109. class OCSPRequestBuilder:
  110. def __init__(
  111. self,
  112. request: tuple[
  113. x509.Certificate, x509.Certificate, hashes.HashAlgorithm
  114. ]
  115. | None = None,
  116. request_hash: tuple[bytes, bytes, int, hashes.HashAlgorithm]
  117. | None = None,
  118. extensions: list[x509.Extension[x509.ExtensionType]] = [],
  119. ) -> None:
  120. self._request = request
  121. self._request_hash = request_hash
  122. self._extensions = extensions
  123. def add_certificate(
  124. self,
  125. cert: x509.Certificate,
  126. issuer: x509.Certificate,
  127. algorithm: hashes.HashAlgorithm,
  128. ) -> OCSPRequestBuilder:
  129. if self._request is not None or self._request_hash is not None:
  130. raise ValueError("Only one certificate can be added to a request")
  131. _verify_algorithm(algorithm)
  132. if not isinstance(cert, x509.Certificate) or not isinstance(
  133. issuer, x509.Certificate
  134. ):
  135. raise TypeError("cert and issuer must be a Certificate")
  136. return OCSPRequestBuilder(
  137. (cert, issuer, algorithm), self._request_hash, self._extensions
  138. )
  139. def add_certificate_by_hash(
  140. self,
  141. issuer_name_hash: bytes,
  142. issuer_key_hash: bytes,
  143. serial_number: int,
  144. algorithm: hashes.HashAlgorithm,
  145. ) -> OCSPRequestBuilder:
  146. if self._request is not None or self._request_hash is not None:
  147. raise ValueError("Only one certificate can be added to a request")
  148. if not isinstance(serial_number, int):
  149. raise TypeError("serial_number must be an integer")
  150. _verify_algorithm(algorithm)
  151. utils._check_bytes("issuer_name_hash", issuer_name_hash)
  152. utils._check_bytes("issuer_key_hash", issuer_key_hash)
  153. if algorithm.digest_size != len(
  154. issuer_name_hash
  155. ) or algorithm.digest_size != len(issuer_key_hash):
  156. raise ValueError(
  157. "issuer_name_hash and issuer_key_hash must be the same length "
  158. "as the digest size of the algorithm"
  159. )
  160. return OCSPRequestBuilder(
  161. self._request,
  162. (issuer_name_hash, issuer_key_hash, serial_number, algorithm),
  163. self._extensions,
  164. )
  165. def add_extension(
  166. self, extval: x509.ExtensionType, critical: bool
  167. ) -> OCSPRequestBuilder:
  168. if not isinstance(extval, x509.ExtensionType):
  169. raise TypeError("extension must be an ExtensionType")
  170. extension = x509.Extension(extval.oid, critical, extval)
  171. _reject_duplicate_extension(extension, self._extensions)
  172. return OCSPRequestBuilder(
  173. self._request, self._request_hash, [*self._extensions, extension]
  174. )
  175. def build(self) -> OCSPRequest:
  176. if self._request is None and self._request_hash is None:
  177. raise ValueError("You must add a certificate before building")
  178. return ocsp.create_ocsp_request(self)
  179. class OCSPResponseBuilder:
  180. def __init__(
  181. self,
  182. response: _SingleResponse | None = None,
  183. responder_id: tuple[x509.Certificate, OCSPResponderEncoding]
  184. | None = None,
  185. certs: list[x509.Certificate] | None = None,
  186. extensions: list[x509.Extension[x509.ExtensionType]] = [],
  187. ):
  188. self._response = response
  189. self._responder_id = responder_id
  190. self._certs = certs
  191. self._extensions = extensions
  192. def add_response(
  193. self,
  194. cert: x509.Certificate,
  195. issuer: x509.Certificate,
  196. algorithm: hashes.HashAlgorithm,
  197. cert_status: OCSPCertStatus,
  198. this_update: datetime.datetime,
  199. next_update: datetime.datetime | None,
  200. revocation_time: datetime.datetime | None,
  201. revocation_reason: x509.ReasonFlags | None,
  202. ) -> OCSPResponseBuilder:
  203. if self._response is not None:
  204. raise ValueError("Only one response per OCSPResponse.")
  205. singleresp = _SingleResponse(
  206. cert,
  207. issuer,
  208. algorithm,
  209. cert_status,
  210. this_update,
  211. next_update,
  212. revocation_time,
  213. revocation_reason,
  214. )
  215. return OCSPResponseBuilder(
  216. singleresp,
  217. self._responder_id,
  218. self._certs,
  219. self._extensions,
  220. )
  221. def responder_id(
  222. self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate
  223. ) -> OCSPResponseBuilder:
  224. if self._responder_id is not None:
  225. raise ValueError("responder_id can only be set once")
  226. if not isinstance(responder_cert, x509.Certificate):
  227. raise TypeError("responder_cert must be a Certificate")
  228. if not isinstance(encoding, OCSPResponderEncoding):
  229. raise TypeError(
  230. "encoding must be an element from OCSPResponderEncoding"
  231. )
  232. return OCSPResponseBuilder(
  233. self._response,
  234. (responder_cert, encoding),
  235. self._certs,
  236. self._extensions,
  237. )
  238. def certificates(
  239. self, certs: typing.Iterable[x509.Certificate]
  240. ) -> OCSPResponseBuilder:
  241. if self._certs is not None:
  242. raise ValueError("certificates may only be set once")
  243. certs = list(certs)
  244. if len(certs) == 0:
  245. raise ValueError("certs must not be an empty list")
  246. if not all(isinstance(x, x509.Certificate) for x in certs):
  247. raise TypeError("certs must be a list of Certificates")
  248. return OCSPResponseBuilder(
  249. self._response,
  250. self._responder_id,
  251. certs,
  252. self._extensions,
  253. )
  254. def add_extension(
  255. self, extval: x509.ExtensionType, critical: bool
  256. ) -> OCSPResponseBuilder:
  257. if not isinstance(extval, x509.ExtensionType):
  258. raise TypeError("extension must be an ExtensionType")
  259. extension = x509.Extension(extval.oid, critical, extval)
  260. _reject_duplicate_extension(extension, self._extensions)
  261. return OCSPResponseBuilder(
  262. self._response,
  263. self._responder_id,
  264. self._certs,
  265. [*self._extensions, extension],
  266. )
  267. def sign(
  268. self,
  269. private_key: CertificateIssuerPrivateKeyTypes,
  270. algorithm: hashes.HashAlgorithm | None,
  271. ) -> OCSPResponse:
  272. if self._response is None:
  273. raise ValueError("You must add a response before signing")
  274. if self._responder_id is None:
  275. raise ValueError("You must add a responder_id before signing")
  276. return ocsp.create_ocsp_response(
  277. OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
  278. )
  279. @classmethod
  280. def build_unsuccessful(
  281. cls, response_status: OCSPResponseStatus
  282. ) -> OCSPResponse:
  283. if not isinstance(response_status, OCSPResponseStatus):
  284. raise TypeError(
  285. "response_status must be an item from OCSPResponseStatus"
  286. )
  287. if response_status is OCSPResponseStatus.SUCCESSFUL:
  288. raise ValueError("response_status cannot be SUCCESSFUL")
  289. return ocsp.create_ocsp_response(response_status, None, None, None)
  290. load_der_ocsp_request = ocsp.load_der_ocsp_request
  291. load_der_ocsp_response = ocsp.load_der_ocsp_response