test_reflection.py 60 KB


  1. # testing/suite/test_reflection.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. import operator
  8. import re
  9. import sqlalchemy as sa
  10. from .. import config
  11. from .. import engines
  12. from .. import eq_
  13. from .. import expect_warnings
  14. from .. import fixtures
  15. from .. import is_
  16. from ..provision import get_temp_table_name
  17. from ..provision import temp_table_keyword_args
  18. from ..schema import Column
  19. from ..schema import Table
  20. from ... import event
  21. from ... import ForeignKey
  22. from ... import func
  23. from ... import Identity
  24. from ... import inspect
  25. from ... import Integer
  26. from ... import MetaData
  27. from ... import String
  28. from ... import testing
  29. from ... import types as sql_types
  30. from ...schema import DDL
  31. from ...schema import Index
  32. from ...sql.elements import quoted_name
  33. from ...sql.schema import BLANK_SCHEMA
  34. from ...testing import is_false
  35. from ...testing import is_true
  36. metadata, users = None, None
  37. class OneConnectionTablesTest(fixtures.TablesTest):
  38. @classmethod
  39. def setup_bind(cls):
  40. # TODO: when temp tables are subject to server reset,
  41. # this will also have to disable that server reset from
  42. # happening
  43. if config.requirements.independent_connections.enabled:
  44. from sqlalchemy import pool
  45. return engines.testing_engine(
  46. options=dict(poolclass=pool.StaticPool, scope="class"),
  47. )
  48. else:
  49. return config.db
  50. class HasTableTest(OneConnectionTablesTest):
  51. __backend__ = True
  52. @classmethod
  53. def define_tables(cls, metadata):
  54. Table(
  55. "test_table",
  56. metadata,
  57. Column("id", Integer, primary_key=True),
  58. Column("data", String(50)),
  59. )
  60. if testing.requires.schemas.enabled:
  61. Table(
  62. "test_table_s",
  63. metadata,
  64. Column("id", Integer, primary_key=True),
  65. Column("data", String(50)),
  66. schema=config.test_schema,
  67. )
  68. if testing.requires.view_reflection:
  69. cls.define_views(metadata)
  70. if testing.requires.has_temp_table.enabled:
  71. cls.define_temp_tables(metadata)
  72. @classmethod
  73. def define_views(cls, metadata):
  74. query = "CREATE VIEW vv AS SELECT id, data FROM test_table"
  75. event.listen(metadata, "after_create", DDL(query))
  76. event.listen(metadata, "before_drop", DDL("DROP VIEW vv"))
  77. if testing.requires.schemas.enabled:
  78. query = (
  79. "CREATE VIEW %s.vv AS SELECT id, data FROM %s.test_table_s"
  80. % (
  81. config.test_schema,
  82. config.test_schema,
  83. )
  84. )
  85. event.listen(metadata, "after_create", DDL(query))
  86. event.listen(
  87. metadata,
  88. "before_drop",
  89. DDL("DROP VIEW %s.vv" % (config.test_schema)),
  90. )
  91. @classmethod
  92. def temp_table_name(cls):
  93. return get_temp_table_name(
  94. config, config.db, "user_tmp_%s" % (config.ident,)
  95. )
  96. @classmethod
  97. def define_temp_tables(cls, metadata):
  98. kw = temp_table_keyword_args(config, config.db)
  99. table_name = cls.temp_table_name()
  100. user_tmp = Table(
  101. table_name,
  102. metadata,
  103. Column("id", sa.INT, primary_key=True),
  104. Column("name", sa.VARCHAR(50)),
  105. **kw
  106. )
  107. if (
  108. testing.requires.view_reflection.enabled
  109. and testing.requires.temporary_views.enabled
  110. ):
  111. event.listen(
  112. user_tmp,
  113. "after_create",
  114. DDL(
  115. "create temporary view user_tmp_v as "
  116. "select * from user_tmp_%s" % config.ident
  117. ),
  118. )
  119. event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
  120. def test_has_table(self):
  121. with config.db.begin() as conn:
  122. is_true(config.db.dialect.has_table(conn, "test_table"))
  123. is_false(config.db.dialect.has_table(conn, "test_table_s"))
  124. is_false(config.db.dialect.has_table(conn, "nonexistent_table"))
  125. @testing.requires.schemas
  126. def test_has_table_schema(self):
  127. with config.db.begin() as conn:
  128. is_false(
  129. config.db.dialect.has_table(
  130. conn, "test_table", schema=config.test_schema
  131. )
  132. )
  133. is_true(
  134. config.db.dialect.has_table(
  135. conn, "test_table_s", schema=config.test_schema
  136. )
  137. )
  138. is_false(
  139. config.db.dialect.has_table(
  140. conn, "nonexistent_table", schema=config.test_schema
  141. )
  142. )
  143. @testing.fails_on(
  144. "oracle",
  145. "per #8700 this remains at its previous behavior of not "
  146. "working within 1.4.",
  147. )
  148. @testing.requires.views
  149. def test_has_table_view(self, connection):
  150. insp = inspect(connection)
  151. is_true(insp.has_table("vv"))
  152. @testing.requires.has_temp_table
  153. def test_has_table_temp_table(self, connection):
  154. insp = inspect(connection)
  155. temp_table_name = self.temp_table_name()
  156. is_true(insp.has_table(temp_table_name))
  157. @testing.requires.has_temp_table
  158. @testing.requires.view_reflection
  159. @testing.requires.temporary_views
  160. def test_has_table_temp_view(self, connection):
  161. insp = inspect(connection)
  162. is_true(insp.has_table("user_tmp_v"))
  163. @testing.fails_on(
  164. "oracle",
  165. "per #8700 this remains at its previous behavior of not "
  166. "working within 1.4",
  167. )
  168. @testing.requires.views
  169. @testing.requires.schemas
  170. def test_has_table_view_schema(self, connection):
  171. insp = inspect(connection)
  172. is_true(insp.has_table("vv", config.test_schema))
  173. class HasIndexTest(fixtures.TablesTest):
  174. __backend__ = True
  175. @classmethod
  176. def define_tables(cls, metadata):
  177. tt = Table(
  178. "test_table",
  179. metadata,
  180. Column("id", Integer, primary_key=True),
  181. Column("data", String(50)),
  182. )
  183. Index("my_idx", tt.c.data)
  184. if testing.requires.schemas.enabled:
  185. tt = Table(
  186. "test_table",
  187. metadata,
  188. Column("id", Integer, primary_key=True),
  189. Column("data", String(50)),
  190. schema=config.test_schema,
  191. )
  192. Index("my_idx_s", tt.c.data)
  193. def test_has_index(self):
  194. with config.db.begin() as conn:
  195. assert config.db.dialect.has_index(conn, "test_table", "my_idx")
  196. assert not config.db.dialect.has_index(
  197. conn, "test_table", "my_idx_s"
  198. )
  199. assert not config.db.dialect.has_index(
  200. conn, "nonexistent_table", "my_idx"
  201. )
  202. assert not config.db.dialect.has_index(
  203. conn, "test_table", "nonexistent_idx"
  204. )
  205. @testing.requires.schemas
  206. def test_has_index_schema(self):
  207. with config.db.begin() as conn:
  208. assert config.db.dialect.has_index(
  209. conn, "test_table", "my_idx_s", schema=config.test_schema
  210. )
  211. assert not config.db.dialect.has_index(
  212. conn, "test_table", "my_idx", schema=config.test_schema
  213. )
  214. assert not config.db.dialect.has_index(
  215. conn,
  216. "nonexistent_table",
  217. "my_idx_s",
  218. schema=config.test_schema,
  219. )
  220. assert not config.db.dialect.has_index(
  221. conn,
  222. "test_table",
  223. "nonexistent_idx_s",
  224. schema=config.test_schema,
  225. )
  226. class QuotedNameArgumentTest(fixtures.TablesTest):
  227. run_create_tables = "once"
  228. __backend__ = True
  229. @classmethod
  230. def define_tables(cls, metadata):
  231. Table(
  232. "quote ' one",
  233. metadata,
  234. Column("id", Integer),
  235. Column("name", String(50)),
  236. Column("data", String(50)),
  237. Column("related_id", Integer),
  238. sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
  239. sa.Index("ix quote ' one", "name"),
  240. sa.UniqueConstraint(
  241. "data",
  242. name="uq quote' one",
  243. ),
  244. sa.ForeignKeyConstraint(
  245. ["id"], ["related.id"], name="fk quote ' one"
  246. ),
  247. sa.CheckConstraint("name != 'foo'", name="ck quote ' one"),
  248. comment=r"""quote ' one comment""",
  249. test_needs_fk=True,
  250. )
  251. if testing.requires.symbol_names_w_double_quote.enabled:
  252. Table(
  253. 'quote " two',
  254. metadata,
  255. Column("id", Integer),
  256. Column("name", String(50)),
  257. Column("data", String(50)),
  258. Column("related_id", Integer),
  259. sa.PrimaryKeyConstraint("id", name='pk quote " two'),
  260. sa.Index('ix quote " two', "name"),
  261. sa.UniqueConstraint(
  262. "data",
  263. name='uq quote" two',
  264. ),
  265. sa.ForeignKeyConstraint(
  266. ["id"], ["related.id"], name='fk quote " two'
  267. ),
  268. sa.CheckConstraint("name != 'foo'", name='ck quote " two '),
  269. comment=r"""quote " two comment""",
  270. test_needs_fk=True,
  271. )
  272. Table(
  273. "related",
  274. metadata,
  275. Column("id", Integer, primary_key=True),
  276. Column("related", Integer),
  277. test_needs_fk=True,
  278. )
  279. if testing.requires.view_column_reflection.enabled:
  280. if testing.requires.symbol_names_w_double_quote.enabled:
  281. names = [
  282. "quote ' one",
  283. 'quote " two',
  284. ]
  285. else:
  286. names = [
  287. "quote ' one",
  288. ]
  289. for name in names:
  290. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  291. config.db.dialect.identifier_preparer.quote(
  292. "view %s" % name
  293. ),
  294. config.db.dialect.identifier_preparer.quote(name),
  295. )
  296. event.listen(metadata, "after_create", DDL(query))
  297. event.listen(
  298. metadata,
  299. "before_drop",
  300. DDL(
  301. "DROP VIEW %s"
  302. % config.db.dialect.identifier_preparer.quote(
  303. "view %s" % name
  304. )
  305. ),
  306. )
  307. def quote_fixtures(fn):
  308. return testing.combinations(
  309. ("quote ' one",),
  310. ('quote " two', testing.requires.symbol_names_w_double_quote),
  311. )(fn)
  312. @quote_fixtures
  313. def test_get_table_options(self, name):
  314. insp = inspect(config.db)
  315. insp.get_table_options(name)
  316. @quote_fixtures
  317. @testing.requires.view_column_reflection
  318. def test_get_view_definition(self, name):
  319. insp = inspect(config.db)
  320. assert insp.get_view_definition("view %s" % name)
  321. @quote_fixtures
  322. def test_get_columns(self, name):
  323. insp = inspect(config.db)
  324. assert insp.get_columns(name)
  325. @quote_fixtures
  326. def test_get_pk_constraint(self, name):
  327. insp = inspect(config.db)
  328. assert insp.get_pk_constraint(name)
  329. @quote_fixtures
  330. def test_get_foreign_keys(self, name):
  331. insp = inspect(config.db)
  332. assert insp.get_foreign_keys(name)
  333. @quote_fixtures
  334. def test_get_indexes(self, name):
  335. insp = inspect(config.db)
  336. assert insp.get_indexes(name)
  337. @quote_fixtures
  338. @testing.requires.unique_constraint_reflection
  339. def test_get_unique_constraints(self, name):
  340. insp = inspect(config.db)
  341. assert insp.get_unique_constraints(name)
  342. @quote_fixtures
  343. @testing.requires.comment_reflection
  344. def test_get_table_comment(self, name):
  345. insp = inspect(config.db)
  346. assert insp.get_table_comment(name)
  347. @quote_fixtures
  348. @testing.requires.check_constraint_reflection
  349. def test_get_check_constraints(self, name):
  350. insp = inspect(config.db)
  351. assert insp.get_check_constraints(name)
  352. class ComponentReflectionTest(OneConnectionTablesTest):
  353. run_inserts = run_deletes = None
  354. __backend__ = True
  355. @classmethod
  356. def define_tables(cls, metadata):
  357. cls.define_reflected_tables(metadata, None)
  358. if testing.requires.schemas.enabled:
  359. cls.define_reflected_tables(metadata, testing.config.test_schema)
  360. @classmethod
  361. def define_reflected_tables(cls, metadata, schema):
  362. if schema:
  363. schema_prefix = schema + "."
  364. else:
  365. schema_prefix = ""
  366. if testing.requires.self_referential_foreign_keys.enabled:
  367. users = Table(
  368. "users",
  369. metadata,
  370. Column("user_id", sa.INT, primary_key=True),
  371. Column("test1", sa.CHAR(5), nullable=False),
  372. Column("test2", sa.Float(5), nullable=False),
  373. Column(
  374. "parent_user_id",
  375. sa.Integer,
  376. sa.ForeignKey(
  377. "%susers.user_id" % schema_prefix, name="user_id_fk"
  378. ),
  379. ),
  380. schema=schema,
  381. test_needs_fk=True,
  382. )
  383. else:
  384. users = Table(
  385. "users",
  386. metadata,
  387. Column("user_id", sa.INT, primary_key=True),
  388. Column("test1", sa.CHAR(5), nullable=False),
  389. Column("test2", sa.Float(5), nullable=False),
  390. schema=schema,
  391. test_needs_fk=True,
  392. )
  393. Table(
  394. "dingalings",
  395. metadata,
  396. Column("dingaling_id", sa.Integer, primary_key=True),
  397. Column(
  398. "address_id",
  399. sa.Integer,
  400. sa.ForeignKey("%semail_addresses.address_id" % schema_prefix),
  401. ),
  402. Column("data", sa.String(30)),
  403. schema=schema,
  404. test_needs_fk=True,
  405. )
  406. Table(
  407. "email_addresses",
  408. metadata,
  409. Column("address_id", sa.Integer),
  410. Column(
  411. "remote_user_id", sa.Integer, sa.ForeignKey(users.c.user_id)
  412. ),
  413. Column("email_address", sa.String(20)),
  414. sa.PrimaryKeyConstraint("address_id", name="email_ad_pk"),
  415. schema=schema,
  416. test_needs_fk=True,
  417. )
  418. Table(
  419. "comment_test",
  420. metadata,
  421. Column("id", sa.Integer, primary_key=True, comment="id comment"),
  422. Column("data", sa.String(20), comment="data % comment"),
  423. Column(
  424. "d2",
  425. sa.String(20),
  426. comment=r"""Comment types type speedily ' " \ '' Fun!""",
  427. ),
  428. schema=schema,
  429. comment=r"""the test % ' " \ table comment""",
  430. )
  431. if testing.requires.cross_schema_fk_reflection.enabled:
  432. if schema is None:
  433. Table(
  434. "local_table",
  435. metadata,
  436. Column("id", sa.Integer, primary_key=True),
  437. Column("data", sa.String(20)),
  438. Column(
  439. "remote_id",
  440. ForeignKey(
  441. "%s.remote_table_2.id" % testing.config.test_schema
  442. ),
  443. ),
  444. test_needs_fk=True,
  445. schema=config.db.dialect.default_schema_name,
  446. )
  447. else:
  448. Table(
  449. "remote_table",
  450. metadata,
  451. Column("id", sa.Integer, primary_key=True),
  452. Column(
  453. "local_id",
  454. ForeignKey(
  455. "%s.local_table.id"
  456. % config.db.dialect.default_schema_name
  457. ),
  458. ),
  459. Column("data", sa.String(20)),
  460. schema=schema,
  461. test_needs_fk=True,
  462. )
  463. Table(
  464. "remote_table_2",
  465. metadata,
  466. Column("id", sa.Integer, primary_key=True),
  467. Column("data", sa.String(20)),
  468. schema=schema,
  469. test_needs_fk=True,
  470. )
  471. if testing.requires.index_reflection.enabled:
  472. cls.define_index(metadata, users)
  473. if not schema:
  474. # test_needs_fk is at the moment to force MySQL InnoDB
  475. noncol_idx_test_nopk = Table(
  476. "noncol_idx_test_nopk",
  477. metadata,
  478. Column("q", sa.String(5)),
  479. test_needs_fk=True,
  480. )
  481. noncol_idx_test_pk = Table(
  482. "noncol_idx_test_pk",
  483. metadata,
  484. Column("id", sa.Integer, primary_key=True),
  485. Column("q", sa.String(5)),
  486. test_needs_fk=True,
  487. )
  488. if testing.requires.indexes_with_ascdesc.enabled:
  489. Index("noncol_idx_nopk", noncol_idx_test_nopk.c.q.desc())
  490. Index("noncol_idx_pk", noncol_idx_test_pk.c.q.desc())
  491. if testing.requires.view_column_reflection.enabled:
  492. cls.define_views(metadata, schema)
  493. if not schema and testing.requires.temp_table_reflection.enabled:
  494. cls.define_temp_tables(metadata)
  495. @classmethod
  496. def define_temp_tables(cls, metadata):
  497. kw = temp_table_keyword_args(config, config.db)
  498. table_name = get_temp_table_name(
  499. config, config.db, "user_tmp_%s" % config.ident
  500. )
  501. user_tmp = Table(
  502. table_name,
  503. metadata,
  504. Column("id", sa.INT, primary_key=True),
  505. Column("name", sa.VARCHAR(50)),
  506. Column("foo", sa.INT),
  507. # disambiguate temp table unique constraint names. this is
  508. # pretty arbitrary for a generic dialect however we are doing
  509. # it to suit SQL Server which will produce name conflicts for
  510. # unique constraints created against temp tables in different
  511. # databases.
  512. # https://www.arbinada.com/en/node/1645
  513. sa.UniqueConstraint("name", name="user_tmp_uq_%s" % config.ident),
  514. sa.Index("user_tmp_ix", "foo"),
  515. **kw
  516. )
  517. if (
  518. testing.requires.view_reflection.enabled
  519. and testing.requires.temporary_views.enabled
  520. ):
  521. event.listen(
  522. user_tmp,
  523. "after_create",
  524. DDL(
  525. "create temporary view user_tmp_v as "
  526. "select * from user_tmp_%s" % config.ident
  527. ),
  528. )
  529. event.listen(user_tmp, "before_drop", DDL("drop view user_tmp_v"))
  530. @classmethod
  531. def define_index(cls, metadata, users):
  532. Index("users_t_idx", users.c.test1, users.c.test2)
  533. Index("users_all_idx", users.c.user_id, users.c.test2, users.c.test1)
  534. @classmethod
  535. def define_views(cls, metadata, schema):
  536. for table_name in ("users", "email_addresses"):
  537. fullname = table_name
  538. if schema:
  539. fullname = "%s.%s" % (schema, table_name)
  540. view_name = fullname + "_v"
  541. query = "CREATE VIEW %s AS SELECT * FROM %s" % (
  542. view_name,
  543. fullname,
  544. )
  545. event.listen(metadata, "after_create", DDL(query))
  546. event.listen(
  547. metadata, "before_drop", DDL("DROP VIEW %s" % view_name)
  548. )
  549. @testing.requires.schema_reflection
  550. def test_get_schema_names(self):
  551. insp = inspect(self.bind)
  552. self.assert_(testing.config.test_schema in insp.get_schema_names())
  553. @testing.requires.schema_reflection
  554. def test_get_schema_names_w_translate_map(self, connection):
  555. """test #7300"""
  556. connection = connection.execution_options(
  557. schema_translate_map={
  558. "foo": "bar",
  559. BLANK_SCHEMA: testing.config.test_schema,
  560. }
  561. )
  562. insp = inspect(connection)
  563. self.assert_(testing.config.test_schema in insp.get_schema_names())
  564. @testing.requires.schema_reflection
  565. def test_dialect_initialize(self):
  566. engine = engines.testing_engine()
  567. inspect(engine)
  568. assert hasattr(engine.dialect, "default_schema_name")
  569. @testing.requires.schema_reflection
  570. def test_get_default_schema_name(self):
  571. insp = inspect(self.bind)
  572. eq_(insp.default_schema_name, self.bind.dialect.default_schema_name)
  573. @testing.requires.foreign_key_constraint_reflection
  574. @testing.combinations(
  575. (None, True, False, False),
  576. (None, True, False, True, testing.requires.schemas),
  577. ("foreign_key", True, False, False),
  578. (None, False, True, False),
  579. (None, False, True, True, testing.requires.schemas),
  580. (None, True, True, False),
  581. (None, True, True, True, testing.requires.schemas),
  582. argnames="order_by,include_plain,include_views,use_schema",
  583. )
  584. def test_get_table_names(
  585. self, connection, order_by, include_plain, include_views, use_schema
  586. ):
  587. if use_schema:
  588. schema = config.test_schema
  589. else:
  590. schema = None
  591. _ignore_tables = [
  592. "comment_test",
  593. "noncol_idx_test_pk",
  594. "noncol_idx_test_nopk",
  595. "local_table",
  596. "remote_table",
  597. "remote_table_2",
  598. ]
  599. insp = inspect(connection)
  600. if include_views:
  601. table_names = insp.get_view_names(schema)
  602. table_names.sort()
  603. answer = ["email_addresses_v", "users_v"]
  604. eq_(sorted(table_names), answer)
  605. if include_plain:
  606. if order_by:
  607. tables = [
  608. rec[0]
  609. for rec in insp.get_sorted_table_and_fkc_names(schema)
  610. if rec[0]
  611. ]
  612. else:
  613. tables = insp.get_table_names(schema)
  614. table_names = [t for t in tables if t not in _ignore_tables]
  615. if order_by == "foreign_key":
  616. answer = ["users", "email_addresses", "dingalings"]
  617. eq_(table_names, answer)
  618. else:
  619. answer = ["dingalings", "email_addresses", "users"]
  620. eq_(sorted(table_names), answer)
  621. @testing.requires.temp_table_names
  622. def test_get_temp_table_names(self):
  623. insp = inspect(self.bind)
  624. temp_table_names = insp.get_temp_table_names()
  625. eq_(sorted(temp_table_names), ["user_tmp_%s" % config.ident])
  626. @testing.requires.view_reflection
  627. @testing.requires.temp_table_names
  628. @testing.requires.temporary_views
  629. def test_get_temp_view_names(self):
  630. insp = inspect(self.bind)
  631. temp_table_names = insp.get_temp_view_names()
  632. eq_(sorted(temp_table_names), ["user_tmp_v"])
  633. @testing.requires.comment_reflection
  634. def test_get_comments(self):
  635. self._test_get_comments()
  636. @testing.requires.comment_reflection
  637. @testing.requires.schemas
  638. def test_get_comments_with_schema(self):
  639. self._test_get_comments(testing.config.test_schema)
  640. def _test_get_comments(self, schema=None):
  641. insp = inspect(self.bind)
  642. eq_(
  643. insp.get_table_comment("comment_test", schema=schema),
  644. {"text": r"""the test % ' " \ table comment"""},
  645. )
  646. eq_(insp.get_table_comment("users", schema=schema), {"text": None})
  647. eq_(
  648. [
  649. {"name": rec["name"], "comment": rec["comment"]}
  650. for rec in insp.get_columns("comment_test", schema=schema)
  651. ],
  652. [
  653. {"comment": "id comment", "name": "id"},
  654. {"comment": "data % comment", "name": "data"},
  655. {
  656. "comment": (
  657. r"""Comment types type speedily ' " \ '' Fun!"""
  658. ),
  659. "name": "d2",
  660. },
  661. ],
  662. )
  663. @testing.combinations(
  664. (False, False),
  665. (False, True, testing.requires.schemas),
  666. (True, False, testing.requires.view_reflection),
  667. (
  668. True,
  669. True,
  670. testing.requires.schemas + testing.requires.view_reflection,
  671. ),
  672. argnames="use_views,use_schema",
  673. )
  674. def test_get_columns(self, connection, use_views, use_schema):
  675. if use_schema:
  676. schema = config.test_schema
  677. else:
  678. schema = None
  679. users, addresses = (self.tables.users, self.tables.email_addresses)
  680. if use_views:
  681. table_names = ["users_v", "email_addresses_v"]
  682. else:
  683. table_names = ["users", "email_addresses"]
  684. insp = inspect(connection)
  685. for table_name, table in zip(table_names, (users, addresses)):
  686. schema_name = schema
  687. cols = insp.get_columns(table_name, schema=schema_name)
  688. self.assert_(len(cols) > 0, len(cols))
  689. # should be in order
  690. for i, col in enumerate(table.columns):
  691. eq_(col.name, cols[i]["name"])
  692. ctype = cols[i]["type"].__class__
  693. ctype_def = col.type
  694. if isinstance(ctype_def, sa.types.TypeEngine):
  695. ctype_def = ctype_def.__class__
  696. # Oracle returns Date for DateTime.
  697. if testing.against("oracle") and ctype_def in (
  698. sql_types.Date,
  699. sql_types.DateTime,
  700. ):
  701. ctype_def = sql_types.Date
  702. # assert that the desired type and return type share
  703. # a base within one of the generic types.
  704. self.assert_(
  705. len(
  706. set(ctype.__mro__)
  707. .intersection(ctype_def.__mro__)
  708. .intersection(
  709. [
  710. sql_types.Integer,
  711. sql_types.Numeric,
  712. sql_types.DateTime,
  713. sql_types.Date,
  714. sql_types.Time,
  715. sql_types.String,
  716. sql_types._Binary,
  717. ]
  718. )
  719. )
  720. > 0,
  721. "%s(%s), %s(%s)"
  722. % (col.name, col.type, cols[i]["name"], ctype),
  723. )
  724. if not col.primary_key:
  725. assert cols[i]["default"] is None
  726. @testing.requires.temp_table_reflection
  727. def test_get_temp_table_columns(self):
  728. table_name = get_temp_table_name(
  729. config, self.bind, "user_tmp_%s" % config.ident
  730. )
  731. user_tmp = self.tables[table_name]
  732. insp = inspect(self.bind)
  733. cols = insp.get_columns(table_name)
  734. self.assert_(len(cols) > 0, len(cols))
  735. for i, col in enumerate(user_tmp.columns):
  736. eq_(col.name, cols[i]["name"])
  737. @testing.requires.temp_table_reflection
  738. @testing.requires.view_column_reflection
  739. @testing.requires.temporary_views
  740. def test_get_temp_view_columns(self):
  741. insp = inspect(self.bind)
  742. cols = insp.get_columns("user_tmp_v")
  743. eq_([col["name"] for col in cols], ["id", "name", "foo"])
  744. @testing.combinations(
  745. (False,), (True, testing.requires.schemas), argnames="use_schema"
  746. )
  747. @testing.requires.primary_key_constraint_reflection
  748. def test_get_pk_constraint(self, connection, use_schema):
  749. if use_schema:
  750. schema = testing.config.test_schema
  751. else:
  752. schema = None
  753. users, addresses = self.tables.users, self.tables.email_addresses
  754. insp = inspect(connection)
  755. users_cons = insp.get_pk_constraint(users.name, schema=schema)
  756. users_pkeys = users_cons["constrained_columns"]
  757. eq_(users_pkeys, ["user_id"])
  758. addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
  759. addr_pkeys = addr_cons["constrained_columns"]
  760. eq_(addr_pkeys, ["address_id"])
  761. with testing.requires.reflects_pk_names.fail_if():
  762. eq_(addr_cons["name"], "email_ad_pk")
  763. @testing.combinations(
  764. (False,), (True, testing.requires.schemas), argnames="use_schema"
  765. )
  766. @testing.requires.foreign_key_constraint_reflection
  767. def test_get_foreign_keys(self, connection, use_schema):
  768. if use_schema:
  769. schema = config.test_schema
  770. else:
  771. schema = None
  772. users, addresses = (self.tables.users, self.tables.email_addresses)
  773. insp = inspect(connection)
  774. expected_schema = schema
  775. # users
  776. if testing.requires.self_referential_foreign_keys.enabled:
  777. users_fkeys = insp.get_foreign_keys(users.name, schema=schema)
  778. fkey1 = users_fkeys[0]
  779. with testing.requires.named_constraints.fail_if():
  780. eq_(fkey1["name"], "user_id_fk")
  781. eq_(fkey1["referred_schema"], expected_schema)
  782. eq_(fkey1["referred_table"], users.name)
  783. eq_(fkey1["referred_columns"], ["user_id"])
  784. if testing.requires.self_referential_foreign_keys.enabled:
  785. eq_(fkey1["constrained_columns"], ["parent_user_id"])
  786. # addresses
  787. addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema)
  788. fkey1 = addr_fkeys[0]
  789. with testing.requires.implicitly_named_constraints.fail_if():
  790. self.assert_(fkey1["name"] is not None)
  791. eq_(fkey1["referred_schema"], expected_schema)
  792. eq_(fkey1["referred_table"], users.name)
  793. eq_(fkey1["referred_columns"], ["user_id"])
  794. eq_(fkey1["constrained_columns"], ["remote_user_id"])
  795. @testing.requires.cross_schema_fk_reflection
  796. @testing.requires.schemas
  797. def test_get_inter_schema_foreign_keys(self):
  798. local_table, remote_table, remote_table_2 = self.tables(
  799. "%s.local_table" % self.bind.dialect.default_schema_name,
  800. "%s.remote_table" % testing.config.test_schema,
  801. "%s.remote_table_2" % testing.config.test_schema,
  802. )
  803. insp = inspect(self.bind)
  804. local_fkeys = insp.get_foreign_keys(local_table.name)
  805. eq_(len(local_fkeys), 1)
  806. fkey1 = local_fkeys[0]
  807. eq_(fkey1["referred_schema"], testing.config.test_schema)
  808. eq_(fkey1["referred_table"], remote_table_2.name)
  809. eq_(fkey1["referred_columns"], ["id"])
  810. eq_(fkey1["constrained_columns"], ["remote_id"])
  811. remote_fkeys = insp.get_foreign_keys(
  812. remote_table.name, schema=testing.config.test_schema
  813. )
  814. eq_(len(remote_fkeys), 1)
  815. fkey2 = remote_fkeys[0]
  816. assert fkey2["referred_schema"] in (
  817. None,
  818. self.bind.dialect.default_schema_name,
  819. )
  820. eq_(fkey2["referred_table"], local_table.name)
  821. eq_(fkey2["referred_columns"], ["id"])
  822. eq_(fkey2["constrained_columns"], ["local_id"])
  823. def _assert_insp_indexes(self, indexes, expected_indexes):
  824. index_names = [d["name"] for d in indexes]
  825. for e_index in expected_indexes:
  826. assert e_index["name"] in index_names
  827. index = indexes[index_names.index(e_index["name"])]
  828. for key in e_index:
  829. eq_(e_index[key], index[key])
  830. @testing.combinations(
  831. (False,), (True, testing.requires.schemas), argnames="use_schema"
  832. )
  833. def test_get_indexes(self, connection, use_schema):
  834. if use_schema:
  835. schema = config.test_schema
  836. else:
  837. schema = None
  838. # The database may decide to create indexes for foreign keys, etc.
  839. # so there may be more indexes than expected.
  840. insp = inspect(self.bind)
  841. indexes = insp.get_indexes("users", schema=schema)
  842. expected_indexes = [
  843. {
  844. "unique": False,
  845. "column_names": ["test1", "test2"],
  846. "name": "users_t_idx",
  847. },
  848. {
  849. "unique": False,
  850. "column_names": ["user_id", "test2", "test1"],
  851. "name": "users_all_idx",
  852. },
  853. ]
  854. self._assert_insp_indexes(indexes, expected_indexes)
  855. @testing.combinations(
  856. ("noncol_idx_test_nopk", "noncol_idx_nopk"),
  857. ("noncol_idx_test_pk", "noncol_idx_pk"),
  858. argnames="tname,ixname",
  859. )
  860. @testing.requires.index_reflection
  861. @testing.requires.indexes_with_ascdesc
  862. def test_get_noncol_index(self, connection, tname, ixname):
  863. insp = inspect(connection)
  864. indexes = insp.get_indexes(tname)
  865. # reflecting an index that has "x DESC" in it as the column.
  866. # the DB may or may not give us "x", but make sure we get the index
  867. # back, it has a name, it's connected to the table.
  868. expected_indexes = [{"unique": False, "name": ixname}]
  869. self._assert_insp_indexes(indexes, expected_indexes)
  870. t = Table(tname, MetaData(), autoload_with=connection)
  871. eq_(len(t.indexes), 1)
  872. is_(list(t.indexes)[0].table, t)
  873. eq_(list(t.indexes)[0].name, ixname)
  874. @testing.requires.temp_table_reflection
  875. @testing.requires.unique_constraint_reflection
  876. def test_get_temp_table_unique_constraints(self):
  877. insp = inspect(self.bind)
  878. reflected = insp.get_unique_constraints("user_tmp_%s" % config.ident)
  879. for refl in reflected:
  880. # Different dialects handle duplicate index and constraints
  881. # differently, so ignore this flag
  882. refl.pop("duplicates_index", None)
  883. eq_(
  884. reflected,
  885. [
  886. {
  887. "column_names": ["name"],
  888. "name": "user_tmp_uq_%s" % config.ident,
  889. }
  890. ],
  891. )
  892. @testing.requires.temp_table_reflect_indexes
  893. def test_get_temp_table_indexes(self):
  894. insp = inspect(self.bind)
  895. table_name = get_temp_table_name(
  896. config, config.db, "user_tmp_%s" % config.ident
  897. )
  898. indexes = insp.get_indexes(table_name)
  899. for ind in indexes:
  900. ind.pop("dialect_options", None)
  901. expected = [
  902. {"unique": False, "column_names": ["foo"], "name": "user_tmp_ix"}
  903. ]
  904. if testing.requires.index_reflects_included_columns.enabled:
  905. expected[0]["include_columns"] = []
  906. eq_(
  907. [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
  908. expected,
  909. )
  910. @testing.combinations(
  911. (True, testing.requires.schemas), (False,), argnames="use_schema"
  912. )
  913. @testing.requires.unique_constraint_reflection
  914. def test_get_unique_constraints(self, metadata, connection, use_schema):
  915. # SQLite dialect needs to parse the names of the constraints
  916. # separately from what it gets from PRAGMA index_list(), and
  917. # then matches them up. so same set of column_names in two
  918. # constraints will confuse it. Perhaps we should no longer
  919. # bother with index_list() here since we have the whole
  920. # CREATE TABLE?
  921. if use_schema:
  922. schema = config.test_schema
  923. else:
  924. schema = None
  925. uniques = sorted(
  926. [
  927. {"name": "unique_a", "column_names": ["a"]},
  928. {"name": "unique_a_b_c", "column_names": ["a", "b", "c"]},
  929. {"name": "unique_c_a_b", "column_names": ["c", "a", "b"]},
  930. {"name": "unique_asc_key", "column_names": ["asc", "key"]},
  931. {"name": "i.have.dots", "column_names": ["b"]},
  932. {"name": "i have spaces", "column_names": ["c"]},
  933. ],
  934. key=operator.itemgetter("name"),
  935. )
  936. table = Table(
  937. "testtbl",
  938. metadata,
  939. Column("a", sa.String(20)),
  940. Column("b", sa.String(30)),
  941. Column("c", sa.Integer),
  942. # reserved identifiers
  943. Column("asc", sa.String(30)),
  944. Column("key", sa.String(30)),
  945. schema=schema,
  946. )
  947. for uc in uniques:
  948. table.append_constraint(
  949. sa.UniqueConstraint(*uc["column_names"], name=uc["name"])
  950. )
  951. table.create(connection)
  952. inspector = inspect(connection)
  953. reflected = sorted(
  954. inspector.get_unique_constraints("testtbl", schema=schema),
  955. key=operator.itemgetter("name"),
  956. )
  957. names_that_duplicate_index = set()
  958. eq_(len(uniques), len(reflected))
  959. for orig, refl in zip(uniques, reflected):
  960. # Different dialects handle duplicate index and constraints
  961. # differently, so ignore this flag
  962. dupe = refl.pop("duplicates_index", None)
  963. if dupe:
  964. names_that_duplicate_index.add(dupe)
  965. eq_(orig, refl)
  966. reflected_metadata = MetaData()
  967. reflected = Table(
  968. "testtbl",
  969. reflected_metadata,
  970. autoload_with=connection,
  971. schema=schema,
  972. )
  973. # test "deduplicates for index" logic. MySQL and Oracle
  974. # "unique constraints" are actually unique indexes (with possible
  975. # exception of a unique that is a dupe of another one in the case
  976. # of Oracle). make sure # they aren't duplicated.
  977. idx_names = set([idx.name for idx in reflected.indexes])
  978. uq_names = set(
  979. [
  980. uq.name
  981. for uq in reflected.constraints
  982. if isinstance(uq, sa.UniqueConstraint)
  983. ]
  984. ).difference(["unique_c_a_b"])
  985. assert not idx_names.intersection(uq_names)
  986. if names_that_duplicate_index:
  987. eq_(names_that_duplicate_index, idx_names)
  988. eq_(uq_names, set())
  989. @testing.requires.view_reflection
  990. @testing.combinations(
  991. (False,), (True, testing.requires.schemas), argnames="use_schema"
  992. )
  993. def test_get_view_definition(self, connection, use_schema):
  994. if use_schema:
  995. schema = config.test_schema
  996. else:
  997. schema = None
  998. view_name1 = "users_v"
  999. view_name2 = "email_addresses_v"
  1000. insp = inspect(connection)
  1001. v1 = insp.get_view_definition(view_name1, schema=schema)
  1002. self.assert_(v1)
  1003. v2 = insp.get_view_definition(view_name2, schema=schema)
  1004. self.assert_(v2)
  1005. # why is this here if it's PG specific ?
  1006. @testing.combinations(
  1007. ("users", False),
  1008. ("users", True, testing.requires.schemas),
  1009. argnames="table_name,use_schema",
  1010. )
  1011. @testing.only_on("postgresql", "PG specific feature")
  1012. def test_get_table_oid(self, connection, table_name, use_schema):
  1013. if use_schema:
  1014. schema = config.test_schema
  1015. else:
  1016. schema = None
  1017. insp = inspect(connection)
  1018. oid = insp.get_table_oid(table_name, schema)
  1019. self.assert_(isinstance(oid, int))
  1020. @testing.requires.table_reflection
  1021. def test_autoincrement_col(self):
  1022. """test that 'autoincrement' is reflected according to sqla's policy.
  1023. Don't mark this test as unsupported for any backend !
  1024. (technically it fails with MySQL InnoDB since "id" comes before "id2")
  1025. A backend is better off not returning "autoincrement" at all,
  1026. instead of potentially returning "False" for an auto-incrementing
  1027. primary key column.
  1028. """
  1029. insp = inspect(self.bind)
  1030. for tname, cname in [
  1031. ("users", "user_id"),
  1032. ("email_addresses", "address_id"),
  1033. ("dingalings", "dingaling_id"),
  1034. ]:
  1035. cols = insp.get_columns(tname)
  1036. id_ = {c["name"]: c for c in cols}[cname]
  1037. assert id_.get("autoincrement", True)
  1038. class TableNoColumnsTest(fixtures.TestBase):
  1039. __requires__ = ("reflect_tables_no_columns",)
  1040. __backend__ = True
  1041. @testing.fixture
  1042. def table_no_columns(self, connection, metadata):
  1043. Table("empty", metadata)
  1044. metadata.create_all(connection)
  1045. @testing.fixture
  1046. def view_no_columns(self, connection, metadata):
  1047. Table("empty", metadata)
  1048. metadata.create_all(connection)
  1049. Table("empty", metadata)
  1050. event.listen(
  1051. metadata,
  1052. "after_create",
  1053. DDL("CREATE VIEW empty_v AS SELECT * FROM empty"),
  1054. )
  1055. # for transactional DDL the transaction is rolled back before this
  1056. # drop statement is invoked
  1057. event.listen(
  1058. metadata, "before_drop", DDL("DROP VIEW IF EXISTS empty_v")
  1059. )
  1060. metadata.create_all(connection)
  1061. @testing.requires.reflect_tables_no_columns
  1062. def test_reflect_table_no_columns(self, connection, table_no_columns):
  1063. t2 = Table("empty", MetaData(), autoload_with=connection)
  1064. eq_(list(t2.c), [])
  1065. @testing.requires.reflect_tables_no_columns
  1066. def test_get_columns_table_no_columns(self, connection, table_no_columns):
  1067. eq_(inspect(connection).get_columns("empty"), [])
  1068. @testing.requires.reflect_tables_no_columns
  1069. def test_reflect_incl_table_no_columns(self, connection, table_no_columns):
  1070. m = MetaData()
  1071. m.reflect(connection)
  1072. assert set(m.tables).intersection(["empty"])
  1073. @testing.requires.views
  1074. @testing.requires.reflect_tables_no_columns
  1075. def test_reflect_view_no_columns(self, connection, view_no_columns):
  1076. t2 = Table("empty_v", MetaData(), autoload_with=connection)
  1077. eq_(list(t2.c), [])
  1078. @testing.requires.views
  1079. @testing.requires.reflect_tables_no_columns
  1080. def test_get_columns_view_no_columns(self, connection, view_no_columns):
  1081. eq_(inspect(connection).get_columns("empty_v"), [])
  1082. class ComponentReflectionTestExtra(fixtures.TestBase):
  1083. __backend__ = True
  1084. @testing.combinations(
  1085. (True, testing.requires.schemas), (False,), argnames="use_schema"
  1086. )
  1087. @testing.requires.check_constraint_reflection
  1088. def test_get_check_constraints(self, metadata, connection, use_schema):
  1089. if use_schema:
  1090. schema = config.test_schema
  1091. else:
  1092. schema = None
  1093. Table(
  1094. "sa_cc",
  1095. metadata,
  1096. Column("a", Integer()),
  1097. sa.CheckConstraint("a > 1 AND a < 5", name="cc1"),
  1098. sa.CheckConstraint(
  1099. "a = 1 OR (a > 2 AND a < 5)", name="UsesCasing"
  1100. ),
  1101. schema=schema,
  1102. )
  1103. metadata.create_all(connection)
  1104. inspector = inspect(connection)
  1105. reflected = sorted(
  1106. inspector.get_check_constraints("sa_cc", schema=schema),
  1107. key=operator.itemgetter("name"),
  1108. )
  1109. # trying to minimize effect of quoting, parenthesis, etc.
  1110. # may need to add more to this as new dialects get CHECK
  1111. # constraint reflection support
  1112. def normalize(sqltext):
  1113. return " ".join(
  1114. re.findall(r"and|\d|=|a|or|<|>", sqltext.lower(), re.I)
  1115. )
  1116. reflected = [
  1117. {"name": item["name"], "sqltext": normalize(item["sqltext"])}
  1118. for item in reflected
  1119. ]
  1120. eq_(
  1121. reflected,
  1122. [
  1123. {"name": "UsesCasing", "sqltext": "a = 1 or a > 2 and a < 5"},
  1124. {"name": "cc1", "sqltext": "a > 1 and a < 5"},
  1125. ],
  1126. )
  1127. @testing.requires.indexes_with_expressions
  1128. def test_reflect_expression_based_indexes(self, metadata, connection):
  1129. t = Table(
  1130. "t",
  1131. metadata,
  1132. Column("x", String(30)),
  1133. Column("y", String(30)),
  1134. )
  1135. Index("t_idx", func.lower(t.c.x), func.lower(t.c.y))
  1136. Index("t_idx_2", t.c.x)
  1137. metadata.create_all(connection)
  1138. insp = inspect(connection)
  1139. expected = [
  1140. {
  1141. "name": "t_idx_2",
  1142. "column_names": ["x"],
  1143. "unique": False,
  1144. "dialect_options": {},
  1145. }
  1146. ]
  1147. if testing.requires.index_reflects_included_columns.enabled:
  1148. expected[0]["include_columns"] = []
  1149. expected[0]["dialect_options"] = {
  1150. "%s_include" % connection.engine.name: []
  1151. }
  1152. with expect_warnings(
  1153. "Skipped unsupported reflection of expression-based index t_idx"
  1154. ):
  1155. eq_(insp.get_indexes("t"), expected)
  1156. @testing.requires.index_reflects_included_columns
  1157. def test_reflect_covering_index(self, metadata, connection):
  1158. t = Table(
  1159. "t",
  1160. metadata,
  1161. Column("x", String(30)),
  1162. Column("y", String(30)),
  1163. )
  1164. idx = Index("t_idx", t.c.x)
  1165. idx.dialect_options[connection.engine.name]["include"] = ["y"]
  1166. metadata.create_all(connection)
  1167. insp = inspect(connection)
  1168. eq_(
  1169. insp.get_indexes("t"),
  1170. [
  1171. {
  1172. "name": "t_idx",
  1173. "column_names": ["x"],
  1174. "include_columns": ["y"],
  1175. "unique": False,
  1176. "dialect_options": {
  1177. "%s_include" % connection.engine.name: ["y"]
  1178. },
  1179. }
  1180. ],
  1181. )
  1182. t2 = Table("t", MetaData(), autoload_with=connection)
  1183. eq_(
  1184. list(t2.indexes)[0].dialect_options[connection.engine.name][
  1185. "include"
  1186. ],
  1187. ["y"],
  1188. )
  1189. def _type_round_trip(self, connection, metadata, *types):
  1190. t = Table(
  1191. "t",
  1192. metadata,
  1193. *[Column("t%d" % i, type_) for i, type_ in enumerate(types)]
  1194. )
  1195. t.create(connection)
  1196. return [c["type"] for c in inspect(connection).get_columns("t")]
  1197. @testing.requires.table_reflection
  1198. def test_numeric_reflection(self, connection, metadata):
  1199. for typ in self._type_round_trip(
  1200. connection, metadata, sql_types.Numeric(18, 5)
  1201. ):
  1202. assert isinstance(typ, sql_types.Numeric)
  1203. eq_(typ.precision, 18)
  1204. eq_(typ.scale, 5)
  1205. @testing.requires.table_reflection
  1206. def test_varchar_reflection(self, connection, metadata):
  1207. typ = self._type_round_trip(
  1208. connection, metadata, sql_types.String(52)
  1209. )[0]
  1210. assert isinstance(typ, sql_types.String)
  1211. eq_(typ.length, 52)
  1212. @testing.requires.table_reflection
  1213. def test_nullable_reflection(self, connection, metadata):
  1214. t = Table(
  1215. "t",
  1216. metadata,
  1217. Column("a", Integer, nullable=True),
  1218. Column("b", Integer, nullable=False),
  1219. )
  1220. t.create(connection)
  1221. eq_(
  1222. dict(
  1223. (col["name"], col["nullable"])
  1224. for col in inspect(connection).get_columns("t")
  1225. ),
  1226. {"a": True, "b": False},
  1227. )
  1228. @testing.combinations(
  1229. (
  1230. None,
  1231. "CASCADE",
  1232. None,
  1233. testing.requires.foreign_key_constraint_option_reflection_ondelete,
  1234. ),
  1235. (
  1236. None,
  1237. None,
  1238. "SET NULL",
  1239. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  1240. ),
  1241. (
  1242. {},
  1243. None,
  1244. "NO ACTION",
  1245. testing.requires.foreign_key_constraint_option_reflection_onupdate,
  1246. ),
  1247. (
  1248. {},
  1249. "NO ACTION",
  1250. None,
  1251. testing.requires.fk_constraint_option_reflection_ondelete_noaction,
  1252. ),
  1253. (
  1254. None,
  1255. None,
  1256. "RESTRICT",
  1257. testing.requires.fk_constraint_option_reflection_onupdate_restrict,
  1258. ),
  1259. (
  1260. None,
  1261. "RESTRICT",
  1262. None,
  1263. testing.requires.fk_constraint_option_reflection_ondelete_restrict,
  1264. ),
  1265. argnames="expected,ondelete,onupdate",
  1266. )
  1267. def test_get_foreign_key_options(
  1268. self, connection, metadata, expected, ondelete, onupdate
  1269. ):
  1270. options = {}
  1271. if ondelete:
  1272. options["ondelete"] = ondelete
  1273. if onupdate:
  1274. options["onupdate"] = onupdate
  1275. if expected is None:
  1276. expected = options
  1277. Table(
  1278. "x",
  1279. metadata,
  1280. Column("id", Integer, primary_key=True),
  1281. test_needs_fk=True,
  1282. )
  1283. Table(
  1284. "table",
  1285. metadata,
  1286. Column("id", Integer, primary_key=True),
  1287. Column("x_id", Integer, sa.ForeignKey("x.id", name="xid")),
  1288. Column("test", String(10)),
  1289. test_needs_fk=True,
  1290. )
  1291. Table(
  1292. "user",
  1293. metadata,
  1294. Column("id", Integer, primary_key=True),
  1295. Column("name", String(50), nullable=False),
  1296. Column("tid", Integer),
  1297. sa.ForeignKeyConstraint(
  1298. ["tid"], ["table.id"], name="myfk", **options
  1299. ),
  1300. test_needs_fk=True,
  1301. )
  1302. metadata.create_all(connection)
  1303. insp = inspect(connection)
  1304. # test 'options' is always present for a backend
  1305. # that can reflect these, since alembic looks for this
  1306. opts = insp.get_foreign_keys("table")[0]["options"]
  1307. eq_(dict((k, opts[k]) for k in opts if opts[k]), {})
  1308. opts = insp.get_foreign_keys("user")[0]["options"]
  1309. eq_(opts, expected)
  1310. # eq_(dict((k, opts[k]) for k in opts if opts[k]), expected)
  1311. class NormalizedNameTest(fixtures.TablesTest):
  1312. __requires__ = ("denormalized_names",)
  1313. __backend__ = True
  1314. @classmethod
  1315. def define_tables(cls, metadata):
  1316. Table(
  1317. quoted_name("t1", quote=True),
  1318. metadata,
  1319. Column("id", Integer, primary_key=True),
  1320. )
  1321. Table(
  1322. quoted_name("t2", quote=True),
  1323. metadata,
  1324. Column("id", Integer, primary_key=True),
  1325. Column("t1id", ForeignKey("t1.id")),
  1326. )
  1327. def test_reflect_lowercase_forced_tables(self):
  1328. m2 = MetaData()
  1329. t2_ref = Table(
  1330. quoted_name("t2", quote=True), m2, autoload_with=config.db
  1331. )
  1332. t1_ref = m2.tables["t1"]
  1333. assert t2_ref.c.t1id.references(t1_ref.c.id)
  1334. m3 = MetaData()
  1335. m3.reflect(
  1336. config.db, only=lambda name, m: name.lower() in ("t1", "t2")
  1337. )
  1338. assert m3.tables["t2"].c.t1id.references(m3.tables["t1"].c.id)
  1339. def test_get_table_names(self):
  1340. tablenames = [
  1341. t
  1342. for t in inspect(config.db).get_table_names()
  1343. if t.lower() in ("t1", "t2")
  1344. ]
  1345. eq_(tablenames[0].upper(), tablenames[0].lower())
  1346. eq_(tablenames[1].upper(), tablenames[1].lower())
  1347. class ComputedReflectionTest(fixtures.ComputedReflectionFixtureTest):
  1348. def test_computed_col_default_not_set(self):
  1349. insp = inspect(config.db)
  1350. cols = insp.get_columns("computed_default_table")
  1351. col_data = {c["name"]: c for c in cols}
  1352. is_true("42" in col_data["with_default"]["default"])
  1353. is_(col_data["normal"]["default"], None)
  1354. is_(col_data["computed_col"]["default"], None)
  1355. def test_get_column_returns_computed(self):
  1356. insp = inspect(config.db)
  1357. cols = insp.get_columns("computed_default_table")
  1358. data = {c["name"]: c for c in cols}
  1359. for key in ("id", "normal", "with_default"):
  1360. is_true("computed" not in data[key])
  1361. compData = data["computed_col"]
  1362. is_true("computed" in compData)
  1363. is_true("sqltext" in compData["computed"])
  1364. eq_(self.normalize(compData["computed"]["sqltext"]), "normal+42")
  1365. eq_(
  1366. "persisted" in compData["computed"],
  1367. testing.requires.computed_columns_reflect_persisted.enabled,
  1368. )
  1369. if testing.requires.computed_columns_reflect_persisted.enabled:
  1370. eq_(
  1371. compData["computed"]["persisted"],
  1372. testing.requires.computed_columns_default_persisted.enabled,
  1373. )
  1374. def check_column(self, data, column, sqltext, persisted):
  1375. is_true("computed" in data[column])
  1376. compData = data[column]["computed"]
  1377. eq_(self.normalize(compData["sqltext"]), sqltext)
  1378. if testing.requires.computed_columns_reflect_persisted.enabled:
  1379. is_true("persisted" in compData)
  1380. is_(compData["persisted"], persisted)
  1381. def test_get_column_returns_persisted(self):
  1382. insp = inspect(config.db)
  1383. cols = insp.get_columns("computed_column_table")
  1384. data = {c["name"]: c for c in cols}
  1385. self.check_column(
  1386. data,
  1387. "computed_no_flag",
  1388. "normal+42",
  1389. testing.requires.computed_columns_default_persisted.enabled,
  1390. )
  1391. if testing.requires.computed_columns_virtual.enabled:
  1392. self.check_column(
  1393. data,
  1394. "computed_virtual",
  1395. "normal+2",
  1396. False,
  1397. )
  1398. if testing.requires.computed_columns_stored.enabled:
  1399. self.check_column(
  1400. data,
  1401. "computed_stored",
  1402. "normal-42",
  1403. True,
  1404. )
  1405. @testing.requires.schemas
  1406. def test_get_column_returns_persisted_with_schema(self):
  1407. insp = inspect(config.db)
  1408. cols = insp.get_columns(
  1409. "computed_column_table", schema=config.test_schema
  1410. )
  1411. data = {c["name"]: c for c in cols}
  1412. self.check_column(
  1413. data,
  1414. "computed_no_flag",
  1415. "normal/42",
  1416. testing.requires.computed_columns_default_persisted.enabled,
  1417. )
  1418. if testing.requires.computed_columns_virtual.enabled:
  1419. self.check_column(
  1420. data,
  1421. "computed_virtual",
  1422. "normal/2",
  1423. False,
  1424. )
  1425. if testing.requires.computed_columns_stored.enabled:
  1426. self.check_column(
  1427. data,
  1428. "computed_stored",
  1429. "normal*42",
  1430. True,
  1431. )
  1432. class IdentityReflectionTest(fixtures.TablesTest):
  1433. run_inserts = run_deletes = None
  1434. __backend__ = True
  1435. __requires__ = ("identity_columns", "table_reflection")
  1436. @classmethod
  1437. def define_tables(cls, metadata):
  1438. Table(
  1439. "t1",
  1440. metadata,
  1441. Column("normal", Integer),
  1442. Column("id1", Integer, Identity()),
  1443. )
  1444. Table(
  1445. "t2",
  1446. metadata,
  1447. Column(
  1448. "id2",
  1449. Integer,
  1450. Identity(
  1451. always=True,
  1452. start=2,
  1453. increment=3,
  1454. minvalue=-2,
  1455. maxvalue=42,
  1456. cycle=True,
  1457. cache=4,
  1458. ),
  1459. ),
  1460. )
  1461. if testing.requires.schemas.enabled:
  1462. Table(
  1463. "t1",
  1464. metadata,
  1465. Column("normal", Integer),
  1466. Column("id1", Integer, Identity(always=True, start=20)),
  1467. schema=config.test_schema,
  1468. )
  1469. def check(self, value, exp, approx):
  1470. if testing.requires.identity_columns_standard.enabled:
  1471. common_keys = (
  1472. "always",
  1473. "start",
  1474. "increment",
  1475. "minvalue",
  1476. "maxvalue",
  1477. "cycle",
  1478. "cache",
  1479. )
  1480. for k in list(value):
  1481. if k not in common_keys:
  1482. value.pop(k)
  1483. if approx:
  1484. eq_(len(value), len(exp))
  1485. for k in value:
  1486. if k == "minvalue":
  1487. is_true(value[k] <= exp[k])
  1488. elif k in {"maxvalue", "cache"}:
  1489. is_true(value[k] >= exp[k])
  1490. else:
  1491. eq_(value[k], exp[k], k)
  1492. else:
  1493. eq_(value, exp)
  1494. else:
  1495. eq_(value["start"], exp["start"])
  1496. eq_(value["increment"], exp["increment"])
  1497. def test_reflect_identity(self):
  1498. insp = inspect(config.db)
  1499. cols = insp.get_columns("t1") + insp.get_columns("t2")
  1500. for col in cols:
  1501. if col["name"] == "normal":
  1502. is_false("identity" in col)
  1503. elif col["name"] == "id1":
  1504. is_true(col["autoincrement"] in (True, "auto"))
  1505. eq_(col["default"], None)
  1506. is_true("identity" in col)
  1507. self.check(
  1508. col["identity"],
  1509. dict(
  1510. always=False,
  1511. start=1,
  1512. increment=1,
  1513. minvalue=1,
  1514. maxvalue=2147483647,
  1515. cycle=False,
  1516. cache=1,
  1517. ),
  1518. approx=True,
  1519. )
  1520. elif col["name"] == "id2":
  1521. is_true(col["autoincrement"] in (True, "auto"))
  1522. eq_(col["default"], None)
  1523. is_true("identity" in col)
  1524. self.check(
  1525. col["identity"],
  1526. dict(
  1527. always=True,
  1528. start=2,
  1529. increment=3,
  1530. minvalue=-2,
  1531. maxvalue=42,
  1532. cycle=True,
  1533. cache=4,
  1534. ),
  1535. approx=False,
  1536. )
  1537. @testing.requires.schemas
  1538. def test_reflect_identity_schema(self):
  1539. insp = inspect(config.db)
  1540. cols = insp.get_columns("t1", schema=config.test_schema)
  1541. for col in cols:
  1542. if col["name"] == "normal":
  1543. is_false("identity" in col)
  1544. elif col["name"] == "id1":
  1545. is_true(col["autoincrement"] in (True, "auto"))
  1546. eq_(col["default"], None)
  1547. is_true("identity" in col)
  1548. self.check(
  1549. col["identity"],
  1550. dict(
  1551. always=True,
  1552. start=20,
  1553. increment=1,
  1554. minvalue=1,
  1555. maxvalue=2147483647,
  1556. cycle=False,
  1557. cache=1,
  1558. ),
  1559. approx=True,
  1560. )
  1561. class CompositeKeyReflectionTest(fixtures.TablesTest):
  1562. __backend__ = True
  1563. @classmethod
  1564. def define_tables(cls, metadata):
  1565. tb1 = Table(
  1566. "tb1",
  1567. metadata,
  1568. Column("id", Integer),
  1569. Column("attr", Integer),
  1570. Column("name", sql_types.VARCHAR(20)),
  1571. sa.PrimaryKeyConstraint("name", "id", "attr", name="pk_tb1"),
  1572. schema=None,
  1573. test_needs_fk=True,
  1574. )
  1575. Table(
  1576. "tb2",
  1577. metadata,
  1578. Column("id", Integer, primary_key=True),
  1579. Column("pid", Integer),
  1580. Column("pattr", Integer),
  1581. Column("pname", sql_types.VARCHAR(20)),
  1582. sa.ForeignKeyConstraint(
  1583. ["pname", "pid", "pattr"],
  1584. [tb1.c.name, tb1.c.id, tb1.c.attr],
  1585. name="fk_tb1_name_id_attr",
  1586. ),
  1587. schema=None,
  1588. test_needs_fk=True,
  1589. )
  1590. @testing.requires.primary_key_constraint_reflection
  1591. def test_pk_column_order(self):
  1592. # test for issue #5661
  1593. insp = inspect(self.bind)
  1594. primary_key = insp.get_pk_constraint(self.tables.tb1.name)
  1595. eq_(primary_key.get("constrained_columns"), ["name", "id", "attr"])
  1596. @testing.requires.foreign_key_constraint_reflection
  1597. def test_fk_column_order(self):
  1598. # test for issue #5661
  1599. insp = inspect(self.bind)
  1600. foreign_keys = insp.get_foreign_keys(self.tables.tb2.name)
  1601. eq_(len(foreign_keys), 1)
  1602. fkey1 = foreign_keys[0]
  1603. eq_(fkey1.get("referred_columns"), ["name", "id", "attr"])
  1604. eq_(fkey1.get("constrained_columns"), ["pname", "pid", "pattr"])
  1605. __all__ = (
  1606. "ComponentReflectionTest",
  1607. "ComponentReflectionTestExtra",
  1608. "TableNoColumnsTest",
  1609. "QuotedNameArgumentTest",
  1610. "HasTableTest",
  1611. "HasIndexTest",
  1612. "NormalizedNameTest",
  1613. "ComputedReflectionTest",
  1614. "IdentityReflectionTest",
  1615. "CompositeKeyReflectionTest",
  1616. )