test_cte.py 6.3 KB


  1. # testing/suite/test_cte.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from .. import fixtures
  8. from ..assertions import eq_
  9. from ..schema import Column
  10. from ..schema import Table
  11. from ... import ForeignKey
  12. from ... import Integer
  13. from ... import select
  14. from ... import String
  15. from ... import testing
  16. class CTETest(fixtures.TablesTest):
  17. __backend__ = True
  18. __requires__ = ("ctes",)
  19. run_inserts = "each"
  20. run_deletes = "each"
  21. @classmethod
  22. def define_tables(cls, metadata):
  23. Table(
  24. "some_table",
  25. metadata,
  26. Column("id", Integer, primary_key=True),
  27. Column("data", String(50)),
  28. Column("parent_id", ForeignKey("some_table.id")),
  29. )
  30. Table(
  31. "some_other_table",
  32. metadata,
  33. Column("id", Integer, primary_key=True),
  34. Column("data", String(50)),
  35. Column("parent_id", Integer),
  36. )
  37. @classmethod
  38. def insert_data(cls, connection):
  39. connection.execute(
  40. cls.tables.some_table.insert(),
  41. [
  42. {"id": 1, "data": "d1", "parent_id": None},
  43. {"id": 2, "data": "d2", "parent_id": 1},
  44. {"id": 3, "data": "d3", "parent_id": 1},
  45. {"id": 4, "data": "d4", "parent_id": 3},
  46. {"id": 5, "data": "d5", "parent_id": 3},
  47. ],
  48. )
  49. def test_select_nonrecursive_round_trip(self, connection):
  50. some_table = self.tables.some_table
  51. cte = (
  52. select(some_table)
  53. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  54. .cte("some_cte")
  55. )
  56. result = connection.execute(
  57. select(cte.c.data).where(cte.c.data.in_(["d4", "d5"]))
  58. )
  59. eq_(result.fetchall(), [("d4",)])
  60. def test_select_recursive_round_trip(self, connection):
  61. some_table = self.tables.some_table
  62. cte = (
  63. select(some_table)
  64. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  65. .cte("some_cte", recursive=True)
  66. )
  67. cte_alias = cte.alias("c1")
  68. st1 = some_table.alias()
  69. # note that SQL Server requires this to be UNION ALL,
  70. # can't be UNION
  71. cte = cte.union_all(
  72. select(st1).where(st1.c.id == cte_alias.c.parent_id)
  73. )
  74. result = connection.execute(
  75. select(cte.c.data)
  76. .where(cte.c.data != "d2")
  77. .order_by(cte.c.data.desc())
  78. )
  79. eq_(
  80. result.fetchall(),
  81. [("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)],
  82. )
  83. def test_insert_from_select_round_trip(self, connection):
  84. some_table = self.tables.some_table
  85. some_other_table = self.tables.some_other_table
  86. cte = (
  87. select(some_table)
  88. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  89. .cte("some_cte")
  90. )
  91. connection.execute(
  92. some_other_table.insert().from_select(
  93. ["id", "data", "parent_id"], select(cte)
  94. )
  95. )
  96. eq_(
  97. connection.execute(
  98. select(some_other_table).order_by(some_other_table.c.id)
  99. ).fetchall(),
  100. [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)],
  101. )
  102. @testing.requires.ctes_with_update_delete
  103. @testing.requires.update_from
  104. def test_update_from_round_trip(self, connection):
  105. some_table = self.tables.some_table
  106. some_other_table = self.tables.some_other_table
  107. connection.execute(
  108. some_other_table.insert().from_select(
  109. ["id", "data", "parent_id"], select(some_table)
  110. )
  111. )
  112. cte = (
  113. select(some_table)
  114. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  115. .cte("some_cte")
  116. )
  117. connection.execute(
  118. some_other_table.update()
  119. .values(parent_id=5)
  120. .where(some_other_table.c.data == cte.c.data)
  121. )
  122. eq_(
  123. connection.execute(
  124. select(some_other_table).order_by(some_other_table.c.id)
  125. ).fetchall(),
  126. [
  127. (1, "d1", None),
  128. (2, "d2", 5),
  129. (3, "d3", 5),
  130. (4, "d4", 5),
  131. (5, "d5", 3),
  132. ],
  133. )
  134. @testing.requires.ctes_with_update_delete
  135. @testing.requires.delete_from
  136. def test_delete_from_round_trip(self, connection):
  137. some_table = self.tables.some_table
  138. some_other_table = self.tables.some_other_table
  139. connection.execute(
  140. some_other_table.insert().from_select(
  141. ["id", "data", "parent_id"], select(some_table)
  142. )
  143. )
  144. cte = (
  145. select(some_table)
  146. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  147. .cte("some_cte")
  148. )
  149. connection.execute(
  150. some_other_table.delete().where(
  151. some_other_table.c.data == cte.c.data
  152. )
  153. )
  154. eq_(
  155. connection.execute(
  156. select(some_other_table).order_by(some_other_table.c.id)
  157. ).fetchall(),
  158. [(1, "d1", None), (5, "d5", 3)],
  159. )
  160. @testing.requires.ctes_with_update_delete
  161. def test_delete_scalar_subq_round_trip(self, connection):
  162. some_table = self.tables.some_table
  163. some_other_table = self.tables.some_other_table
  164. connection.execute(
  165. some_other_table.insert().from_select(
  166. ["id", "data", "parent_id"], select(some_table)
  167. )
  168. )
  169. cte = (
  170. select(some_table)
  171. .where(some_table.c.data.in_(["d2", "d3", "d4"]))
  172. .cte("some_cte")
  173. )
  174. connection.execute(
  175. some_other_table.delete().where(
  176. some_other_table.c.data
  177. == select(cte.c.data)
  178. .where(cte.c.id == some_other_table.c.id)
  179. .scalar_subquery()
  180. )
  181. )
  182. eq_(
  183. connection.execute(
  184. select(some_other_table).order_by(some_other_table.c.id)
  185. ).fetchall(),
  186. [(1, "d1", None), (5, "d5", 3)],
  187. )