providers_manager.py 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Manages all providers."""
  19. from __future__ import annotations
  20. import fnmatch
  21. import functools
  22. import inspect
  23. import json
  24. import logging
  25. import os
  26. import sys
  27. import traceback
  28. import warnings
  29. from dataclasses import dataclass
  30. from functools import wraps
  31. from time import perf_counter
  32. from typing import TYPE_CHECKING, Any, Callable, MutableMapping, NamedTuple, NoReturn, TypeVar
  33. from packaging.utils import canonicalize_name
  34. from airflow.exceptions import AirflowOptionalProviderFeatureException
  35. from airflow.hooks.filesystem import FSHook
  36. from airflow.hooks.package_index import PackageIndexHook
  37. from airflow.typing_compat import ParamSpec
  38. from airflow.utils import yaml
  39. from airflow.utils.entry_points import entry_points_with_dist
  40. from airflow.utils.log.logging_mixin import LoggingMixin
  41. from airflow.utils.module_loading import import_string
  42. from airflow.utils.singleton import Singleton
  43. log = logging.getLogger(__name__)
  44. if sys.version_info >= (3, 9):
  45. from importlib.resources import files as resource_files
  46. else:
  47. from importlib_resources import files as resource_files
  48. PS = ParamSpec("PS")
  49. RT = TypeVar("RT")
  50. MIN_PROVIDER_VERSIONS = {
  51. "apache-airflow-providers-celery": "2.1.0",
  52. }
  53. def _ensure_prefix_for_placeholders(field_behaviors: dict[str, Any], conn_type: str):
  54. """
  55. Verify the correct placeholder prefix.
  56. If the given field_behaviors dict contains a placeholder's node, and there
  57. are placeholders for extra fields (i.e. anything other than the built-in conn
  58. attrs), and if those extra fields are unprefixed, then add the prefix.
  59. The reason we need to do this is, all custom conn fields live in the same dictionary,
  60. so we need to namespace them with a prefix internally. But for user convenience,
  61. and consistency between the `get_ui_field_behaviour` method and the extra dict itself,
  62. we allow users to supply the unprefixed name.
  63. """
  64. conn_attrs = {"host", "schema", "login", "password", "port", "extra"}
  65. def ensure_prefix(field):
  66. if field not in conn_attrs and not field.startswith("extra__"):
  67. return f"extra__{conn_type}__{field}"
  68. else:
  69. return field
  70. if "placeholders" in field_behaviors:
  71. placeholders = field_behaviors["placeholders"]
  72. field_behaviors["placeholders"] = {ensure_prefix(k): v for k, v in placeholders.items()}
  73. return field_behaviors
  74. if TYPE_CHECKING:
  75. from urllib.parse import SplitResult
  76. from airflow.datasets import Dataset
  77. from airflow.decorators.base import TaskDecorator
  78. from airflow.hooks.base import BaseHook
  79. from airflow.typing_compat import Literal
  80. class LazyDictWithCache(MutableMapping):
  81. """
  82. Lazy-loaded cached dictionary.
  83. Dictionary, which in case you set callable, executes the passed callable with `key` attribute
  84. at first use - and returns and caches the result.
  85. """
  86. __slots__ = ["_resolved", "_raw_dict"]
  87. def __init__(self, *args, **kw):
  88. self._resolved = set()
  89. self._raw_dict = dict(*args, **kw)
  90. def __setitem__(self, key, value):
  91. self._raw_dict.__setitem__(key, value)
  92. def __getitem__(self, key):
  93. value = self._raw_dict.__getitem__(key)
  94. if key not in self._resolved and callable(value):
  95. # exchange callable with result of calling it -- but only once! allow resolver to return a
  96. # callable itself
  97. value = value()
  98. self._resolved.add(key)
  99. self._raw_dict.__setitem__(key, value)
  100. return value
  101. def __delitem__(self, key):
  102. try:
  103. self._resolved.remove(key)
  104. except KeyError:
  105. pass
  106. self._raw_dict.__delitem__(key)
  107. def __iter__(self):
  108. return iter(self._raw_dict)
  109. def __len__(self):
  110. return len(self._raw_dict)
  111. def __contains__(self, key):
  112. return key in self._raw_dict
  113. def clear(self):
  114. self._resolved.clear()
  115. self._raw_dict.clear()
  116. def _read_schema_from_resources_or_local_file(filename: str) -> dict:
  117. try:
  118. with resource_files("airflow").joinpath(filename).open("rb") as f:
  119. schema = json.load(f)
  120. except (TypeError, FileNotFoundError):
  121. import pathlib
  122. with (pathlib.Path(__file__).parent / filename).open("rb") as f:
  123. schema = json.load(f)
  124. return schema
  125. def _create_provider_info_schema_validator():
  126. """Create JSON schema validator from the provider_info.schema.json."""
  127. import jsonschema
  128. schema = _read_schema_from_resources_or_local_file("provider_info.schema.json")
  129. cls = jsonschema.validators.validator_for(schema)
  130. validator = cls(schema)
  131. return validator
  132. def _create_customized_form_field_behaviours_schema_validator():
  133. """Create JSON schema validator from the customized_form_field_behaviours.schema.json."""
  134. import jsonschema
  135. schema = _read_schema_from_resources_or_local_file("customized_form_field_behaviours.schema.json")
  136. cls = jsonschema.validators.validator_for(schema)
  137. validator = cls(schema)
  138. return validator
  139. def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool:
  140. if provider_package.startswith("apache-airflow"):
  141. provider_path = provider_package[len("apache-") :].replace("-", ".")
  142. if not class_name.startswith(provider_path):
  143. log.warning(
  144. "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'",
  145. class_name,
  146. provider_package,
  147. provider_path,
  148. )
  149. return False
  150. return True
  151. @dataclass
  152. class ProviderInfo:
  153. """
  154. Provider information.
  155. :param version: version string
  156. :param data: dictionary with information about the provider
  157. :param source_or_package: whether the provider is source files or PyPI package. When installed from
  158. sources we suppress provider import errors.
  159. """
  160. version: str
  161. data: dict
  162. package_or_source: Literal["source"] | Literal["package"]
  163. def __post_init__(self):
  164. if self.package_or_source not in ("source", "package"):
  165. raise ValueError(
  166. f"Received {self.package_or_source!r} for `package_or_source`. "
  167. "Must be either 'package' or 'source'."
  168. )
  169. self.is_source = self.package_or_source == "source"
  170. class HookClassProvider(NamedTuple):
  171. """Hook class and Provider it comes from."""
  172. hook_class_name: str
  173. package_name: str
  174. class TriggerInfo(NamedTuple):
  175. """Trigger class and provider it comes from."""
  176. trigger_class_name: str
  177. package_name: str
  178. integration_name: str
  179. class NotificationInfo(NamedTuple):
  180. """Notification class and provider it comes from."""
  181. notification_class_name: str
  182. package_name: str
  183. class PluginInfo(NamedTuple):
  184. """Plugin class, name and provider it comes from."""
  185. name: str
  186. plugin_class: str
  187. provider_name: str
  188. class HookInfo(NamedTuple):
  189. """Hook information."""
  190. hook_class_name: str
  191. connection_id_attribute_name: str
  192. package_name: str
  193. hook_name: str
  194. connection_type: str
  195. connection_testable: bool
  196. class ConnectionFormWidgetInfo(NamedTuple):
  197. """Connection Form Widget information."""
  198. hook_class_name: str
  199. package_name: str
  200. field: Any
  201. field_name: str
  202. is_sensitive: bool
  203. def log_debug_import_from_sources(class_name, e, provider_package):
  204. """Log debug imports from sources."""
  205. log.debug(
  206. "Optional feature disabled on exception when importing '%s' from '%s' package",
  207. class_name,
  208. provider_package,
  209. exc_info=e,
  210. )
  211. def log_optional_feature_disabled(class_name, e, provider_package):
  212. """Log optional feature disabled."""
  213. log.debug(
  214. "Optional feature disabled on exception when importing '%s' from '%s' package",
  215. class_name,
  216. provider_package,
  217. exc_info=e,
  218. )
  219. log.info(
  220. "Optional provider feature disabled when importing '%s' from '%s' package",
  221. class_name,
  222. provider_package,
  223. )
  224. def log_import_warning(class_name, e, provider_package):
  225. """Log import warning."""
  226. log.warning(
  227. "Exception when importing '%s' from '%s' package",
  228. class_name,
  229. provider_package,
  230. exc_info=e,
  231. )
  232. # This is a temporary measure until all community providers will add AirflowOptionalProviderFeatureException
  233. # where they have optional features. We are going to add tests in our CI to catch all such cases and will
  234. # fix them, but until now all "known unhandled optional feature errors" from community providers
  235. # should be added here
  236. KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS = [("apache-airflow-providers-google", "No module named 'paramiko'")]
  237. def _correctness_check(provider_package: str, class_name: str, provider_info: ProviderInfo) -> Any:
  238. """
  239. Perform coherence check on provider classes.
  240. For apache-airflow providers - it checks if it starts with appropriate package. For all providers
  241. it tries to import the provider - checking that there are no exceptions during importing.
  242. It logs appropriate warning in case it detects any problems.
  243. :param provider_package: name of the provider package
  244. :param class_name: name of the class to import
  245. :return the class if the class is OK, None otherwise.
  246. """
  247. if not _check_builtin_provider_prefix(provider_package, class_name):
  248. return None
  249. try:
  250. imported_class = import_string(class_name)
  251. except AirflowOptionalProviderFeatureException as e:
  252. # When the provider class raises AirflowOptionalProviderFeatureException
  253. # this is an expected case when only some classes in provider are
  254. # available. We just log debug level here and print info message in logs so that
  255. # the user is aware of it
  256. log_optional_feature_disabled(class_name, e, provider_package)
  257. return None
  258. except ImportError as e:
  259. if provider_info.is_source:
  260. # When we have providers from sources, then we just turn all import logs to debug logs
  261. # As this is pretty expected that you have a number of dependencies not installed
  262. # (we always have all providers from sources until we split providers to separate repo)
  263. log_debug_import_from_sources(class_name, e, provider_package)
  264. return None
  265. if "No module named 'airflow.providers." in e.msg:
  266. # handle cases where another provider is missing. This can only happen if
  267. # there is an optional feature, so we log debug and print information about it
  268. log_optional_feature_disabled(class_name, e, provider_package)
  269. return None
  270. for known_error in KNOWN_UNHANDLED_OPTIONAL_FEATURE_ERRORS:
  271. # Until we convert all providers to use AirflowOptionalProviderFeatureException
  272. # we assume any problem with importing another "provider" is because this is an
  273. # optional feature, so we log debug and print information about it
  274. if known_error[0] == provider_package and known_error[1] in e.msg:
  275. log_optional_feature_disabled(class_name, e, provider_package)
  276. return None
  277. # But when we have no idea - we print warning to logs
  278. log_import_warning(class_name, e, provider_package)
  279. return None
  280. except Exception as e:
  281. log_import_warning(class_name, e, provider_package)
  282. return None
  283. return imported_class
  284. # We want to have better control over initialization of parameters and be able to debug and test it
  285. # So we add our own decorator
  286. def provider_info_cache(cache_name: str) -> Callable[[Callable[PS, NoReturn]], Callable[PS, None]]:
  287. """
  288. Decorate and cache provider info.
  289. Decorator factory that create decorator that caches initialization of provider's parameters
  290. :param cache_name: Name of the cache
  291. """
  292. def provider_info_cache_decorator(func: Callable[PS, NoReturn]) -> Callable[PS, None]:
  293. @wraps(func)
  294. def wrapped_function(*args: PS.args, **kwargs: PS.kwargs) -> None:
  295. providers_manager_instance = args[0]
  296. if TYPE_CHECKING:
  297. assert isinstance(providers_manager_instance, ProvidersManager)
  298. if cache_name in providers_manager_instance._initialized_cache:
  299. return
  300. start_time = perf_counter()
  301. log.debug("Initializing Providers Manager[%s]", cache_name)
  302. func(*args, **kwargs)
  303. providers_manager_instance._initialized_cache[cache_name] = True
  304. log.debug(
  305. "Initialization of Providers Manager[%s] took %.2f seconds",
  306. cache_name,
  307. perf_counter() - start_time,
  308. )
  309. return wrapped_function
  310. return provider_info_cache_decorator
  311. class ProvidersManager(LoggingMixin, metaclass=Singleton):
  312. """
  313. Manages all provider packages.
  314. This is a Singleton class. The first time it is
  315. instantiated, it discovers all available providers in installed packages and
  316. local source folders (if airflow is run from sources).
  317. """
  318. resource_version = "0"
  319. _initialized: bool = False
  320. _initialization_stack_trace = None
  321. @staticmethod
  322. def initialized() -> bool:
  323. return ProvidersManager._initialized
  324. @staticmethod
  325. def initialization_stack_trace() -> str | None:
  326. return ProvidersManager._initialization_stack_trace
  327. def __init__(self):
  328. """Initialize the manager."""
  329. super().__init__()
  330. ProvidersManager._initialized = True
  331. ProvidersManager._initialization_stack_trace = "".join(traceback.format_stack(inspect.currentframe()))
  332. self._initialized_cache: dict[str, bool] = {}
  333. # Keeps dict of providers keyed by module name
  334. self._provider_dict: dict[str, ProviderInfo] = {}
  335. # Keeps dict of hooks keyed by connection type
  336. self._hooks_dict: dict[str, HookInfo] = {}
  337. self._fs_set: set[str] = set()
  338. self._dataset_uri_handlers: dict[str, Callable[[SplitResult], SplitResult]] = {}
  339. self._dataset_factories: dict[str, Callable[..., Dataset]] = {}
  340. self._dataset_to_openlineage_converters: dict[str, Callable] = {}
  341. self._taskflow_decorators: dict[str, Callable] = LazyDictWithCache() # type: ignore[assignment]
  342. # keeps mapping between connection_types and hook class, package they come from
  343. self._hook_provider_dict: dict[str, HookClassProvider] = {}
  344. # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time
  345. self._hooks_lazy_dict: LazyDictWithCache[str, HookInfo | Callable] = LazyDictWithCache()
  346. # Keeps methods that should be used to add custom widgets tuple of keyed by name of the extra field
  347. self._connection_form_widgets: dict[str, ConnectionFormWidgetInfo] = {}
  348. # Customizations for javascript fields are kept here
  349. self._field_behaviours: dict[str, dict] = {}
  350. self._extra_link_class_name_set: set[str] = set()
  351. self._logging_class_name_set: set[str] = set()
  352. self._auth_manager_class_name_set: set[str] = set()
  353. self._secrets_backend_class_name_set: set[str] = set()
  354. self._executor_class_name_set: set[str] = set()
  355. self._provider_configs: dict[str, dict[str, Any]] = {}
  356. self._api_auth_backend_module_names: set[str] = set()
  357. self._trigger_info_set: set[TriggerInfo] = set()
  358. self._notification_info_set: set[NotificationInfo] = set()
  359. self._provider_schema_validator = _create_provider_info_schema_validator()
  360. self._customized_form_fields_schema_validator = (
  361. _create_customized_form_field_behaviours_schema_validator()
  362. )
  363. # Set of plugins contained in providers
  364. self._plugins_set: set[PluginInfo] = set()
  365. self._init_airflow_core_hooks()
  366. def _init_airflow_core_hooks(self):
  367. """Initialize the hooks dict with default hooks from Airflow core."""
  368. core_dummy_hooks = {
  369. "generic": "Generic",
  370. "email": "Email",
  371. }
  372. for key, display in core_dummy_hooks.items():
  373. self._hooks_lazy_dict[key] = HookInfo(
  374. hook_class_name=None,
  375. connection_id_attribute_name=None,
  376. package_name=None,
  377. hook_name=display,
  378. connection_type=None,
  379. connection_testable=False,
  380. )
  381. for cls in [FSHook, PackageIndexHook]:
  382. package_name = cls.__module__
  383. hook_class_name = f"{cls.__module__}.{cls.__name__}"
  384. hook_info = self._import_hook(
  385. connection_type=None,
  386. provider_info=None,
  387. hook_class_name=hook_class_name,
  388. package_name=package_name,
  389. )
  390. self._hook_provider_dict[hook_info.connection_type] = HookClassProvider(
  391. hook_class_name=hook_class_name, package_name=package_name
  392. )
  393. self._hooks_lazy_dict[hook_info.connection_type] = hook_info
  394. @provider_info_cache("list")
  395. def initialize_providers_list(self):
  396. """Lazy initialization of providers list."""
  397. # Local source folders are loaded first. They should take precedence over the package ones for
  398. # Development purpose. In production provider.yaml files are not present in the 'airflow" directory
  399. # So there is no risk we are going to override package provider accidentally. This can only happen
  400. # in case of local development
  401. self._discover_all_airflow_builtin_providers_from_local_sources()
  402. self._discover_all_providers_from_packages()
  403. self._verify_all_providers_all_compatible()
  404. self._provider_dict = dict(sorted(self._provider_dict.items()))
  405. def _verify_all_providers_all_compatible(self):
  406. from packaging import version as packaging_version
  407. for provider_id, info in self._provider_dict.items():
  408. min_version = MIN_PROVIDER_VERSIONS.get(provider_id)
  409. if min_version:
  410. if packaging_version.parse(min_version) > packaging_version.parse(info.version):
  411. log.warning(
  412. "The package %s is not compatible with this version of Airflow. "
  413. "The package has version %s but the minimum supported version "
  414. "of the package is %s",
  415. provider_id,
  416. info.version,
  417. min_version,
  418. )
  419. @provider_info_cache("hooks")
  420. def initialize_providers_hooks(self):
  421. """Lazy initialization of providers hooks."""
  422. self.initialize_providers_list()
  423. self._discover_hooks()
  424. self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items()))
  425. @provider_info_cache("filesystems")
  426. def initialize_providers_filesystems(self):
  427. """Lazy initialization of providers filesystems."""
  428. self.initialize_providers_list()
  429. self._discover_filesystems()
  430. @provider_info_cache("dataset_uris")
  431. def initialize_providers_dataset_uri_resources(self):
  432. """Lazy initialization of provider dataset URI handlers, factories, converters etc."""
  433. self.initialize_providers_list()
  434. self._discover_dataset_uri_resources()
  435. @provider_info_cache("taskflow_decorators")
  436. def initialize_providers_taskflow_decorator(self):
  437. """Lazy initialization of providers hooks."""
  438. self.initialize_providers_list()
  439. self._discover_taskflow_decorators()
  440. @provider_info_cache("extra_links")
  441. def initialize_providers_extra_links(self):
  442. """Lazy initialization of providers extra links."""
  443. self.initialize_providers_list()
  444. self._discover_extra_links()
  445. @provider_info_cache("logging")
  446. def initialize_providers_logging(self):
  447. """Lazy initialization of providers logging information."""
  448. self.initialize_providers_list()
  449. self._discover_logging()
  450. @provider_info_cache("secrets_backends")
  451. def initialize_providers_secrets_backends(self):
  452. """Lazy initialization of providers secrets_backends information."""
  453. self.initialize_providers_list()
  454. self._discover_secrets_backends()
  455. @provider_info_cache("executors")
  456. def initialize_providers_executors(self):
  457. """Lazy initialization of providers executors information."""
  458. self.initialize_providers_list()
  459. self._discover_executors()
  460. @provider_info_cache("notifications")
  461. def initialize_providers_notifications(self):
  462. """Lazy initialization of providers notifications information."""
  463. self.initialize_providers_list()
  464. self._discover_notifications()
  465. @provider_info_cache("auth_managers")
  466. def initialize_providers_auth_managers(self):
  467. """Lazy initialization of providers notifications information."""
  468. self.initialize_providers_list()
  469. self._discover_auth_managers()
  470. @provider_info_cache("config")
  471. def initialize_providers_configuration(self):
  472. """Lazy initialization of providers configuration information."""
  473. self._initialize_providers_configuration()
  474. def _initialize_providers_configuration(self):
  475. """
  476. Initialize providers configuration information.
  477. Should be used if we do not want to trigger caching for ``initialize_providers_configuration`` method.
  478. In some cases we might want to make sure that the configuration is initialized, but we do not want
  479. to cache the initialization method - for example when we just want to write configuration with
  480. providers, but it is used in the context where no providers are loaded yet we will eventually
  481. restore the original configuration and we want the subsequent ``initialize_providers_configuration``
  482. method to be run in order to load the configuration for providers again.
  483. """
  484. self.initialize_providers_list()
  485. self._discover_config()
  486. # Now update conf with the new provider configuration from providers
  487. from airflow.configuration import conf
  488. conf.load_providers_configuration()
  489. @provider_info_cache("auth_backends")
  490. def initialize_providers_auth_backends(self):
  491. """Lazy initialization of providers API auth_backends information."""
  492. self.initialize_providers_list()
  493. self._discover_auth_backends()
  494. @provider_info_cache("plugins")
  495. def initialize_providers_plugins(self):
  496. self.initialize_providers_list()
  497. self._discover_plugins()
  498. def _discover_all_providers_from_packages(self) -> None:
  499. """
  500. Discover all providers by scanning packages installed.
  501. The list of providers should be returned via the 'apache_airflow_provider'
  502. entrypoint as a dictionary conforming to the 'airflow/provider_info.schema.json'
  503. schema. Note that the schema is different at runtime than provider.yaml.schema.json.
  504. The development version of provider schema is more strict and changes together with
  505. the code. The runtime version is more relaxed (allows for additional properties)
  506. and verifies only the subset of fields that are needed at runtime.
  507. """
  508. for entry_point, dist in entry_points_with_dist("apache_airflow_provider"):
  509. package_name = canonicalize_name(dist.metadata["name"])
  510. if package_name in self._provider_dict:
  511. continue
  512. log.debug("Loading %s from package %s", entry_point, package_name)
  513. version = dist.version
  514. provider_info = entry_point.load()()
  515. self._provider_schema_validator.validate(provider_info)
  516. provider_info_package_name = provider_info["package-name"]
  517. if package_name != provider_info_package_name:
  518. raise ValueError(
  519. f"The package '{package_name}' from setuptools and "
  520. f"{provider_info_package_name} do not match. Please make sure they are aligned"
  521. )
  522. if package_name not in self._provider_dict:
  523. self._provider_dict[package_name] = ProviderInfo(version, provider_info, "package")
  524. else:
  525. log.warning(
  526. "The provider for package '%s' could not be registered from because providers for that "
  527. "package name have already been registered",
  528. package_name,
  529. )
  530. def _discover_all_airflow_builtin_providers_from_local_sources(self) -> None:
  531. """
  532. Find all built-in airflow providers if airflow is run from the local sources.
  533. It finds `provider.yaml` files for all such providers and registers the providers using those.
  534. This 'provider.yaml' scanning takes precedence over scanning packages installed
  535. in case you have both sources and packages installed, the providers will be loaded from
  536. the "airflow" sources rather than from the packages.
  537. """
  538. try:
  539. import airflow.providers
  540. except ImportError:
  541. log.info("You have no providers installed.")
  542. return
  543. seen = set()
  544. for path in airflow.providers.__path__: # type: ignore[attr-defined]
  545. try:
  546. # The same path can appear in the __path__ twice, under non-normalized paths (ie.
  547. # /path/to/repo/airflow/providers and /path/to/repo/./airflow/providers)
  548. path = os.path.realpath(path)
  549. if path not in seen:
  550. seen.add(path)
  551. self._add_provider_info_from_local_source_files_on_path(path)
  552. except Exception as e:
  553. log.warning("Error when loading 'provider.yaml' files from %s airflow sources: %s", path, e)
  554. def _add_provider_info_from_local_source_files_on_path(self, path) -> None:
  555. """
  556. Find all the provider.yaml files in the directory specified.
  557. :param path: path where to look for provider.yaml files
  558. """
  559. root_path = path
  560. for folder, subdirs, files in os.walk(path, topdown=True):
  561. for filename in fnmatch.filter(files, "provider.yaml"):
  562. try:
  563. package_name = "apache-airflow-providers" + folder[len(root_path) :].replace(os.sep, "-")
  564. self._add_provider_info_from_local_source_file(
  565. os.path.join(folder, filename), package_name
  566. )
  567. subdirs[:] = []
  568. except Exception as e:
  569. log.warning("Error when loading 'provider.yaml' file from %s %e", folder, e)
  570. def _add_provider_info_from_local_source_file(self, path, package_name) -> None:
  571. """
  572. Parse found provider.yaml file and adds found provider to the dictionary.
  573. :param path: full file path of the provider.yaml file
  574. :param package_name: name of the package
  575. """
  576. try:
  577. log.debug("Loading %s from %s", package_name, path)
  578. with open(path) as provider_yaml_file:
  579. provider_info = yaml.safe_load(provider_yaml_file)
  580. self._provider_schema_validator.validate(provider_info)
  581. version = provider_info["versions"][0]
  582. if package_name not in self._provider_dict:
  583. self._provider_dict[package_name] = ProviderInfo(version, provider_info, "source")
  584. else:
  585. log.warning(
  586. "The providers for package '%s' could not be registered because providers for that "
  587. "package name have already been registered",
  588. package_name,
  589. )
  590. except Exception as e:
  591. log.warning("Error when loading '%s'", path, exc_info=e)
  592. def _discover_hooks_from_connection_types(
  593. self,
  594. hook_class_names_registered: set[str],
  595. already_registered_warning_connection_types: set[str],
  596. package_name: str,
  597. provider: ProviderInfo,
  598. ):
  599. """
  600. Discover hooks from the "connection-types" property.
  601. This is new, better method that replaces discovery from hook-class-names as it
  602. allows to lazy import individual Hook classes when they are accessed.
  603. The "connection-types" keeps information about both - connection type and class
  604. name so we can discover all connection-types without importing the classes.
  605. :param hook_class_names_registered: set of registered hook class names for this provider
  606. :param already_registered_warning_connection_types: set of connections for which warning should be
  607. printed in logs as they were already registered before
  608. :param package_name:
  609. :param provider:
  610. :return:
  611. """
  612. provider_uses_connection_types = False
  613. connection_types = provider.data.get("connection-types")
  614. if connection_types:
  615. for connection_type_dict in connection_types:
  616. connection_type = connection_type_dict["connection-type"]
  617. hook_class_name = connection_type_dict["hook-class-name"]
  618. hook_class_names_registered.add(hook_class_name)
  619. already_registered = self._hook_provider_dict.get(connection_type)
  620. if already_registered:
  621. if already_registered.package_name != package_name:
  622. already_registered_warning_connection_types.add(connection_type)
  623. else:
  624. log.warning(
  625. "The connection type '%s' is already registered in the"
  626. " package '%s' with different class names: '%s' and '%s'. ",
  627. connection_type,
  628. package_name,
  629. already_registered.hook_class_name,
  630. hook_class_name,
  631. )
  632. else:
  633. self._hook_provider_dict[connection_type] = HookClassProvider(
  634. hook_class_name=hook_class_name, package_name=package_name
  635. )
  636. # Defer importing hook to access time by setting import hook method as dict value
  637. self._hooks_lazy_dict[connection_type] = functools.partial(
  638. self._import_hook,
  639. connection_type=connection_type,
  640. provider_info=provider,
  641. )
  642. provider_uses_connection_types = True
  643. return provider_uses_connection_types
  644. def _discover_hooks_from_hook_class_names(
  645. self,
  646. hook_class_names_registered: set[str],
  647. already_registered_warning_connection_types: set[str],
  648. package_name: str,
  649. provider: ProviderInfo,
  650. provider_uses_connection_types: bool,
  651. ):
  652. """
  653. Discover hooks from "hook-class-names' property.
  654. This property is deprecated but we should support it in Airflow 2.
  655. The hook-class-names array contained just Hook names without connection type,
  656. therefore we need to import all those classes immediately to know which connection types
  657. are supported. This makes it impossible to selectively only import those hooks that are used.
  658. :param already_registered_warning_connection_types: list of connection hooks that we should warn
  659. about when finished discovery
  660. :param package_name: name of the provider package
  661. :param provider: class that keeps information about version and details of the provider
  662. :param provider_uses_connection_types: determines whether the provider uses "connection-types" new
  663. form of passing connection types
  664. :return:
  665. """
  666. hook_class_names = provider.data.get("hook-class-names")
  667. if hook_class_names:
  668. for hook_class_name in hook_class_names:
  669. if hook_class_name in hook_class_names_registered:
  670. # Silently ignore the hook class - it's already marked for lazy-import by
  671. # connection-types discovery
  672. continue
  673. hook_info = self._import_hook(
  674. connection_type=None,
  675. provider_info=provider,
  676. hook_class_name=hook_class_name,
  677. package_name=package_name,
  678. )
  679. if not hook_info:
  680. # Problem why importing class - we ignore it. Log is written at import time
  681. continue
  682. already_registered = self._hook_provider_dict.get(hook_info.connection_type)
  683. if already_registered:
  684. if already_registered.package_name != package_name:
  685. already_registered_warning_connection_types.add(hook_info.connection_type)
  686. else:
  687. if already_registered.hook_class_name != hook_class_name:
  688. log.warning(
  689. "The hook connection type '%s' is registered twice in the"
  690. " package '%s' with different class names: '%s' and '%s'. "
  691. " Please fix it!",
  692. hook_info.connection_type,
  693. package_name,
  694. already_registered.hook_class_name,
  695. hook_class_name,
  696. )
  697. else:
  698. self._hook_provider_dict[hook_info.connection_type] = HookClassProvider(
  699. hook_class_name=hook_class_name, package_name=package_name
  700. )
  701. self._hooks_lazy_dict[hook_info.connection_type] = hook_info
  702. if not provider_uses_connection_types:
  703. warnings.warn(
  704. f"The provider {package_name} uses `hook-class-names` "
  705. "property in provider-info and has no `connection-types` one. "
  706. "The 'hook-class-names' property has been deprecated in favour "
  707. "of 'connection-types' in Airflow 2.2. Use **both** in case you want to "
  708. "have backwards compatibility with Airflow < 2.2",
  709. DeprecationWarning,
  710. stacklevel=1,
  711. )
  712. for already_registered_connection_type in already_registered_warning_connection_types:
  713. log.warning(
  714. "The connection_type '%s' has been already registered by provider '%s.'",
  715. already_registered_connection_type,
  716. self._hook_provider_dict[already_registered_connection_type].package_name,
  717. )
  718. def _discover_hooks(self) -> None:
  719. """Retrieve all connections defined in the providers via Hooks."""
  720. for package_name, provider in self._provider_dict.items():
  721. duplicated_connection_types: set[str] = set()
  722. hook_class_names_registered: set[str] = set()
  723. provider_uses_connection_types = self._discover_hooks_from_connection_types(
  724. hook_class_names_registered, duplicated_connection_types, package_name, provider
  725. )
  726. self._discover_hooks_from_hook_class_names(
  727. hook_class_names_registered,
  728. duplicated_connection_types,
  729. package_name,
  730. provider,
  731. provider_uses_connection_types,
  732. )
  733. self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items()))
  734. @provider_info_cache("import_all_hooks")
  735. def _import_info_from_all_hooks(self):
  736. """Force-import all hooks and initialize the connections/fields."""
  737. # Retrieve all hooks to make sure that all of them are imported
  738. _ = list(self._hooks_lazy_dict.values())
  739. self._field_behaviours = dict(sorted(self._field_behaviours.items()))
  740. # Widgets for connection forms are currently used in two places:
  741. # 1. In the UI Connections, expected same order that it defined in Hook.
  742. # 2. cli command - `airflow providers widgets` and expected that it in alphabetical order.
  743. # It is not possible to recover original ordering after sorting,
  744. # that the main reason why original sorting moved to cli part:
  745. # self._connection_form_widgets = dict(sorted(self._connection_form_widgets.items()))
  746. def _discover_filesystems(self) -> None:
  747. """Retrieve all filesystems defined in the providers."""
  748. for provider_package, provider in self._provider_dict.items():
  749. for fs_module_name in provider.data.get("filesystems", []):
  750. if _correctness_check(provider_package, f"{fs_module_name}.get_fs", provider):
  751. self._fs_set.add(fs_module_name)
  752. self._fs_set = set(sorted(self._fs_set))
  753. def _discover_dataset_uri_resources(self) -> None:
  754. """Discovers and registers dataset URI handlers, factories, and converters for all providers."""
  755. from airflow.datasets import normalize_noop
  756. def _safe_register_resource(
  757. provider_package_name: str,
  758. schemes_list: list[str],
  759. resource_path: str | None,
  760. resource_registry: dict,
  761. default_resource: Any = None,
  762. ):
  763. """
  764. Register a specific resource (handler, factory, or converter) for the given schemes.
  765. If the resolved resource (either from the path or the default) is valid, it updates
  766. the resource registry with the appropriate resource for each scheme.
  767. """
  768. resource = (
  769. _correctness_check(provider_package_name, resource_path, provider)
  770. if resource_path is not None
  771. else default_resource
  772. )
  773. if resource:
  774. resource_registry.update((scheme, resource) for scheme in schemes_list)
  775. for provider_name, provider in self._provider_dict.items():
  776. for uri_info in provider.data.get("dataset-uris", []):
  777. if "schemes" not in uri_info or "handler" not in uri_info:
  778. continue # Both schemas and handler must be explicitly set, handler can be set to null
  779. common_args = {"schemes_list": uri_info["schemes"], "provider_package_name": provider_name}
  780. _safe_register_resource(
  781. resource_path=uri_info["handler"],
  782. resource_registry=self._dataset_uri_handlers,
  783. default_resource=normalize_noop,
  784. **common_args,
  785. )
  786. _safe_register_resource(
  787. resource_path=uri_info.get("factory"),
  788. resource_registry=self._dataset_factories,
  789. **common_args,
  790. )
  791. _safe_register_resource(
  792. resource_path=uri_info.get("to_openlineage_converter"),
  793. resource_registry=self._dataset_to_openlineage_converters,
  794. **common_args,
  795. )
  796. def _discover_taskflow_decorators(self) -> None:
  797. for name, info in self._provider_dict.items():
  798. for taskflow_decorator in info.data.get("task-decorators", []):
  799. self._add_taskflow_decorator(
  800. taskflow_decorator["name"], taskflow_decorator["class-name"], name
  801. )
  802. def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None:
  803. if not _check_builtin_provider_prefix(provider_package, decorator_class_name):
  804. return
  805. if name in self._taskflow_decorators:
  806. try:
  807. existing = self._taskflow_decorators[name]
  808. other_name = f"{existing.__module__}.{existing.__name__}"
  809. except Exception:
  810. # If problem importing, then get the value from the functools.partial
  811. other_name = self._taskflow_decorators._raw_dict[name].args[0] # type: ignore[attr-defined]
  812. log.warning(
  813. "The taskflow decorator '%s' has been already registered (by %s).",
  814. name,
  815. other_name,
  816. )
  817. return
  818. self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name)
  819. @staticmethod
  820. def _get_attr(obj: Any, attr_name: str):
  821. """Retrieve attributes of an object, or warn if not found."""
  822. if not hasattr(obj, attr_name):
  823. log.warning("The object '%s' is missing %s attribute and cannot be registered", obj, attr_name)
  824. return None
  825. return getattr(obj, attr_name)
  826. def _import_hook(
  827. self,
  828. connection_type: str | None,
  829. provider_info: ProviderInfo,
  830. hook_class_name: str | None = None,
  831. package_name: str | None = None,
  832. ) -> HookInfo | None:
  833. """
  834. Import hook and retrieve hook information.
  835. Either connection_type (for lazy loading) or hook_class_name must be set - but not both).
  836. Only needs package_name if hook_class_name is passed (for lazy loading, package_name
  837. is retrieved from _connection_type_class_provider_dict together with hook_class_name).
  838. :param connection_type: type of the connection
  839. :param hook_class_name: name of the hook class
  840. :param package_name: provider package - only needed in case connection_type is missing
  841. : return
  842. """
  843. from wtforms import BooleanField, IntegerField, PasswordField, StringField
  844. if connection_type is None and hook_class_name is None:
  845. raise ValueError("Either connection_type or hook_class_name must be set")
  846. if connection_type is not None and hook_class_name is not None:
  847. raise ValueError(
  848. f"Both connection_type ({connection_type} and "
  849. f"hook_class_name {hook_class_name} are set. Only one should be set!"
  850. )
  851. if connection_type is not None:
  852. class_provider = self._hook_provider_dict[connection_type]
  853. package_name = class_provider.package_name
  854. hook_class_name = class_provider.hook_class_name
  855. else:
  856. if not hook_class_name:
  857. raise ValueError("Either connection_type or hook_class_name must be set")
  858. if not package_name:
  859. raise ValueError(
  860. f"Provider package name is not set when hook_class_name ({hook_class_name}) is used"
  861. )
  862. allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField]
  863. hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info)
  864. if hook_class is None:
  865. return None
  866. try:
  867. module, class_name = hook_class_name.rsplit(".", maxsplit=1)
  868. # Do not use attr here. We want to check only direct class fields not those
  869. # inherited from parent hook. This way we add form fields only once for the whole
  870. # hierarchy and we add it only from the parent hook that provides those!
  871. if "get_connection_form_widgets" in hook_class.__dict__:
  872. widgets = hook_class.get_connection_form_widgets()
  873. if widgets:
  874. for widget in widgets.values():
  875. if widget.field_class not in allowed_field_classes:
  876. log.warning(
  877. "The hook_class '%s' uses field of unsupported class '%s'. "
  878. "Only '%s' field classes are supported",
  879. hook_class_name,
  880. widget.field_class,
  881. allowed_field_classes,
  882. )
  883. return None
  884. self._add_widgets(package_name, hook_class, widgets)
  885. if "get_ui_field_behaviour" in hook_class.__dict__:
  886. field_behaviours = hook_class.get_ui_field_behaviour()
  887. if field_behaviours:
  888. self._add_customized_fields(package_name, hook_class, field_behaviours)
  889. except ImportError as e:
  890. if "No module named 'flask_appbuilder'" in e.msg:
  891. log.warning(
  892. "The hook_class '%s' is not fully initialized (UI widgets will be missing), because "
  893. "the 'flask_appbuilder' package is not installed, however it is not required for "
  894. "Airflow components to work",
  895. hook_class_name,
  896. )
  897. except Exception as e:
  898. log.warning(
  899. "Exception when importing '%s' from '%s' package: %s",
  900. hook_class_name,
  901. package_name,
  902. e,
  903. )
  904. return None
  905. hook_connection_type = self._get_attr(hook_class, "conn_type")
  906. if connection_type:
  907. if hook_connection_type != connection_type:
  908. log.warning(
  909. "Inconsistency! The hook class '%s' declares connection type '%s'"
  910. " but it is added by provider '%s' as connection_type '%s' in provider info. "
  911. "This should be fixed!",
  912. hook_class,
  913. hook_connection_type,
  914. package_name,
  915. connection_type,
  916. )
  917. connection_type = hook_connection_type
  918. connection_id_attribute_name: str = self._get_attr(hook_class, "conn_name_attr")
  919. hook_name: str = self._get_attr(hook_class, "hook_name")
  920. if not connection_type or not connection_id_attribute_name or not hook_name:
  921. log.warning(
  922. "The hook misses one of the key attributes: "
  923. "conn_type: %s, conn_id_attribute_name: %s, hook_name: %s",
  924. connection_type,
  925. connection_id_attribute_name,
  926. hook_name,
  927. )
  928. return None
  929. return HookInfo(
  930. hook_class_name=hook_class_name,
  931. connection_id_attribute_name=connection_id_attribute_name,
  932. package_name=package_name,
  933. hook_name=hook_name,
  934. connection_type=connection_type,
  935. connection_testable=hasattr(hook_class, "test_connection"),
  936. )
  937. def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, Any]):
  938. conn_type = hook_class.conn_type # type: ignore
  939. for field_identifier, field in widgets.items():
  940. if field_identifier.startswith("extra__"):
  941. prefixed_field_name = field_identifier
  942. else:
  943. prefixed_field_name = f"extra__{conn_type}__{field_identifier}"
  944. if prefixed_field_name in self._connection_form_widgets:
  945. log.warning(
  946. "The field %s from class %s has already been added by another provider. Ignoring it.",
  947. field_identifier,
  948. hook_class.__name__,
  949. )
  950. # In case of inherited hooks this might be happening several times
  951. else:
  952. self._connection_form_widgets[prefixed_field_name] = ConnectionFormWidgetInfo(
  953. hook_class.__name__,
  954. package_name,
  955. field,
  956. field_identifier,
  957. hasattr(field.field_class.widget, "input_type")
  958. and field.field_class.widget.input_type == "password",
  959. )
  960. def _add_customized_fields(self, package_name: str, hook_class: type, customized_fields: dict):
  961. try:
  962. connection_type = getattr(hook_class, "conn_type")
  963. self._customized_form_fields_schema_validator.validate(customized_fields)
  964. if connection_type:
  965. customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type)
  966. if connection_type in self._field_behaviours:
  967. log.warning(
  968. "The connection_type %s from package %s and class %s has already been added "
  969. "by another provider. Ignoring it.",
  970. connection_type,
  971. package_name,
  972. hook_class.__name__,
  973. )
  974. return
  975. self._field_behaviours[connection_type] = customized_fields
  976. except Exception as e:
  977. log.warning(
  978. "Error when loading customized fields from package '%s' hook class '%s': %s",
  979. package_name,
  980. hook_class.__name__,
  981. e,
  982. )
  983. def _discover_auth_managers(self) -> None:
  984. """Retrieve all auth managers defined in the providers."""
  985. for provider_package, provider in self._provider_dict.items():
  986. if provider.data.get("auth-managers"):
  987. for auth_manager_class_name in provider.data["auth-managers"]:
  988. if _correctness_check(provider_package, auth_manager_class_name, provider):
  989. self._auth_manager_class_name_set.add(auth_manager_class_name)
  990. def _discover_notifications(self) -> None:
  991. """Retrieve all notifications defined in the providers."""
  992. for provider_package, provider in self._provider_dict.items():
  993. if provider.data.get("notifications"):
  994. for notification_class_name in provider.data["notifications"]:
  995. if _correctness_check(provider_package, notification_class_name, provider):
  996. self._notification_info_set.add(notification_class_name)
  997. def _discover_extra_links(self) -> None:
  998. """Retrieve all extra links defined in the providers."""
  999. for provider_package, provider in self._provider_dict.items():
  1000. if provider.data.get("extra-links"):
  1001. for extra_link_class_name in provider.data["extra-links"]:
  1002. if _correctness_check(provider_package, extra_link_class_name, provider):
  1003. self._extra_link_class_name_set.add(extra_link_class_name)
  1004. def _discover_logging(self) -> None:
  1005. """Retrieve all logging defined in the providers."""
  1006. for provider_package, provider in self._provider_dict.items():
  1007. if provider.data.get("logging"):
  1008. for logging_class_name in provider.data["logging"]:
  1009. if _correctness_check(provider_package, logging_class_name, provider):
  1010. self._logging_class_name_set.add(logging_class_name)
  1011. def _discover_secrets_backends(self) -> None:
  1012. """Retrieve all secrets backends defined in the providers."""
  1013. for provider_package, provider in self._provider_dict.items():
  1014. if provider.data.get("secrets-backends"):
  1015. for secrets_backends_class_name in provider.data["secrets-backends"]:
  1016. if _correctness_check(provider_package, secrets_backends_class_name, provider):
  1017. self._secrets_backend_class_name_set.add(secrets_backends_class_name)
  1018. def _discover_auth_backends(self) -> None:
  1019. """Retrieve all API auth backends defined in the providers."""
  1020. for provider_package, provider in self._provider_dict.items():
  1021. if provider.data.get("auth-backends"):
  1022. for auth_backend_module_name in provider.data["auth-backends"]:
  1023. if _correctness_check(provider_package, auth_backend_module_name + ".init_app", provider):
  1024. self._api_auth_backend_module_names.add(auth_backend_module_name)
  1025. def _discover_executors(self) -> None:
  1026. """Retrieve all executors defined in the providers."""
  1027. for provider_package, provider in self._provider_dict.items():
  1028. if provider.data.get("executors"):
  1029. for executors_class_name in provider.data["executors"]:
  1030. if _correctness_check(provider_package, executors_class_name, provider):
  1031. self._executor_class_name_set.add(executors_class_name)
  1032. def _discover_config(self) -> None:
  1033. """Retrieve all configs defined in the providers."""
  1034. for provider_package, provider in self._provider_dict.items():
  1035. if provider.data.get("config"):
  1036. self._provider_configs[provider_package] = provider.data.get("config") # type: ignore[assignment]
  1037. def _discover_plugins(self) -> None:
  1038. """Retrieve all plugins defined in the providers."""
  1039. for provider_package, provider in self._provider_dict.items():
  1040. for plugin_dict in provider.data.get("plugins", ()):
  1041. if not _correctness_check(provider_package, plugin_dict["plugin-class"], provider):
  1042. log.warning("Plugin not loaded due to above correctness check problem.")
  1043. continue
  1044. self._plugins_set.add(
  1045. PluginInfo(
  1046. name=plugin_dict["name"],
  1047. plugin_class=plugin_dict["plugin-class"],
  1048. provider_name=provider_package,
  1049. )
  1050. )
  1051. @provider_info_cache("triggers")
  1052. def initialize_providers_triggers(self):
  1053. """Initialize providers triggers."""
  1054. self.initialize_providers_list()
  1055. for provider_package, provider in self._provider_dict.items():
  1056. for trigger in provider.data.get("triggers", []):
  1057. for trigger_class_name in trigger.get("python-modules"):
  1058. self._trigger_info_set.add(
  1059. TriggerInfo(
  1060. package_name=provider_package,
  1061. trigger_class_name=trigger_class_name,
  1062. integration_name=trigger.get("integration-name", ""),
  1063. )
  1064. )
  1065. @property
  1066. def auth_managers(self) -> list[str]:
  1067. """Returns information about available providers notifications class."""
  1068. self.initialize_providers_auth_managers()
  1069. return sorted(self._auth_manager_class_name_set)
  1070. @property
  1071. def notification(self) -> list[NotificationInfo]:
  1072. """Returns information about available providers notifications class."""
  1073. self.initialize_providers_notifications()
  1074. return sorted(self._notification_info_set)
  1075. @property
  1076. def trigger(self) -> list[TriggerInfo]:
  1077. """Returns information about available providers trigger class."""
  1078. self.initialize_providers_triggers()
  1079. return sorted(self._trigger_info_set, key=lambda x: x.package_name)
  1080. @property
  1081. def providers(self) -> dict[str, ProviderInfo]:
  1082. """Returns information about available providers."""
  1083. self.initialize_providers_list()
  1084. return self._provider_dict
  1085. @property
  1086. def hooks(self) -> MutableMapping[str, HookInfo | None]:
  1087. """
  1088. Return dictionary of connection_type-to-hook mapping.
  1089. Note that the dict can contain None values if a hook discovered cannot be imported!
  1090. """
  1091. self.initialize_providers_hooks()
  1092. # When we return hooks here it will only be used to retrieve hook information
  1093. return self._hooks_lazy_dict
  1094. @property
  1095. def plugins(self) -> list[PluginInfo]:
  1096. """Returns information about plugins available in providers."""
  1097. self.initialize_providers_plugins()
  1098. return sorted(self._plugins_set, key=lambda x: x.plugin_class)
  1099. @property
  1100. def taskflow_decorators(self) -> dict[str, TaskDecorator]:
  1101. self.initialize_providers_taskflow_decorator()
  1102. return self._taskflow_decorators # type: ignore[return-value]
  1103. @property
  1104. def extra_links_class_names(self) -> list[str]:
  1105. """Returns set of extra link class names."""
  1106. self.initialize_providers_extra_links()
  1107. return sorted(self._extra_link_class_name_set)
  1108. @property
  1109. def connection_form_widgets(self) -> dict[str, ConnectionFormWidgetInfo]:
  1110. """
  1111. Returns widgets for connection forms.
  1112. Dictionary keys in the same order that it defined in Hook.
  1113. """
  1114. self.initialize_providers_hooks()
  1115. self._import_info_from_all_hooks()
  1116. return self._connection_form_widgets
  1117. @property
  1118. def field_behaviours(self) -> dict[str, dict]:
  1119. """Returns dictionary with field behaviours for connection types."""
  1120. self.initialize_providers_hooks()
  1121. self._import_info_from_all_hooks()
  1122. return self._field_behaviours
  1123. @property
  1124. def logging_class_names(self) -> list[str]:
  1125. """Returns set of log task handlers class names."""
  1126. self.initialize_providers_logging()
  1127. return sorted(self._logging_class_name_set)
  1128. @property
  1129. def secrets_backend_class_names(self) -> list[str]:
  1130. """Returns set of secret backend class names."""
  1131. self.initialize_providers_secrets_backends()
  1132. return sorted(self._secrets_backend_class_name_set)
  1133. @property
  1134. def auth_backend_module_names(self) -> list[str]:
  1135. """Returns set of API auth backend class names."""
  1136. self.initialize_providers_auth_backends()
  1137. return sorted(self._api_auth_backend_module_names)
  1138. @property
  1139. def executor_class_names(self) -> list[str]:
  1140. self.initialize_providers_executors()
  1141. return sorted(self._executor_class_name_set)
  1142. @property
  1143. def filesystem_module_names(self) -> list[str]:
  1144. self.initialize_providers_filesystems()
  1145. return sorted(self._fs_set)
  1146. @property
  1147. def dataset_factories(self) -> dict[str, Callable[..., Dataset]]:
  1148. self.initialize_providers_dataset_uri_resources()
  1149. return self._dataset_factories
  1150. @property
  1151. def dataset_uri_handlers(self) -> dict[str, Callable[[SplitResult], SplitResult]]:
  1152. self.initialize_providers_dataset_uri_resources()
  1153. return self._dataset_uri_handlers
  1154. @property
  1155. def dataset_to_openlineage_converters(
  1156. self,
  1157. ) -> dict[str, Callable]:
  1158. self.initialize_providers_dataset_uri_resources()
  1159. return self._dataset_to_openlineage_converters
  1160. @property
  1161. def provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
  1162. self.initialize_providers_configuration()
  1163. return sorted(self._provider_configs.items(), key=lambda x: x[0])
  1164. @property
  1165. def already_initialized_provider_configs(self) -> list[tuple[str, dict[str, Any]]]:
  1166. return sorted(self._provider_configs.items(), key=lambda x: x[0])
  1167. def _cleanup(self):
  1168. self._initialized_cache.clear()
  1169. self._provider_dict.clear()
  1170. self._hooks_dict.clear()
  1171. self._fs_set.clear()
  1172. self._taskflow_decorators.clear()
  1173. self._hook_provider_dict.clear()
  1174. self._hooks_lazy_dict.clear()
  1175. self._connection_form_widgets.clear()
  1176. self._field_behaviours.clear()
  1177. self._extra_link_class_name_set.clear()
  1178. self._logging_class_name_set.clear()
  1179. self._auth_manager_class_name_set.clear()
  1180. self._secrets_backend_class_name_set.clear()
  1181. self._executor_class_name_set.clear()
  1182. self._provider_configs.clear()
  1183. self._api_auth_backend_module_names.clear()
  1184. self._trigger_info_set.clear()
  1185. self._notification_info_set.clear()
  1186. self._plugins_set.clear()
  1187. self._initialized = False
  1188. self._initialization_stack_trace = None