strategies.py 106 KB


  1. # orm/strategies.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. """sqlalchemy.orm.interfaces.LoaderStrategy
  8. implementations, and related MapperOptions."""
  9. from __future__ import absolute_import
  10. import collections
  11. import itertools
  12. from . import attributes
  13. from . import exc as orm_exc
  14. from . import interfaces
  15. from . import loading
  16. from . import path_registry
  17. from . import properties
  18. from . import query
  19. from . import relationships
  20. from . import unitofwork
  21. from . import util as orm_util
  22. from .base import _DEFER_FOR_STATE
  23. from .base import _RAISE_FOR_STATE
  24. from .base import _SET_DEFERRED_EXPIRED
  25. from .context import _column_descriptions
  26. from .context import ORMCompileState
  27. from .context import ORMSelectCompileState
  28. from .context import QueryContext
  29. from .interfaces import LoaderStrategy
  30. from .interfaces import StrategizedProperty
  31. from .session import _state_session
  32. from .state import InstanceState
  33. from .util import _none_set
  34. from .util import aliased
  35. from .. import event
  36. from .. import exc as sa_exc
  37. from .. import inspect
  38. from .. import log
  39. from .. import sql
  40. from .. import util
  41. from ..sql import util as sql_util
  42. from ..sql import visitors
  43. from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
  44. from ..sql.selectable import Select
  45. def _register_attribute(
  46. prop,
  47. mapper,
  48. useobject,
  49. compare_function=None,
  50. typecallable=None,
  51. callable_=None,
  52. proxy_property=None,
  53. active_history=False,
  54. impl_class=None,
  55. **kw
  56. ):
  57. listen_hooks = []
  58. uselist = useobject and prop.uselist
  59. if useobject and prop.single_parent:
  60. listen_hooks.append(single_parent_validator)
  61. if prop.key in prop.parent.validators:
  62. fn, opts = prop.parent.validators[prop.key]
  63. listen_hooks.append(
  64. lambda desc, prop: orm_util._validator_events(
  65. desc, prop.key, fn, **opts
  66. )
  67. )
  68. if useobject:
  69. listen_hooks.append(unitofwork.track_cascade_events)
  70. # need to assemble backref listeners
  71. # after the singleparentvalidator, mapper validator
  72. if useobject:
  73. backref = prop.back_populates
  74. if backref and prop._effective_sync_backref:
  75. listen_hooks.append(
  76. lambda desc, prop: attributes.backref_listeners(
  77. desc, backref, uselist
  78. )
  79. )
  80. # a single MapperProperty is shared down a class inheritance
  81. # hierarchy, so we set up attribute instrumentation and backref event
  82. # for each mapper down the hierarchy.
  83. # typically, "mapper" is the same as prop.parent, due to the way
  84. # the configure_mappers() process runs, however this is not strongly
  85. # enforced, and in the case of a second configure_mappers() run the
  86. # mapper here might not be prop.parent; also, a subclass mapper may
  87. # be called here before a superclass mapper. That is, can't depend
  88. # on mappers not already being set up so we have to check each one.
  89. for m in mapper.self_and_descendants:
  90. if prop is m._props.get(
  91. prop.key
  92. ) and not m.class_manager._attr_has_impl(prop.key):
  93. desc = attributes.register_attribute_impl(
  94. m.class_,
  95. prop.key,
  96. parent_token=prop,
  97. uselist=uselist,
  98. compare_function=compare_function,
  99. useobject=useobject,
  100. trackparent=useobject
  101. and (
  102. prop.single_parent
  103. or prop.direction is interfaces.ONETOMANY
  104. ),
  105. typecallable=typecallable,
  106. callable_=callable_,
  107. active_history=active_history,
  108. impl_class=impl_class,
  109. send_modified_events=not useobject or not prop.viewonly,
  110. doc=prop.doc,
  111. **kw
  112. )
  113. for hook in listen_hooks:
  114. hook(desc, prop)
  115. @properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
  116. class UninstrumentedColumnLoader(LoaderStrategy):
  117. """Represent a non-instrumented MapperProperty.
  118. The polymorphic_on argument of mapper() often results in this,
  119. if the argument is against the with_polymorphic selectable.
  120. """
  121. __slots__ = ("columns",)
  122. def __init__(self, parent, strategy_key):
  123. super(UninstrumentedColumnLoader, self).__init__(parent, strategy_key)
  124. self.columns = self.parent_property.columns
  125. def setup_query(
  126. self,
  127. compile_state,
  128. query_entity,
  129. path,
  130. loadopt,
  131. adapter,
  132. column_collection=None,
  133. **kwargs
  134. ):
  135. for c in self.columns:
  136. if adapter:
  137. c = adapter.columns[c]
  138. compile_state._append_dedupe_col_collection(c, column_collection)
  139. def create_row_processor(
  140. self,
  141. context,
  142. query_entity,
  143. path,
  144. loadopt,
  145. mapper,
  146. result,
  147. adapter,
  148. populators,
  149. ):
  150. pass
  151. @log.class_logger
  152. @properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
  153. class ColumnLoader(LoaderStrategy):
  154. """Provide loading behavior for a :class:`.ColumnProperty`."""
  155. __slots__ = "columns", "is_composite"
  156. def __init__(self, parent, strategy_key):
  157. super(ColumnLoader, self).__init__(parent, strategy_key)
  158. self.columns = self.parent_property.columns
  159. self.is_composite = hasattr(self.parent_property, "composite_class")
  160. def setup_query(
  161. self,
  162. compile_state,
  163. query_entity,
  164. path,
  165. loadopt,
  166. adapter,
  167. column_collection,
  168. memoized_populators,
  169. check_for_adapt=False,
  170. **kwargs
  171. ):
  172. for c in self.columns:
  173. if adapter:
  174. if check_for_adapt:
  175. c = adapter.adapt_check_present(c)
  176. if c is None:
  177. return
  178. else:
  179. c = adapter.columns[c]
  180. compile_state._append_dedupe_col_collection(c, column_collection)
  181. fetch = self.columns[0]
  182. if adapter:
  183. fetch = adapter.columns[fetch]
  184. memoized_populators[self.parent_property] = fetch
  185. def init_class_attribute(self, mapper):
  186. self.is_class_level = True
  187. coltype = self.columns[0].type
  188. # TODO: check all columns ? check for foreign key as well?
  189. active_history = (
  190. self.parent_property.active_history
  191. or self.columns[0].primary_key
  192. or (
  193. mapper.version_id_col is not None
  194. and mapper._columntoproperty.get(mapper.version_id_col, None)
  195. is self.parent_property
  196. )
  197. )
  198. _register_attribute(
  199. self.parent_property,
  200. mapper,
  201. useobject=False,
  202. compare_function=coltype.compare_values,
  203. active_history=active_history,
  204. )
  205. def create_row_processor(
  206. self,
  207. context,
  208. query_entity,
  209. path,
  210. loadopt,
  211. mapper,
  212. result,
  213. adapter,
  214. populators,
  215. ):
  216. # look through list of columns represented here
  217. # to see which, if any, is present in the row.
  218. for col in self.columns:
  219. if adapter:
  220. col = adapter.columns[col]
  221. getter = result._getter(col, False)
  222. if getter:
  223. populators["quick"].append((self.key, getter))
  224. break
  225. else:
  226. populators["expire"].append((self.key, True))
  227. @log.class_logger
  228. @properties.ColumnProperty.strategy_for(query_expression=True)
  229. class ExpressionColumnLoader(ColumnLoader):
  230. def __init__(self, parent, strategy_key):
  231. super(ExpressionColumnLoader, self).__init__(parent, strategy_key)
  232. # compare to the "default" expression that is mapped in
  233. # the column. If it's sql.null, we don't need to render
  234. # unless an expr is passed in the options.
  235. null = sql.null().label(None)
  236. self._have_default_expression = any(
  237. not c.compare(null) for c in self.parent_property.columns
  238. )
  239. def setup_query(
  240. self,
  241. compile_state,
  242. query_entity,
  243. path,
  244. loadopt,
  245. adapter,
  246. column_collection,
  247. memoized_populators,
  248. **kwargs
  249. ):
  250. columns = None
  251. if loadopt and "expression" in loadopt.local_opts:
  252. columns = [loadopt.local_opts["expression"]]
  253. elif self._have_default_expression:
  254. columns = self.parent_property.columns
  255. if columns is None:
  256. return
  257. for c in columns:
  258. if adapter:
  259. c = adapter.columns[c]
  260. compile_state._append_dedupe_col_collection(c, column_collection)
  261. fetch = columns[0]
  262. if adapter:
  263. fetch = adapter.columns[fetch]
  264. memoized_populators[self.parent_property] = fetch
  265. def create_row_processor(
  266. self,
  267. context,
  268. query_entity,
  269. path,
  270. loadopt,
  271. mapper,
  272. result,
  273. adapter,
  274. populators,
  275. ):
  276. # look through list of columns represented here
  277. # to see which, if any, is present in the row.
  278. if loadopt and "expression" in loadopt.local_opts:
  279. columns = [loadopt.local_opts["expression"]]
  280. for col in columns:
  281. if adapter:
  282. col = adapter.columns[col]
  283. getter = result._getter(col, False)
  284. if getter:
  285. populators["quick"].append((self.key, getter))
  286. break
  287. else:
  288. populators["expire"].append((self.key, True))
  289. def init_class_attribute(self, mapper):
  290. self.is_class_level = True
  291. _register_attribute(
  292. self.parent_property,
  293. mapper,
  294. useobject=False,
  295. compare_function=self.columns[0].type.compare_values,
  296. accepts_scalar_loader=False,
  297. )
  298. @log.class_logger
  299. @properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
  300. @properties.ColumnProperty.strategy_for(
  301. deferred=True, instrument=True, raiseload=True
  302. )
  303. @properties.ColumnProperty.strategy_for(do_nothing=True)
  304. class DeferredColumnLoader(LoaderStrategy):
  305. """Provide loading behavior for a deferred :class:`.ColumnProperty`."""
  306. __slots__ = "columns", "group", "raiseload"
  307. def __init__(self, parent, strategy_key):
  308. super(DeferredColumnLoader, self).__init__(parent, strategy_key)
  309. if hasattr(self.parent_property, "composite_class"):
  310. raise NotImplementedError(
  311. "Deferred loading for composite " "types not implemented yet"
  312. )
  313. self.raiseload = self.strategy_opts.get("raiseload", False)
  314. self.columns = self.parent_property.columns
  315. self.group = self.parent_property.group
  316. def create_row_processor(
  317. self,
  318. context,
  319. query_entity,
  320. path,
  321. loadopt,
  322. mapper,
  323. result,
  324. adapter,
  325. populators,
  326. ):
  327. # for a DeferredColumnLoader, this method is only used during a
  328. # "row processor only" query; see test_deferred.py ->
  329. # tests with "rowproc_only" in their name. As of the 1.0 series,
  330. # loading._instance_processor doesn't use a "row processing" function
  331. # to populate columns, instead it uses data in the "populators"
  332. # dictionary. Normally, the DeferredColumnLoader.setup_query()
  333. # sets up that data in the "memoized_populators" dictionary
  334. # and "create_row_processor()" here is never invoked.
  335. if (
  336. context.refresh_state
  337. and context.query._compile_options._only_load_props
  338. and self.key in context.query._compile_options._only_load_props
  339. ):
  340. self.parent_property._get_strategy(
  341. (("deferred", False), ("instrument", True))
  342. ).create_row_processor(
  343. context,
  344. query_entity,
  345. path,
  346. loadopt,
  347. mapper,
  348. result,
  349. adapter,
  350. populators,
  351. )
  352. elif not self.is_class_level:
  353. if self.raiseload:
  354. set_deferred_for_local_state = (
  355. self.parent_property._raise_column_loader
  356. )
  357. else:
  358. set_deferred_for_local_state = (
  359. self.parent_property._deferred_column_loader
  360. )
  361. populators["new"].append((self.key, set_deferred_for_local_state))
  362. else:
  363. populators["expire"].append((self.key, False))
  364. def init_class_attribute(self, mapper):
  365. self.is_class_level = True
  366. _register_attribute(
  367. self.parent_property,
  368. mapper,
  369. useobject=False,
  370. compare_function=self.columns[0].type.compare_values,
  371. callable_=self._load_for_state,
  372. load_on_unexpire=False,
  373. )
  374. def setup_query(
  375. self,
  376. compile_state,
  377. query_entity,
  378. path,
  379. loadopt,
  380. adapter,
  381. column_collection,
  382. memoized_populators,
  383. only_load_props=None,
  384. **kw
  385. ):
  386. if (
  387. (
  388. compile_state.compile_options._render_for_subquery
  389. and self.parent_property._renders_in_subqueries
  390. )
  391. or (
  392. loadopt
  393. and "undefer_pks" in loadopt.local_opts
  394. and set(self.columns).intersection(
  395. self.parent._should_undefer_in_wildcard
  396. )
  397. )
  398. or (
  399. loadopt
  400. and self.group
  401. and loadopt.local_opts.get(
  402. "undefer_group_%s" % self.group, False
  403. )
  404. )
  405. or (only_load_props and self.key in only_load_props)
  406. ):
  407. self.parent_property._get_strategy(
  408. (("deferred", False), ("instrument", True))
  409. ).setup_query(
  410. compile_state,
  411. query_entity,
  412. path,
  413. loadopt,
  414. adapter,
  415. column_collection,
  416. memoized_populators,
  417. **kw
  418. )
  419. elif self.is_class_level:
  420. memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
  421. elif not self.raiseload:
  422. memoized_populators[self.parent_property] = _DEFER_FOR_STATE
  423. else:
  424. memoized_populators[self.parent_property] = _RAISE_FOR_STATE
  425. def _load_for_state(self, state, passive):
  426. if not state.key:
  427. return attributes.ATTR_EMPTY
  428. if not passive & attributes.SQL_OK:
  429. return attributes.PASSIVE_NO_RESULT
  430. localparent = state.manager.mapper
  431. if self.group:
  432. toload = [
  433. p.key
  434. for p in localparent.iterate_properties
  435. if isinstance(p, StrategizedProperty)
  436. and isinstance(p.strategy, DeferredColumnLoader)
  437. and p.group == self.group
  438. ]
  439. else:
  440. toload = [self.key]
  441. # narrow the keys down to just those which have no history
  442. group = [k for k in toload if k in state.unmodified]
  443. session = _state_session(state)
  444. if session is None:
  445. raise orm_exc.DetachedInstanceError(
  446. "Parent instance %s is not bound to a Session; "
  447. "deferred load operation of attribute '%s' cannot proceed"
  448. % (orm_util.state_str(state), self.key)
  449. )
  450. if self.raiseload:
  451. self._invoke_raise_load(state, passive, "raise")
  452. if (
  453. loading.load_on_ident(
  454. session,
  455. sql.select(localparent).set_label_style(
  456. LABEL_STYLE_TABLENAME_PLUS_COL
  457. ),
  458. state.key,
  459. only_load_props=group,
  460. refresh_state=state,
  461. )
  462. is None
  463. ):
  464. raise orm_exc.ObjectDeletedError(state)
  465. return attributes.ATTR_WAS_SET
  466. def _invoke_raise_load(self, state, passive, lazy):
  467. raise sa_exc.InvalidRequestError(
  468. "'%s' is not available due to raiseload=True" % (self,)
  469. )
  470. class LoadDeferredColumns(object):
  471. """serializable loader object used by DeferredColumnLoader"""
  472. def __init__(self, key, raiseload=False):
  473. self.key = key
  474. self.raiseload = raiseload
  475. def __call__(self, state, passive=attributes.PASSIVE_OFF):
  476. key = self.key
  477. localparent = state.manager.mapper
  478. prop = localparent._props[key]
  479. if self.raiseload:
  480. strategy_key = (
  481. ("deferred", True),
  482. ("instrument", True),
  483. ("raiseload", True),
  484. )
  485. else:
  486. strategy_key = (("deferred", True), ("instrument", True))
  487. strategy = prop._get_strategy(strategy_key)
  488. return strategy._load_for_state(state, passive)
  489. class AbstractRelationshipLoader(LoaderStrategy):
  490. """LoaderStratgies which deal with related objects."""
  491. __slots__ = "mapper", "target", "uselist", "entity"
  492. def __init__(self, parent, strategy_key):
  493. super(AbstractRelationshipLoader, self).__init__(parent, strategy_key)
  494. self.mapper = self.parent_property.mapper
  495. self.entity = self.parent_property.entity
  496. self.target = self.parent_property.target
  497. self.uselist = self.parent_property.uselist
  498. @log.class_logger
  499. @relationships.RelationshipProperty.strategy_for(do_nothing=True)
  500. class DoNothingLoader(LoaderStrategy):
  501. """Relationship loader that makes no change to the object's state.
  502. Compared to NoLoader, this loader does not initialize the
  503. collection/attribute to empty/none; the usual default LazyLoader will
  504. take effect.
  505. """
  506. @log.class_logger
  507. @relationships.RelationshipProperty.strategy_for(lazy="noload")
  508. @relationships.RelationshipProperty.strategy_for(lazy=None)
  509. class NoLoader(AbstractRelationshipLoader):
  510. """Provide loading behavior for a :class:`.RelationshipProperty`
  511. with "lazy=None".
  512. """
  513. __slots__ = ()
  514. def init_class_attribute(self, mapper):
  515. self.is_class_level = True
  516. _register_attribute(
  517. self.parent_property,
  518. mapper,
  519. useobject=True,
  520. typecallable=self.parent_property.collection_class,
  521. )
  522. def create_row_processor(
  523. self,
  524. context,
  525. query_entity,
  526. path,
  527. loadopt,
  528. mapper,
  529. result,
  530. adapter,
  531. populators,
  532. ):
  533. def invoke_no_load(state, dict_, row):
  534. if self.uselist:
  535. attributes.init_state_collection(state, dict_, self.key)
  536. else:
  537. dict_[self.key] = None
  538. populators["new"].append((self.key, invoke_no_load))
  539. @log.class_logger
  540. @relationships.RelationshipProperty.strategy_for(lazy=True)
  541. @relationships.RelationshipProperty.strategy_for(lazy="select")
  542. @relationships.RelationshipProperty.strategy_for(lazy="raise")
  543. @relationships.RelationshipProperty.strategy_for(lazy="raise_on_sql")
  544. @relationships.RelationshipProperty.strategy_for(lazy="baked_select")
  545. class LazyLoader(AbstractRelationshipLoader, util.MemoizedSlots):
  546. """Provide loading behavior for a :class:`.RelationshipProperty`
  547. with "lazy=True", that is loads when first accessed.
  548. """
  549. __slots__ = (
  550. "_lazywhere",
  551. "_rev_lazywhere",
  552. "_lazyload_reverse_option",
  553. "_order_by",
  554. "use_get",
  555. "is_aliased_class",
  556. "_bind_to_col",
  557. "_equated_columns",
  558. "_rev_bind_to_col",
  559. "_rev_equated_columns",
  560. "_simple_lazy_clause",
  561. "_raise_always",
  562. "_raise_on_sql",
  563. )
  564. def __init__(self, parent, strategy_key):
  565. super(LazyLoader, self).__init__(parent, strategy_key)
  566. self._raise_always = self.strategy_opts["lazy"] == "raise"
  567. self._raise_on_sql = self.strategy_opts["lazy"] == "raise_on_sql"
  568. self.is_aliased_class = inspect(self.entity).is_aliased_class
  569. join_condition = self.parent_property._join_condition
  570. (
  571. self._lazywhere,
  572. self._bind_to_col,
  573. self._equated_columns,
  574. ) = join_condition.create_lazy_clause()
  575. (
  576. self._rev_lazywhere,
  577. self._rev_bind_to_col,
  578. self._rev_equated_columns,
  579. ) = join_condition.create_lazy_clause(reverse_direction=True)
  580. if self.parent_property.order_by:
  581. self._order_by = [
  582. sql_util._deep_annotate(elem, {"_orm_adapt": True})
  583. for elem in util.to_list(self.parent_property.order_by)
  584. ]
  585. else:
  586. self._order_by = None
  587. self.logger.info("%s lazy loading clause %s", self, self._lazywhere)
  588. # determine if our "lazywhere" clause is the same as the mapper's
  589. # get() clause. then we can just use mapper.get()
  590. #
  591. # TODO: the "not self.uselist" can be taken out entirely; a m2o
  592. # load that populates for a list (very unusual, but is possible with
  593. # the API) can still set for "None" and the attribute system will
  594. # populate as an empty list.
  595. self.use_get = (
  596. not self.is_aliased_class
  597. and not self.uselist
  598. and self.entity._get_clause[0].compare(
  599. self._lazywhere,
  600. use_proxies=True,
  601. compare_keys=False,
  602. equivalents=self.mapper._equivalent_columns,
  603. )
  604. )
  605. if self.use_get:
  606. for col in list(self._equated_columns):
  607. if col in self.mapper._equivalent_columns:
  608. for c in self.mapper._equivalent_columns[col]:
  609. self._equated_columns[c] = self._equated_columns[col]
  610. self.logger.info(
  611. "%s will use Session.get() to " "optimize instance loads", self
  612. )
  613. def init_class_attribute(self, mapper):
  614. self.is_class_level = True
  615. _legacy_inactive_history_style = (
  616. self.parent_property._legacy_inactive_history_style
  617. )
  618. if self.parent_property.active_history:
  619. active_history = True
  620. _deferred_history = False
  621. elif (
  622. self.parent_property.direction is not interfaces.MANYTOONE
  623. or not self.use_get
  624. ):
  625. if _legacy_inactive_history_style:
  626. active_history = True
  627. _deferred_history = False
  628. else:
  629. active_history = False
  630. _deferred_history = True
  631. else:
  632. active_history = _deferred_history = False
  633. _register_attribute(
  634. self.parent_property,
  635. mapper,
  636. useobject=True,
  637. callable_=self._load_for_state,
  638. typecallable=self.parent_property.collection_class,
  639. active_history=active_history,
  640. _deferred_history=_deferred_history,
  641. )
  642. def _memoized_attr__simple_lazy_clause(self):
  643. lazywhere = sql_util._deep_annotate(
  644. self._lazywhere, {"_orm_adapt": True}
  645. )
  646. criterion, bind_to_col = (lazywhere, self._bind_to_col)
  647. params = []
  648. def visit_bindparam(bindparam):
  649. bindparam.unique = False
  650. visitors.traverse(criterion, {}, {"bindparam": visit_bindparam})
  651. def visit_bindparam(bindparam):
  652. if bindparam._identifying_key in bind_to_col:
  653. params.append(
  654. (
  655. bindparam.key,
  656. bind_to_col[bindparam._identifying_key],
  657. None,
  658. )
  659. )
  660. elif bindparam.callable is None:
  661. params.append((bindparam.key, None, bindparam.value))
  662. criterion = visitors.cloned_traverse(
  663. criterion, {}, {"bindparam": visit_bindparam}
  664. )
  665. return criterion, params
  666. def _generate_lazy_clause(self, state, passive):
  667. criterion, param_keys = self._simple_lazy_clause
  668. if state is None:
  669. return sql_util.adapt_criterion_to_null(
  670. criterion, [key for key, ident, value in param_keys]
  671. )
  672. mapper = self.parent_property.parent
  673. o = state.obj() # strong ref
  674. dict_ = attributes.instance_dict(o)
  675. if passive & attributes.INIT_OK:
  676. passive ^= attributes.INIT_OK
  677. params = {}
  678. for key, ident, value in param_keys:
  679. if ident is not None:
  680. if passive and passive & attributes.LOAD_AGAINST_COMMITTED:
  681. value = mapper._get_committed_state_attr_by_column(
  682. state, dict_, ident, passive
  683. )
  684. else:
  685. value = mapper._get_state_attr_by_column(
  686. state, dict_, ident, passive
  687. )
  688. params[key] = value
  689. return criterion, params
  690. def _invoke_raise_load(self, state, passive, lazy):
  691. raise sa_exc.InvalidRequestError(
  692. "'%s' is not available due to lazy='%s'" % (self, lazy)
  693. )
  694. def _load_for_state(self, state, passive, loadopt=None, extra_criteria=()):
  695. if not state.key and (
  696. (
  697. not self.parent_property.load_on_pending
  698. and not state._load_pending
  699. )
  700. or not state.session_id
  701. ):
  702. return attributes.ATTR_EMPTY
  703. pending = not state.key
  704. primary_key_identity = None
  705. use_get = self.use_get and (not loadopt or not loadopt._extra_criteria)
  706. if (not passive & attributes.SQL_OK and not use_get) or (
  707. not passive & attributes.NON_PERSISTENT_OK and pending
  708. ):
  709. return attributes.PASSIVE_NO_RESULT
  710. if (
  711. # we were given lazy="raise"
  712. self._raise_always
  713. # the no_raise history-related flag was not passed
  714. and not passive & attributes.NO_RAISE
  715. and (
  716. # if we are use_get and related_object_ok is disabled,
  717. # which means we are at most looking in the identity map
  718. # for history purposes or otherwise returning
  719. # PASSIVE_NO_RESULT, don't raise. This is also a
  720. # history-related flag
  721. not use_get
  722. or passive & attributes.RELATED_OBJECT_OK
  723. )
  724. ):
  725. self._invoke_raise_load(state, passive, "raise")
  726. session = _state_session(state)
  727. if not session:
  728. if passive & attributes.NO_RAISE:
  729. return attributes.PASSIVE_NO_RESULT
  730. raise orm_exc.DetachedInstanceError(
  731. "Parent instance %s is not bound to a Session; "
  732. "lazy load operation of attribute '%s' cannot proceed"
  733. % (orm_util.state_str(state), self.key)
  734. )
  735. # if we have a simple primary key load, check the
  736. # identity map without generating a Query at all
  737. if use_get:
  738. primary_key_identity = self._get_ident_for_use_get(
  739. session, state, passive
  740. )
  741. if attributes.PASSIVE_NO_RESULT in primary_key_identity:
  742. return attributes.PASSIVE_NO_RESULT
  743. elif attributes.NEVER_SET in primary_key_identity:
  744. return attributes.NEVER_SET
  745. if _none_set.issuperset(primary_key_identity):
  746. return None
  747. if (
  748. self.key in state.dict
  749. and not passive & attributes.DEFERRED_HISTORY_LOAD
  750. ):
  751. return attributes.ATTR_WAS_SET
  752. # look for this identity in the identity map. Delegate to the
  753. # Query class in use, as it may have special rules for how it
  754. # does this, including how it decides what the correct
  755. # identity_token would be for this identity.
  756. instance = session._identity_lookup(
  757. self.entity,
  758. primary_key_identity,
  759. passive=passive,
  760. lazy_loaded_from=state,
  761. )
  762. if instance is not None:
  763. if instance is attributes.PASSIVE_CLASS_MISMATCH:
  764. return None
  765. else:
  766. return instance
  767. elif (
  768. not passive & attributes.SQL_OK
  769. or not passive & attributes.RELATED_OBJECT_OK
  770. ):
  771. return attributes.PASSIVE_NO_RESULT
  772. return self._emit_lazyload(
  773. session,
  774. state,
  775. primary_key_identity,
  776. passive,
  777. loadopt,
  778. extra_criteria,
  779. )
  780. def _get_ident_for_use_get(self, session, state, passive):
  781. instance_mapper = state.manager.mapper
  782. if passive & attributes.LOAD_AGAINST_COMMITTED:
  783. get_attr = instance_mapper._get_committed_state_attr_by_column
  784. else:
  785. get_attr = instance_mapper._get_state_attr_by_column
  786. dict_ = state.dict
  787. return [
  788. get_attr(state, dict_, self._equated_columns[pk], passive=passive)
  789. for pk in self.mapper.primary_key
  790. ]
  791. @util.preload_module("sqlalchemy.orm.strategy_options")
  792. def _emit_lazyload(
  793. self,
  794. session,
  795. state,
  796. primary_key_identity,
  797. passive,
  798. loadopt,
  799. extra_criteria,
  800. ):
  801. strategy_options = util.preloaded.orm_strategy_options
  802. clauseelement = self.entity.__clause_element__()
  803. stmt = Select._create_raw_select(
  804. _raw_columns=[clauseelement],
  805. _propagate_attrs=clauseelement._propagate_attrs,
  806. _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
  807. _compile_options=ORMCompileState.default_compile_options,
  808. )
  809. load_options = QueryContext.default_load_options
  810. load_options += {
  811. "_invoke_all_eagers": False,
  812. "_lazy_loaded_from": state,
  813. }
  814. if self.parent_property.secondary is not None:
  815. stmt = stmt.select_from(
  816. self.mapper, self.parent_property.secondary
  817. )
  818. pending = not state.key
  819. # don't autoflush on pending
  820. if pending or passive & attributes.NO_AUTOFLUSH:
  821. stmt._execution_options = util.immutabledict({"autoflush": False})
  822. use_get = self.use_get
  823. if state.load_options or (loadopt and loadopt._extra_criteria):
  824. effective_path = state.load_path[self.parent_property]
  825. opts = tuple(state.load_options)
  826. if loadopt and loadopt._extra_criteria:
  827. use_get = False
  828. opts += (
  829. orm_util.LoaderCriteriaOption(self.entity, extra_criteria),
  830. )
  831. stmt._with_options = opts
  832. else:
  833. # this path is used if there are not already any options
  834. # in the query, but an event may want to add them
  835. effective_path = state.mapper._path_registry[self.parent_property]
  836. stmt._compile_options += {"_current_path": effective_path}
  837. if use_get:
  838. if self._raise_on_sql and not passive & attributes.NO_RAISE:
  839. self._invoke_raise_load(state, passive, "raise_on_sql")
  840. return loading.load_on_pk_identity(
  841. session, stmt, primary_key_identity, load_options=load_options
  842. )
  843. if self._order_by:
  844. stmt._order_by_clauses = self._order_by
  845. def _lazyload_reverse(compile_context):
  846. for rev in self.parent_property._reverse_property:
  847. # reverse props that are MANYTOONE are loading *this*
  848. # object from get(), so don't need to eager out to those.
  849. if (
  850. rev.direction is interfaces.MANYTOONE
  851. and rev._use_get
  852. and not isinstance(rev.strategy, LazyLoader)
  853. ):
  854. strategy_options.Load.for_existing_path(
  855. compile_context.compile_options._current_path[
  856. rev.parent
  857. ]
  858. ).lazyload(rev).process_compile_state(compile_context)
  859. stmt._with_context_options += (
  860. (_lazyload_reverse, self.parent_property),
  861. )
  862. lazy_clause, params = self._generate_lazy_clause(state, passive)
  863. execution_options = {
  864. "_sa_orm_load_options": load_options,
  865. }
  866. if (
  867. self.key in state.dict
  868. and not passive & attributes.DEFERRED_HISTORY_LOAD
  869. ):
  870. return attributes.ATTR_WAS_SET
  871. if pending:
  872. if util.has_intersection(orm_util._none_set, params.values()):
  873. return None
  874. elif util.has_intersection(orm_util._never_set, params.values()):
  875. return None
  876. if self._raise_on_sql and not passive & attributes.NO_RAISE:
  877. self._invoke_raise_load(state, passive, "raise_on_sql")
  878. stmt._where_criteria = (lazy_clause,)
  879. result = session.execute(
  880. stmt, params, execution_options=execution_options
  881. )
  882. result = result.unique().scalars().all()
  883. if self.uselist:
  884. return result
  885. else:
  886. l = len(result)
  887. if l:
  888. if l > 1:
  889. util.warn(
  890. "Multiple rows returned with "
  891. "uselist=False for lazily-loaded attribute '%s' "
  892. % self.parent_property
  893. )
  894. return result[0]
  895. else:
  896. return None
  897. def create_row_processor(
  898. self,
  899. context,
  900. query_entity,
  901. path,
  902. loadopt,
  903. mapper,
  904. result,
  905. adapter,
  906. populators,
  907. ):
  908. key = self.key
  909. if not self.is_class_level or (loadopt and loadopt._extra_criteria):
  910. # we are not the primary manager for this attribute
  911. # on this class - set up a
  912. # per-instance lazyloader, which will override the
  913. # class-level behavior.
  914. # this currently only happens when using a
  915. # "lazyload" option on a "no load"
  916. # attribute - "eager" attributes always have a
  917. # class-level lazyloader installed.
  918. set_lazy_callable = (
  919. InstanceState._instance_level_callable_processor
  920. )(
  921. mapper.class_manager,
  922. LoadLazyAttribute(
  923. key,
  924. self,
  925. loadopt,
  926. loadopt._generate_extra_criteria(context)
  927. if loadopt._extra_criteria
  928. else None,
  929. ),
  930. key,
  931. )
  932. populators["new"].append((self.key, set_lazy_callable))
  933. elif context.populate_existing or mapper.always_refresh:
  934. def reset_for_lazy_callable(state, dict_, row):
  935. # we are the primary manager for this attribute on
  936. # this class - reset its
  937. # per-instance attribute state, so that the class-level
  938. # lazy loader is
  939. # executed when next referenced on this instance.
  940. # this is needed in
  941. # populate_existing() types of scenarios to reset
  942. # any existing state.
  943. state._reset(dict_, key)
  944. populators["new"].append((self.key, reset_for_lazy_callable))
  945. class LoadLazyAttribute(object):
  946. """semi-serializable loader object used by LazyLoader
  947. Historically, this object would be carried along with instances that
  948. needed to run lazyloaders, so it had to be serializable to support
  949. cached instances.
  950. this is no longer a general requirement, and the case where this object
  951. is used is exactly the case where we can't really serialize easily,
  952. which is when extra criteria in the loader option is present.
  953. We can't reliably serialize that as it refers to mapped entities and
  954. AliasedClass objects that are local to the current process, which would
  955. need to be matched up on deserialize e.g. the sqlalchemy.ext.serializer
  956. approach.
  957. """
  958. def __init__(self, key, initiating_strategy, loadopt, extra_criteria):
  959. self.key = key
  960. self.strategy_key = initiating_strategy.strategy_key
  961. self.loadopt = loadopt
  962. self.extra_criteria = extra_criteria
  963. def __getstate__(self):
  964. if self.extra_criteria is not None:
  965. util.warn(
  966. "Can't reliably serialize a lazyload() option that "
  967. "contains additional criteria; please use eager loading "
  968. "for this case"
  969. )
  970. return {
  971. "key": self.key,
  972. "strategy_key": self.strategy_key,
  973. "loadopt": self.loadopt,
  974. "extra_criteria": (),
  975. }
  976. def __call__(self, state, passive=attributes.PASSIVE_OFF):
  977. key = self.key
  978. instance_mapper = state.manager.mapper
  979. prop = instance_mapper._props[key]
  980. strategy = prop._strategies[self.strategy_key]
  981. return strategy._load_for_state(
  982. state,
  983. passive,
  984. loadopt=self.loadopt,
  985. extra_criteria=self.extra_criteria,
  986. )
  987. class PostLoader(AbstractRelationshipLoader):
  988. """A relationship loader that emits a second SELECT statement."""
  989. def _check_recursive_postload(self, context, path, join_depth=None):
  990. effective_path = (
  991. context.compile_state.current_path or orm_util.PathRegistry.root
  992. ) + path
  993. if loading.PostLoad.path_exists(
  994. context, effective_path, self.parent_property
  995. ):
  996. return True
  997. path_w_prop = path[self.parent_property]
  998. effective_path_w_prop = effective_path[self.parent_property]
  999. if not path_w_prop.contains(context.attributes, "loader"):
  1000. if join_depth:
  1001. if effective_path_w_prop.length / 2 > join_depth:
  1002. return True
  1003. elif effective_path_w_prop.contains_mapper(self.mapper):
  1004. return True
  1005. return False
  1006. def _immediateload_create_row_processor(
  1007. self,
  1008. context,
  1009. query_entity,
  1010. path,
  1011. loadopt,
  1012. mapper,
  1013. result,
  1014. adapter,
  1015. populators,
  1016. ):
  1017. return self.parent_property._get_strategy(
  1018. (("lazy", "immediate"),)
  1019. ).create_row_processor(
  1020. context,
  1021. query_entity,
  1022. path,
  1023. loadopt,
  1024. mapper,
  1025. result,
  1026. adapter,
  1027. populators,
  1028. )
  1029. @relationships.RelationshipProperty.strategy_for(lazy="immediate")
  1030. class ImmediateLoader(PostLoader):
  1031. __slots__ = ()
  1032. def init_class_attribute(self, mapper):
  1033. self.parent_property._get_strategy(
  1034. (("lazy", "select"),)
  1035. ).init_class_attribute(mapper)
  1036. def create_row_processor(
  1037. self,
  1038. context,
  1039. query_entity,
  1040. path,
  1041. loadopt,
  1042. mapper,
  1043. result,
  1044. adapter,
  1045. populators,
  1046. ):
  1047. def load_immediate(state, dict_, row):
  1048. state.get_impl(self.key).get(state, dict_, flags)
  1049. if self._check_recursive_postload(context, path):
  1050. # this will not emit SQL and will only emit for a many-to-one
  1051. # "use get" load. the "_RELATED" part means it may return
  1052. # instance even if its expired, since this is a mutually-recursive
  1053. # load operation.
  1054. flags = attributes.PASSIVE_NO_FETCH_RELATED | attributes.NO_RAISE
  1055. else:
  1056. flags = attributes.PASSIVE_OFF | attributes.NO_RAISE
  1057. populators["delayed"].append((self.key, load_immediate))
  1058. @log.class_logger
  1059. @relationships.RelationshipProperty.strategy_for(lazy="subquery")
  1060. class SubqueryLoader(PostLoader):
  1061. __slots__ = ("join_depth",)
  1062. def __init__(self, parent, strategy_key):
  1063. super(SubqueryLoader, self).__init__(parent, strategy_key)
  1064. self.join_depth = self.parent_property.join_depth
  1065. def init_class_attribute(self, mapper):
  1066. self.parent_property._get_strategy(
  1067. (("lazy", "select"),)
  1068. ).init_class_attribute(mapper)
  1069. def _get_leftmost(
  1070. self,
  1071. orig_query_entity_index,
  1072. subq_path,
  1073. current_compile_state,
  1074. is_root,
  1075. ):
  1076. given_subq_path = subq_path
  1077. subq_path = subq_path.path
  1078. subq_mapper = orm_util._class_to_mapper(subq_path[0])
  1079. # determine attributes of the leftmost mapper
  1080. if (
  1081. self.parent.isa(subq_mapper)
  1082. and self.parent_property is subq_path[1]
  1083. ):
  1084. leftmost_mapper, leftmost_prop = self.parent, self.parent_property
  1085. else:
  1086. leftmost_mapper, leftmost_prop = subq_mapper, subq_path[1]
  1087. if is_root:
  1088. # the subq_path is also coming from cached state, so when we start
  1089. # building up this path, it has to also be converted to be in terms
  1090. # of the current state. this is for the specific case of the entity
  1091. # is an AliasedClass against a subquery that's not otherwise going
  1092. # to adapt
  1093. new_subq_path = current_compile_state._entities[
  1094. orig_query_entity_index
  1095. ].entity_zero._path_registry[leftmost_prop]
  1096. additional = len(subq_path) - len(new_subq_path)
  1097. if additional:
  1098. new_subq_path += path_registry.PathRegistry.coerce(
  1099. subq_path[-additional:]
  1100. )
  1101. else:
  1102. new_subq_path = given_subq_path
  1103. leftmost_cols = leftmost_prop.local_columns
  1104. leftmost_attr = [
  1105. getattr(
  1106. new_subq_path.path[0].entity,
  1107. leftmost_mapper._columntoproperty[c].key,
  1108. )
  1109. for c in leftmost_cols
  1110. ]
  1111. return leftmost_mapper, leftmost_attr, leftmost_prop, new_subq_path
  1112. def _generate_from_original_query(
  1113. self,
  1114. orig_compile_state,
  1115. orig_query,
  1116. leftmost_mapper,
  1117. leftmost_attr,
  1118. leftmost_relationship,
  1119. orig_entity,
  1120. ):
  1121. # reformat the original query
  1122. # to look only for significant columns
  1123. q = orig_query._clone().correlate(None)
  1124. # LEGACY: make a Query back from the select() !!
  1125. # This suits at least two legacy cases:
  1126. # 1. applications which expect before_compile() to be called
  1127. # below when we run .subquery() on this query (Keystone)
  1128. # 2. applications which are doing subqueryload with complex
  1129. # from_self() queries, as query.subquery() / .statement
  1130. # has to do the full compile context for multiply-nested
  1131. # from_self() (Neutron) - see test_subqload_from_self
  1132. # for demo.
  1133. q2 = query.Query.__new__(query.Query)
  1134. q2.__dict__.update(q.__dict__)
  1135. q = q2
  1136. # set the query's "FROM" list explicitly to what the
  1137. # FROM list would be in any case, as we will be limiting
  1138. # the columns in the SELECT list which may no longer include
  1139. # all entities mentioned in things like WHERE, JOIN, etc.
  1140. if not q._from_obj:
  1141. q._enable_assertions = False
  1142. q.select_from.non_generative(
  1143. q,
  1144. *{
  1145. ent["entity"]
  1146. for ent in _column_descriptions(
  1147. orig_query, compile_state=orig_compile_state
  1148. )
  1149. if ent["entity"] is not None
  1150. }
  1151. )
  1152. # select from the identity columns of the outer (specifically, these
  1153. # are the 'local_cols' of the property). This will remove other
  1154. # columns from the query that might suggest the right entity which is
  1155. # why we do set select_from above. The attributes we have are
  1156. # coerced and adapted using the original query's adapter, which is
  1157. # needed only for the case of adapting a subclass column to
  1158. # that of a polymorphic selectable, e.g. we have
  1159. # Engineer.primary_language and the entity is Person. All other
  1160. # adaptations, e.g. from_self, select_entity_from(), will occur
  1161. # within the new query when it compiles, as the compile_state we are
  1162. # using here is only a partial one. If the subqueryload is from a
  1163. # with_polymorphic() or other aliased() object, left_attr will already
  1164. # be the correct attributes so no adaptation is needed.
  1165. target_cols = orig_compile_state._adapt_col_list(
  1166. [
  1167. sql.coercions.expect(sql.roles.ColumnsClauseRole, o)
  1168. for o in leftmost_attr
  1169. ],
  1170. orig_compile_state._get_current_adapter(),
  1171. )
  1172. q._raw_columns = target_cols
  1173. distinct_target_key = leftmost_relationship.distinct_target_key
  1174. if distinct_target_key is True:
  1175. q._distinct = True
  1176. elif distinct_target_key is None:
  1177. # if target_cols refer to a non-primary key or only
  1178. # part of a composite primary key, set the q as distinct
  1179. for t in set(c.table for c in target_cols):
  1180. if not set(target_cols).issuperset(t.primary_key):
  1181. q._distinct = True
  1182. break
  1183. # don't need ORDER BY if no limit/offset
  1184. if not q._has_row_limiting_clause:
  1185. q._order_by_clauses = ()
  1186. if q._distinct is True and q._order_by_clauses:
  1187. # the logic to automatically add the order by columns to the query
  1188. # when distinct is True is deprecated in the query
  1189. to_add = sql_util.expand_column_list_from_order_by(
  1190. target_cols, q._order_by_clauses
  1191. )
  1192. if to_add:
  1193. q._set_entities(target_cols + to_add)
  1194. # the original query now becomes a subquery
  1195. # which we'll join onto.
  1196. # LEGACY: as "q" is a Query, the before_compile() event is invoked
  1197. # here.
  1198. embed_q = q.set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL).subquery()
  1199. left_alias = orm_util.AliasedClass(
  1200. leftmost_mapper, embed_q, use_mapper_path=True
  1201. )
  1202. return left_alias
  1203. def _prep_for_joins(self, left_alias, subq_path):
  1204. # figure out what's being joined. a.k.a. the fun part
  1205. to_join = []
  1206. pairs = list(subq_path.pairs())
  1207. for i, (mapper, prop) in enumerate(pairs):
  1208. if i > 0:
  1209. # look at the previous mapper in the chain -
  1210. # if it is as or more specific than this prop's
  1211. # mapper, use that instead.
  1212. # note we have an assumption here that
  1213. # the non-first element is always going to be a mapper,
  1214. # not an AliasedClass
  1215. prev_mapper = pairs[i - 1][1].mapper
  1216. to_append = prev_mapper if prev_mapper.isa(mapper) else mapper
  1217. else:
  1218. to_append = mapper
  1219. to_join.append((to_append, prop.key))
  1220. # determine the immediate parent class we are joining from,
  1221. # which needs to be aliased.
  1222. if len(to_join) < 2:
  1223. # in the case of a one level eager load, this is the
  1224. # leftmost "left_alias".
  1225. parent_alias = left_alias
  1226. else:
  1227. info = inspect(to_join[-1][0])
  1228. if info.is_aliased_class:
  1229. parent_alias = info.entity
  1230. else:
  1231. # alias a plain mapper as we may be
  1232. # joining multiple times
  1233. parent_alias = orm_util.AliasedClass(
  1234. info.entity, use_mapper_path=True
  1235. )
  1236. local_cols = self.parent_property.local_columns
  1237. local_attr = [
  1238. getattr(parent_alias, self.parent._columntoproperty[c].key)
  1239. for c in local_cols
  1240. ]
  1241. return to_join, local_attr, parent_alias
  1242. def _apply_joins(
  1243. self, q, to_join, left_alias, parent_alias, effective_entity
  1244. ):
  1245. ltj = len(to_join)
  1246. if ltj == 1:
  1247. to_join = [
  1248. getattr(left_alias, to_join[0][1]).of_type(effective_entity)
  1249. ]
  1250. elif ltj == 2:
  1251. to_join = [
  1252. getattr(left_alias, to_join[0][1]).of_type(parent_alias),
  1253. getattr(parent_alias, to_join[-1][1]).of_type(
  1254. effective_entity
  1255. ),
  1256. ]
  1257. elif ltj > 2:
  1258. middle = [
  1259. (
  1260. orm_util.AliasedClass(item[0])
  1261. if not inspect(item[0]).is_aliased_class
  1262. else item[0].entity,
  1263. item[1],
  1264. )
  1265. for item in to_join[1:-1]
  1266. ]
  1267. inner = []
  1268. while middle:
  1269. item = middle.pop(0)
  1270. attr = getattr(item[0], item[1])
  1271. if middle:
  1272. attr = attr.of_type(middle[0][0])
  1273. else:
  1274. attr = attr.of_type(parent_alias)
  1275. inner.append(attr)
  1276. to_join = (
  1277. [getattr(left_alias, to_join[0][1]).of_type(inner[0].parent)]
  1278. + inner
  1279. + [
  1280. getattr(parent_alias, to_join[-1][1]).of_type(
  1281. effective_entity
  1282. )
  1283. ]
  1284. )
  1285. for attr in to_join:
  1286. q = q.join(attr)
  1287. return q
  1288. def _setup_options(
  1289. self,
  1290. context,
  1291. q,
  1292. subq_path,
  1293. rewritten_path,
  1294. orig_query,
  1295. effective_entity,
  1296. loadopt,
  1297. ):
  1298. # note that because the subqueryload object
  1299. # does not re-use the cached query, instead always making
  1300. # use of the current invoked query, while we have two queries
  1301. # here (orig and context.query), they are both non-cached
  1302. # queries and we can transfer the options as is without
  1303. # adjusting for new criteria. Some work on #6881 / #6889
  1304. # brought this into question.
  1305. new_options = orig_query._with_options
  1306. if loadopt and loadopt._extra_criteria:
  1307. new_options += (
  1308. orm_util.LoaderCriteriaOption(
  1309. self.entity,
  1310. loadopt._generate_extra_criteria(context),
  1311. ),
  1312. )
  1313. # propagate loader options etc. to the new query.
  1314. # these will fire relative to subq_path.
  1315. q = q._with_current_path(rewritten_path)
  1316. q = q.options(*new_options)
  1317. return q
  1318. def _setup_outermost_orderby(self, q):
  1319. if self.parent_property.order_by:
  1320. def _setup_outermost_orderby(compile_context):
  1321. compile_context.eager_order_by += tuple(
  1322. util.to_list(self.parent_property.order_by)
  1323. )
  1324. q = q._add_context_option(
  1325. _setup_outermost_orderby, self.parent_property
  1326. )
  1327. return q
  1328. class _SubqCollections(object):
  1329. """Given a :class:`_query.Query` used to emit the "subquery load",
  1330. provide a load interface that executes the query at the
  1331. first moment a value is needed.
  1332. """
  1333. __slots__ = (
  1334. "session",
  1335. "execution_options",
  1336. "load_options",
  1337. "params",
  1338. "subq",
  1339. "_data",
  1340. )
  1341. def __init__(self, context, subq):
  1342. # avoid creating a cycle by storing context
  1343. # even though that's preferable
  1344. self.session = context.session
  1345. self.execution_options = context.execution_options
  1346. self.load_options = context.load_options
  1347. self.params = context.params or {}
  1348. self.subq = subq
  1349. self._data = None
  1350. def get(self, key, default):
  1351. if self._data is None:
  1352. self._load()
  1353. return self._data.get(key, default)
  1354. def _load(self):
  1355. self._data = collections.defaultdict(list)
  1356. q = self.subq
  1357. assert q.session is None
  1358. q = q.with_session(self.session)
  1359. if self.load_options._populate_existing:
  1360. q = q.populate_existing()
  1361. # to work with baked query, the parameters may have been
  1362. # updated since this query was created, so take these into account
  1363. rows = list(q.params(self.params))
  1364. for k, v in itertools.groupby(rows, lambda x: x[1:]):
  1365. self._data[k].extend(vv[0] for vv in v)
  1366. def loader(self, state, dict_, row):
  1367. if self._data is None:
  1368. self._load()
  1369. def _setup_query_from_rowproc(
  1370. self,
  1371. context,
  1372. query_entity,
  1373. path,
  1374. entity,
  1375. loadopt,
  1376. adapter,
  1377. ):
  1378. compile_state = context.compile_state
  1379. if (
  1380. not compile_state.compile_options._enable_eagerloads
  1381. or compile_state.compile_options._for_refresh_state
  1382. ):
  1383. return
  1384. orig_query_entity_index = compile_state._entities.index(query_entity)
  1385. context.loaders_require_buffering = True
  1386. path = path[self.parent_property]
  1387. # build up a path indicating the path from the leftmost
  1388. # entity to the thing we're subquery loading.
  1389. with_poly_entity = path.get(
  1390. compile_state.attributes, "path_with_polymorphic", None
  1391. )
  1392. if with_poly_entity is not None:
  1393. effective_entity = with_poly_entity
  1394. else:
  1395. effective_entity = self.entity
  1396. subq_path, rewritten_path = context.query._execution_options.get(
  1397. ("subquery_paths", None),
  1398. (orm_util.PathRegistry.root, orm_util.PathRegistry.root),
  1399. )
  1400. is_root = subq_path is orm_util.PathRegistry.root
  1401. subq_path = subq_path + path
  1402. rewritten_path = rewritten_path + path
  1403. # if not via query option, check for
  1404. # a cycle
  1405. # TODO: why is this here??? this is now handled
  1406. # by the _check_recursive_postload call
  1407. if not path.contains(compile_state.attributes, "loader"):
  1408. if self.join_depth:
  1409. if (
  1410. (
  1411. compile_state.current_path.length
  1412. if compile_state.current_path
  1413. else 0
  1414. )
  1415. + path.length
  1416. ) / 2 > self.join_depth:
  1417. return
  1418. elif subq_path.contains_mapper(self.mapper):
  1419. return
  1420. # use the current query being invoked, not the compile state
  1421. # one. this is so that we get the current parameters. however,
  1422. # it means we can't use the existing compile state, we have to make
  1423. # a new one. other approaches include possibly using the
  1424. # compiled query but swapping the params, seems only marginally
  1425. # less time spent but more complicated
  1426. orig_query = context.query._execution_options.get(
  1427. ("orig_query", SubqueryLoader), context.query
  1428. )
  1429. # make a new compile_state for the query that's probably cached, but
  1430. # we're sort of undoing a bit of that caching :(
  1431. compile_state_cls = ORMCompileState._get_plugin_class_for_plugin(
  1432. orig_query, "orm"
  1433. )
  1434. if orig_query._is_lambda_element:
  1435. if context.load_options._lazy_loaded_from is None:
  1436. util.warn(
  1437. 'subqueryloader for "%s" must invoke lambda callable '
  1438. "at %r in "
  1439. "order to produce a new query, decreasing the efficiency "
  1440. "of caching for this statement. Consider using "
  1441. "selectinload() for more effective full-lambda caching"
  1442. % (self, orig_query)
  1443. )
  1444. orig_query = orig_query._resolved
  1445. # this is the more "quick" version, however it's not clear how
  1446. # much of this we need. in particular I can't get a test to
  1447. # fail if the "set_base_alias" is missing and not sure why that is.
  1448. orig_compile_state = compile_state_cls._create_entities_collection(
  1449. orig_query, legacy=False
  1450. )
  1451. (
  1452. leftmost_mapper,
  1453. leftmost_attr,
  1454. leftmost_relationship,
  1455. rewritten_path,
  1456. ) = self._get_leftmost(
  1457. orig_query_entity_index,
  1458. rewritten_path,
  1459. orig_compile_state,
  1460. is_root,
  1461. )
  1462. # generate a new Query from the original, then
  1463. # produce a subquery from it.
  1464. left_alias = self._generate_from_original_query(
  1465. orig_compile_state,
  1466. orig_query,
  1467. leftmost_mapper,
  1468. leftmost_attr,
  1469. leftmost_relationship,
  1470. entity,
  1471. )
  1472. # generate another Query that will join the
  1473. # left alias to the target relationships.
  1474. # basically doing a longhand
  1475. # "from_self()". (from_self() itself not quite industrial
  1476. # strength enough for all contingencies...but very close)
  1477. q = query.Query(effective_entity)
  1478. q._execution_options = q._execution_options.union(
  1479. {
  1480. ("orig_query", SubqueryLoader): orig_query,
  1481. ("subquery_paths", None): (subq_path, rewritten_path),
  1482. }
  1483. )
  1484. q = q._set_enable_single_crit(False)
  1485. to_join, local_attr, parent_alias = self._prep_for_joins(
  1486. left_alias, subq_path
  1487. )
  1488. q = q.add_columns(*local_attr)
  1489. q = self._apply_joins(
  1490. q, to_join, left_alias, parent_alias, effective_entity
  1491. )
  1492. q = self._setup_options(
  1493. context,
  1494. q,
  1495. subq_path,
  1496. rewritten_path,
  1497. orig_query,
  1498. effective_entity,
  1499. loadopt,
  1500. )
  1501. q = self._setup_outermost_orderby(q)
  1502. return q
  1503. def create_row_processor(
  1504. self,
  1505. context,
  1506. query_entity,
  1507. path,
  1508. loadopt,
  1509. mapper,
  1510. result,
  1511. adapter,
  1512. populators,
  1513. ):
  1514. if context.refresh_state:
  1515. return self._immediateload_create_row_processor(
  1516. context,
  1517. query_entity,
  1518. path,
  1519. loadopt,
  1520. mapper,
  1521. result,
  1522. adapter,
  1523. populators,
  1524. )
  1525. # the subqueryloader does a similar check in setup_query() unlike
  1526. # the other post loaders, however we have this here for consistency
  1527. elif self._check_recursive_postload(context, path, self.join_depth):
  1528. return
  1529. elif not isinstance(context.compile_state, ORMSelectCompileState):
  1530. # issue 7505 - subqueryload() in 1.3 and previous would silently
  1531. # degrade for from_statement() without warning. this behavior
  1532. # is restored here
  1533. return
  1534. if not self.parent.class_manager[self.key].impl.supports_population:
  1535. raise sa_exc.InvalidRequestError(
  1536. "'%s' does not support object "
  1537. "population - eager loading cannot be applied." % self
  1538. )
  1539. # a little dance here as the "path" is still something that only
  1540. # semi-tracks the exact series of things we are loading, still not
  1541. # telling us about with_polymorphic() and stuff like that when it's at
  1542. # the root.. the initial MapperEntity is more accurate for this case.
  1543. if len(path) == 1:
  1544. if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
  1545. return
  1546. elif not orm_util._entity_isa(path[-1], self.parent):
  1547. return
  1548. subq = self._setup_query_from_rowproc(
  1549. context,
  1550. query_entity,
  1551. path,
  1552. path[-1],
  1553. loadopt,
  1554. adapter,
  1555. )
  1556. if subq is None:
  1557. return
  1558. assert subq.session is None
  1559. path = path[self.parent_property]
  1560. local_cols = self.parent_property.local_columns
  1561. # cache the loaded collections in the context
  1562. # so that inheriting mappers don't re-load when they
  1563. # call upon create_row_processor again
  1564. collections = path.get(context.attributes, "collections")
  1565. if collections is None:
  1566. collections = self._SubqCollections(context, subq)
  1567. path.set(context.attributes, "collections", collections)
  1568. if adapter:
  1569. local_cols = [adapter.columns[c] for c in local_cols]
  1570. if self.uselist:
  1571. self._create_collection_loader(
  1572. context, result, collections, local_cols, populators
  1573. )
  1574. else:
  1575. self._create_scalar_loader(
  1576. context, result, collections, local_cols, populators
  1577. )
  1578. def _create_collection_loader(
  1579. self, context, result, collections, local_cols, populators
  1580. ):
  1581. tuple_getter = result._tuple_getter(local_cols)
  1582. def load_collection_from_subq(state, dict_, row):
  1583. collection = collections.get(tuple_getter(row), ())
  1584. state.get_impl(self.key).set_committed_value(
  1585. state, dict_, collection
  1586. )
  1587. def load_collection_from_subq_existing_row(state, dict_, row):
  1588. if self.key not in dict_:
  1589. load_collection_from_subq(state, dict_, row)
  1590. populators["new"].append((self.key, load_collection_from_subq))
  1591. populators["existing"].append(
  1592. (self.key, load_collection_from_subq_existing_row)
  1593. )
  1594. if context.invoke_all_eagers:
  1595. populators["eager"].append((self.key, collections.loader))
  1596. def _create_scalar_loader(
  1597. self, context, result, collections, local_cols, populators
  1598. ):
  1599. tuple_getter = result._tuple_getter(local_cols)
  1600. def load_scalar_from_subq(state, dict_, row):
  1601. collection = collections.get(tuple_getter(row), (None,))
  1602. if len(collection) > 1:
  1603. util.warn(
  1604. "Multiple rows returned with "
  1605. "uselist=False for eagerly-loaded attribute '%s' " % self
  1606. )
  1607. scalar = collection[0]
  1608. state.get_impl(self.key).set_committed_value(state, dict_, scalar)
  1609. def load_scalar_from_subq_existing_row(state, dict_, row):
  1610. if self.key not in dict_:
  1611. load_scalar_from_subq(state, dict_, row)
  1612. populators["new"].append((self.key, load_scalar_from_subq))
  1613. populators["existing"].append(
  1614. (self.key, load_scalar_from_subq_existing_row)
  1615. )
  1616. if context.invoke_all_eagers:
  1617. populators["eager"].append((self.key, collections.loader))
  1618. @log.class_logger
  1619. @relationships.RelationshipProperty.strategy_for(lazy="joined")
  1620. @relationships.RelationshipProperty.strategy_for(lazy=False)
  1621. class JoinedLoader(AbstractRelationshipLoader):
  1622. """Provide loading behavior for a :class:`.RelationshipProperty`
  1623. using joined eager loading.
  1624. """
  1625. __slots__ = "join_depth", "_aliased_class_pool"
  1626. def __init__(self, parent, strategy_key):
  1627. super(JoinedLoader, self).__init__(parent, strategy_key)
  1628. self.join_depth = self.parent_property.join_depth
  1629. self._aliased_class_pool = []
  1630. def init_class_attribute(self, mapper):
  1631. self.parent_property._get_strategy(
  1632. (("lazy", "select"),)
  1633. ).init_class_attribute(mapper)
  1634. def setup_query(
  1635. self,
  1636. compile_state,
  1637. query_entity,
  1638. path,
  1639. loadopt,
  1640. adapter,
  1641. column_collection=None,
  1642. parentmapper=None,
  1643. chained_from_outerjoin=False,
  1644. **kwargs
  1645. ):
  1646. """Add a left outer join to the statement that's being constructed."""
  1647. if not compile_state.compile_options._enable_eagerloads:
  1648. return
  1649. elif self.uselist:
  1650. compile_state.multi_row_eager_loaders = True
  1651. path = path[self.parent_property]
  1652. with_polymorphic = None
  1653. user_defined_adapter = (
  1654. self._init_user_defined_eager_proc(
  1655. loadopt, compile_state, compile_state.attributes
  1656. )
  1657. if loadopt
  1658. else False
  1659. )
  1660. if user_defined_adapter is not False:
  1661. # setup an adapter but dont create any JOIN, assume it's already
  1662. # in the query
  1663. (
  1664. clauses,
  1665. adapter,
  1666. add_to_collection,
  1667. ) = self._setup_query_on_user_defined_adapter(
  1668. compile_state,
  1669. query_entity,
  1670. path,
  1671. adapter,
  1672. user_defined_adapter,
  1673. )
  1674. # don't do "wrap" for multi-row, we want to wrap
  1675. # limited/distinct SELECT,
  1676. # because we want to put the JOIN on the outside.
  1677. else:
  1678. # if not via query option, check for
  1679. # a cycle
  1680. if not path.contains(compile_state.attributes, "loader"):
  1681. if self.join_depth:
  1682. if path.length / 2 > self.join_depth:
  1683. return
  1684. elif path.contains_mapper(self.mapper):
  1685. return
  1686. # add the JOIN and create an adapter
  1687. (
  1688. clauses,
  1689. adapter,
  1690. add_to_collection,
  1691. chained_from_outerjoin,
  1692. ) = self._generate_row_adapter(
  1693. compile_state,
  1694. query_entity,
  1695. path,
  1696. loadopt,
  1697. adapter,
  1698. column_collection,
  1699. parentmapper,
  1700. chained_from_outerjoin,
  1701. )
  1702. # for multi-row, we want to wrap limited/distinct SELECT,
  1703. # because we want to put the JOIN on the outside.
  1704. compile_state.eager_adding_joins = True
  1705. with_poly_entity = path.get(
  1706. compile_state.attributes, "path_with_polymorphic", None
  1707. )
  1708. if with_poly_entity is not None:
  1709. with_polymorphic = inspect(
  1710. with_poly_entity
  1711. ).with_polymorphic_mappers
  1712. else:
  1713. with_polymorphic = None
  1714. path = path[self.entity]
  1715. loading._setup_entity_query(
  1716. compile_state,
  1717. self.mapper,
  1718. query_entity,
  1719. path,
  1720. clauses,
  1721. add_to_collection,
  1722. with_polymorphic=with_polymorphic,
  1723. parentmapper=self.mapper,
  1724. chained_from_outerjoin=chained_from_outerjoin,
  1725. )
  1726. if with_poly_entity is not None and None in set(
  1727. compile_state.secondary_columns
  1728. ):
  1729. raise sa_exc.InvalidRequestError(
  1730. "Detected unaliased columns when generating joined "
  1731. "load. Make sure to use aliased=True or flat=True "
  1732. "when using joined loading with with_polymorphic()."
  1733. )
  1734. def _init_user_defined_eager_proc(
  1735. self, loadopt, compile_state, target_attributes
  1736. ):
  1737. # check if the opt applies at all
  1738. if "eager_from_alias" not in loadopt.local_opts:
  1739. # nope
  1740. return False
  1741. path = loadopt.path.parent
  1742. # the option applies. check if the "user_defined_eager_row_processor"
  1743. # has been built up.
  1744. adapter = path.get(
  1745. compile_state.attributes, "user_defined_eager_row_processor", False
  1746. )
  1747. if adapter is not False:
  1748. # just return it
  1749. return adapter
  1750. # otherwise figure it out.
  1751. alias = loadopt.local_opts["eager_from_alias"]
  1752. root_mapper, prop = path[-2:]
  1753. if alias is not None:
  1754. if isinstance(alias, str):
  1755. alias = prop.target.alias(alias)
  1756. adapter = sql_util.ColumnAdapter(
  1757. alias, equivalents=prop.mapper._equivalent_columns
  1758. )
  1759. else:
  1760. if path.contains(
  1761. compile_state.attributes, "path_with_polymorphic"
  1762. ):
  1763. with_poly_entity = path.get(
  1764. compile_state.attributes, "path_with_polymorphic"
  1765. )
  1766. adapter = orm_util.ORMAdapter(
  1767. with_poly_entity,
  1768. equivalents=prop.mapper._equivalent_columns,
  1769. )
  1770. else:
  1771. adapter = compile_state._polymorphic_adapters.get(
  1772. prop.mapper, None
  1773. )
  1774. path.set(
  1775. target_attributes,
  1776. "user_defined_eager_row_processor",
  1777. adapter,
  1778. )
  1779. return adapter
  1780. def _setup_query_on_user_defined_adapter(
  1781. self, context, entity, path, adapter, user_defined_adapter
  1782. ):
  1783. # apply some more wrapping to the "user defined adapter"
  1784. # if we are setting up the query for SQL render.
  1785. adapter = entity._get_entity_clauses(context)
  1786. if adapter and user_defined_adapter:
  1787. user_defined_adapter = user_defined_adapter.wrap(adapter)
  1788. path.set(
  1789. context.attributes,
  1790. "user_defined_eager_row_processor",
  1791. user_defined_adapter,
  1792. )
  1793. elif adapter:
  1794. user_defined_adapter = adapter
  1795. path.set(
  1796. context.attributes,
  1797. "user_defined_eager_row_processor",
  1798. user_defined_adapter,
  1799. )
  1800. add_to_collection = context.primary_columns
  1801. return user_defined_adapter, adapter, add_to_collection
  1802. def _gen_pooled_aliased_class(self, context):
  1803. # keep a local pool of AliasedClass objects that get re-used.
  1804. # we need one unique AliasedClass per query per appearance of our
  1805. # entity in the query.
  1806. if inspect(self.entity).is_aliased_class:
  1807. alt_selectable = inspect(self.entity).selectable
  1808. else:
  1809. alt_selectable = None
  1810. key = ("joinedloader_ac", self)
  1811. if key not in context.attributes:
  1812. context.attributes[key] = idx = 0
  1813. else:
  1814. context.attributes[key] = idx = context.attributes[key] + 1
  1815. if idx >= len(self._aliased_class_pool):
  1816. to_adapt = orm_util.AliasedClass(
  1817. self.mapper,
  1818. alias=alt_selectable._anonymous_fromclause(flat=True)
  1819. if alt_selectable is not None
  1820. else None,
  1821. flat=True,
  1822. use_mapper_path=True,
  1823. )
  1824. # load up the .columns collection on the Alias() before
  1825. # the object becomes shared among threads. this prevents
  1826. # races for column identities.
  1827. inspect(to_adapt).selectable.c
  1828. self._aliased_class_pool.append(to_adapt)
  1829. return self._aliased_class_pool[idx]
  1830. def _generate_row_adapter(
  1831. self,
  1832. compile_state,
  1833. entity,
  1834. path,
  1835. loadopt,
  1836. adapter,
  1837. column_collection,
  1838. parentmapper,
  1839. chained_from_outerjoin,
  1840. ):
  1841. with_poly_entity = path.get(
  1842. compile_state.attributes, "path_with_polymorphic", None
  1843. )
  1844. if with_poly_entity:
  1845. to_adapt = with_poly_entity
  1846. else:
  1847. to_adapt = self._gen_pooled_aliased_class(compile_state)
  1848. clauses = inspect(to_adapt)._memo(
  1849. ("joinedloader_ormadapter", self),
  1850. orm_util.ORMAdapter,
  1851. to_adapt,
  1852. equivalents=self.mapper._equivalent_columns,
  1853. adapt_required=True,
  1854. allow_label_resolve=False,
  1855. anonymize_labels=True,
  1856. )
  1857. assert clauses.aliased_class is not None
  1858. innerjoin = (
  1859. loadopt.local_opts.get("innerjoin", self.parent_property.innerjoin)
  1860. if loadopt is not None
  1861. else self.parent_property.innerjoin
  1862. )
  1863. if not innerjoin:
  1864. # if this is an outer join, all non-nested eager joins from
  1865. # this path must also be outer joins
  1866. chained_from_outerjoin = True
  1867. compile_state.create_eager_joins.append(
  1868. (
  1869. self._create_eager_join,
  1870. entity,
  1871. path,
  1872. adapter,
  1873. parentmapper,
  1874. clauses,
  1875. innerjoin,
  1876. chained_from_outerjoin,
  1877. loadopt._extra_criteria if loadopt else (),
  1878. )
  1879. )
  1880. add_to_collection = compile_state.secondary_columns
  1881. path.set(compile_state.attributes, "eager_row_processor", clauses)
  1882. return clauses, adapter, add_to_collection, chained_from_outerjoin
  1883. def _create_eager_join(
  1884. self,
  1885. compile_state,
  1886. query_entity,
  1887. path,
  1888. adapter,
  1889. parentmapper,
  1890. clauses,
  1891. innerjoin,
  1892. chained_from_outerjoin,
  1893. extra_criteria,
  1894. ):
  1895. if parentmapper is None:
  1896. localparent = query_entity.mapper
  1897. else:
  1898. localparent = parentmapper
  1899. # whether or not the Query will wrap the selectable in a subquery,
  1900. # and then attach eager load joins to that (i.e., in the case of
  1901. # LIMIT/OFFSET etc.)
  1902. should_nest_selectable = (
  1903. compile_state.multi_row_eager_loaders
  1904. and compile_state._should_nest_selectable
  1905. )
  1906. query_entity_key = None
  1907. if (
  1908. query_entity not in compile_state.eager_joins
  1909. and not should_nest_selectable
  1910. and compile_state.from_clauses
  1911. ):
  1912. indexes = sql_util.find_left_clause_that_matches_given(
  1913. compile_state.from_clauses, query_entity.selectable
  1914. )
  1915. if len(indexes) > 1:
  1916. # for the eager load case, I can't reproduce this right
  1917. # now. For query.join() I can.
  1918. raise sa_exc.InvalidRequestError(
  1919. "Can't identify which query entity in which to joined "
  1920. "eager load from. Please use an exact match when "
  1921. "specifying the join path."
  1922. )
  1923. if indexes:
  1924. clause = compile_state.from_clauses[indexes[0]]
  1925. # join to an existing FROM clause on the query.
  1926. # key it to its list index in the eager_joins dict.
  1927. # Query._compile_context will adapt as needed and
  1928. # append to the FROM clause of the select().
  1929. query_entity_key, default_towrap = indexes[0], clause
  1930. if query_entity_key is None:
  1931. query_entity_key, default_towrap = (
  1932. query_entity,
  1933. query_entity.selectable,
  1934. )
  1935. towrap = compile_state.eager_joins.setdefault(
  1936. query_entity_key, default_towrap
  1937. )
  1938. if adapter:
  1939. if getattr(adapter, "aliased_class", None):
  1940. # joining from an adapted entity. The adapted entity
  1941. # might be a "with_polymorphic", so resolve that to our
  1942. # specific mapper's entity before looking for our attribute
  1943. # name on it.
  1944. efm = inspect(adapter.aliased_class)._entity_for_mapper(
  1945. localparent
  1946. if localparent.isa(self.parent)
  1947. else self.parent
  1948. )
  1949. # look for our attribute on the adapted entity, else fall back
  1950. # to our straight property
  1951. onclause = getattr(efm.entity, self.key, self.parent_property)
  1952. else:
  1953. onclause = getattr(
  1954. orm_util.AliasedClass(
  1955. self.parent, adapter.selectable, use_mapper_path=True
  1956. ),
  1957. self.key,
  1958. self.parent_property,
  1959. )
  1960. else:
  1961. onclause = self.parent_property
  1962. assert clauses.aliased_class is not None
  1963. attach_on_outside = (
  1964. not chained_from_outerjoin
  1965. or not innerjoin
  1966. or innerjoin == "unnested"
  1967. or query_entity.entity_zero.represents_outer_join
  1968. )
  1969. extra_join_criteria = extra_criteria
  1970. additional_entity_criteria = compile_state.global_attributes.get(
  1971. ("additional_entity_criteria", self.mapper), ()
  1972. )
  1973. if additional_entity_criteria:
  1974. extra_join_criteria += tuple(
  1975. ae._resolve_where_criteria(self.mapper)
  1976. for ae in additional_entity_criteria
  1977. if ae.propagate_to_loaders
  1978. )
  1979. if attach_on_outside:
  1980. # this is the "classic" eager join case.
  1981. eagerjoin = orm_util._ORMJoin(
  1982. towrap,
  1983. clauses.aliased_class,
  1984. onclause,
  1985. isouter=not innerjoin
  1986. or query_entity.entity_zero.represents_outer_join
  1987. or (chained_from_outerjoin and isinstance(towrap, sql.Join)),
  1988. _left_memo=self.parent,
  1989. _right_memo=self.mapper,
  1990. _extra_criteria=extra_join_criteria,
  1991. )
  1992. else:
  1993. # all other cases are innerjoin=='nested' approach
  1994. eagerjoin = self._splice_nested_inner_join(
  1995. path, towrap, clauses, onclause, extra_join_criteria
  1996. )
  1997. compile_state.eager_joins[query_entity_key] = eagerjoin
  1998. # send a hint to the Query as to where it may "splice" this join
  1999. eagerjoin.stop_on = query_entity.selectable
  2000. if not parentmapper:
  2001. # for parentclause that is the non-eager end of the join,
  2002. # ensure all the parent cols in the primaryjoin are actually
  2003. # in the
  2004. # columns clause (i.e. are not deferred), so that aliasing applied
  2005. # by the Query propagates those columns outward.
  2006. # This has the effect
  2007. # of "undefering" those columns.
  2008. for col in sql_util._find_columns(
  2009. self.parent_property.primaryjoin
  2010. ):
  2011. if localparent.persist_selectable.c.contains_column(col):
  2012. if adapter:
  2013. col = adapter.columns[col]
  2014. compile_state._append_dedupe_col_collection(
  2015. col, compile_state.primary_columns
  2016. )
  2017. if self.parent_property.order_by:
  2018. compile_state.eager_order_by += tuple(
  2019. (eagerjoin._target_adapter.copy_and_process)(
  2020. util.to_list(self.parent_property.order_by)
  2021. )
  2022. )
  2023. def _splice_nested_inner_join(
  2024. self, path, join_obj, clauses, onclause, extra_criteria, splicing=False
  2025. ):
  2026. # recursive fn to splice a nested join into an existing one.
  2027. # splicing=False means this is the outermost call, and it
  2028. # should return a value. splicing=<from object> is the recursive
  2029. # form, where it can return None to indicate the end of the recursion
  2030. if splicing is False:
  2031. # first call is always handed a join object
  2032. # from the outside
  2033. assert isinstance(join_obj, orm_util._ORMJoin)
  2034. elif isinstance(join_obj, sql.selectable.FromGrouping):
  2035. return self._splice_nested_inner_join(
  2036. path,
  2037. join_obj.element,
  2038. clauses,
  2039. onclause,
  2040. extra_criteria,
  2041. splicing,
  2042. )
  2043. elif not isinstance(join_obj, orm_util._ORMJoin):
  2044. if path[-2].isa(splicing):
  2045. return orm_util._ORMJoin(
  2046. join_obj,
  2047. clauses.aliased_class,
  2048. onclause,
  2049. isouter=False,
  2050. _left_memo=splicing,
  2051. _right_memo=path[-1].mapper,
  2052. _extra_criteria=extra_criteria,
  2053. )
  2054. else:
  2055. return None
  2056. target_join = self._splice_nested_inner_join(
  2057. path,
  2058. join_obj.right,
  2059. clauses,
  2060. onclause,
  2061. extra_criteria,
  2062. join_obj._right_memo,
  2063. )
  2064. if target_join is None:
  2065. right_splice = False
  2066. target_join = self._splice_nested_inner_join(
  2067. path,
  2068. join_obj.left,
  2069. clauses,
  2070. onclause,
  2071. extra_criteria,
  2072. join_obj._left_memo,
  2073. )
  2074. if target_join is None:
  2075. # should only return None when recursively called,
  2076. # e.g. splicing refers to a from obj
  2077. assert (
  2078. splicing is not False
  2079. ), "assertion failed attempting to produce joined eager loads"
  2080. return None
  2081. else:
  2082. right_splice = True
  2083. if right_splice:
  2084. # for a right splice, attempt to flatten out
  2085. # a JOIN b JOIN c JOIN .. to avoid needless
  2086. # parenthesis nesting
  2087. if not join_obj.isouter and not target_join.isouter:
  2088. eagerjoin = join_obj._splice_into_center(target_join)
  2089. else:
  2090. eagerjoin = orm_util._ORMJoin(
  2091. join_obj.left,
  2092. target_join,
  2093. join_obj.onclause,
  2094. isouter=join_obj.isouter,
  2095. _left_memo=join_obj._left_memo,
  2096. )
  2097. else:
  2098. eagerjoin = orm_util._ORMJoin(
  2099. target_join,
  2100. join_obj.right,
  2101. join_obj.onclause,
  2102. isouter=join_obj.isouter,
  2103. _right_memo=join_obj._right_memo,
  2104. )
  2105. eagerjoin._target_adapter = target_join._target_adapter
  2106. return eagerjoin
  2107. def _create_eager_adapter(self, context, result, adapter, path, loadopt):
  2108. compile_state = context.compile_state
  2109. user_defined_adapter = (
  2110. self._init_user_defined_eager_proc(
  2111. loadopt, compile_state, context.attributes
  2112. )
  2113. if loadopt
  2114. else False
  2115. )
  2116. if user_defined_adapter is not False:
  2117. decorator = user_defined_adapter
  2118. # user defined eagerloads are part of the "primary"
  2119. # portion of the load.
  2120. # the adapters applied to the Query should be honored.
  2121. if compile_state.compound_eager_adapter and decorator:
  2122. decorator = decorator.wrap(
  2123. compile_state.compound_eager_adapter
  2124. )
  2125. elif compile_state.compound_eager_adapter:
  2126. decorator = compile_state.compound_eager_adapter
  2127. else:
  2128. decorator = path.get(
  2129. compile_state.attributes, "eager_row_processor"
  2130. )
  2131. if decorator is None:
  2132. return False
  2133. if self.mapper._result_has_identity_key(result, decorator):
  2134. return decorator
  2135. else:
  2136. # no identity key - don't return a row
  2137. # processor, will cause a degrade to lazy
  2138. return False
  2139. def create_row_processor(
  2140. self,
  2141. context,
  2142. query_entity,
  2143. path,
  2144. loadopt,
  2145. mapper,
  2146. result,
  2147. adapter,
  2148. populators,
  2149. ):
  2150. if not self.parent.class_manager[self.key].impl.supports_population:
  2151. raise sa_exc.InvalidRequestError(
  2152. "'%s' does not support object "
  2153. "population - eager loading cannot be applied." % self
  2154. )
  2155. if self.uselist:
  2156. context.loaders_require_uniquing = True
  2157. our_path = path[self.parent_property]
  2158. eager_adapter = self._create_eager_adapter(
  2159. context, result, adapter, our_path, loadopt
  2160. )
  2161. if eager_adapter is not False:
  2162. key = self.key
  2163. _instance = loading._instance_processor(
  2164. query_entity,
  2165. self.mapper,
  2166. context,
  2167. result,
  2168. our_path[self.entity],
  2169. eager_adapter,
  2170. )
  2171. if not self.uselist:
  2172. self._create_scalar_loader(context, key, _instance, populators)
  2173. else:
  2174. self._create_collection_loader(
  2175. context, key, _instance, populators
  2176. )
  2177. else:
  2178. self.parent_property._get_strategy(
  2179. (("lazy", "select"),)
  2180. ).create_row_processor(
  2181. context,
  2182. query_entity,
  2183. path,
  2184. loadopt,
  2185. mapper,
  2186. result,
  2187. adapter,
  2188. populators,
  2189. )
  2190. def _create_collection_loader(self, context, key, _instance, populators):
  2191. def load_collection_from_joined_new_row(state, dict_, row):
  2192. # note this must unconditionally clear out any existing collection.
  2193. # an existing collection would be present only in the case of
  2194. # populate_existing().
  2195. collection = attributes.init_state_collection(state, dict_, key)
  2196. result_list = util.UniqueAppender(
  2197. collection, "append_without_event"
  2198. )
  2199. context.attributes[(state, key)] = result_list
  2200. inst = _instance(row)
  2201. if inst is not None:
  2202. result_list.append(inst)
  2203. def load_collection_from_joined_existing_row(state, dict_, row):
  2204. if (state, key) in context.attributes:
  2205. result_list = context.attributes[(state, key)]
  2206. else:
  2207. # appender_key can be absent from context.attributes
  2208. # with isnew=False when self-referential eager loading
  2209. # is used; the same instance may be present in two
  2210. # distinct sets of result columns
  2211. collection = attributes.init_state_collection(
  2212. state, dict_, key
  2213. )
  2214. result_list = util.UniqueAppender(
  2215. collection, "append_without_event"
  2216. )
  2217. context.attributes[(state, key)] = result_list
  2218. inst = _instance(row)
  2219. if inst is not None:
  2220. result_list.append(inst)
  2221. def load_collection_from_joined_exec(state, dict_, row):
  2222. _instance(row)
  2223. populators["new"].append(
  2224. (self.key, load_collection_from_joined_new_row)
  2225. )
  2226. populators["existing"].append(
  2227. (self.key, load_collection_from_joined_existing_row)
  2228. )
  2229. if context.invoke_all_eagers:
  2230. populators["eager"].append(
  2231. (self.key, load_collection_from_joined_exec)
  2232. )
  2233. def _create_scalar_loader(self, context, key, _instance, populators):
  2234. def load_scalar_from_joined_new_row(state, dict_, row):
  2235. # set a scalar object instance directly on the parent
  2236. # object, bypassing InstrumentedAttribute event handlers.
  2237. dict_[key] = _instance(row)
  2238. def load_scalar_from_joined_existing_row(state, dict_, row):
  2239. # call _instance on the row, even though the object has
  2240. # been created, so that we further descend into properties
  2241. existing = _instance(row)
  2242. # conflicting value already loaded, this shouldn't happen
  2243. if key in dict_:
  2244. if existing is not dict_[key]:
  2245. util.warn(
  2246. "Multiple rows returned with "
  2247. "uselist=False for eagerly-loaded attribute '%s' "
  2248. % self
  2249. )
  2250. else:
  2251. # this case is when one row has multiple loads of the
  2252. # same entity (e.g. via aliasing), one has an attribute
  2253. # that the other doesn't.
  2254. dict_[key] = existing
  2255. def load_scalar_from_joined_exec(state, dict_, row):
  2256. _instance(row)
  2257. populators["new"].append((self.key, load_scalar_from_joined_new_row))
  2258. populators["existing"].append(
  2259. (self.key, load_scalar_from_joined_existing_row)
  2260. )
  2261. if context.invoke_all_eagers:
  2262. populators["eager"].append(
  2263. (self.key, load_scalar_from_joined_exec)
  2264. )
  2265. @log.class_logger
  2266. @relationships.RelationshipProperty.strategy_for(lazy="selectin")
  2267. class SelectInLoader(PostLoader, util.MemoizedSlots):
  2268. __slots__ = (
  2269. "join_depth",
  2270. "omit_join",
  2271. "_parent_alias",
  2272. "_query_info",
  2273. "_fallback_query_info",
  2274. )
  2275. query_info = collections.namedtuple(
  2276. "queryinfo",
  2277. [
  2278. "load_only_child",
  2279. "load_with_join",
  2280. "in_expr",
  2281. "pk_cols",
  2282. "zero_idx",
  2283. "child_lookup_cols",
  2284. ],
  2285. )
  2286. _chunksize = 500
  2287. def __init__(self, parent, strategy_key):
  2288. super(SelectInLoader, self).__init__(parent, strategy_key)
  2289. self.join_depth = self.parent_property.join_depth
  2290. is_m2o = self.parent_property.direction is interfaces.MANYTOONE
  2291. if self.parent_property.omit_join is not None:
  2292. self.omit_join = self.parent_property.omit_join
  2293. else:
  2294. lazyloader = self.parent_property._get_strategy(
  2295. (("lazy", "select"),)
  2296. )
  2297. if is_m2o:
  2298. self.omit_join = lazyloader.use_get
  2299. else:
  2300. self.omit_join = self.parent._get_clause[0].compare(
  2301. lazyloader._rev_lazywhere,
  2302. use_proxies=True,
  2303. compare_keys=False,
  2304. equivalents=self.parent._equivalent_columns,
  2305. )
  2306. if self.omit_join:
  2307. if is_m2o:
  2308. self._query_info = self._init_for_omit_join_m2o()
  2309. self._fallback_query_info = self._init_for_join()
  2310. else:
  2311. self._query_info = self._init_for_omit_join()
  2312. else:
  2313. self._query_info = self._init_for_join()
  2314. def _init_for_omit_join(self):
  2315. pk_to_fk = dict(
  2316. self.parent_property._join_condition.local_remote_pairs
  2317. )
  2318. pk_to_fk.update(
  2319. (equiv, pk_to_fk[k])
  2320. for k in list(pk_to_fk)
  2321. for equiv in self.parent._equivalent_columns.get(k, ())
  2322. )
  2323. pk_cols = fk_cols = [
  2324. pk_to_fk[col] for col in self.parent.primary_key if col in pk_to_fk
  2325. ]
  2326. if len(fk_cols) > 1:
  2327. in_expr = sql.tuple_(*fk_cols)
  2328. zero_idx = False
  2329. else:
  2330. in_expr = fk_cols[0]
  2331. zero_idx = True
  2332. return self.query_info(False, False, in_expr, pk_cols, zero_idx, None)
  2333. def _init_for_omit_join_m2o(self):
  2334. pk_cols = self.mapper.primary_key
  2335. if len(pk_cols) > 1:
  2336. in_expr = sql.tuple_(*pk_cols)
  2337. zero_idx = False
  2338. else:
  2339. in_expr = pk_cols[0]
  2340. zero_idx = True
  2341. lazyloader = self.parent_property._get_strategy((("lazy", "select"),))
  2342. lookup_cols = [lazyloader._equated_columns[pk] for pk in pk_cols]
  2343. return self.query_info(
  2344. True, False, in_expr, pk_cols, zero_idx, lookup_cols
  2345. )
  2346. def _init_for_join(self):
  2347. self._parent_alias = aliased(self.parent.class_)
  2348. pa_insp = inspect(self._parent_alias)
  2349. pk_cols = [
  2350. pa_insp._adapt_element(col) for col in self.parent.primary_key
  2351. ]
  2352. if len(pk_cols) > 1:
  2353. in_expr = sql.tuple_(*pk_cols)
  2354. zero_idx = False
  2355. else:
  2356. in_expr = pk_cols[0]
  2357. zero_idx = True
  2358. return self.query_info(False, True, in_expr, pk_cols, zero_idx, None)
  2359. def init_class_attribute(self, mapper):
  2360. self.parent_property._get_strategy(
  2361. (("lazy", "select"),)
  2362. ).init_class_attribute(mapper)
  2363. def create_row_processor(
  2364. self,
  2365. context,
  2366. query_entity,
  2367. path,
  2368. loadopt,
  2369. mapper,
  2370. result,
  2371. adapter,
  2372. populators,
  2373. ):
  2374. if context.refresh_state:
  2375. return self._immediateload_create_row_processor(
  2376. context,
  2377. query_entity,
  2378. path,
  2379. loadopt,
  2380. mapper,
  2381. result,
  2382. adapter,
  2383. populators,
  2384. )
  2385. elif self._check_recursive_postload(context, path, self.join_depth):
  2386. return
  2387. if not self.parent.class_manager[self.key].impl.supports_population:
  2388. raise sa_exc.InvalidRequestError(
  2389. "'%s' does not support object "
  2390. "population - eager loading cannot be applied." % self
  2391. )
  2392. # a little dance here as the "path" is still something that only
  2393. # semi-tracks the exact series of things we are loading, still not
  2394. # telling us about with_polymorphic() and stuff like that when it's at
  2395. # the root.. the initial MapperEntity is more accurate for this case.
  2396. if len(path) == 1:
  2397. if not orm_util._entity_isa(query_entity.entity_zero, self.parent):
  2398. return
  2399. elif not orm_util._entity_isa(path[-1], self.parent):
  2400. return
  2401. selectin_path = (
  2402. context.compile_state.current_path or orm_util.PathRegistry.root
  2403. ) + path
  2404. path_w_prop = path[self.parent_property]
  2405. # build up a path indicating the path from the leftmost
  2406. # entity to the thing we're subquery loading.
  2407. with_poly_entity = path_w_prop.get(
  2408. context.attributes, "path_with_polymorphic", None
  2409. )
  2410. if with_poly_entity is not None:
  2411. effective_entity = inspect(with_poly_entity)
  2412. else:
  2413. effective_entity = self.entity
  2414. loading.PostLoad.callable_for_path(
  2415. context,
  2416. selectin_path,
  2417. self.parent,
  2418. self.parent_property,
  2419. self._load_for_path,
  2420. effective_entity,
  2421. loadopt,
  2422. )
  2423. def _load_for_path(
  2424. self, context, path, states, load_only, effective_entity, loadopt
  2425. ):
  2426. if load_only and self.key not in load_only:
  2427. return
  2428. query_info = self._query_info
  2429. if query_info.load_only_child:
  2430. our_states = collections.defaultdict(list)
  2431. none_states = []
  2432. mapper = self.parent
  2433. for state, overwrite in states:
  2434. state_dict = state.dict
  2435. related_ident = tuple(
  2436. mapper._get_state_attr_by_column(
  2437. state,
  2438. state_dict,
  2439. lk,
  2440. passive=attributes.PASSIVE_NO_FETCH,
  2441. )
  2442. for lk in query_info.child_lookup_cols
  2443. )
  2444. # if the loaded parent objects do not have the foreign key
  2445. # to the related item loaded, then degrade into the joined
  2446. # version of selectinload
  2447. if attributes.PASSIVE_NO_RESULT in related_ident:
  2448. query_info = self._fallback_query_info
  2449. break
  2450. # organize states into lists keyed to particular foreign
  2451. # key values.
  2452. if None not in related_ident:
  2453. our_states[related_ident].append(
  2454. (state, state_dict, overwrite)
  2455. )
  2456. else:
  2457. # For FK values that have None, add them to a
  2458. # separate collection that will be populated separately
  2459. none_states.append((state, state_dict, overwrite))
  2460. # note the above conditional may have changed query_info
  2461. if not query_info.load_only_child:
  2462. our_states = [
  2463. (state.key[1], state, state.dict, overwrite)
  2464. for state, overwrite in states
  2465. ]
  2466. pk_cols = query_info.pk_cols
  2467. in_expr = query_info.in_expr
  2468. if not query_info.load_with_join:
  2469. # in "omit join" mode, the primary key column and the
  2470. # "in" expression are in terms of the related entity. So
  2471. # if the related entity is polymorphic or otherwise aliased,
  2472. # we need to adapt our "pk_cols" and "in_expr" to that
  2473. # entity. in non-"omit join" mode, these are against the
  2474. # parent entity and do not need adaption.
  2475. if effective_entity.is_aliased_class:
  2476. pk_cols = [
  2477. effective_entity._adapt_element(col) for col in pk_cols
  2478. ]
  2479. in_expr = effective_entity._adapt_element(in_expr)
  2480. bundle_ent = orm_util.Bundle("pk", *pk_cols)
  2481. bundle_sql = bundle_ent.__clause_element__()
  2482. entity_sql = effective_entity.__clause_element__()
  2483. q = Select._create_raw_select(
  2484. _raw_columns=[bundle_sql, entity_sql],
  2485. _label_style=LABEL_STYLE_TABLENAME_PLUS_COL,
  2486. _compile_options=ORMCompileState.default_compile_options,
  2487. _propagate_attrs={
  2488. "compile_state_plugin": "orm",
  2489. "plugin_subject": effective_entity,
  2490. },
  2491. )
  2492. if not query_info.load_with_join:
  2493. # the Bundle we have in the "omit_join" case is against raw, non
  2494. # annotated columns, so to ensure the Query knows its primary
  2495. # entity, we add it explicitly. If we made the Bundle against
  2496. # annotated columns, we hit a performance issue in this specific
  2497. # case, which is detailed in issue #4347.
  2498. q = q.select_from(effective_entity)
  2499. else:
  2500. # in the non-omit_join case, the Bundle is against the annotated/
  2501. # mapped column of the parent entity, but the #4347 issue does not
  2502. # occur in this case.
  2503. q = q.select_from(self._parent_alias).join(
  2504. getattr(self._parent_alias, self.parent_property.key).of_type(
  2505. effective_entity
  2506. )
  2507. )
  2508. q = q.filter(in_expr.in_(sql.bindparam("primary_keys")))
  2509. # a test which exercises what these comments talk about is
  2510. # test_selectin_relations.py -> test_twolevel_selectin_w_polymorphic
  2511. #
  2512. # effective_entity above is given to us in terms of the cached
  2513. # statement, namely this one:
  2514. orig_query = context.compile_state.select_statement
  2515. # the actual statement that was requested is this one:
  2516. # context_query = context.query
  2517. #
  2518. # that's not the cached one, however. So while it is of the identical
  2519. # structure, if it has entities like AliasedInsp, which we get from
  2520. # aliased() or with_polymorphic(), the AliasedInsp will likely be a
  2521. # different object identity each time, and will not match up
  2522. # hashing-wise to the corresponding AliasedInsp that's in the
  2523. # cached query, meaning it won't match on paths and loader lookups
  2524. # and loaders like this one will be skipped if it is used in options.
  2525. #
  2526. # as it turns out, standard loader options like selectinload(),
  2527. # lazyload() that have a path need
  2528. # to come from the cached query so that the AliasedInsp etc. objects
  2529. # that are in the query line up with the object that's in the path
  2530. # of the strategy object. however other options like
  2531. # with_loader_criteria() that doesn't have a path (has a fixed entity)
  2532. # and needs to have access to the latest closure state in order to
  2533. # be correct, we need to use the uncached one.
  2534. #
  2535. # as of #8399 we let the loader option itself figure out what it
  2536. # wants to do given cached and uncached version of itself.
  2537. effective_path = path[self.parent_property]
  2538. if orig_query is context.query:
  2539. new_options = orig_query._with_options
  2540. else:
  2541. cached_options = orig_query._with_options
  2542. uncached_options = context.query._with_options
  2543. # propagate compile state options from the original query,
  2544. # updating their "extra_criteria" as necessary.
  2545. # note this will create a different cache key than
  2546. # "orig" options if extra_criteria is present, because the copy
  2547. # of extra_criteria will have different boundparam than that of
  2548. # the QueryableAttribute in the path
  2549. new_options = [
  2550. orig_opt._adapt_cached_option_to_uncached_option(
  2551. context, uncached_opt
  2552. )
  2553. for orig_opt, uncached_opt in zip(
  2554. cached_options, uncached_options
  2555. )
  2556. ]
  2557. if loadopt and loadopt._extra_criteria:
  2558. new_options += (
  2559. orm_util.LoaderCriteriaOption(
  2560. effective_entity,
  2561. loadopt._generate_extra_criteria(context),
  2562. ),
  2563. )
  2564. q = q.options(*new_options)
  2565. q = q._update_compile_options({"_current_path": effective_path})
  2566. if context.populate_existing:
  2567. q = q.execution_options(populate_existing=True)
  2568. if self.parent_property.order_by:
  2569. if not query_info.load_with_join:
  2570. eager_order_by = self.parent_property.order_by
  2571. if effective_entity.is_aliased_class:
  2572. eager_order_by = [
  2573. effective_entity._adapt_element(elem)
  2574. for elem in eager_order_by
  2575. ]
  2576. q = q.order_by(*eager_order_by)
  2577. else:
  2578. def _setup_outermost_orderby(compile_context):
  2579. compile_context.eager_order_by += tuple(
  2580. util.to_list(self.parent_property.order_by)
  2581. )
  2582. q = q._add_context_option(
  2583. _setup_outermost_orderby, self.parent_property
  2584. )
  2585. if query_info.load_only_child:
  2586. self._load_via_child(
  2587. our_states, none_states, query_info, q, context
  2588. )
  2589. else:
  2590. self._load_via_parent(our_states, query_info, q, context)
  2591. def _load_via_child(self, our_states, none_states, query_info, q, context):
  2592. uselist = self.uselist
  2593. # this sort is really for the benefit of the unit tests
  2594. our_keys = sorted(our_states)
  2595. while our_keys:
  2596. chunk = our_keys[0 : self._chunksize]
  2597. our_keys = our_keys[self._chunksize :]
  2598. data = {
  2599. k: v
  2600. for k, v in context.session.execute(
  2601. q,
  2602. params={
  2603. "primary_keys": [
  2604. key[0] if query_info.zero_idx else key
  2605. for key in chunk
  2606. ]
  2607. },
  2608. ).unique()
  2609. }
  2610. for key in chunk:
  2611. # for a real foreign key and no concurrent changes to the
  2612. # DB while running this method, "key" is always present in
  2613. # data. However, for primaryjoins without real foreign keys
  2614. # a non-None primaryjoin condition may still refer to no
  2615. # related object.
  2616. related_obj = data.get(key, None)
  2617. for state, dict_, overwrite in our_states[key]:
  2618. if not overwrite and self.key in dict_:
  2619. continue
  2620. state.get_impl(self.key).set_committed_value(
  2621. state,
  2622. dict_,
  2623. related_obj if not uselist else [related_obj],
  2624. )
  2625. # populate none states with empty value / collection
  2626. for state, dict_, overwrite in none_states:
  2627. if not overwrite and self.key in dict_:
  2628. continue
  2629. # note it's OK if this is a uselist=True attribute, the empty
  2630. # collection will be populated
  2631. state.get_impl(self.key).set_committed_value(state, dict_, None)
  2632. def _load_via_parent(self, our_states, query_info, q, context):
  2633. uselist = self.uselist
  2634. _empty_result = () if uselist else None
  2635. while our_states:
  2636. chunk = our_states[0 : self._chunksize]
  2637. our_states = our_states[self._chunksize :]
  2638. primary_keys = [
  2639. key[0] if query_info.zero_idx else key
  2640. for key, state, state_dict, overwrite in chunk
  2641. ]
  2642. data = collections.defaultdict(list)
  2643. for k, v in itertools.groupby(
  2644. context.session.execute(
  2645. q, params={"primary_keys": primary_keys}
  2646. ).unique(),
  2647. lambda x: x[0],
  2648. ):
  2649. data[k].extend(vv[1] for vv in v)
  2650. for key, state, state_dict, overwrite in chunk:
  2651. if not overwrite and self.key in state_dict:
  2652. continue
  2653. collection = data.get(key, _empty_result)
  2654. if not uselist and collection:
  2655. if len(collection) > 1:
  2656. util.warn(
  2657. "Multiple rows returned with "
  2658. "uselist=False for eagerly-loaded "
  2659. "attribute '%s' " % self
  2660. )
  2661. state.get_impl(self.key).set_committed_value(
  2662. state, state_dict, collection[0]
  2663. )
  2664. else:
  2665. # note that empty tuple set on uselist=False sets the
  2666. # value to None
  2667. state.get_impl(self.key).set_committed_value(
  2668. state, state_dict, collection
  2669. )
  2670. def single_parent_validator(desc, prop):
  2671. def _do_check(state, value, oldvalue, initiator):
  2672. if value is not None and initiator.key == prop.key:
  2673. hasparent = initiator.hasparent(attributes.instance_state(value))
  2674. if hasparent and oldvalue is not value:
  2675. raise sa_exc.InvalidRequestError(
  2676. "Instance %s is already associated with an instance "
  2677. "of %s via its %s attribute, and is only allowed a "
  2678. "single parent."
  2679. % (orm_util.instance_str(value), state.class_, prop),
  2680. code="bbf1",
  2681. )
  2682. return value
  2683. def append(state, value, initiator):
  2684. return _do_check(state, value, None, initiator)
  2685. def set_(state, value, oldvalue, initiator):
  2686. return _do_check(state, value, oldvalue, initiator)
  2687. event.listen(
  2688. desc, "append", append, raw=True, retval=True, active_history=True
  2689. )
  2690. event.listen(desc, "set", set_, raw=True, retval=True, active_history=True)