jsonfield.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # Copyright 2017-2022 Alexey Stepanov aka penguinolog
  2. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  3. # not use this file except in compliance with the License. You may obtain
  4. # a copy of the License at
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. # Unless required by applicable law or agreed to in writing, software
  7. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  8. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  9. # License for the specific language governing permissions and limitations
  10. # under the License.
  11. """JSONField implementation for SQLAlchemy."""
  12. from __future__ import annotations
  13. import json
  14. import typing
  15. import sqlalchemy.ext.mutable
  16. import sqlalchemy.types
  17. if typing.TYPE_CHECKING:
  18. import types
  19. from sqlalchemy.engine import Dialect
  20. from sqlalchemy.sql.type_api import TypeEngine
  21. __all__ = ("JSONField", "mutable_json_field")
  22. # noinspection PyAbstractClass
  23. class JSONField(sqlalchemy.types.TypeDecorator): # type: ignore[type-arg] # pylint: disable=abstract-method
  24. """Represent an immutable structure as a json-encoded string or json.
  25. Usage::
  26. JSONField(enforce_string=True|False, enforce_unicode=True|False)
  27. """
  28. def process_literal_param(self, value: typing.Any, dialect: Dialect) -> typing.Any:
  29. """Re-use of process_bind_param.
  30. :return: encoded value if required
  31. :rtype: typing.Union[str, typing.Any]
  32. """
  33. return self.process_bind_param(value, dialect)
  34. impl = sqlalchemy.types.TypeEngine # Special placeholder
  35. cache_ok = False # Cache complexity due to requerement of value re-serialization and mutability
  36. def __init__( # pylint: disable=keyword-arg-before-vararg
  37. self,
  38. enforce_string: bool = False,
  39. enforce_unicode: bool = False,
  40. json: types.ModuleType | typing.Any = json, # pylint: disable=redefined-outer-name
  41. json_type: TypeEngine[typing.Any] | type[TypeEngine[typing.Any]] = sqlalchemy.JSON,
  42. *args: typing.Any,
  43. **kwargs: typing.Any,
  44. ) -> None:
  45. """JSONField.
  46. :param enforce_string: enforce String(UnicodeText) type usage
  47. :type enforce_string: bool
  48. :param enforce_unicode: do not encode non-ascii data
  49. :type enforce_unicode: bool
  50. :param json: JSON encoding/decoding library. By default: standard json package.
  51. :param json_type: the sqlalchemy/dialect class that will be used to render the DB JSON type.
  52. By default: sqlalchemy.JSON
  53. :param args: extra baseclass arguments
  54. :type args: typing.Any
  55. :param kwargs: extra baseclass keyworded arguments
  56. :type kwargs: typing.Any
  57. """
  58. self.__enforce_string = enforce_string
  59. self.__enforce_unicode = enforce_unicode
  60. self.__json_codec = json
  61. self.__json_type = json_type
  62. super().__init__(*args, **kwargs)
  63. def __use_json(self, dialect: Dialect) -> bool:
  64. """Helper to determine, which encoder to use.
  65. :return: use engine-based json encoder
  66. :rtype: bool
  67. """
  68. return hasattr(dialect, "_json_serializer") and not self.__enforce_string
  69. def load_dialect_impl(self, dialect: Dialect) -> TypeEngine[typing.Any]:
  70. """Select impl by dialect.
  71. :return: dialect implementation depends on decoding method
  72. :rtype: TypeEngine
  73. """
  74. # types are handled by DefaultDialect, Dialect class is abstract
  75. if self.__use_json(dialect):
  76. return dialect.type_descriptor(self.__json_type) # type: ignore[arg-type]
  77. return dialect.type_descriptor(sqlalchemy.UnicodeText) # type: ignore[arg-type]
  78. def process_bind_param(self, value: typing.Any, dialect: Dialect) -> str | typing.Any:
  79. """Encode data, if required.
  80. :return: encoded value if required
  81. :rtype: typing.Union[str, typing.Any]
  82. """
  83. if self.__use_json(dialect) or value is None:
  84. return value
  85. return self.__json_codec.dumps(value, ensure_ascii=not self.__enforce_unicode)
  86. def process_result_value(self, value: str | typing.Any, dialect: Dialect) -> typing.Any:
  87. """Decode data, if required.
  88. :return: decoded result value if required
  89. :rtype: typing.Any
  90. """
  91. if self.__use_json(dialect) or value is None:
  92. return value
  93. return self.__json_codec.loads(value)
  94. def mutable_json_field( # pylint: disable=keyword-arg-before-vararg, redefined-outer-name
  95. enforce_string: bool = False,
  96. enforce_unicode: bool = False,
  97. json: types.ModuleType | typing.Any = json,
  98. *args: typing.Any,
  99. **kwargs: typing.Any,
  100. ) -> JSONField:
  101. """Mutable JSONField creator.
  102. :param enforce_string: enforce String(UnicodeText) type usage
  103. :type enforce_string: bool
  104. :param enforce_unicode: do not encode non-ascii data
  105. :type enforce_unicode: bool
  106. :param json: JSON encoding/decoding library.
  107. By default: standard json package.
  108. :param args: extra baseclass arguments
  109. :type args: typing.Any
  110. :param kwargs: extra baseclass keyworded arguments
  111. :type kwargs: typing.Any
  112. :return: Mutable JSONField via MutableDict.as_mutable
  113. :rtype: JSONField
  114. """
  115. return sqlalchemy.ext.mutable.MutableDict.as_mutable( # type: ignore[return-value]
  116. JSONField( # type: ignore[misc]
  117. enforce_string=enforce_string,
  118. enforce_unicode=enforce_unicode,
  119. json=json,
  120. *args, # noqa: B026
  121. **kwargs,
  122. )
  123. )