test_insert.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. # testing/suite/test_insert.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from .. import config
  8. from .. import engines
  9. from .. import fixtures
  10. from ..assertions import eq_
  11. from ..config import requirements
  12. from ..schema import Column
  13. from ..schema import Table
  14. from ... import Integer
  15. from ... import literal
  16. from ... import literal_column
  17. from ... import select
  18. from ... import String
  19. class LastrowidTest(fixtures.TablesTest):
  20. run_deletes = "each"
  21. __backend__ = True
  22. __requires__ = "implements_get_lastrowid", "autoincrement_insert"
  23. __engine_options__ = {"implicit_returning": False}
  24. @classmethod
  25. def define_tables(cls, metadata):
  26. Table(
  27. "autoinc_pk",
  28. metadata,
  29. Column(
  30. "id", Integer, primary_key=True, test_needs_autoincrement=True
  31. ),
  32. Column("data", String(50)),
  33. )
  34. Table(
  35. "manual_pk",
  36. metadata,
  37. Column("id", Integer, primary_key=True, autoincrement=False),
  38. Column("data", String(50)),
  39. )
  40. def _assert_round_trip(self, table, conn):
  41. row = conn.execute(table.select()).first()
  42. eq_(
  43. row,
  44. (
  45. conn.dialect.default_sequence_base,
  46. "some data",
  47. ),
  48. )
  49. def test_autoincrement_on_insert(self, connection):
  50. connection.execute(
  51. self.tables.autoinc_pk.insert(), dict(data="some data")
  52. )
  53. self._assert_round_trip(self.tables.autoinc_pk, connection)
  54. def test_last_inserted_id(self, connection):
  55. r = connection.execute(
  56. self.tables.autoinc_pk.insert(), dict(data="some data")
  57. )
  58. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  59. eq_(r.inserted_primary_key, (pk,))
  60. @requirements.dbapi_lastrowid
  61. def test_native_lastrowid_autoinc(self, connection):
  62. r = connection.execute(
  63. self.tables.autoinc_pk.insert(), dict(data="some data")
  64. )
  65. lastrowid = r.lastrowid
  66. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  67. eq_(lastrowid, pk)
  68. class InsertBehaviorTest(fixtures.TablesTest):
  69. run_deletes = "each"
  70. __backend__ = True
  71. @classmethod
  72. def define_tables(cls, metadata):
  73. Table(
  74. "autoinc_pk",
  75. metadata,
  76. Column(
  77. "id", Integer, primary_key=True, test_needs_autoincrement=True
  78. ),
  79. Column("data", String(50)),
  80. )
  81. Table(
  82. "manual_pk",
  83. metadata,
  84. Column("id", Integer, primary_key=True, autoincrement=False),
  85. Column("data", String(50)),
  86. )
  87. Table(
  88. "includes_defaults",
  89. metadata,
  90. Column(
  91. "id", Integer, primary_key=True, test_needs_autoincrement=True
  92. ),
  93. Column("data", String(50)),
  94. Column("x", Integer, default=5),
  95. Column(
  96. "y",
  97. Integer,
  98. default=literal_column("2", type_=Integer) + literal(2),
  99. ),
  100. )
  101. @requirements.autoincrement_insert
  102. def test_autoclose_on_insert(self):
  103. if requirements.returning.enabled:
  104. engine = engines.testing_engine(
  105. options={"implicit_returning": False}
  106. )
  107. else:
  108. engine = config.db
  109. with engine.begin() as conn:
  110. r = conn.execute(
  111. self.tables.autoinc_pk.insert(), dict(data="some data")
  112. )
  113. assert r._soft_closed
  114. assert not r.closed
  115. assert r.is_insert
  116. # new as of I8091919d45421e3f53029b8660427f844fee0228; for the moment
  117. # an insert where the PK was taken from a row that the dialect
  118. # selected, as is the case for mssql/pyodbc, will still report
  119. # returns_rows as true because there's a cursor description. in that
  120. # case, the row had to have been consumed at least.
  121. assert not r.returns_rows or r.fetchone() is None
  122. @requirements.returning
  123. def test_autoclose_on_insert_implicit_returning(self, connection):
  124. r = connection.execute(
  125. self.tables.autoinc_pk.insert(), dict(data="some data")
  126. )
  127. assert r._soft_closed
  128. assert not r.closed
  129. assert r.is_insert
  130. # note we are experimenting with having this be True
  131. # as of I8091919d45421e3f53029b8660427f844fee0228 .
  132. # implicit returning has fetched the row, but it still is a
  133. # "returns rows"
  134. assert r.returns_rows
  135. # and we should be able to fetchone() on it, we just get no row
  136. eq_(r.fetchone(), None)
  137. # and the keys, etc.
  138. eq_(r.keys(), ["id"])
  139. # but the dialect took in the row already. not really sure
  140. # what the best behavior is.
  141. @requirements.empty_inserts
  142. def test_empty_insert(self, connection):
  143. r = connection.execute(self.tables.autoinc_pk.insert())
  144. assert r._soft_closed
  145. assert not r.closed
  146. r = connection.execute(
  147. self.tables.autoinc_pk.select().where(
  148. self.tables.autoinc_pk.c.id != None
  149. )
  150. )
  151. eq_(len(r.all()), 1)
  152. @requirements.empty_inserts_executemany
  153. def test_empty_insert_multiple(self, connection):
  154. r = connection.execute(self.tables.autoinc_pk.insert(), [{}, {}, {}])
  155. assert r._soft_closed
  156. assert not r.closed
  157. r = connection.execute(
  158. self.tables.autoinc_pk.select().where(
  159. self.tables.autoinc_pk.c.id != None
  160. )
  161. )
  162. eq_(len(r.all()), 3)
  163. @requirements.insert_from_select
  164. def test_insert_from_select_autoinc(self, connection):
  165. src_table = self.tables.manual_pk
  166. dest_table = self.tables.autoinc_pk
  167. connection.execute(
  168. src_table.insert(),
  169. [
  170. dict(id=1, data="data1"),
  171. dict(id=2, data="data2"),
  172. dict(id=3, data="data3"),
  173. ],
  174. )
  175. result = connection.execute(
  176. dest_table.insert().from_select(
  177. ("data",),
  178. select(src_table.c.data).where(
  179. src_table.c.data.in_(["data2", "data3"])
  180. ),
  181. )
  182. )
  183. eq_(result.inserted_primary_key, (None,))
  184. result = connection.execute(
  185. select(dest_table.c.data).order_by(dest_table.c.data)
  186. )
  187. eq_(result.fetchall(), [("data2",), ("data3",)])
  188. @requirements.insert_from_select
  189. def test_insert_from_select_autoinc_no_rows(self, connection):
  190. src_table = self.tables.manual_pk
  191. dest_table = self.tables.autoinc_pk
  192. result = connection.execute(
  193. dest_table.insert().from_select(
  194. ("data",),
  195. select(src_table.c.data).where(
  196. src_table.c.data.in_(["data2", "data3"])
  197. ),
  198. )
  199. )
  200. eq_(result.inserted_primary_key, (None,))
  201. result = connection.execute(
  202. select(dest_table.c.data).order_by(dest_table.c.data)
  203. )
  204. eq_(result.fetchall(), [])
  205. @requirements.insert_from_select
  206. def test_insert_from_select(self, connection):
  207. table = self.tables.manual_pk
  208. connection.execute(
  209. table.insert(),
  210. [
  211. dict(id=1, data="data1"),
  212. dict(id=2, data="data2"),
  213. dict(id=3, data="data3"),
  214. ],
  215. )
  216. connection.execute(
  217. table.insert()
  218. .inline()
  219. .from_select(
  220. ("id", "data"),
  221. select(table.c.id + 5, table.c.data).where(
  222. table.c.data.in_(["data2", "data3"])
  223. ),
  224. )
  225. )
  226. eq_(
  227. connection.execute(
  228. select(table.c.data).order_by(table.c.data)
  229. ).fetchall(),
  230. [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
  231. )
  232. @requirements.insert_from_select
  233. def test_insert_from_select_with_defaults(self, connection):
  234. table = self.tables.includes_defaults
  235. connection.execute(
  236. table.insert(),
  237. [
  238. dict(id=1, data="data1"),
  239. dict(id=2, data="data2"),
  240. dict(id=3, data="data3"),
  241. ],
  242. )
  243. connection.execute(
  244. table.insert()
  245. .inline()
  246. .from_select(
  247. ("id", "data"),
  248. select(table.c.id + 5, table.c.data).where(
  249. table.c.data.in_(["data2", "data3"])
  250. ),
  251. )
  252. )
  253. eq_(
  254. connection.execute(
  255. select(table).order_by(table.c.data, table.c.id)
  256. ).fetchall(),
  257. [
  258. (1, "data1", 5, 4),
  259. (2, "data2", 5, 4),
  260. (7, "data2", 5, 4),
  261. (3, "data3", 5, 4),
  262. (8, "data3", 5, 4),
  263. ],
  264. )
  265. class ReturningTest(fixtures.TablesTest):
  266. run_create_tables = "each"
  267. __requires__ = "returning", "autoincrement_insert"
  268. __backend__ = True
  269. __engine_options__ = {"implicit_returning": True}
  270. def _assert_round_trip(self, table, conn):
  271. row = conn.execute(table.select()).first()
  272. eq_(
  273. row,
  274. (
  275. conn.dialect.default_sequence_base,
  276. "some data",
  277. ),
  278. )
  279. @classmethod
  280. def define_tables(cls, metadata):
  281. Table(
  282. "autoinc_pk",
  283. metadata,
  284. Column(
  285. "id", Integer, primary_key=True, test_needs_autoincrement=True
  286. ),
  287. Column("data", String(50)),
  288. )
  289. @requirements.fetch_rows_post_commit
  290. def test_explicit_returning_pk_autocommit(self, connection):
  291. table = self.tables.autoinc_pk
  292. r = connection.execute(
  293. table.insert().returning(table.c.id), dict(data="some data")
  294. )
  295. pk = r.first()[0]
  296. fetched_pk = connection.scalar(select(table.c.id))
  297. eq_(fetched_pk, pk)
  298. def test_explicit_returning_pk_no_autocommit(self, connection):
  299. table = self.tables.autoinc_pk
  300. r = connection.execute(
  301. table.insert().returning(table.c.id), dict(data="some data")
  302. )
  303. pk = r.first()[0]
  304. fetched_pk = connection.scalar(select(table.c.id))
  305. eq_(fetched_pk, pk)
  306. def test_autoincrement_on_insert_implicit_returning(self, connection):
  307. connection.execute(
  308. self.tables.autoinc_pk.insert(), dict(data="some data")
  309. )
  310. self._assert_round_trip(self.tables.autoinc_pk, connection)
  311. def test_last_inserted_id_implicit_returning(self, connection):
  312. r = connection.execute(
  313. self.tables.autoinc_pk.insert(), dict(data="some data")
  314. )
  315. pk = connection.scalar(select(self.tables.autoinc_pk.c.id))
  316. eq_(r.inserted_primary_key, (pk,))
  317. __all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")