provision.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # dialects/mssql/provision.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. from sqlalchemy import inspect
  8. from sqlalchemy import Integer
  9. from ... import create_engine
  10. from ... import exc
  11. from ...schema import Column
  12. from ...schema import DropConstraint
  13. from ...schema import ForeignKeyConstraint
  14. from ...schema import MetaData
  15. from ...schema import Table
  16. from ...testing.provision import create_db
  17. from ...testing.provision import drop_all_schema_objects_pre_tables
  18. from ...testing.provision import drop_db
  19. from ...testing.provision import get_temp_table_name
  20. from ...testing.provision import log
  21. from ...testing.provision import post_configure_engine
  22. from ...testing.provision import run_reap_dbs
  23. from ...testing.provision import temp_table_keyword_args
  24. @post_configure_engine.for_db("mssql")
  25. def post_configure_engine(url, engine, follower_ident):
  26. if engine.driver == "pyodbc":
  27. engine.dialect.dbapi.pooling = False
  28. @create_db.for_db("mssql")
  29. def _mssql_create_db(cfg, eng, ident):
  30. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  31. conn.exec_driver_sql("create database %s" % ident)
  32. conn.exec_driver_sql(
  33. "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
  34. )
  35. conn.exec_driver_sql(
  36. "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
  37. )
  38. conn.exec_driver_sql("use %s" % ident)
  39. conn.exec_driver_sql("create schema test_schema")
  40. conn.exec_driver_sql("create schema test_schema_2")
  41. @drop_db.for_db("mssql")
  42. def _mssql_drop_db(cfg, eng, ident):
  43. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  44. _mssql_drop_ignore(conn, ident)
  45. def _mssql_drop_ignore(conn, ident):
  46. try:
  47. # typically when this happens, we can't KILL the session anyway,
  48. # so let the cleanup process drop the DBs
  49. # for row in conn.exec_driver_sql(
  50. # "select session_id from sys.dm_exec_sessions "
  51. # "where database_id=db_id('%s')" % ident):
  52. # log.info("killing SQL server session %s", row['session_id'])
  53. # conn.exec_driver_sql("kill %s" % row['session_id'])
  54. conn.exec_driver_sql("drop database %s" % ident)
  55. log.info("Reaped db: %s", ident)
  56. return True
  57. except exc.DatabaseError as err:
  58. log.warning("couldn't drop db: %s", err)
  59. return False
  60. @run_reap_dbs.for_db("mssql")
  61. def _reap_mssql_dbs(url, idents):
  62. log.info("db reaper connecting to %r", url)
  63. eng = create_engine(url)
  64. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  65. log.info("identifiers in file: %s", ", ".join(idents))
  66. to_reap = conn.exec_driver_sql(
  67. "select d.name from sys.databases as d where name "
  68. "like 'TEST_%' and not exists (select session_id "
  69. "from sys.dm_exec_sessions "
  70. "where database_id=d.database_id)"
  71. )
  72. all_names = {dbname.lower() for (dbname,) in to_reap}
  73. to_drop = set()
  74. for name in all_names:
  75. if name in idents:
  76. to_drop.add(name)
  77. dropped = total = 0
  78. for total, dbname in enumerate(to_drop, 1):
  79. if _mssql_drop_ignore(conn, dbname):
  80. dropped += 1
  81. log.info(
  82. "Dropped %d out of %d stale databases detected", dropped, total
  83. )
  84. @temp_table_keyword_args.for_db("mssql")
  85. def _mssql_temp_table_keyword_args(cfg, eng):
  86. return {}
  87. @get_temp_table_name.for_db("mssql")
  88. def _mssql_get_temp_table_name(cfg, eng, base_name):
  89. return "##" + base_name
  90. @drop_all_schema_objects_pre_tables.for_db("mssql")
  91. def drop_all_schema_objects_pre_tables(cfg, eng):
  92. with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
  93. inspector = inspect(conn)
  94. for schema in (None, "dbo", cfg.test_schema, cfg.test_schema_2):
  95. for tname in inspector.get_table_names(schema=schema):
  96. tb = Table(
  97. tname,
  98. MetaData(),
  99. Column("x", Integer),
  100. Column("y", Integer),
  101. schema=schema,
  102. )
  103. for fk in inspect(conn).get_foreign_keys(tname, schema=schema):
  104. conn.execute(
  105. DropConstraint(
  106. ForeignKeyConstraint(
  107. [tb.c.x], [tb.c.y], name=fk["name"]
  108. )
  109. )
  110. )