annotation.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # sql/annotation.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """The :class:`.Annotated` class and related routines; creates hash-equivalent
  8. copies of SQL constructs which contain context-specific markers and
  9. associations.
  10. """
  11. from . import operators
  12. from .base import HasCacheKey
  13. from .traversals import anon_map
  14. from .visitors import InternalTraversal
  15. from .. import util
  16. EMPTY_ANNOTATIONS = util.immutabledict()
  17. class SupportsAnnotations(object):
  18. _annotations = EMPTY_ANNOTATIONS
  19. @util.memoized_property
  20. def _annotations_cache_key(self):
  21. anon_map_ = anon_map()
  22. return self._gen_annotations_cache_key(anon_map_)
  23. def _gen_annotations_cache_key(self, anon_map):
  24. return (
  25. "_annotations",
  26. tuple(
  27. (
  28. key,
  29. value._gen_cache_key(anon_map, [])
  30. if isinstance(value, HasCacheKey)
  31. else value,
  32. )
  33. for key, value in [
  34. (key, self._annotations[key])
  35. for key in sorted(self._annotations)
  36. ]
  37. ),
  38. )
  39. class SupportsCloneAnnotations(SupportsAnnotations):
  40. _clone_annotations_traverse_internals = [
  41. ("_annotations", InternalTraversal.dp_annotations_key)
  42. ]
  43. def _annotate(self, values):
  44. """return a copy of this ClauseElement with annotations
  45. updated by the given dictionary.
  46. """
  47. new = self._clone()
  48. new._annotations = new._annotations.union(values)
  49. new.__dict__.pop("_annotations_cache_key", None)
  50. new.__dict__.pop("_generate_cache_key", None)
  51. return new
  52. def _with_annotations(self, values):
  53. """return a copy of this ClauseElement with annotations
  54. replaced by the given dictionary.
  55. """
  56. new = self._clone()
  57. new._annotations = util.immutabledict(values)
  58. new.__dict__.pop("_annotations_cache_key", None)
  59. new.__dict__.pop("_generate_cache_key", None)
  60. return new
  61. def _deannotate(self, values=None, clone=False):
  62. """return a copy of this :class:`_expression.ClauseElement`
  63. with annotations
  64. removed.
  65. :param values: optional tuple of individual values
  66. to remove.
  67. """
  68. if clone or self._annotations:
  69. # clone is used when we are also copying
  70. # the expression for a deep deannotation
  71. new = self._clone()
  72. new._annotations = util.immutabledict()
  73. new.__dict__.pop("_annotations_cache_key", None)
  74. return new
  75. else:
  76. return self
  77. class SupportsWrappingAnnotations(SupportsAnnotations):
  78. def _annotate(self, values):
  79. """return a copy of this ClauseElement with annotations
  80. updated by the given dictionary.
  81. """
  82. return Annotated(self, values)
  83. def _with_annotations(self, values):
  84. """return a copy of this ClauseElement with annotations
  85. replaced by the given dictionary.
  86. """
  87. return Annotated(self, values)
  88. def _deannotate(self, values=None, clone=False):
  89. """return a copy of this :class:`_expression.ClauseElement`
  90. with annotations
  91. removed.
  92. :param values: optional tuple of individual values
  93. to remove.
  94. """
  95. if clone:
  96. s = self._clone()
  97. return s
  98. else:
  99. return self
  100. class Annotated(object):
  101. """clones a SupportsAnnotated and applies an 'annotations' dictionary.
  102. Unlike regular clones, this clone also mimics __hash__() and
  103. __cmp__() of the original element so that it takes its place
  104. in hashed collections.
  105. A reference to the original element is maintained, for the important
  106. reason of keeping its hash value current. When GC'ed, the
  107. hash value may be reused, causing conflicts.
  108. .. note:: The rationale for Annotated producing a brand new class,
  109. rather than placing the functionality directly within ClauseElement,
  110. is **performance**. The __hash__() method is absent on plain
  111. ClauseElement which leads to significantly reduced function call
  112. overhead, as the use of sets and dictionaries against ClauseElement
  113. objects is prevalent, but most are not "annotated".
  114. """
  115. _is_column_operators = False
  116. def __new__(cls, *args):
  117. if not args:
  118. # clone constructor
  119. return object.__new__(cls)
  120. else:
  121. element, values = args
  122. # pull appropriate subclass from registry of annotated
  123. # classes
  124. try:
  125. cls = annotated_classes[element.__class__]
  126. except KeyError:
  127. cls = _new_annotation_type(element.__class__, cls)
  128. return object.__new__(cls)
  129. def __init__(self, element, values):
  130. self.__dict__ = element.__dict__.copy()
  131. self.__dict__.pop("_annotations_cache_key", None)
  132. self.__dict__.pop("_generate_cache_key", None)
  133. self.__element = element
  134. self._annotations = util.immutabledict(values)
  135. self._hash = hash(element)
  136. def _annotate(self, values):
  137. _values = self._annotations.union(values)
  138. return self._with_annotations(_values)
  139. def _with_annotations(self, values):
  140. clone = self.__class__.__new__(self.__class__)
  141. clone.__dict__ = self.__dict__.copy()
  142. clone.__dict__.pop("_annotations_cache_key", None)
  143. clone.__dict__.pop("_generate_cache_key", None)
  144. clone._annotations = values
  145. return clone
  146. def _deannotate(self, values=None, clone=True):
  147. if values is None:
  148. return self.__element
  149. else:
  150. return self._with_annotations(
  151. util.immutabledict(
  152. {
  153. key: value
  154. for key, value in self._annotations.items()
  155. if key not in values
  156. }
  157. )
  158. )
  159. def _compiler_dispatch(self, visitor, **kw):
  160. return self.__element.__class__._compiler_dispatch(self, visitor, **kw)
  161. @property
  162. def _constructor(self):
  163. return self.__element._constructor
  164. def _clone(self, **kw):
  165. clone = self.__element._clone(**kw)
  166. if clone is self.__element:
  167. # detect immutable, don't change anything
  168. return self
  169. else:
  170. # update the clone with any changes that have occurred
  171. # to this object's __dict__.
  172. clone.__dict__.update(self.__dict__)
  173. return self.__class__(clone, self._annotations)
  174. def __reduce__(self):
  175. return self.__class__, (self.__element, self._annotations)
  176. def __hash__(self):
  177. return self._hash
  178. def __eq__(self, other):
  179. if self._is_column_operators:
  180. return self.__element.__class__.__eq__(self, other)
  181. else:
  182. return hash(other) == hash(self)
  183. @property
  184. def entity_namespace(self):
  185. if "entity_namespace" in self._annotations:
  186. return self._annotations["entity_namespace"].entity_namespace
  187. else:
  188. return self.__element.entity_namespace
  189. # hard-generate Annotated subclasses. this technique
  190. # is used instead of on-the-fly types (i.e. type.__new__())
  191. # so that the resulting objects are pickleable; additionally, other
  192. # decisions can be made up front about the type of object being annotated
  193. # just once per class rather than per-instance.
  194. annotated_classes = {}
  195. def _safe_annotate(to_annotate, annotations):
  196. try:
  197. _annotate = to_annotate._annotate
  198. except AttributeError:
  199. # skip objects that don't actually have an `_annotate`
  200. # attribute, namely QueryableAttribute inside of a join
  201. # condition
  202. return to_annotate
  203. else:
  204. return _annotate(annotations)
  205. def _deep_annotate(
  206. element, annotations, exclude=None, detect_subquery_cols=False
  207. ):
  208. """Deep copy the given ClauseElement, annotating each element
  209. with the given annotations dictionary.
  210. Elements within the exclude collection will be cloned but not annotated.
  211. """
  212. # annotated objects hack the __hash__() method so if we want to
  213. # uniquely process them we have to use id()
  214. cloned_ids = {}
  215. def clone(elem, **kw):
  216. kw["detect_subquery_cols"] = detect_subquery_cols
  217. id_ = id(elem)
  218. if id_ in cloned_ids:
  219. return cloned_ids[id_]
  220. if (
  221. exclude
  222. and hasattr(elem, "proxy_set")
  223. and elem.proxy_set.intersection(exclude)
  224. ):
  225. newelem = elem._clone(clone=clone, **kw)
  226. elif annotations != elem._annotations:
  227. if detect_subquery_cols and elem._is_immutable:
  228. newelem = _safe_annotate(
  229. elem._clone(clone=clone, **kw), annotations
  230. )
  231. else:
  232. newelem = _safe_annotate(elem, annotations)
  233. else:
  234. newelem = elem
  235. newelem._copy_internals(clone=clone)
  236. cloned_ids[id_] = newelem
  237. return newelem
  238. if element is not None:
  239. element = clone(element)
  240. clone = None # remove gc cycles
  241. return element
  242. def _deep_deannotate(element, values=None):
  243. """Deep copy the given element, removing annotations."""
  244. cloned = {}
  245. def clone(elem, **kw):
  246. if values:
  247. key = id(elem)
  248. else:
  249. key = elem
  250. if key not in cloned:
  251. newelem = elem._deannotate(values=values, clone=True)
  252. newelem._copy_internals(clone=clone)
  253. cloned[key] = newelem
  254. return newelem
  255. else:
  256. return cloned[key]
  257. if element is not None:
  258. element = clone(element)
  259. clone = None # remove gc cycles
  260. return element
  261. def _shallow_annotate(element, annotations):
  262. """Annotate the given ClauseElement and copy its internals so that
  263. internal objects refer to the new annotated object.
  264. Basically used to apply a "don't traverse" annotation to a
  265. selectable, without digging throughout the whole
  266. structure wasting time.
  267. """
  268. element = element._annotate(annotations)
  269. element._copy_internals()
  270. return element
  271. def _new_annotation_type(cls, base_cls):
  272. if issubclass(cls, Annotated):
  273. return cls
  274. elif cls in annotated_classes:
  275. return annotated_classes[cls]
  276. for super_ in cls.__mro__:
  277. # check if an Annotated subclass more specific than
  278. # the given base_cls is already registered, such
  279. # as AnnotatedColumnElement.
  280. if super_ in annotated_classes:
  281. base_cls = annotated_classes[super_]
  282. break
  283. annotated_classes[cls] = anno_cls = type(
  284. "Annotated%s" % cls.__name__, (base_cls, cls), {}
  285. )
  286. globals()["Annotated%s" % cls.__name__] = anno_cls
  287. if "_traverse_internals" in cls.__dict__:
  288. anno_cls._traverse_internals = list(cls._traverse_internals) + [
  289. ("_annotations", InternalTraversal.dp_annotations_key)
  290. ]
  291. elif cls.__dict__.get("inherit_cache", False):
  292. anno_cls._traverse_internals = list(cls._traverse_internals) + [
  293. ("_annotations", InternalTraversal.dp_annotations_key)
  294. ]
  295. # some classes include this even if they have traverse_internals
  296. # e.g. BindParameter, add it if present.
  297. if cls.__dict__.get("inherit_cache", False):
  298. anno_cls.inherit_cache = True
  299. anno_cls._is_column_operators = issubclass(cls, operators.ColumnOperators)
  300. return anno_cls
  301. def _prepare_annotations(target_hierarchy, base_cls):
  302. for cls in util.walk_subclasses(target_hierarchy):
  303. _new_annotation_type(cls, base_cls)