123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Providers sub-commands."""
- from __future__ import annotations
- import sys
- import re2
- from airflow.cli.simple_table import AirflowConsole
- from airflow.providers_manager import ProvidersManager
- from airflow.utils.cli import suppress_logs_and_warning
- from airflow.utils.providers_configuration_loader import providers_configuration_loaded
- ERROR_IMPORTING_HOOK = "Error when importing hook!"
- def _remove_rst_syntax(value: str) -> str:
- return re2.sub("[`_<>]", "", value.strip(" \n."))
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def provider_get(args):
- """Get a provider info."""
- providers = ProvidersManager().providers
- if args.provider_name in providers:
- provider_version = providers[args.provider_name].version
- provider_info = providers[args.provider_name].data
- if args.full:
- provider_info["description"] = _remove_rst_syntax(provider_info["description"])
- AirflowConsole().print_as(
- data=[provider_info],
- output=args.output,
- )
- else:
- AirflowConsole().print_as(
- data=[{"Provider": args.provider_name, "Version": provider_version}], output=args.output
- )
- else:
- raise SystemExit(f"No such provider installed: {args.provider_name}")
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def providers_list(args):
- """List all providers at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().providers.values()),
- output=args.output,
- mapper=lambda x: {
- "package_name": x.data["package-name"],
- "description": _remove_rst_syntax(x.data["description"]),
- "version": x.version,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def hooks_list(args):
- """List all hooks at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().hooks.items()),
- output=args.output,
- mapper=lambda x: {
- "connection_type": x[0],
- "class": x[1].hook_class_name if x[1] else ERROR_IMPORTING_HOOK,
- "conn_id_attribute_name": x[1].connection_id_attribute_name if x[1] else ERROR_IMPORTING_HOOK,
- "package_name": x[1].package_name if x[1] else ERROR_IMPORTING_HOOK,
- "hook_name": x[1].hook_name if x[1] else ERROR_IMPORTING_HOOK,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def triggers_list(args):
- AirflowConsole().print_as(
- data=ProvidersManager().trigger,
- output=args.output,
- mapper=lambda x: {
- "package_name": x.package_name,
- "class": x.trigger_class_name,
- "integration_name": x.integration_name,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def notifications_list(args):
- AirflowConsole().print_as(
- data=ProvidersManager().notification,
- output=args.output,
- mapper=lambda x: {
- "notification_class_name": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def connection_form_widget_list(args):
- """List all custom connection form fields at the command line."""
- AirflowConsole().print_as(
- data=sorted(ProvidersManager().connection_form_widgets.items()),
- output=args.output,
- mapper=lambda x: {
- "connection_parameter_name": x[0],
- "class": x[1].hook_class_name,
- "package_name": x[1].package_name,
- "field_type": x[1].field.field_class.__name__,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def connection_field_behaviours(args):
- """List field behaviours."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().field_behaviours),
- output=args.output,
- mapper=lambda x: {
- "field_behaviours": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def extra_links_list(args):
- """List all extra links at the command line."""
- AirflowConsole().print_as(
- data=ProvidersManager().extra_links_class_names,
- output=args.output,
- mapper=lambda x: {
- "extra_link_class_name": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def logging_list(args):
- """List all log task handlers at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().logging_class_names),
- output=args.output,
- mapper=lambda x: {
- "logging_class_name": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def secrets_backends_list(args):
- """List all secrets backends at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().secrets_backend_class_names),
- output=args.output,
- mapper=lambda x: {
- "secrets_backend_class_name": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def auth_backend_list(args):
- """List all API auth backend modules at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().auth_backend_module_names),
- output=args.output,
- mapper=lambda x: {
- "api_auth_backend_module": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def auth_managers_list(args):
- """List all auth managers at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().auth_managers),
- output=args.output,
- mapper=lambda x: {
- "auth_managers_module": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def executors_list(args):
- """List all executors at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().executor_class_names),
- output=args.output,
- mapper=lambda x: {
- "executor_class_names": x,
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def config_list(args):
- """List all configurations at the command line."""
- AirflowConsole().print_as(
- data=list(ProvidersManager().provider_configs),
- output=args.output,
- mapper=lambda x: {
- "provider_config": x,
- },
- )
- @suppress_logs_and_warning
- def lazy_loaded(args):
- """
- Informs if providers manager has been initialized too early.
- If provider is initialized, shows the stack trace and exit with error code 1.
- """
- import rich
- if ProvidersManager.initialized():
- rich.print(
- "\n[red]ProvidersManager was initialized during CLI parsing. This should not happen.\n",
- file=sys.stderr,
- )
- rich.print(
- "\n[yellow]Please make sure no Providers Manager initialization happens during CLI parsing.\n",
- file=sys.stderr,
- )
- rich.print("Stack trace where it has been initialized:\n", file=sys.stderr)
- rich.print(ProvidersManager.initialization_stack_trace(), file=sys.stderr)
- sys.exit(1)
- else:
- rich.print("[green]All ok. Providers Manager was not initialized during the CLI parsing.")
- sys.exit(0)
|