pytestplugin.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826
  1. # testing/plugin/pytestplugin.py
  2. # Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
  3. # <see AUTHORS file>
  4. #
  5. # This module is part of SQLAlchemy and is released under
  6. # the MIT License: https://www.opensource.org/licenses/mit-license.php
  7. try:
  8. # installed by bootstrap.py
  9. import sqla_plugin_base as plugin_base
  10. except ImportError:
  11. # assume we're a package, use traditional import
  12. from . import plugin_base
  13. import argparse
  14. import collections
  15. from functools import update_wrapper
  16. import inspect
  17. import itertools
  18. import operator
  19. import os
  20. import re
  21. import sys
  22. import uuid
  23. import pytest
  24. py2k = sys.version_info < (3, 0)
  25. if py2k:
  26. try:
  27. import sqla_reinvent_fixtures as reinvent_fixtures_py2k
  28. except ImportError:
  29. from . import reinvent_fixtures_py2k
  30. def pytest_addoption(parser):
  31. group = parser.getgroup("sqlalchemy")
  32. def make_option(name, **kw):
  33. callback_ = kw.pop("callback", None)
  34. if callback_:
  35. class CallableAction(argparse.Action):
  36. def __call__(
  37. self, parser, namespace, values, option_string=None
  38. ):
  39. callback_(option_string, values, parser)
  40. kw["action"] = CallableAction
  41. zeroarg_callback = kw.pop("zeroarg_callback", None)
  42. if zeroarg_callback:
  43. class CallableAction(argparse.Action):
  44. def __init__(
  45. self,
  46. option_strings,
  47. dest,
  48. default=False,
  49. required=False,
  50. help=None, # noqa
  51. ):
  52. super(CallableAction, self).__init__(
  53. option_strings=option_strings,
  54. dest=dest,
  55. nargs=0,
  56. const=True,
  57. default=default,
  58. required=required,
  59. help=help,
  60. )
  61. def __call__(
  62. self, parser, namespace, values, option_string=None
  63. ):
  64. zeroarg_callback(option_string, values, parser)
  65. kw["action"] = CallableAction
  66. group.addoption(name, **kw)
  67. plugin_base.setup_options(make_option)
  68. plugin_base.read_config()
  69. def pytest_configure(config):
  70. if config.pluginmanager.hasplugin("xdist"):
  71. config.pluginmanager.register(XDistHooks())
  72. if hasattr(config, "workerinput"):
  73. plugin_base.restore_important_follower_config(config.workerinput)
  74. plugin_base.configure_follower(config.workerinput["follower_ident"])
  75. else:
  76. if config.option.write_idents and os.path.exists(
  77. config.option.write_idents
  78. ):
  79. os.remove(config.option.write_idents)
  80. plugin_base.pre_begin(config.option)
  81. plugin_base.set_coverage_flag(
  82. bool(getattr(config.option, "cov_source", False))
  83. )
  84. plugin_base.set_fixture_functions(PytestFixtureFunctions)
  85. if config.option.dump_pyannotate:
  86. global DUMP_PYANNOTATE
  87. DUMP_PYANNOTATE = True
  88. DUMP_PYANNOTATE = False
  89. @pytest.fixture(autouse=True)
  90. def collect_types_fixture():
  91. if DUMP_PYANNOTATE:
  92. from pyannotate_runtime import collect_types
  93. collect_types.start()
  94. yield
  95. if DUMP_PYANNOTATE:
  96. collect_types.stop()
  97. def pytest_sessionstart(session):
  98. from sqlalchemy.testing import asyncio
  99. asyncio._assume_async(plugin_base.post_begin)
  100. def pytest_sessionfinish(session):
  101. from sqlalchemy.testing import asyncio
  102. asyncio._maybe_async_provisioning(plugin_base.final_process_cleanup)
  103. if session.config.option.dump_pyannotate:
  104. from pyannotate_runtime import collect_types
  105. collect_types.dump_stats(session.config.option.dump_pyannotate)
  106. def pytest_collection_finish(session):
  107. if session.config.option.dump_pyannotate:
  108. from pyannotate_runtime import collect_types
  109. lib_sqlalchemy = os.path.abspath("lib/sqlalchemy")
  110. def _filter(filename):
  111. filename = os.path.normpath(os.path.abspath(filename))
  112. if "lib/sqlalchemy" not in os.path.commonpath(
  113. [filename, lib_sqlalchemy]
  114. ):
  115. return None
  116. if "testing" in filename:
  117. return None
  118. return filename
  119. collect_types.init_types_collection(filter_filename=_filter)
  120. class XDistHooks(object):
  121. def pytest_configure_node(self, node):
  122. from sqlalchemy.testing import provision
  123. from sqlalchemy.testing import asyncio
  124. # the master for each node fills workerinput dictionary
  125. # which pytest-xdist will transfer to the subprocess
  126. plugin_base.memoize_important_follower_config(node.workerinput)
  127. node.workerinput["follower_ident"] = "test_%s" % uuid.uuid4().hex[0:12]
  128. asyncio._maybe_async_provisioning(
  129. provision.create_follower_db, node.workerinput["follower_ident"]
  130. )
  131. def pytest_testnodedown(self, node, error):
  132. from sqlalchemy.testing import provision
  133. from sqlalchemy.testing import asyncio
  134. asyncio._maybe_async_provisioning(
  135. provision.drop_follower_db, node.workerinput["follower_ident"]
  136. )
  137. def pytest_collection_modifyitems(session, config, items):
  138. # look for all those classes that specify __backend__ and
  139. # expand them out into per-database test cases.
  140. # this is much easier to do within pytest_pycollect_makeitem, however
  141. # pytest is iterating through cls.__dict__ as makeitem is
  142. # called which causes a "dictionary changed size" error on py3k.
  143. # I'd submit a pullreq for them to turn it into a list first, but
  144. # it's to suit the rather odd use case here which is that we are adding
  145. # new classes to a module on the fly.
  146. from sqlalchemy.testing import asyncio
  147. rebuilt_items = collections.defaultdict(
  148. lambda: collections.defaultdict(list)
  149. )
  150. items[:] = [
  151. item
  152. for item in items
  153. if item.getparent(pytest.Class) is not None
  154. and not item.getparent(pytest.Class).name.startswith("_")
  155. ]
  156. test_classes = set(item.getparent(pytest.Class) for item in items)
  157. def collect(element):
  158. for inst_or_fn in element.collect():
  159. if isinstance(inst_or_fn, pytest.Collector):
  160. # no yield from in 2.7
  161. for el in collect(inst_or_fn):
  162. yield el
  163. else:
  164. yield inst_or_fn
  165. def setup_test_classes():
  166. for test_class in test_classes:
  167. for sub_cls in plugin_base.generate_sub_tests(
  168. test_class.cls, test_class.module
  169. ):
  170. if sub_cls is not test_class.cls:
  171. per_cls_dict = rebuilt_items[test_class.cls]
  172. # support pytest 5.4.0 and above pytest.Class.from_parent
  173. ctor = getattr(pytest.Class, "from_parent", pytest.Class)
  174. module = test_class.getparent(pytest.Module)
  175. for fn in collect(
  176. ctor(name=sub_cls.__name__, parent=module)
  177. ):
  178. per_cls_dict[fn.name].append(fn)
  179. # class requirements will sometimes need to access the DB to check
  180. # capabilities, so need to do this for async
  181. asyncio._maybe_async_provisioning(setup_test_classes)
  182. newitems = []
  183. for item in items:
  184. cls_ = item.cls
  185. if cls_ in rebuilt_items:
  186. newitems.extend(rebuilt_items[cls_][item.name])
  187. else:
  188. newitems.append(item)
  189. if py2k:
  190. for item in newitems:
  191. reinvent_fixtures_py2k.scan_for_fixtures_to_use_for_class(item)
  192. # seems like the functions attached to a test class aren't sorted already?
  193. # is that true and why's that? (when using unittest, they're sorted)
  194. items[:] = sorted(
  195. newitems,
  196. key=lambda item: (
  197. item.getparent(pytest.Module).name,
  198. item.getparent(pytest.Class).name,
  199. item.name,
  200. ),
  201. )
  202. def pytest_pycollect_makeitem(collector, name, obj):
  203. if inspect.isclass(obj) and plugin_base.want_class(name, obj):
  204. from sqlalchemy.testing import config
  205. if config.any_async:
  206. obj = _apply_maybe_async(obj)
  207. ctor = getattr(pytest.Class, "from_parent", pytest.Class)
  208. return [
  209. ctor(name=parametrize_cls.__name__, parent=collector)
  210. for parametrize_cls in _parametrize_cls(collector.module, obj)
  211. ]
  212. elif (
  213. inspect.isfunction(obj)
  214. and collector.cls is not None
  215. and plugin_base.want_method(collector.cls, obj)
  216. ):
  217. # None means, fall back to default logic, which includes
  218. # method-level parametrize
  219. return None
  220. else:
  221. # empty list means skip this item
  222. return []
  223. def _is_wrapped_coroutine_function(fn):
  224. while hasattr(fn, "__wrapped__"):
  225. fn = fn.__wrapped__
  226. return inspect.iscoroutinefunction(fn)
  227. def _apply_maybe_async(obj, recurse=True):
  228. from sqlalchemy.testing import asyncio
  229. for name, value in vars(obj).items():
  230. if (
  231. (callable(value) or isinstance(value, classmethod))
  232. and not getattr(value, "_maybe_async_applied", False)
  233. and (name.startswith("test_"))
  234. and not _is_wrapped_coroutine_function(value)
  235. ):
  236. is_classmethod = False
  237. if isinstance(value, classmethod):
  238. value = value.__func__
  239. is_classmethod = True
  240. @_pytest_fn_decorator
  241. def make_async(fn, *args, **kwargs):
  242. return asyncio._maybe_async(fn, *args, **kwargs)
  243. do_async = make_async(value)
  244. if is_classmethod:
  245. do_async = classmethod(do_async)
  246. do_async._maybe_async_applied = True
  247. setattr(obj, name, do_async)
  248. if recurse:
  249. for cls in obj.mro()[1:]:
  250. if cls != object:
  251. _apply_maybe_async(cls, False)
  252. return obj
  253. def _parametrize_cls(module, cls):
  254. """implement a class-based version of pytest parametrize."""
  255. if "_sa_parametrize" not in cls.__dict__:
  256. return [cls]
  257. _sa_parametrize = cls._sa_parametrize
  258. classes = []
  259. for full_param_set in itertools.product(
  260. *[params for argname, params in _sa_parametrize]
  261. ):
  262. cls_variables = {}
  263. for argname, param in zip(
  264. [_sa_param[0] for _sa_param in _sa_parametrize], full_param_set
  265. ):
  266. if not argname:
  267. raise TypeError("need argnames for class-based combinations")
  268. argname_split = re.split(r",\s*", argname)
  269. for arg, val in zip(argname_split, param.values):
  270. cls_variables[arg] = val
  271. parametrized_name = "_".join(
  272. # token is a string, but in py2k pytest is giving us a unicode,
  273. # so call str() on it.
  274. str(re.sub(r"\W", "", token))
  275. for param in full_param_set
  276. for token in param.id.split("-")
  277. )
  278. name = "%s_%s" % (cls.__name__, parametrized_name)
  279. newcls = type.__new__(type, name, (cls,), cls_variables)
  280. setattr(module, name, newcls)
  281. classes.append(newcls)
  282. return classes
  283. _current_class = None
  284. def pytest_runtest_setup(item):
  285. from sqlalchemy.testing import asyncio
  286. # pytest_runtest_setup runs *before* pytest fixtures with scope="class".
  287. # plugin_base.start_test_class_outside_fixtures may opt to raise SkipTest
  288. # for the whole class and has to run things that are across all current
  289. # databases, so we run this outside of the pytest fixture system altogether
  290. # and ensure asyncio greenlet if any engines are async
  291. global _current_class
  292. if isinstance(item, pytest.Function) and _current_class is None:
  293. asyncio._maybe_async_provisioning(
  294. plugin_base.start_test_class_outside_fixtures,
  295. item.cls,
  296. )
  297. _current_class = item.getparent(pytest.Class)
  298. @pytest.hookimpl(hookwrapper=True)
  299. def pytest_runtest_teardown(item, nextitem):
  300. # runs inside of pytest function fixture scope
  301. # after test function runs
  302. from sqlalchemy.testing import asyncio
  303. from sqlalchemy.util import string_types
  304. asyncio._maybe_async(plugin_base.after_test, item)
  305. yield
  306. # this is now after all the fixture teardown have run, the class can be
  307. # finalized. Since pytest v7 this finalizer can no longer be added in
  308. # pytest_runtest_setup since the class has not yet been setup at that
  309. # time.
  310. # See https://github.com/pytest-dev/pytest/issues/9343
  311. global _current_class, _current_report
  312. if _current_class is not None and (
  313. # last test or a new class
  314. nextitem is None
  315. or nextitem.getparent(pytest.Class) is not _current_class
  316. ):
  317. _current_class = None
  318. try:
  319. asyncio._maybe_async_provisioning(
  320. plugin_base.stop_test_class_outside_fixtures, item.cls
  321. )
  322. except Exception as e:
  323. # in case of an exception during teardown attach the original
  324. # error to the exception message, otherwise it will get lost
  325. if _current_report.failed:
  326. if not e.args:
  327. e.args = (
  328. "__Original test failure__:\n"
  329. + _current_report.longreprtext,
  330. )
  331. elif e.args[-1] and isinstance(e.args[-1], string_types):
  332. args = list(e.args)
  333. args[-1] += (
  334. "\n__Original test failure__:\n"
  335. + _current_report.longreprtext
  336. )
  337. e.args = tuple(args)
  338. else:
  339. e.args += (
  340. "__Original test failure__",
  341. _current_report.longreprtext,
  342. )
  343. raise
  344. finally:
  345. _current_report = None
  346. def pytest_runtest_call(item):
  347. # runs inside of pytest function fixture scope
  348. # before test function runs
  349. from sqlalchemy.testing import asyncio
  350. asyncio._maybe_async(
  351. plugin_base.before_test,
  352. item,
  353. item.module.__name__,
  354. item.cls,
  355. item.name,
  356. )
  357. _current_report = None
  358. def pytest_runtest_logreport(report):
  359. global _current_report
  360. if report.when == "call":
  361. _current_report = report
  362. @pytest.fixture(scope="class")
  363. def setup_class_methods(request):
  364. from sqlalchemy.testing import asyncio
  365. cls = request.cls
  366. if hasattr(cls, "setup_test_class"):
  367. asyncio._maybe_async(cls.setup_test_class)
  368. if py2k:
  369. reinvent_fixtures_py2k.run_class_fixture_setup(request)
  370. yield
  371. if py2k:
  372. reinvent_fixtures_py2k.run_class_fixture_teardown(request)
  373. if hasattr(cls, "teardown_test_class"):
  374. asyncio._maybe_async(cls.teardown_test_class)
  375. asyncio._maybe_async(plugin_base.stop_test_class, cls)
  376. @pytest.fixture(scope="function")
  377. def setup_test_methods(request):
  378. from sqlalchemy.testing import asyncio
  379. # called for each test
  380. self = request.instance
  381. # before this fixture runs:
  382. # 1. function level "autouse" fixtures under py3k (examples: TablesTest
  383. # define tables / data, MappedTest define tables / mappers / data)
  384. # 2. run homegrown function level "autouse" fixtures under py2k
  385. if py2k:
  386. reinvent_fixtures_py2k.run_fn_fixture_setup(request)
  387. # 3. run outer xdist-style setup
  388. if hasattr(self, "setup_test"):
  389. asyncio._maybe_async(self.setup_test)
  390. # alembic test suite is using setUp and tearDown
  391. # xdist methods; support these in the test suite
  392. # for the near term
  393. if hasattr(self, "setUp"):
  394. asyncio._maybe_async(self.setUp)
  395. # inside the yield:
  396. # 4. function level fixtures defined on test functions themselves,
  397. # e.g. "connection", "metadata" run next
  398. # 5. pytest hook pytest_runtest_call then runs
  399. # 6. test itself runs
  400. yield
  401. # yield finishes:
  402. # 7. function level fixtures defined on test functions
  403. # themselves, e.g. "connection" rolls back the transaction, "metadata"
  404. # emits drop all
  405. # 8. pytest hook pytest_runtest_teardown hook runs, this is associated
  406. # with fixtures close all sessions, provisioning.stop_test_class(),
  407. # engines.testing_reaper -> ensure all connection pool connections
  408. # are returned, engines created by testing_engine that aren't the
  409. # config engine are disposed
  410. asyncio._maybe_async(plugin_base.after_test_fixtures, self)
  411. # 10. run xdist-style teardown
  412. if hasattr(self, "tearDown"):
  413. asyncio._maybe_async(self.tearDown)
  414. if hasattr(self, "teardown_test"):
  415. asyncio._maybe_async(self.teardown_test)
  416. # 11. run homegrown function-level "autouse" fixtures under py2k
  417. if py2k:
  418. reinvent_fixtures_py2k.run_fn_fixture_teardown(request)
  419. # 12. function level "autouse" fixtures under py3k (examples: TablesTest /
  420. # MappedTest delete table data, possibly drop tables and clear mappers
  421. # depending on the flags defined by the test class)
  422. def getargspec(fn):
  423. if sys.version_info.major == 3:
  424. return inspect.getfullargspec(fn)
  425. else:
  426. return inspect.getargspec(fn)
  427. def _pytest_fn_decorator(target):
  428. """Port of langhelpers.decorator with pytest-specific tricks."""
  429. from sqlalchemy.util.langhelpers import format_argspec_plus
  430. from sqlalchemy.util.compat import inspect_getfullargspec
  431. def _exec_code_in_env(code, env, fn_name):
  432. exec(code, env)
  433. return env[fn_name]
  434. def decorate(fn, add_positional_parameters=()):
  435. spec = inspect_getfullargspec(fn)
  436. if add_positional_parameters:
  437. spec.args.extend(add_positional_parameters)
  438. metadata = dict(
  439. __target_fn="__target_fn", __orig_fn="__orig_fn", name=fn.__name__
  440. )
  441. metadata.update(format_argspec_plus(spec, grouped=False))
  442. code = (
  443. """\
  444. def %(name)s(%(args)s):
  445. return %(__target_fn)s(%(__orig_fn)s, %(apply_kw)s)
  446. """
  447. % metadata
  448. )
  449. decorated = _exec_code_in_env(
  450. code, {"__target_fn": target, "__orig_fn": fn}, fn.__name__
  451. )
  452. if not add_positional_parameters:
  453. decorated.__defaults__ = getattr(fn, "__func__", fn).__defaults__
  454. decorated.__wrapped__ = fn
  455. return update_wrapper(decorated, fn)
  456. else:
  457. # this is the pytest hacky part. don't do a full update wrapper
  458. # because pytest is really being sneaky about finding the args
  459. # for the wrapped function
  460. decorated.__module__ = fn.__module__
  461. decorated.__name__ = fn.__name__
  462. if hasattr(fn, "pytestmark"):
  463. decorated.pytestmark = fn.pytestmark
  464. return decorated
  465. return decorate
  466. class PytestFixtureFunctions(plugin_base.FixtureFunctions):
  467. def skip_test_exception(self, *arg, **kw):
  468. return pytest.skip.Exception(*arg, **kw)
  469. def mark_base_test_class(self):
  470. return pytest.mark.usefixtures(
  471. "setup_class_methods", "setup_test_methods"
  472. )
  473. _combination_id_fns = {
  474. "i": lambda obj: obj,
  475. "r": repr,
  476. "s": str,
  477. "n": lambda obj: obj.__name__
  478. if hasattr(obj, "__name__")
  479. else type(obj).__name__,
  480. }
  481. def combinations(self, *arg_sets, **kw):
  482. """Facade for pytest.mark.parametrize.
  483. Automatically derives argument names from the callable which in our
  484. case is always a method on a class with positional arguments.
  485. ids for parameter sets are derived using an optional template.
  486. """
  487. from sqlalchemy.testing import exclusions
  488. if sys.version_info.major == 3:
  489. if len(arg_sets) == 1 and hasattr(arg_sets[0], "__next__"):
  490. arg_sets = list(arg_sets[0])
  491. else:
  492. if len(arg_sets) == 1 and hasattr(arg_sets[0], "next"):
  493. arg_sets = list(arg_sets[0])
  494. argnames = kw.pop("argnames", None)
  495. def _filter_exclusions(args):
  496. result = []
  497. gathered_exclusions = []
  498. for a in args:
  499. if isinstance(a, exclusions.compound):
  500. gathered_exclusions.append(a)
  501. else:
  502. result.append(a)
  503. return result, gathered_exclusions
  504. id_ = kw.pop("id_", None)
  505. tobuild_pytest_params = []
  506. has_exclusions = False
  507. if id_:
  508. _combination_id_fns = self._combination_id_fns
  509. # because itemgetter is not consistent for one argument vs.
  510. # multiple, make it multiple in all cases and use a slice
  511. # to omit the first argument
  512. _arg_getter = operator.itemgetter(
  513. 0,
  514. *[
  515. idx
  516. for idx, char in enumerate(id_)
  517. if char in ("n", "r", "s", "a")
  518. ]
  519. )
  520. fns = [
  521. (operator.itemgetter(idx), _combination_id_fns[char])
  522. for idx, char in enumerate(id_)
  523. if char in _combination_id_fns
  524. ]
  525. for arg in arg_sets:
  526. if not isinstance(arg, tuple):
  527. arg = (arg,)
  528. fn_params, param_exclusions = _filter_exclusions(arg)
  529. parameters = _arg_getter(fn_params)[1:]
  530. if param_exclusions:
  531. has_exclusions = True
  532. tobuild_pytest_params.append(
  533. (
  534. parameters,
  535. param_exclusions,
  536. "-".join(
  537. comb_fn(getter(arg)) for getter, comb_fn in fns
  538. ),
  539. )
  540. )
  541. else:
  542. for arg in arg_sets:
  543. if not isinstance(arg, tuple):
  544. arg = (arg,)
  545. fn_params, param_exclusions = _filter_exclusions(arg)
  546. if param_exclusions:
  547. has_exclusions = True
  548. tobuild_pytest_params.append(
  549. (fn_params, param_exclusions, None)
  550. )
  551. pytest_params = []
  552. for parameters, param_exclusions, id_ in tobuild_pytest_params:
  553. if has_exclusions:
  554. parameters += (param_exclusions,)
  555. param = pytest.param(*parameters, id=id_)
  556. pytest_params.append(param)
  557. def decorate(fn):
  558. if inspect.isclass(fn):
  559. if has_exclusions:
  560. raise NotImplementedError(
  561. "exclusions not supported for class level combinations"
  562. )
  563. if "_sa_parametrize" not in fn.__dict__:
  564. fn._sa_parametrize = []
  565. fn._sa_parametrize.append((argnames, pytest_params))
  566. return fn
  567. else:
  568. if argnames is None:
  569. _argnames = getargspec(fn).args[1:]
  570. else:
  571. _argnames = re.split(r", *", argnames)
  572. if has_exclusions:
  573. _argnames += ["_exclusions"]
  574. @_pytest_fn_decorator
  575. def check_exclusions(fn, *args, **kw):
  576. _exclusions = args[-1]
  577. if _exclusions:
  578. exlu = exclusions.compound().add(*_exclusions)
  579. fn = exlu(fn)
  580. return fn(*args[0:-1], **kw)
  581. def process_metadata(spec):
  582. spec.args.append("_exclusions")
  583. fn = check_exclusions(
  584. fn, add_positional_parameters=("_exclusions",)
  585. )
  586. return pytest.mark.parametrize(_argnames, pytest_params)(fn)
  587. return decorate
  588. def param_ident(self, *parameters):
  589. ident = parameters[0]
  590. return pytest.param(*parameters[1:], id=ident)
  591. def fixture(self, *arg, **kw):
  592. from sqlalchemy.testing import config
  593. from sqlalchemy.testing import asyncio
  594. # wrapping pytest.fixture function. determine if
  595. # decorator was called as @fixture or @fixture().
  596. if len(arg) > 0 and callable(arg[0]):
  597. # was called as @fixture(), we have the function to wrap.
  598. fn = arg[0]
  599. arg = arg[1:]
  600. else:
  601. # was called as @fixture, don't have the function yet.
  602. fn = None
  603. # create a pytest.fixture marker. because the fn is not being
  604. # passed, this is always a pytest.FixtureFunctionMarker()
  605. # object (or whatever pytest is calling it when you read this)
  606. # that is waiting for a function.
  607. fixture = pytest.fixture(*arg, **kw)
  608. # now apply wrappers to the function, including fixture itself
  609. def wrap(fn):
  610. if config.any_async:
  611. fn = asyncio._maybe_async_wrapper(fn)
  612. # other wrappers may be added here
  613. if py2k and "autouse" in kw:
  614. # py2k workaround for too-slow collection of autouse fixtures
  615. # in pytest 4.6.11. See notes in reinvent_fixtures_py2k for
  616. # rationale.
  617. # comment this condition out in order to disable the
  618. # py2k workaround entirely.
  619. reinvent_fixtures_py2k.add_fixture(fn, fixture)
  620. else:
  621. # now apply FixtureFunctionMarker
  622. fn = fixture(fn)
  623. return fn
  624. if fn:
  625. return wrap(fn)
  626. else:
  627. return wrap
  628. def get_current_test_name(self):
  629. return os.environ.get("PYTEST_CURRENT_TEST")
  630. def async_test(self, fn):
  631. from sqlalchemy.testing import asyncio
  632. @_pytest_fn_decorator
  633. def decorate(fn, *args, **kwargs):
  634. asyncio._run_coroutine_function(fn, *args, **kwargs)
  635. return decorate(fn)