internal_utils.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import json
  2. from typing import Any
  3. from typing import Type
  4. from typing import TYPE_CHECKING
  5. from flask import current_app
  6. from flask import Flask
  7. from flask_jwt_extended.exceptions import RevokedTokenError
  8. from flask_jwt_extended.exceptions import UserClaimsVerificationError
  9. from flask_jwt_extended.exceptions import WrongTokenError
  10. try:
  11. from flask.json.provider import DefaultJSONProvider
  12. HAS_JSON_PROVIDER = True
  13. except ModuleNotFoundError: # pragma: no cover
  14. # The flask.json.provider module was added in Flask 2.2.
  15. # Further details are handled in get_json_encoder.
  16. HAS_JSON_PROVIDER = False
  17. if TYPE_CHECKING: # pragma: no cover
  18. from flask_jwt_extended import JWTManager
  19. def get_jwt_manager() -> "JWTManager":
  20. try:
  21. return current_app.extensions["flask-jwt-extended"]
  22. except KeyError: # pragma: no cover
  23. raise RuntimeError(
  24. "You must initialize a JWTManager with this flask "
  25. "application before using this method"
  26. ) from None
  27. def has_user_lookup() -> bool:
  28. jwt_manager = get_jwt_manager()
  29. return jwt_manager._user_lookup_callback is not None
  30. def user_lookup(*args, **kwargs) -> Any:
  31. jwt_manager = get_jwt_manager()
  32. return jwt_manager._user_lookup_callback and jwt_manager._user_lookup_callback(
  33. *args, **kwargs
  34. )
  35. def verify_token_type(decoded_token: dict, refresh: bool) -> None:
  36. if not refresh and decoded_token["type"] == "refresh":
  37. raise WrongTokenError("Only non-refresh tokens are allowed")
  38. elif refresh and decoded_token["type"] != "refresh":
  39. raise WrongTokenError("Only refresh tokens are allowed")
  40. def verify_token_not_blocklisted(jwt_header: dict, jwt_data: dict) -> None:
  41. jwt_manager = get_jwt_manager()
  42. if jwt_manager._token_in_blocklist_callback(jwt_header, jwt_data):
  43. raise RevokedTokenError(jwt_header, jwt_data)
  44. def custom_verification_for_token(jwt_header: dict, jwt_data: dict) -> None:
  45. jwt_manager = get_jwt_manager()
  46. if not jwt_manager._token_verification_callback(jwt_header, jwt_data):
  47. error_msg = "User claims verification failed"
  48. raise UserClaimsVerificationError(error_msg, jwt_header, jwt_data)
  49. class JSONEncoder(json.JSONEncoder):
  50. """A JSON encoder which uses the app.json_provider_class for the default"""
  51. def default(self, o: Any) -> Any:
  52. # If the registered JSON provider does not implement a default classmethod
  53. # use the method defined by the DefaultJSONProvider
  54. default = getattr(
  55. current_app.json_provider_class, "default", DefaultJSONProvider.default
  56. )
  57. return default(o)
  58. def get_json_encoder(app: Flask) -> Type[json.JSONEncoder]:
  59. """Get the JSON Encoder for the provided flask app
  60. Starting with flask version 2.2 the flask application provides a
  61. interface to register a custom JSON Encoder/Decoder under the json_provider_class.
  62. As this interface is not compatible with the standard JSONEncoder, the `default`
  63. method of the class is wrapped.
  64. Lookup Order:
  65. - app.json_encoder - For Flask < 2.2
  66. - app.json_provider_class.default
  67. - flask.json.provider.DefaultJSONProvider.default
  68. """
  69. if not HAS_JSON_PROVIDER: # pragma: no cover
  70. return app.json_encoder # type: ignore
  71. return JSONEncoder