cli_parser.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #!/usr/bin/env python
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing,
  14. # software distributed under the License is distributed on an
  15. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  16. # KIND, either express or implied. See the License for the
  17. # specific language governing permissions and limitations
  18. # under the License.
  19. """
  20. Produce a CLI parser object from Airflow CLI command configuration.
  21. .. seealso:: :mod:`airflow.cli.cli_config`
  22. """
  23. from __future__ import annotations
  24. import argparse
  25. import logging
  26. import sys
  27. from argparse import Action
  28. from collections import Counter
  29. from functools import lru_cache
  30. from typing import TYPE_CHECKING, Iterable
  31. import lazy_object_proxy
  32. from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter
  33. from airflow.cli.cli_config import (
  34. DAG_CLI_DICT,
  35. ActionCommand,
  36. DefaultHelpParser,
  37. GroupCommand,
  38. core_commands,
  39. )
  40. from airflow.cli.utils import CliConflictError
  41. from airflow.exceptions import AirflowException
  42. from airflow.executors.executor_loader import ExecutorLoader
  43. from airflow.utils.helpers import partition
  44. from airflow.www.extensions.init_auth_manager import get_auth_manager_cls
  45. if TYPE_CHECKING:
  46. from airflow.cli.cli_config import (
  47. Arg,
  48. CLICommand,
  49. )
  50. airflow_commands = core_commands.copy() # make a copy to prevent bad interactions in tests
  51. log = logging.getLogger(__name__)
  52. for executor_name in ExecutorLoader.get_executor_names():
  53. try:
  54. executor, _ = ExecutorLoader.import_executor_cls(executor_name)
  55. airflow_commands.extend(executor.get_cli_commands())
  56. except Exception:
  57. log.exception("Failed to load CLI commands from executor: %s", executor_name)
  58. log.error(
  59. "Ensure all dependencies are met and try again. If using a Celery based executor install "
  60. "a 3.3.0+ version of the Celery provider. If using a Kubernetes executor, install a "
  61. "7.4.0+ version of the CNCF provider"
  62. )
  63. # Do not re-raise the exception since we want the CLI to still function for
  64. # other commands.
  65. try:
  66. auth_mgr = get_auth_manager_cls()
  67. airflow_commands.extend(auth_mgr.get_cli_commands())
  68. except Exception as e:
  69. log.warning("cannot load CLI commands from auth manager: %s", e)
  70. log.warning("Authentication manager is not configured and webserver will not be able to start.")
  71. # do not re-raise for the same reason as above
  72. if len(sys.argv) > 1 and sys.argv[1] == "webserver":
  73. log.exception(e)
  74. sys.exit(1)
  75. ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands}
  76. # Check if sub-commands are defined twice, which could be an issue.
  77. if len(ALL_COMMANDS_DICT) < len(airflow_commands):
  78. dup = {k for k, v in Counter([c.name for c in airflow_commands]).items() if v > 1}
  79. raise CliConflictError(
  80. f"The following CLI {len(dup)} command(s) are defined more than once: {sorted(dup)}\n"
  81. f"This can be due to an Executor or Auth Manager redefining core airflow CLI commands."
  82. )
  83. class AirflowHelpFormatter(RichHelpFormatter):
  84. """
  85. Custom help formatter to display help message.
  86. It displays simple commands and groups of commands in separate sections.
  87. """
  88. def _iter_indented_subactions(self, action: Action):
  89. if isinstance(action, argparse._SubParsersAction):
  90. self._indent()
  91. subactions = action._get_subactions()
  92. action_subcommands, group_subcommands = partition(
  93. lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand), subactions
  94. )
  95. yield Action([], f"\n{' ':{self._current_indent}}Groups", nargs=0)
  96. self._indent()
  97. yield from group_subcommands
  98. self._dedent()
  99. yield Action([], f"\n{' ':{self._current_indent}}Commands:", nargs=0)
  100. self._indent()
  101. yield from action_subcommands
  102. self._dedent()
  103. self._dedent()
  104. else:
  105. yield from super()._iter_indented_subactions(action)
  106. class LazyRichHelpFormatter(RawTextRichHelpFormatter):
  107. """
  108. Custom help formatter to display help message.
  109. It resolves lazy help string before printing it using rich.
  110. """
  111. def add_argument(self, action: Action) -> None:
  112. if isinstance(action.help, lazy_object_proxy.Proxy):
  113. action.help = str(action.help)
  114. return super().add_argument(action)
  115. @lru_cache(maxsize=None)
  116. def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser:
  117. """Create and returns command line argument parser."""
  118. parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
  119. subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
  120. subparsers.required = True
  121. command_dict = DAG_CLI_DICT if dag_parser else ALL_COMMANDS_DICT
  122. for _, sub in sorted(command_dict.items()):
  123. _add_command(subparsers, sub)
  124. return parser
  125. def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]:
  126. """Sort subcommand optional args, keep positional args."""
  127. def get_long_option(arg: Arg):
  128. """Get long option from Arg.flags."""
  129. return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1]
  130. positional, optional = partition(lambda x: x.flags[0].startswith("-"), args)
  131. yield from positional
  132. yield from sorted(optional, key=lambda x: get_long_option(x).lower())
  133. def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) -> None:
  134. if isinstance(sub, ActionCommand) and sub.hide:
  135. sub_proc = subparsers.add_parser(sub.name, epilog=sub.epilog)
  136. else:
  137. sub_proc = subparsers.add_parser(
  138. sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog
  139. )
  140. sub_proc.formatter_class = LazyRichHelpFormatter
  141. if isinstance(sub, GroupCommand):
  142. _add_group_command(sub, sub_proc)
  143. elif isinstance(sub, ActionCommand):
  144. _add_action_command(sub, sub_proc)
  145. else:
  146. raise AirflowException("Invalid command definition.")
  147. def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) -> None:
  148. for arg in _sort_args(sub.args):
  149. arg.add_to_parser(sub_proc)
  150. sub_proc.set_defaults(func=sub.func)
  151. def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser) -> None:
  152. subcommands = sub.subcommands
  153. sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND")
  154. sub_subparsers.required = True
  155. for command in sorted(subcommands, key=lambda x: x.name):
  156. _add_command(sub_subparsers, command)