provision.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # dialects/oracle/provision.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from ... import create_engine
  8. from ... import exc
  9. from ...engine import url as sa_url
  10. from ...testing.provision import configure_follower
  11. from ...testing.provision import create_db
  12. from ...testing.provision import drop_db
  13. from ...testing.provision import follower_url_from_main
  14. from ...testing.provision import log
  15. from ...testing.provision import post_configure_engine
  16. from ...testing.provision import run_reap_dbs
  17. from ...testing.provision import set_default_schema_on_connection
  18. from ...testing.provision import stop_test_class_outside_fixtures
  19. from ...testing.provision import temp_table_keyword_args
  20. @create_db.for_db("oracle")
  21. def _oracle_create_db(cfg, eng, ident):
  22. # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
  23. # similar, so that the default tablespace is not "system"; reflection will
  24. # fail otherwise
  25. with eng.begin() as conn:
  26. conn.exec_driver_sql("create user %s identified by xe" % ident)
  27. conn.exec_driver_sql("create user %s_ts1 identified by xe" % ident)
  28. conn.exec_driver_sql("create user %s_ts2 identified by xe" % ident)
  29. conn.exec_driver_sql("grant dba to %s" % (ident,))
  30. conn.exec_driver_sql("grant unlimited tablespace to %s" % ident)
  31. conn.exec_driver_sql("grant unlimited tablespace to %s_ts1" % ident)
  32. conn.exec_driver_sql("grant unlimited tablespace to %s_ts2" % ident)
  33. @configure_follower.for_db("oracle")
  34. def _oracle_configure_follower(config, ident):
  35. config.test_schema = "%s_ts1" % ident
  36. config.test_schema_2 = "%s_ts2" % ident
  37. def _ora_drop_ignore(conn, dbname):
  38. try:
  39. conn.exec_driver_sql("drop user %s cascade" % dbname)
  40. log.info("Reaped db: %s", dbname)
  41. return True
  42. except exc.DatabaseError as err:
  43. log.warning("couldn't drop db: %s", err)
  44. return False
  45. @drop_db.for_db("oracle")
  46. def _oracle_drop_db(cfg, eng, ident):
  47. with eng.begin() as conn:
  48. # cx_Oracle seems to occasionally leak open connections when a large
  49. # suite it run, even if we confirm we have zero references to
  50. # connection objects.
  51. # while there is a "kill session" command in Oracle,
  52. # it unfortunately does not release the connection sufficiently.
  53. _ora_drop_ignore(conn, ident)
  54. _ora_drop_ignore(conn, "%s_ts1" % ident)
  55. _ora_drop_ignore(conn, "%s_ts2" % ident)
  56. @stop_test_class_outside_fixtures.for_db("oracle")
  57. def stop_test_class_outside_fixtures(config, db, cls):
  58. try:
  59. with db.begin() as conn:
  60. # run magic command to get rid of identity sequences
  61. # https://floo.bar/2019/11/29/drop-the-underlying-sequence-of-an-identity-column/ # noqa: E501
  62. conn.exec_driver_sql("purge recyclebin")
  63. except exc.DatabaseError as err:
  64. log.warning("purge recyclebin command failed: %s", err)
  65. # clear statement cache on all connections that were used
  66. # https://github.com/oracle/python-cx_Oracle/issues/519
  67. for cx_oracle_conn in _all_conns:
  68. try:
  69. sc = cx_oracle_conn.stmtcachesize
  70. except db.dialect.dbapi.InterfaceError:
  71. # connection closed
  72. pass
  73. else:
  74. cx_oracle_conn.stmtcachesize = 0
  75. cx_oracle_conn.stmtcachesize = sc
  76. _all_conns.clear()
  77. _all_conns = set()
  78. @post_configure_engine.for_db("oracle")
  79. def _oracle_post_configure_engine(url, engine, follower_ident):
  80. from sqlalchemy import event
  81. @event.listens_for(engine, "checkout")
  82. def checkout(dbapi_con, con_record, con_proxy):
  83. _all_conns.add(dbapi_con)
  84. @event.listens_for(engine, "checkin")
  85. def checkin(dbapi_connection, connection_record):
  86. # work around cx_Oracle issue:
  87. # https://github.com/oracle/python-cx_Oracle/issues/530
  88. # invalidate oracle connections that had 2pc set up
  89. if "cx_oracle_xid" in connection_record.info:
  90. connection_record.invalidate()
  91. @run_reap_dbs.for_db("oracle")
  92. def _reap_oracle_dbs(url, idents):
  93. log.info("db reaper connecting to %r", url)
  94. eng = create_engine(url)
  95. with eng.begin() as conn:
  96. log.info("identifiers in file: %s", ", ".join(idents))
  97. to_reap = conn.exec_driver_sql(
  98. "select u.username from all_users u where username "
  99. "like 'TEST_%' and not exists (select username "
  100. "from v$session where username=u.username)"
  101. )
  102. all_names = {username.lower() for (username,) in to_reap}
  103. to_drop = set()
  104. for name in all_names:
  105. if name.endswith("_ts1") or name.endswith("_ts2"):
  106. continue
  107. elif name in idents:
  108. to_drop.add(name)
  109. if "%s_ts1" % name in all_names:
  110. to_drop.add("%s_ts1" % name)
  111. if "%s_ts2" % name in all_names:
  112. to_drop.add("%s_ts2" % name)
  113. dropped = total = 0
  114. for total, username in enumerate(to_drop, 1):
  115. if _ora_drop_ignore(conn, username):
  116. dropped += 1
  117. log.info(
  118. "Dropped %d out of %d stale databases detected", dropped, total
  119. )
  120. @follower_url_from_main.for_db("oracle")
  121. def _oracle_follower_url_from_main(url, ident):
  122. url = sa_url.make_url(url)
  123. return url.set(username=ident, password="xe")
  124. @temp_table_keyword_args.for_db("oracle")
  125. def _oracle_temp_table_keyword_args(cfg, eng):
  126. return {
  127. "prefixes": ["GLOBAL TEMPORARY"],
  128. "oracle_on_commit": "PRESERVE ROWS",
  129. }
  130. @set_default_schema_on_connection.for_db("oracle")
  131. def _oracle_set_default_schema_on_connection(
  132. cfg, dbapi_connection, schema_name
  133. ):
  134. cursor = dbapi_connection.cursor()
  135. cursor.execute("ALTER SESSION SET CURRENT_SCHEMA=%s" % schema_name)
  136. cursor.close()