jwt_signer.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from datetime import timedelta
  19. from typing import Any
  20. import jwt
  21. from airflow.utils import timezone
  22. class JWTSigner:
  23. """
  24. Signs and verifies JWT Token. Used to authorise and verify requests.
  25. :param secret_key: key used to sign the request
  26. :param expiration_time_in_seconds: time after which the token becomes invalid (in seconds)
  27. :param audience: audience that the request is expected to have
  28. :param leeway_in_seconds: leeway that allows for a small clock skew between the two parties
  29. :param algorithm: algorithm used for signing
  30. """
  31. def __init__(
  32. self,
  33. secret_key: str,
  34. expiration_time_in_seconds: int,
  35. audience: str,
  36. leeway_in_seconds: int = 5,
  37. algorithm: str = "HS512",
  38. ):
  39. self._secret_key = secret_key
  40. self._expiration_time_in_seconds = expiration_time_in_seconds
  41. self._audience = audience
  42. self._leeway_in_seconds = leeway_in_seconds
  43. self._algorithm = algorithm
  44. def generate_signed_token(self, extra_payload: dict[str, Any]) -> str:
  45. """
  46. Generate JWT with extra payload added.
  47. :param extra_payload: extra payload that is added to the signed token
  48. :return: signed token
  49. """
  50. jwt_dict = {
  51. "aud": self._audience,
  52. "iat": timezone.utcnow(),
  53. "nbf": timezone.utcnow(),
  54. "exp": timezone.utcnow() + timedelta(seconds=self._expiration_time_in_seconds),
  55. }
  56. jwt_dict.update(extra_payload)
  57. token = jwt.encode(
  58. jwt_dict,
  59. self._secret_key,
  60. algorithm=self._algorithm,
  61. )
  62. return token
  63. def verify_token(self, token: str) -> dict[str, Any]:
  64. payload = jwt.decode(
  65. token,
  66. self._secret_key,
  67. leeway=timedelta(seconds=self._leeway_in_seconds),
  68. algorithms=[self._algorithm],
  69. options={
  70. "verify_signature": True,
  71. "require": ["exp", "iat", "nbf"],
  72. },
  73. audience=self._audience,
  74. )
  75. return payload