123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import inspect
- from typing import Any
- from airflow import plugins_manager
- from airflow.cli.simple_table import AirflowConsole
- from airflow.plugins_manager import PluginsDirectorySource, get_plugin_info
- from airflow.utils.cli import suppress_logs_and_warning
- from airflow.utils.providers_configuration_loader import providers_configuration_loaded
- def _get_name(class_like_object) -> str:
- if isinstance(class_like_object, (str, PluginsDirectorySource)):
- return str(class_like_object)
- if inspect.isclass(class_like_object):
- return class_like_object.__name__
- return class_like_object.__class__.__name__
- def _join_plugins_names(value: list[Any] | Any) -> str:
- value = value if isinstance(value, list) else [value]
- return ",".join(_get_name(v) for v in value)
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def dump_plugins(args):
- """Dump plugins information."""
- plugins_info: list[dict[str, str]] = get_plugin_info()
- if not plugins_manager.plugins:
- print("No plugins loaded")
- return
- # Remove empty info
- if args.output == "table":
- # We can do plugins_info[0] as the element it will exist as there's
- # at least one plugin at this point
- for col in list(plugins_info[0]):
- if all(not bool(p[col]) for p in plugins_info):
- for plugin in plugins_info:
- del plugin[col]
- AirflowConsole().print_as(plugins_info, output=args.output)
|