provision.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. # dialects/sqlite/provision.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. import os
  8. import re
  9. from ... import exc
  10. from ...engine import url as sa_url
  11. from ...testing.provision import create_db
  12. from ...testing.provision import drop_db
  13. from ...testing.provision import follower_url_from_main
  14. from ...testing.provision import generate_driver_url
  15. from ...testing.provision import log
  16. from ...testing.provision import post_configure_engine
  17. from ...testing.provision import run_reap_dbs
  18. from ...testing.provision import stop_test_class_outside_fixtures
  19. from ...testing.provision import temp_table_keyword_args
  20. # TODO: I can't get this to build dynamically with pytest-xdist procs
  21. _drivernames = {"pysqlite", "aiosqlite", "pysqlcipher"}
  22. @generate_driver_url.for_db("sqlite")
  23. def generate_driver_url(url, driver, query_str):
  24. if driver == "pysqlcipher" and url.get_driver_name() != "pysqlcipher":
  25. if url.database:
  26. url = url.set(database=url.database + ".enc")
  27. url = url.set(password="test")
  28. url = url.set(drivername="sqlite+%s" % (driver,))
  29. try:
  30. url.get_dialect()
  31. except exc.NoSuchModuleError:
  32. return None
  33. else:
  34. return url
  35. @follower_url_from_main.for_db("sqlite")
  36. def _sqlite_follower_url_from_main(url, ident):
  37. url = sa_url.make_url(url)
  38. if not url.database or url.database == ":memory:":
  39. return url
  40. else:
  41. m = re.match(r"(.+?)\.(.+)$", url.database)
  42. name, ext = m.group(1, 2)
  43. drivername = url.get_driver_name()
  44. return sa_url.make_url(
  45. "sqlite+%s:///%s_%s.%s" % (drivername, drivername, ident, ext)
  46. )
  47. @post_configure_engine.for_db("sqlite")
  48. def _sqlite_post_configure_engine(url, engine, follower_ident):
  49. from sqlalchemy import event
  50. @event.listens_for(engine, "connect")
  51. def connect(dbapi_connection, connection_record):
  52. # use file DBs in all cases, memory acts kind of strangely
  53. # as an attached
  54. if not follower_ident:
  55. # note this test_schema.db gets created for all test runs.
  56. # there's not any dedicated cleanup step for it. it in some
  57. # ways corresponds to the "test.test_schema" schema that's
  58. # expected to be already present, so for now it just stays
  59. # in a given checkout directory.
  60. dbapi_connection.execute(
  61. 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
  62. % (engine.driver,)
  63. )
  64. else:
  65. dbapi_connection.execute(
  66. 'ATTACH DATABASE "%s_%s_test_schema.db" AS test_schema'
  67. % (follower_ident, engine.driver)
  68. )
  69. @create_db.for_db("sqlite")
  70. def _sqlite_create_db(cfg, eng, ident):
  71. pass
  72. @drop_db.for_db("sqlite")
  73. def _sqlite_drop_db(cfg, eng, ident):
  74. for path in [
  75. "%s.db" % ident,
  76. "%s_%s_test_schema.db" % (ident, eng.driver),
  77. ]:
  78. if os.path.exists(path):
  79. log.info("deleting SQLite database file: %s" % path)
  80. os.remove(path)
  81. @stop_test_class_outside_fixtures.for_db("sqlite")
  82. def stop_test_class_outside_fixtures(config, db, cls):
  83. with db.connect() as conn:
  84. files = [
  85. row.file
  86. for row in conn.exec_driver_sql("PRAGMA database_list")
  87. if row.file
  88. ]
  89. if files:
  90. db.dispose()
  91. # some sqlite file tests are not cleaning up well yet, so do this
  92. # just to make things simple for now
  93. for file_ in files:
  94. if file_ and os.path.exists(file_):
  95. os.remove(file_)
  96. @temp_table_keyword_args.for_db("sqlite")
  97. def _sqlite_temp_table_keyword_args(cfg, eng):
  98. return {"prefixes": ["TEMPORARY"]}
  99. @run_reap_dbs.for_db("sqlite")
  100. def _reap_sqlite_dbs(url, idents):
  101. log.info("db reaper connecting to %r", url)
  102. log.info("identifiers in file: %s", ", ".join(idents))
  103. for ident in idents:
  104. # we don't have a config so we can't call _sqlite_drop_db due to the
  105. # decorator
  106. for ext in ("db", "db.enc"):
  107. for path in (
  108. ["%s.%s" % (ident, ext)]
  109. + [
  110. "%s_%s.%s" % (drivername, ident, ext)
  111. for drivername in _drivernames
  112. ]
  113. + [
  114. "%s_test_schema.%s" % (drivername, ext)
  115. for drivername in _drivernames
  116. ]
  117. + [
  118. "%s_%s_test_schema.%s" % (ident, drivername, ext)
  119. for drivername in _drivernames
  120. ]
  121. ):
  122. if os.path.exists(path):
  123. log.info("deleting SQLite database file: %s" % path)
  124. os.remove(path)