_manager.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. from __future__ import annotations
  2. import inspect
  3. import types
  4. from typing import Any
  5. from typing import Callable
  6. from typing import cast
  7. from typing import Final
  8. from typing import Iterable
  9. from typing import Mapping
  10. from typing import Sequence
  11. from typing import TYPE_CHECKING
  12. import warnings
  13. from . import _tracing
  14. from ._callers import _multicall
  15. from ._hooks import _HookImplFunction
  16. from ._hooks import _Namespace
  17. from ._hooks import _Plugin
  18. from ._hooks import _SubsetHookCaller
  19. from ._hooks import HookCaller
  20. from ._hooks import HookImpl
  21. from ._hooks import HookimplOpts
  22. from ._hooks import HookRelay
  23. from ._hooks import HookspecOpts
  24. from ._hooks import normalize_hookimpl_opts
  25. from ._result import Result
  26. if TYPE_CHECKING:
  27. # importtlib.metadata import is slow, defer it.
  28. import importlib.metadata
  29. _BeforeTrace = Callable[[str, Sequence[HookImpl], Mapping[str, Any]], None]
  30. _AfterTrace = Callable[[Result[Any], str, Sequence[HookImpl], Mapping[str, Any]], None]
  31. def _warn_for_function(warning: Warning, function: Callable[..., object]) -> None:
  32. func = cast(types.FunctionType, function)
  33. warnings.warn_explicit(
  34. warning,
  35. type(warning),
  36. lineno=func.__code__.co_firstlineno,
  37. filename=func.__code__.co_filename,
  38. )
  39. class PluginValidationError(Exception):
  40. """Plugin failed validation.
  41. :param plugin: The plugin which failed validation.
  42. :param message: Error message.
  43. """
  44. def __init__(self, plugin: _Plugin, message: str) -> None:
  45. super().__init__(message)
  46. #: The plugin which failed validation.
  47. self.plugin = plugin
  48. class DistFacade:
  49. """Emulate a pkg_resources Distribution"""
  50. def __init__(self, dist: importlib.metadata.Distribution) -> None:
  51. self._dist = dist
  52. @property
  53. def project_name(self) -> str:
  54. name: str = self.metadata["name"]
  55. return name
  56. def __getattr__(self, attr: str, default=None):
  57. return getattr(self._dist, attr, default)
  58. def __dir__(self) -> list[str]:
  59. return sorted(dir(self._dist) + ["_dist", "project_name"])
  60. class PluginManager:
  61. """Core class which manages registration of plugin objects and 1:N hook
  62. calling.
  63. You can register new hooks by calling :meth:`add_hookspecs(module_or_class)
  64. <PluginManager.add_hookspecs>`.
  65. You can register plugin objects (which contain hook implementations) by
  66. calling :meth:`register(plugin) <PluginManager.register>`.
  67. For debugging purposes you can call :meth:`PluginManager.enable_tracing`
  68. which will subsequently send debug information to the trace helper.
  69. :param project_name:
  70. The short project name. Prefer snake case. Make sure it's unique!
  71. """
  72. def __init__(self, project_name: str) -> None:
  73. #: The project name.
  74. self.project_name: Final = project_name
  75. self._name2plugin: Final[dict[str, _Plugin]] = {}
  76. self._plugin_distinfo: Final[list[tuple[_Plugin, DistFacade]]] = []
  77. #: The "hook relay", used to call a hook on all registered plugins.
  78. #: See :ref:`calling`.
  79. self.hook: Final = HookRelay()
  80. #: The tracing entry point. See :ref:`tracing`.
  81. self.trace: Final[_tracing.TagTracerSub] = _tracing.TagTracer().get(
  82. "pluginmanage"
  83. )
  84. self._inner_hookexec = _multicall
  85. def _hookexec(
  86. self,
  87. hook_name: str,
  88. methods: Sequence[HookImpl],
  89. kwargs: Mapping[str, object],
  90. firstresult: bool,
  91. ) -> object | list[object]:
  92. # called from all hookcaller instances.
  93. # enable_tracing will set its own wrapping function at self._inner_hookexec
  94. return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
  95. def register(self, plugin: _Plugin, name: str | None = None) -> str | None:
  96. """Register a plugin and return its name.
  97. :param name:
  98. The name under which to register the plugin. If not specified, a
  99. name is generated using :func:`get_canonical_name`.
  100. :returns:
  101. The plugin name. If the name is blocked from registering, returns
  102. ``None``.
  103. If the plugin is already registered, raises a :exc:`ValueError`.
  104. """
  105. plugin_name = name or self.get_canonical_name(plugin)
  106. if plugin_name in self._name2plugin:
  107. if self._name2plugin.get(plugin_name, -1) is None:
  108. return None # blocked plugin, return None to indicate no registration
  109. raise ValueError(
  110. "Plugin name already registered: %s=%s\n%s"
  111. % (plugin_name, plugin, self._name2plugin)
  112. )
  113. if plugin in self._name2plugin.values():
  114. raise ValueError(
  115. "Plugin already registered under a different name: %s=%s\n%s"
  116. % (plugin_name, plugin, self._name2plugin)
  117. )
  118. # XXX if an error happens we should make sure no state has been
  119. # changed at point of return
  120. self._name2plugin[plugin_name] = plugin
  121. # register matching hook implementations of the plugin
  122. for name in dir(plugin):
  123. hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
  124. if hookimpl_opts is not None:
  125. normalize_hookimpl_opts(hookimpl_opts)
  126. method: _HookImplFunction[object] = getattr(plugin, name)
  127. hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
  128. name = hookimpl_opts.get("specname") or name
  129. hook: HookCaller | None = getattr(self.hook, name, None)
  130. if hook is None:
  131. hook = HookCaller(name, self._hookexec)
  132. setattr(self.hook, name, hook)
  133. elif hook.has_spec():
  134. self._verify_hook(hook, hookimpl)
  135. hook._maybe_apply_history(hookimpl)
  136. hook._add_hookimpl(hookimpl)
  137. return plugin_name
  138. def parse_hookimpl_opts(self, plugin: _Plugin, name: str) -> HookimplOpts | None:
  139. """Try to obtain a hook implementation from an item with the given name
  140. in the given plugin which is being searched for hook impls.
  141. :returns:
  142. The parsed hookimpl options, or None to skip the given item.
  143. This method can be overridden by ``PluginManager`` subclasses to
  144. customize how hook implementation are picked up. By default, returns the
  145. options for items decorated with :class:`HookimplMarker`.
  146. """
  147. method: object = getattr(plugin, name)
  148. if not inspect.isroutine(method):
  149. return None
  150. try:
  151. res: HookimplOpts | None = getattr(
  152. method, self.project_name + "_impl", None
  153. )
  154. except Exception:
  155. res = {} # type: ignore[assignment]
  156. if res is not None and not isinstance(res, dict):
  157. # false positive
  158. res = None # type:ignore[unreachable]
  159. return res
  160. def unregister(
  161. self, plugin: _Plugin | None = None, name: str | None = None
  162. ) -> Any | None:
  163. """Unregister a plugin and all of its hook implementations.
  164. The plugin can be specified either by the plugin object or the plugin
  165. name. If both are specified, they must agree.
  166. Returns the unregistered plugin, or ``None`` if not found.
  167. """
  168. if name is None:
  169. assert plugin is not None, "one of name or plugin needs to be specified"
  170. name = self.get_name(plugin)
  171. assert name is not None, "plugin is not registered"
  172. if plugin is None:
  173. plugin = self.get_plugin(name)
  174. if plugin is None:
  175. return None
  176. hookcallers = self.get_hookcallers(plugin)
  177. if hookcallers:
  178. for hookcaller in hookcallers:
  179. hookcaller._remove_plugin(plugin)
  180. # if self._name2plugin[name] == None registration was blocked: ignore
  181. if self._name2plugin.get(name):
  182. assert name is not None
  183. del self._name2plugin[name]
  184. return plugin
  185. def set_blocked(self, name: str) -> None:
  186. """Block registrations of the given name, unregister if already registered."""
  187. self.unregister(name=name)
  188. self._name2plugin[name] = None
  189. def is_blocked(self, name: str) -> bool:
  190. """Return whether the given plugin name is blocked."""
  191. return name in self._name2plugin and self._name2plugin[name] is None
  192. def unblock(self, name: str) -> bool:
  193. """Unblocks a name.
  194. Returns whether the name was actually blocked.
  195. """
  196. if self._name2plugin.get(name, -1) is None:
  197. del self._name2plugin[name]
  198. return True
  199. return False
  200. def add_hookspecs(self, module_or_class: _Namespace) -> None:
  201. """Add new hook specifications defined in the given ``module_or_class``.
  202. Functions are recognized as hook specifications if they have been
  203. decorated with a matching :class:`HookspecMarker`.
  204. """
  205. names = []
  206. for name in dir(module_or_class):
  207. spec_opts = self.parse_hookspec_opts(module_or_class, name)
  208. if spec_opts is not None:
  209. hc: HookCaller | None = getattr(self.hook, name, None)
  210. if hc is None:
  211. hc = HookCaller(name, self._hookexec, module_or_class, spec_opts)
  212. setattr(self.hook, name, hc)
  213. else:
  214. # Plugins registered this hook without knowing the spec.
  215. hc.set_specification(module_or_class, spec_opts)
  216. for hookfunction in hc.get_hookimpls():
  217. self._verify_hook(hc, hookfunction)
  218. names.append(name)
  219. if not names:
  220. raise ValueError(
  221. f"did not find any {self.project_name!r} hooks in {module_or_class!r}"
  222. )
  223. def parse_hookspec_opts(
  224. self, module_or_class: _Namespace, name: str
  225. ) -> HookspecOpts | None:
  226. """Try to obtain a hook specification from an item with the given name
  227. in the given module or class which is being searched for hook specs.
  228. :returns:
  229. The parsed hookspec options for defining a hook, or None to skip the
  230. given item.
  231. This method can be overridden by ``PluginManager`` subclasses to
  232. customize how hook specifications are picked up. By default, returns the
  233. options for items decorated with :class:`HookspecMarker`.
  234. """
  235. method = getattr(module_or_class, name)
  236. opts: HookspecOpts | None = getattr(method, self.project_name + "_spec", None)
  237. return opts
  238. def get_plugins(self) -> set[Any]:
  239. """Return a set of all registered plugin objects."""
  240. return {x for x in self._name2plugin.values() if x is not None}
  241. def is_registered(self, plugin: _Plugin) -> bool:
  242. """Return whether the plugin is already registered."""
  243. return any(plugin == val for val in self._name2plugin.values())
  244. def get_canonical_name(self, plugin: _Plugin) -> str:
  245. """Return a canonical name for a plugin object.
  246. Note that a plugin may be registered under a different name
  247. specified by the caller of :meth:`register(plugin, name) <register>`.
  248. To obtain the name of a registered plugin use :meth:`get_name(plugin)
  249. <get_name>` instead.
  250. """
  251. name: str | None = getattr(plugin, "__name__", None)
  252. return name or str(id(plugin))
  253. def get_plugin(self, name: str) -> Any | None:
  254. """Return the plugin registered under the given name, if any."""
  255. return self._name2plugin.get(name)
  256. def has_plugin(self, name: str) -> bool:
  257. """Return whether a plugin with the given name is registered."""
  258. return self.get_plugin(name) is not None
  259. def get_name(self, plugin: _Plugin) -> str | None:
  260. """Return the name the plugin is registered under, or ``None`` if
  261. is isn't."""
  262. for name, val in self._name2plugin.items():
  263. if plugin == val:
  264. return name
  265. return None
  266. def _verify_hook(self, hook: HookCaller, hookimpl: HookImpl) -> None:
  267. if hook.is_historic() and (hookimpl.hookwrapper or hookimpl.wrapper):
  268. raise PluginValidationError(
  269. hookimpl.plugin,
  270. "Plugin %r\nhook %r\nhistoric incompatible with yield/wrapper/hookwrapper"
  271. % (hookimpl.plugin_name, hook.name),
  272. )
  273. assert hook.spec is not None
  274. if hook.spec.warn_on_impl:
  275. _warn_for_function(hook.spec.warn_on_impl, hookimpl.function)
  276. # positional arg checking
  277. notinspec = set(hookimpl.argnames) - set(hook.spec.argnames)
  278. if notinspec:
  279. raise PluginValidationError(
  280. hookimpl.plugin,
  281. "Plugin %r for hook %r\nhookimpl definition: %s\n"
  282. "Argument(s) %s are declared in the hookimpl but "
  283. "can not be found in the hookspec"
  284. % (
  285. hookimpl.plugin_name,
  286. hook.name,
  287. _formatdef(hookimpl.function),
  288. notinspec,
  289. ),
  290. )
  291. if hook.spec.warn_on_impl_args:
  292. for hookimpl_argname in hookimpl.argnames:
  293. argname_warning = hook.spec.warn_on_impl_args.get(hookimpl_argname)
  294. if argname_warning is not None:
  295. _warn_for_function(argname_warning, hookimpl.function)
  296. if (
  297. hookimpl.wrapper or hookimpl.hookwrapper
  298. ) and not inspect.isgeneratorfunction(hookimpl.function):
  299. raise PluginValidationError(
  300. hookimpl.plugin,
  301. "Plugin %r for hook %r\nhookimpl definition: %s\n"
  302. "Declared as wrapper=True or hookwrapper=True "
  303. "but function is not a generator function"
  304. % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
  305. )
  306. if hookimpl.wrapper and hookimpl.hookwrapper:
  307. raise PluginValidationError(
  308. hookimpl.plugin,
  309. "Plugin %r for hook %r\nhookimpl definition: %s\n"
  310. "The wrapper=True and hookwrapper=True options are mutually exclusive"
  311. % (hookimpl.plugin_name, hook.name, _formatdef(hookimpl.function)),
  312. )
  313. def check_pending(self) -> None:
  314. """Verify that all hooks which have not been verified against a
  315. hook specification are optional, otherwise raise
  316. :exc:`PluginValidationError`."""
  317. for name in self.hook.__dict__:
  318. if name[0] != "_":
  319. hook: HookCaller = getattr(self.hook, name)
  320. if not hook.has_spec():
  321. for hookimpl in hook.get_hookimpls():
  322. if not hookimpl.optionalhook:
  323. raise PluginValidationError(
  324. hookimpl.plugin,
  325. "unknown hook %r in plugin %r"
  326. % (name, hookimpl.plugin),
  327. )
  328. def load_setuptools_entrypoints(self, group: str, name: str | None = None) -> int:
  329. """Load modules from querying the specified setuptools ``group``.
  330. :param group:
  331. Entry point group to load plugins.
  332. :param name:
  333. If given, loads only plugins with the given ``name``.
  334. :return:
  335. The number of plugins loaded by this call.
  336. """
  337. import importlib.metadata
  338. count = 0
  339. for dist in list(importlib.metadata.distributions()):
  340. for ep in dist.entry_points:
  341. if (
  342. ep.group != group
  343. or (name is not None and ep.name != name)
  344. # already registered
  345. or self.get_plugin(ep.name)
  346. or self.is_blocked(ep.name)
  347. ):
  348. continue
  349. plugin = ep.load()
  350. self.register(plugin, name=ep.name)
  351. self._plugin_distinfo.append((plugin, DistFacade(dist)))
  352. count += 1
  353. return count
  354. def list_plugin_distinfo(self) -> list[tuple[_Plugin, DistFacade]]:
  355. """Return a list of (plugin, distinfo) pairs for all
  356. setuptools-registered plugins."""
  357. return list(self._plugin_distinfo)
  358. def list_name_plugin(self) -> list[tuple[str, _Plugin]]:
  359. """Return a list of (name, plugin) pairs for all registered plugins."""
  360. return list(self._name2plugin.items())
  361. def get_hookcallers(self, plugin: _Plugin) -> list[HookCaller] | None:
  362. """Get all hook callers for the specified plugin.
  363. :returns:
  364. The hook callers, or ``None`` if ``plugin`` is not registered in
  365. this plugin manager.
  366. """
  367. if self.get_name(plugin) is None:
  368. return None
  369. hookcallers = []
  370. for hookcaller in self.hook.__dict__.values():
  371. for hookimpl in hookcaller.get_hookimpls():
  372. if hookimpl.plugin is plugin:
  373. hookcallers.append(hookcaller)
  374. return hookcallers
  375. def add_hookcall_monitoring(
  376. self, before: _BeforeTrace, after: _AfterTrace
  377. ) -> Callable[[], None]:
  378. """Add before/after tracing functions for all hooks.
  379. Returns an undo function which, when called, removes the added tracers.
  380. ``before(hook_name, hook_impls, kwargs)`` will be called ahead
  381. of all hook calls and receive a hookcaller instance, a list
  382. of HookImpl instances and the keyword arguments for the hook call.
  383. ``after(outcome, hook_name, hook_impls, kwargs)`` receives the
  384. same arguments as ``before`` but also a :class:`~pluggy.Result` object
  385. which represents the result of the overall hook call.
  386. """
  387. oldcall = self._inner_hookexec
  388. def traced_hookexec(
  389. hook_name: str,
  390. hook_impls: Sequence[HookImpl],
  391. caller_kwargs: Mapping[str, object],
  392. firstresult: bool,
  393. ) -> object | list[object]:
  394. before(hook_name, hook_impls, caller_kwargs)
  395. outcome = Result.from_call(
  396. lambda: oldcall(hook_name, hook_impls, caller_kwargs, firstresult)
  397. )
  398. after(outcome, hook_name, hook_impls, caller_kwargs)
  399. return outcome.get_result()
  400. self._inner_hookexec = traced_hookexec
  401. def undo() -> None:
  402. self._inner_hookexec = oldcall
  403. return undo
  404. def enable_tracing(self) -> Callable[[], None]:
  405. """Enable tracing of hook calls.
  406. Returns an undo function which, when called, removes the added tracing.
  407. """
  408. hooktrace = self.trace.root.get("hook")
  409. def before(
  410. hook_name: str, methods: Sequence[HookImpl], kwargs: Mapping[str, object]
  411. ) -> None:
  412. hooktrace.root.indent += 1
  413. hooktrace(hook_name, kwargs)
  414. def after(
  415. outcome: Result[object],
  416. hook_name: str,
  417. methods: Sequence[HookImpl],
  418. kwargs: Mapping[str, object],
  419. ) -> None:
  420. if outcome.exception is None:
  421. hooktrace("finish", hook_name, "-->", outcome.get_result())
  422. hooktrace.root.indent -= 1
  423. return self.add_hookcall_monitoring(before, after)
  424. def subset_hook_caller(
  425. self, name: str, remove_plugins: Iterable[_Plugin]
  426. ) -> HookCaller:
  427. """Return a proxy :class:`~pluggy.HookCaller` instance for the named
  428. method which manages calls to all registered plugins except the ones
  429. from remove_plugins."""
  430. orig: HookCaller = getattr(self.hook, name)
  431. plugins_to_remove = {plug for plug in remove_plugins if hasattr(plug, name)}
  432. if plugins_to_remove:
  433. return _SubsetHookCaller(orig, plugins_to_remove)
  434. return orig
  435. def _formatdef(func: Callable[..., object]) -> str:
  436. return f"{func.__name__}{inspect.signature(func)}"