__main__.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #!/usr/bin/env python
  2. # PYTHON_ARGCOMPLETE_OK
  3. #
  4. # Licensed to the Apache Software Foundation (ASF) under one
  5. # or more contributor license agreements. See the NOTICE file
  6. # distributed with this work for additional information
  7. # regarding copyright ownership. The ASF licenses this file
  8. # to you under the Apache License, Version 2.0 (the
  9. # "License"); you may not use this file except in compliance
  10. # with the License. You may obtain a copy of the License at
  11. #
  12. # http://www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing,
  15. # software distributed under the License is distributed on an
  16. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17. # KIND, either express or implied. See the License for the
  18. # specific language governing permissions and limitations
  19. # under the License.
  20. """Main executable module."""
  21. from __future__ import annotations
  22. import os
  23. from argparse import Namespace
  24. import argcomplete
  25. # The configuration module initializes and validates the conf object as a side effect the first
  26. # time it is imported. If it is not imported before importing the settings module, the conf
  27. # object will then be initted/validated as a side effect of it being imported in settings,
  28. # however this can cause issues since those modules are very tightly coupled and can
  29. # very easily cause import cycles in the conf init/validate code (since downstream code from
  30. # those functions likely import settings).
  31. # Therefore importing configuration early (as the first airflow import) avoids
  32. # any possible import cycles with settings downstream.
  33. from airflow import configuration
  34. from airflow.cli import cli_parser
  35. from airflow.configuration import AirflowConfigParser, write_webserver_configuration_if_needed
  36. from airflow.exceptions import AirflowException
  37. def main():
  38. conf = configuration.conf
  39. if conf.get("core", "security") == "kerberos":
  40. os.environ["KRB5CCNAME"] = conf.get("kerberos", "ccache")
  41. os.environ["KRB5_KTNAME"] = conf.get("kerberos", "keytab")
  42. parser = cli_parser.get_parser()
  43. argcomplete.autocomplete(parser)
  44. args = parser.parse_args()
  45. if args.subcommand not in ["lazy_loaded", "version"]:
  46. # Here we ensure that the default configuration is written if needed before running any command
  47. # that might need it. This used to be done during configuration initialization but having it
  48. # in main ensures that it is not done during tests and other ways airflow imports are used
  49. from airflow.configuration import write_default_airflow_configuration_if_needed
  50. conf = write_default_airflow_configuration_if_needed()
  51. if args.subcommand in ["webserver", "internal-api", "worker"]:
  52. write_webserver_configuration_if_needed(conf)
  53. configure_internal_api(args, conf)
  54. args.func(args)
  55. def configure_internal_api(args: Namespace, conf: AirflowConfigParser):
  56. if conf.getboolean("core", "database_access_isolation", fallback=False):
  57. if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]:
  58. # Untrusted components
  59. if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
  60. # make sure that the DB is not available for the components that should not access it
  61. os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
  62. conf.set("database", "sql_alchemy_conn", "none://")
  63. from airflow.api_internal.internal_api_call import InternalApiConfig
  64. InternalApiConfig.set_use_internal_api(args.subcommand)
  65. else:
  66. # Trusted components (this setting is mostly for Breeze where db_isolation and DB are both set
  67. db_connection_url = conf.get("database", "sql_alchemy_conn")
  68. if not db_connection_url or db_connection_url == "none://":
  69. raise AirflowException(
  70. f"Running trusted components {args.subcommand} in db isolation mode "
  71. f"requires connection to be configured via database/sql_alchemy_conn."
  72. )
  73. from airflow.api_internal.internal_api_call import InternalApiConfig
  74. InternalApiConfig.set_use_database_access(args.subcommand)
  75. if __name__ == "__main__":
  76. main()