123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- #!/usr/bin/env python
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """
- Produce a CLI parser object from Airflow CLI command configuration.
- .. seealso:: :mod:`airflow.cli.cli_config`
- """
- from __future__ import annotations
- import argparse
- import logging
- import sys
- from argparse import Action
- from collections import Counter
- from functools import lru_cache
- from typing import TYPE_CHECKING, Iterable
- import lazy_object_proxy
- from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
- from airflow.cli.cli_config import (
- DAG_CLI_DICT,
- ActionCommand,
- DefaultHelpParser,
- GroupCommand,
- core_commands,
- )
- from airflow.cli.utils import CliConflictError
- from airflow.exceptions import AirflowException
- from airflow.executors.executor_loader import ExecutorLoader
- from airflow.utils.helpers import partition
- from airflow.www.extensions.init_auth_manager import get_auth_manager_cls
- if TYPE_CHECKING:
- from airflow.cli.cli_config import (
- Arg,
- CLICommand,
- )
- airflow_commands = core_commands.copy() # make a copy to prevent bad interactions in tests
- log = logging.getLogger(__name__)
- for executor_name in ExecutorLoader.get_executor_names():
- try:
- executor, _ = ExecutorLoader.import_executor_cls(executor_name)
- airflow_commands.extend(executor.get_cli_commands())
- except Exception:
- log.exception("Failed to load CLI commands from executor: %s", executor_name)
- log.error(
- "Ensure all dependencies are met and try again. If using a Celery based executor install "
- "a 3.3.0+ version of the Celery provider. If using a Kubernetes executor, install a "
- "7.4.0+ version of the CNCF provider"
- )
- # Do not re-raise the exception since we want the CLI to still function for
- # other commands.
- try:
- auth_mgr = get_auth_manager_cls()
- airflow_commands.extend(auth_mgr.get_cli_commands())
- except Exception as e:
- log.warning("cannot load CLI commands from auth manager: %s", e)
- log.warning("Authentication manager is not configured and webserver will not be able to start.")
- # do not re-raise for the same reason as above
- if len(sys.argv) > 1 and sys.argv[1] == "webserver":
- log.exception(e)
- sys.exit(1)
- ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands}
- # Check if sub-commands are defined twice, which could be an issue.
- if len(ALL_COMMANDS_DICT) < len(airflow_commands):
- dup = {k for k, v in Counter([c.name for c in airflow_commands]).items() if v > 1}
- raise CliConflictError(
- f"The following CLI {len(dup)} command(s) are defined more than once: {sorted(dup)}\n"
- f"This can be due to an Executor or Auth Manager redefining core airflow CLI commands."
- )
- class AirflowHelpFormatter(RichHelpFormatter):
- """
- Custom help formatter to display help message.
- It displays simple commands and groups of commands in separate sections.
- """
- def _iter_indented_subactions(self, action: Action):
- if isinstance(action, argparse._SubParsersAction):
- self._indent()
- subactions = action._get_subactions()
- action_subcommands, group_subcommands = partition(
- lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand), subactions
- )
- yield Action([], f"\n{' ':{self._current_indent}}Groups", nargs=0)
- self._indent()
- yield from group_subcommands
- self._dedent()
- yield Action([], f"\n{' ':{self._current_indent}}Commands:", nargs=0)
- self._indent()
- yield from action_subcommands
- self._dedent()
- self._dedent()
- else:
- yield from super()._iter_indented_subactions(action)
- class LazyRichHelpFormatter(RawTextRichHelpFormatter):
- """
- Custom help formatter to display help message.
- It resolves lazy help string before printing it using rich.
- """
- def add_argument(self, action: Action) -> None:
- if isinstance(action.help, lazy_object_proxy.Proxy):
- action.help = str(action.help)
- return super().add_argument(action)
- @lru_cache(maxsize=None)
- def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
- """Create and returns command line argument parser."""
- parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
- subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
- subparsers.required = True
- command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
- for _, sub in sorted(command_dict.items()):
- _add_command(subparsers, sub)
- return parser
- def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
- """Sort subcommand optional args, keep positional args."""
- def get_long_option(arg: Arg):
- """Get long option from Arg.flags."""
- return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
- positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
- yield from positional
- yield from sorted(optional, key=lambda x: get_long_option(x).lower())
- def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) -> None:
- if isinstance(sub, ActionCommand) and sub.hide:
- sub_proc = subparsers.add_parser(sub.name, epilog=sub.epilog)
- else:
- sub_proc = subparsers.add_parser(
- sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog
- )
- sub_proc.formatter_class = LazyRichHelpFormatter
- if isinstance(sub, GroupCommand):
- _add_group_command(sub, sub_proc)
- elif isinstance(sub, ActionCommand):
- _add_action_command(sub, sub_proc)
- else:
- raise AirflowException("Invalid command definition.")
- def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) -> None:
- for arg in _sort_args(sub.args):
- arg.add_to_parser(sub_proc)
- sub_proc.set_defaults(func=sub.func)
- def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser) -> None:
- subcommands = sub.subcommands
- sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND")
- sub_subparsers.required = True
- for command in sorted(subcommands, key=lambda x: x.name):
- _add_command(sub_subparsers, command)
|