settings.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import atexit
  20. import functools
  21. import json
  22. import logging
  23. import os
  24. import sys
  25. import traceback
  26. import warnings
  27. from importlib import metadata
  28. from typing import TYPE_CHECKING, Any, Callable
  29. import pluggy
  30. from packaging.version import Version
  31. from sqlalchemy import create_engine, exc, text
  32. from sqlalchemy.orm import scoped_session, sessionmaker
  33. from sqlalchemy.pool import NullPool
  34. from airflow import __version__ as airflow_version, policies
  35. from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf # noqa: F401
  36. from airflow.exceptions import AirflowInternalRuntimeError, RemovedInAirflow3Warning
  37. from airflow.executors import executor_constants
  38. from airflow.logging_config import configure_logging
  39. from airflow.utils.orm_event_handlers import setup_event_handlers
  40. from airflow.utils.sqlalchemy import is_sqlalchemy_v1
  41. from airflow.utils.state import State
  42. from airflow.utils.timezone import local_timezone, parse_timezone, utc
  43. if TYPE_CHECKING:
  44. from sqlalchemy.engine import Engine
  45. from sqlalchemy.orm import Session as SASession
  46. from airflow.www.utils import UIAlert
  47. log = logging.getLogger(__name__)
  48. try:
  49. if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
  50. TIMEZONE = parse_timezone(tz)
  51. else:
  52. TIMEZONE = local_timezone()
  53. except Exception:
  54. TIMEZONE = utc
  55. log.info("Configured default timezone %s", TIMEZONE)
  56. if conf.has_option("database", "sql_alchemy_session_maker"):
  57. log.info(
  58. '[Warning] Found config "sql_alchemy_session_maker", make sure you know what you are doing.\n'
  59. "[Warning] Improper configuration of sql_alchemy_session_maker can lead to serious issues, "
  60. "including data corruption, unrecoverable application crashes.\n"
  61. "[Warning] Please review the SQLAlchemy documentation for detailed guidance on "
  62. "proper configuration and best practices."
  63. )
  64. HEADER = "\n".join(
  65. [
  66. r" ____________ _____________",
  67. r" ____ |__( )_________ __/__ /________ __",
  68. r"____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /",
  69. r"___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /",
  70. r" _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/",
  71. ]
  72. )
  73. LOGGING_LEVEL = logging.INFO
  74. # the prefix to append to gunicorn worker processes after init
  75. GUNICORN_WORKER_READY_PREFIX = "[ready] "
  76. LOG_FORMAT = conf.get("logging", "log_format")
  77. SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")
  78. SQL_ALCHEMY_CONN: str | None = None
  79. PLUGINS_FOLDER: str | None = None
  80. LOGGING_CLASS_PATH: str | None = None
  81. DONOT_MODIFY_HANDLERS: bool | None = None
  82. DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))
  83. engine: Engine
  84. Session: Callable[..., SASession]
  85. # The JSON library to use for DAG Serialization and De-Serialization
  86. json = json
  87. # Dictionary containing State and colors associated to each state to
  88. # display on the Webserver
  89. STATE_COLORS = {
  90. "deferred": "mediumpurple",
  91. "failed": "red",
  92. "queued": "gray",
  93. "removed": "lightgrey",
  94. "restarting": "violet",
  95. "running": "lime",
  96. "scheduled": "tan",
  97. "skipped": "hotpink",
  98. "success": "green",
  99. "up_for_reschedule": "turquoise",
  100. "up_for_retry": "gold",
  101. "upstream_failed": "orange",
  102. "shutdown": "blue",
  103. }
  104. @functools.lru_cache(maxsize=None)
  105. def _get_rich_console(file):
  106. # Delay imports until we need it
  107. import rich.console
  108. return rich.console.Console(file=file)
  109. def custom_show_warning(message, category, filename, lineno, file=None, line=None):
  110. """Print rich and visible warnings."""
  111. # Delay imports until we need it
  112. import re2
  113. from rich.markup import escape
  114. re2_escape_regex = re2.compile(r"(\\*)(\[[a-z#/@][^[]*?])").sub
  115. msg = f"[bold]{line}" if line else f"[bold][yellow]{filename}:{lineno}"
  116. msg += f" {category.__name__}[/bold]: {escape(str(message), _escape=re2_escape_regex)}[/yellow]"
  117. write_console = _get_rich_console(file or sys.stderr)
  118. write_console.print(msg, soft_wrap=True)
  119. def replace_showwarning(replacement):
  120. """
  121. Replace ``warnings.showwarning``, returning the original.
  122. This is useful since we want to "reset" the ``showwarning`` hook on exit to
  123. avoid lazy-loading issues. If a warning is emitted after Python cleaned up
  124. the import system, we would no longer be able to import ``rich``.
  125. """
  126. original = warnings.showwarning
  127. warnings.showwarning = replacement
  128. return original
  129. original_show_warning = replace_showwarning(custom_show_warning)
  130. atexit.register(functools.partial(replace_showwarning, original_show_warning))
  131. POLICY_PLUGIN_MANAGER: Any = None # type: ignore
  132. def task_policy(task):
  133. return POLICY_PLUGIN_MANAGER.hook.task_policy(task=task)
  134. def dag_policy(dag):
  135. return POLICY_PLUGIN_MANAGER.hook.dag_policy(dag=dag)
  136. def task_instance_mutation_hook(task_instance):
  137. return POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook(task_instance=task_instance)
  138. task_instance_mutation_hook.is_noop = True # type: ignore
  139. def pod_mutation_hook(pod):
  140. return POLICY_PLUGIN_MANAGER.hook.pod_mutation_hook(pod=pod)
  141. def get_airflow_context_vars(context):
  142. return POLICY_PLUGIN_MANAGER.hook.get_airflow_context_vars(context=context)
  143. def get_dagbag_import_timeout(dag_file_path: str):
  144. return POLICY_PLUGIN_MANAGER.hook.get_dagbag_import_timeout(dag_file_path=dag_file_path)
  145. def configure_policy_plugin_manager():
  146. global POLICY_PLUGIN_MANAGER
  147. POLICY_PLUGIN_MANAGER = pluggy.PluginManager(policies.local_settings_hookspec.project_name)
  148. POLICY_PLUGIN_MANAGER.add_hookspecs(policies)
  149. POLICY_PLUGIN_MANAGER.register(policies.DefaultPolicy)
  150. def load_policy_plugins(pm: pluggy.PluginManager):
  151. # We can't log duration etc here, as logging hasn't yet been configured!
  152. pm.load_setuptools_entrypoints("airflow.policy")
  153. def configure_vars():
  154. """Configure Global Variables from airflow.cfg."""
  155. global SQL_ALCHEMY_CONN
  156. global DAGS_FOLDER
  157. global PLUGINS_FOLDER
  158. global DONOT_MODIFY_HANDLERS
  159. SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
  160. DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))
  161. PLUGINS_FOLDER = conf.get("core", "plugins_folder", fallback=os.path.join(AIRFLOW_HOME, "plugins"))
  162. # If donot_modify_handlers=True, we do not modify logging handlers in task_run command
  163. # If the flag is set to False, we remove all handlers from the root logger
  164. # and add all handlers from 'airflow.task' logger to the root Logger. This is done
  165. # to get all the logs from the print & log statements in the DAG files before a task is run
  166. # The handlers are restored after the task completes execution.
  167. DONOT_MODIFY_HANDLERS = conf.getboolean("logging", "donot_modify_handlers", fallback=False)
  168. def _run_openlineage_runtime_check():
  169. """
  170. Ensure compatibility of OpenLineage provider package and Airflow version.
  171. Airflow 2.10.0 introduced some core changes (#39336) that made versions <= 1.8.0 of OpenLineage
  172. provider incompatible with future Airflow versions (>= 2.10.0).
  173. """
  174. ol_package = "apache-airflow-providers-openlineage"
  175. try:
  176. ol_version = metadata.version(ol_package)
  177. except metadata.PackageNotFoundError:
  178. return
  179. if ol_version and Version(ol_version) < Version("1.8.0.dev0"):
  180. raise RuntimeError(
  181. f"You have installed `{ol_package}` == `{ol_version}` that is not compatible with "
  182. f"`apache-airflow` == `{airflow_version}`. "
  183. f"For `apache-airflow` >= `2.10.0` you must use `{ol_package}` >= `1.8.0`."
  184. )
  185. def run_providers_custom_runtime_checks():
  186. _run_openlineage_runtime_check()
  187. class SkipDBTestsSession:
  188. """
  189. This fake session is used to skip DB tests when `_AIRFLOW_SKIP_DB_TESTS` is set.
  190. :meta private:
  191. """
  192. def __init__(self):
  193. raise AirflowInternalRuntimeError(
  194. "Your test accessed the DB but `_AIRFLOW_SKIP_DB_TESTS` is set.\n"
  195. "Either make sure your test does not use database or mark the test with `@pytest.mark.db_test`\n"
  196. "See https://github.com/apache/airflow/blob/main/contributing-docs/testing/unit_tests.rst#"
  197. "best-practices-for-db-tests on how "
  198. "to deal with it and consult examples."
  199. )
  200. def remove(*args, **kwargs):
  201. pass
  202. def get_bind(
  203. self,
  204. mapper=None,
  205. clause=None,
  206. bind=None,
  207. _sa_skip_events=None,
  208. _sa_skip_for_implicit_returning=False,
  209. ):
  210. pass
  211. def get_cleaned_traceback(stack_summary: traceback.StackSummary) -> str:
  212. clened_traceback = [
  213. frame
  214. for frame in stack_summary[:-2]
  215. if "/_pytest" not in frame.filename and "/pluggy" not in frame.filename
  216. ]
  217. return "".join(traceback.format_list(clened_traceback))
  218. class TracebackSession:
  219. """
  220. Session that throws error when you try to use it.
  221. Also stores stack at instantiation call site.
  222. :meta private:
  223. """
  224. def __init__(self):
  225. self.traceback = traceback.extract_stack()
  226. def __getattr__(self, item):
  227. raise RuntimeError(
  228. "TracebackSession object was used but internal API is enabled. "
  229. "You'll need to ensure you are making only RPC calls with this object. "
  230. "The stack list below will show where the TracebackSession object was created."
  231. + get_cleaned_traceback(self.traceback)
  232. )
  233. def remove(*args, **kwargs):
  234. pass
  235. AIRFLOW_PATH = os.path.dirname(os.path.dirname(__file__))
  236. AIRFLOW_TESTS_PATH = os.path.join(AIRFLOW_PATH, "tests")
  237. AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
  238. AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py")
  239. AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py")
  240. AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py")
  241. AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")
  242. class TracebackSessionForTests:
  243. """
  244. Session that throws error when you try to create a session outside of the test code.
  245. When we run our tests in "db isolation" mode we expect that "airflow" code will never create
  246. a session on its own and internal_api server is used for all calls but the test code might use
  247. the session to setup and teardown in the DB so that the internal API server accesses it.
  248. :meta private:
  249. """
  250. db_session_class = None
  251. allow_db_access = False
  252. """For pytests to create/prepare stuff where explicit DB access it needed"""
  253. def __init__(self):
  254. self.current_db_session = TracebackSessionForTests.db_session_class()
  255. self.created_traceback = traceback.extract_stack()
  256. def __getattr__(self, item):
  257. test_code, frame_summary = self.is_called_from_test_code()
  258. if self.allow_db_access or test_code:
  259. return getattr(self.current_db_session, item)
  260. raise RuntimeError(
  261. "TracebackSessionForTests object was used but internal API is enabled. "
  262. "Only test code is allowed to use this object.\n"
  263. f"Called from:\n {frame_summary.filename}: {frame_summary.lineno}\n"
  264. f" {frame_summary.line}\n\n"
  265. "You'll need to ensure you are making only RPC calls with this object. "
  266. "The stack list below will show where the TracebackSession object was called:\n"
  267. + get_cleaned_traceback(self.traceback)
  268. + "\n\nThe stack list below will show where the TracebackSession object was created:\n"
  269. + get_cleaned_traceback(self.created_traceback)
  270. )
  271. def remove(*args, **kwargs):
  272. pass
  273. @staticmethod
  274. def set_allow_db_access(session, flag: bool):
  275. """Temporarily, e.g. for pytests allow access to DB to prepare stuff."""
  276. if isinstance(session, TracebackSessionForTests):
  277. session.allow_db_access = flag
  278. def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]:
  279. """
  280. Check if the traceback session was used from the test code.
  281. This is done by checking if the first "airflow" filename in the traceback
  282. is "airflow/tests" or "regular airflow".
  283. :meta: private
  284. :return: True if the object was created from test code, False otherwise.
  285. """
  286. self.traceback = traceback.extract_stack()
  287. if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback):
  288. # This is a fixture call
  289. return True, None
  290. airflow_frames = [
  291. tb
  292. for tb in self.traceback
  293. if tb.filename.startswith(AIRFLOW_PATH)
  294. and not tb.filename == AIRFLOW_SETTINGS_PATH
  295. and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
  296. ]
  297. if any(
  298. filename.endswith("conftest.py")
  299. or filename.endswith("tests/test_utils/db.py")
  300. or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method"))
  301. for filename, _, name, _ in airflow_frames
  302. ):
  303. # This is a fixture call or testing utilities
  304. return True, None
  305. if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
  306. # Let's look at what we are calling directly from the test code
  307. current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name
  308. if (current_filename, current_method_name) in (
  309. (AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
  310. (AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
  311. ):
  312. # This is baseoperator run method that is called directly from the test code and this is
  313. # usual pattern where we create a session in the test code to create dag_runs for tests.
  314. # If `run` code will be run inside a real "airflow" code the stack trace would be longer
  315. # and it would not be directly called from the test code. Also if subsequently any of the
  316. # run_task() method called later from the task code will attempt to execute any DB
  317. # method, the stack trace will be longer and we will catch it as "illegal" call.
  318. return True, None
  319. if current_filename == AIRFLOW_DB_UTILS_PATH:
  320. # This is a util method called directly from the test code
  321. return True, None
  322. for tb in airflow_frames[::-1]:
  323. if tb.filename.startswith(AIRFLOW_PATH):
  324. if tb.filename.startswith(AIRFLOW_TESTS_PATH):
  325. # this is a session created directly in the test code
  326. return True, None
  327. else:
  328. return False, tb
  329. # if it is from elsewhere.... Why???? We should return False in order to crash to find out
  330. # The traceback line will be always 3rd (two bottom ones are Airflow)
  331. return False, self.traceback[-2]
  332. def get_bind(
  333. self,
  334. mapper=None,
  335. clause=None,
  336. bind=None,
  337. _sa_skip_events=None,
  338. _sa_skip_for_implicit_returning=False,
  339. ):
  340. pass
  341. def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
  342. """Determine whether the database connection URI specifies a relative path."""
  343. # Check for non-empty connection string:
  344. if not sqla_conn_str:
  345. return False
  346. # Check for the right URI scheme:
  347. if not sqla_conn_str.startswith("sqlite"):
  348. return False
  349. # In-memory is not useful for production, but useful for writing tests against Airflow for extensions
  350. if sqla_conn_str == "sqlite://":
  351. return False
  352. # Check for absolute path:
  353. if sqla_conn_str.startswith(abs_prefix := "sqlite:///") and os.path.isabs(
  354. sqla_conn_str[len(abs_prefix) :]
  355. ):
  356. return False
  357. return True
  358. def configure_orm(disable_connection_pool=False, pool_class=None):
  359. """Configure ORM using SQLAlchemy."""
  360. from airflow.utils.log.secrets_masker import mask_secret
  361. if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
  362. from airflow.exceptions import AirflowConfigException
  363. raise AirflowConfigException(
  364. f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
  365. "Please use absolute path such as `sqlite:////tmp/airflow.db`."
  366. )
  367. global Session
  368. global engine
  369. if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
  370. # Skip DB initialization in unit tests, if DB tests are skipped
  371. Session = SkipDBTestsSession
  372. engine = None
  373. return
  374. if conf.get("database", "sql_alchemy_conn") == "none://":
  375. from airflow.api_internal.internal_api_call import InternalApiConfig
  376. InternalApiConfig.set_use_internal_api("ORM reconfigured in forked process.")
  377. return
  378. log.debug("Setting up DB connection pool (PID %s)", os.getpid())
  379. engine_args = prepare_engine_args(disable_connection_pool, pool_class)
  380. if conf.has_option("database", "sql_alchemy_connect_args"):
  381. connect_args = conf.getimport("database", "sql_alchemy_connect_args")
  382. else:
  383. connect_args = {}
  384. engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)
  385. mask_secret(engine.url.password)
  386. setup_event_handlers(engine)
  387. if conf.has_option("database", "sql_alchemy_session_maker"):
  388. _session_maker = conf.getimport("database", "sql_alchemy_session_maker")
  389. else:
  390. def _session_maker(_engine):
  391. return sessionmaker(
  392. autocommit=False,
  393. autoflush=False,
  394. bind=_engine,
  395. expire_on_commit=False,
  396. )
  397. Session = scoped_session(_session_maker(engine))
  398. def force_traceback_session_for_untrusted_components(allow_tests_to_use_db=False):
  399. log.info("Forcing TracebackSession for untrusted components.")
  400. global Session
  401. global engine
  402. if allow_tests_to_use_db:
  403. old_session_class = Session
  404. Session = TracebackSessionForTests
  405. TracebackSessionForTests.db_session_class = old_session_class
  406. else:
  407. try:
  408. dispose_orm()
  409. except NameError:
  410. # This exception might be thrown in case the ORM has not been initialized yet.
  411. pass
  412. else:
  413. Session = TracebackSession
  414. engine = None
  415. DEFAULT_ENGINE_ARGS = {
  416. "postgresql": {
  417. "executemany_mode": "values_plus_batch",
  418. "executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
  419. "executemany_batch_page_size": 2000,
  420. },
  421. }
  422. def prepare_engine_args(disable_connection_pool=False, pool_class=None):
  423. """Prepare SQLAlchemy engine args."""
  424. default_args = {}
  425. for dialect, default in DEFAULT_ENGINE_ARGS.items():
  426. if SQL_ALCHEMY_CONN.startswith(dialect):
  427. default_args = default.copy()
  428. break
  429. engine_args: dict = conf.getjson("database", "sql_alchemy_engine_args", fallback=default_args) # type: ignore
  430. if pool_class:
  431. # Don't use separate settings for size etc, only those from sql_alchemy_engine_args
  432. engine_args["poolclass"] = pool_class
  433. elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
  434. engine_args["poolclass"] = NullPool
  435. log.debug("settings.prepare_engine_args(): Using NullPool")
  436. elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
  437. # Pool size engine args not supported by sqlite.
  438. # If no config value is defined for the pool size, select a reasonable value.
  439. # 0 means no limit, which could lead to exceeding the Database connection limit.
  440. pool_size = conf.getint("database", "SQL_ALCHEMY_POOL_SIZE", fallback=5)
  441. # The maximum overflow size of the pool.
  442. # When the number of checked-out connections reaches the size set in pool_size,
  443. # additional connections will be returned up to this limit.
  444. # When those additional connections are returned to the pool, they are disconnected and discarded.
  445. # It follows then that the total number of simultaneous connections
  446. # the pool will allow is pool_size + max_overflow,
  447. # and the total number of "sleeping" connections the pool will allow is pool_size.
  448. # max_overflow can be set to -1 to indicate no overflow limit;
  449. # no limit will be placed on the total number
  450. # of concurrent connections. Defaults to 10.
  451. max_overflow = conf.getint("database", "SQL_ALCHEMY_MAX_OVERFLOW", fallback=10)
  452. # The DB server already has a value for wait_timeout (number of seconds after
  453. # which an idle sleeping connection should be killed). Since other DBs may
  454. # co-exist on the same server, SQLAlchemy should set its
  455. # pool_recycle to an equal or smaller value.
  456. pool_recycle = conf.getint("database", "SQL_ALCHEMY_POOL_RECYCLE", fallback=1800)
  457. # Check connection at the start of each connection pool checkout.
  458. # Typically, this is a simple statement like "SELECT 1", but may also make use
  459. # of some DBAPI-specific method to test the connection for liveness.
  460. # More information here:
  461. # https://docs.sqlalchemy.org/en/14/core/pooling.html#disconnect-handling-pessimistic
  462. pool_pre_ping = conf.getboolean("database", "SQL_ALCHEMY_POOL_PRE_PING", fallback=True)
  463. log.debug(
  464. "settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
  465. "pool_recycle=%d, pid=%d",
  466. pool_size,
  467. max_overflow,
  468. pool_recycle,
  469. os.getpid(),
  470. )
  471. engine_args["pool_size"] = pool_size
  472. engine_args["pool_recycle"] = pool_recycle
  473. engine_args["pool_pre_ping"] = pool_pre_ping
  474. engine_args["max_overflow"] = max_overflow
  475. # The default isolation level for MySQL (REPEATABLE READ) can introduce inconsistencies when
  476. # running multiple schedulers, as repeated queries on the same session may read from stale snapshots.
  477. # 'READ COMMITTED' is the default value for PostgreSQL.
  478. # More information here:
  479. # https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html"
  480. if SQL_ALCHEMY_CONN.startswith("mysql"):
  481. engine_args["isolation_level"] = "READ COMMITTED"
  482. if is_sqlalchemy_v1():
  483. # Allow the user to specify an encoding for their DB otherwise default
  484. # to utf-8 so jobs & users with non-latin1 characters can still use us.
  485. # This parameter was removed in SQLAlchemy 2.x.
  486. engine_args["encoding"] = conf.get("database", "SQL_ENGINE_ENCODING", fallback="utf-8")
  487. return engine_args
  488. def dispose_orm():
  489. """Properly close pooled database connections."""
  490. log.debug("Disposing DB connection pool (PID %s)", os.getpid())
  491. global engine
  492. global Session
  493. if Session is not None: # type: ignore[truthy-function]
  494. Session.remove()
  495. Session = None
  496. if engine:
  497. engine.dispose()
  498. engine = None
  499. def reconfigure_orm(disable_connection_pool=False, pool_class=None):
  500. """Properly close database connections and re-configure ORM."""
  501. dispose_orm()
  502. configure_orm(disable_connection_pool=disable_connection_pool, pool_class=pool_class)
  503. def configure_adapters():
  504. """Register Adapters and DB Converters."""
  505. from pendulum import DateTime as Pendulum
  506. if SQL_ALCHEMY_CONN.startswith("sqlite"):
  507. from sqlite3 import register_adapter
  508. register_adapter(Pendulum, lambda val: val.isoformat(" "))
  509. if SQL_ALCHEMY_CONN.startswith("mysql"):
  510. try:
  511. import MySQLdb.converters
  512. MySQLdb.converters.conversions[Pendulum] = MySQLdb.converters.DateTime2literal
  513. except ImportError:
  514. pass
  515. try:
  516. import pymysql.converters
  517. pymysql.converters.conversions[Pendulum] = pymysql.converters.escape_datetime
  518. except ImportError:
  519. pass
  520. def validate_session():
  521. """Validate ORM Session."""
  522. global engine
  523. worker_precheck = conf.getboolean("celery", "worker_precheck")
  524. if not worker_precheck:
  525. return True
  526. else:
  527. check_session = sessionmaker(bind=engine)
  528. session = check_session()
  529. try:
  530. session.execute(text("select 1"))
  531. conn_status = True
  532. except exc.DBAPIError as err:
  533. log.error(err)
  534. conn_status = False
  535. session.close()
  536. return conn_status
  537. def configure_action_logging() -> None:
  538. """Any additional configuration (register callback) for airflow.utils.action_loggers module."""
  539. def prepare_syspath_for_config_and_plugins():
  540. """Update sys.path for the config and plugins directories."""
  541. # Add ./config/ for loading custom log parsers etc, or
  542. # airflow_local_settings etc.
  543. config_path = os.path.join(AIRFLOW_HOME, "config")
  544. if config_path not in sys.path:
  545. sys.path.append(config_path)
  546. if PLUGINS_FOLDER not in sys.path:
  547. sys.path.append(PLUGINS_FOLDER)
  548. def prepare_syspath_for_dags_folder():
  549. """Update sys.path to include the DAGs folder."""
  550. if DAGS_FOLDER not in sys.path:
  551. sys.path.append(DAGS_FOLDER)
  552. def get_session_lifetime_config():
  553. """Get session timeout configs and handle outdated configs gracefully."""
  554. session_lifetime_minutes = conf.get("webserver", "session_lifetime_minutes", fallback=None)
  555. session_lifetime_days = conf.get("webserver", "session_lifetime_days", fallback=None)
  556. uses_deprecated_lifetime_configs = session_lifetime_days or conf.get(
  557. "webserver", "force_log_out_after", fallback=None
  558. )
  559. minutes_per_day = 24 * 60
  560. default_lifetime_minutes = "43200"
  561. if uses_deprecated_lifetime_configs and session_lifetime_minutes == default_lifetime_minutes:
  562. warnings.warn(
  563. "`session_lifetime_days` option from `[webserver]` section has been "
  564. "renamed to `session_lifetime_minutes`. The new option allows to configure "
  565. "session lifetime in minutes. The `force_log_out_after` option has been removed "
  566. "from `[webserver]` section. Please update your configuration.",
  567. category=RemovedInAirflow3Warning,
  568. stacklevel=2,
  569. )
  570. if session_lifetime_days:
  571. session_lifetime_minutes = minutes_per_day * int(session_lifetime_days)
  572. if not session_lifetime_minutes:
  573. session_lifetime_days = 30
  574. session_lifetime_minutes = minutes_per_day * session_lifetime_days
  575. log.debug("User session lifetime is set to %s minutes.", session_lifetime_minutes)
  576. return int(session_lifetime_minutes)
  577. def import_local_settings():
  578. """Import airflow_local_settings.py files to allow overriding any configs in settings.py file."""
  579. try:
  580. import airflow_local_settings
  581. except ModuleNotFoundError as e:
  582. if e.name == "airflow_local_settings":
  583. log.debug("No airflow_local_settings to import.", exc_info=True)
  584. else:
  585. log.critical(
  586. "Failed to import airflow_local_settings due to a transitive module not found error.",
  587. exc_info=True,
  588. )
  589. raise
  590. except ImportError:
  591. log.critical("Failed to import airflow_local_settings.", exc_info=True)
  592. raise
  593. else:
  594. if hasattr(airflow_local_settings, "__all__"):
  595. names = set(airflow_local_settings.__all__)
  596. else:
  597. names = {n for n in airflow_local_settings.__dict__ if not n.startswith("__")}
  598. if "policy" in names and "task_policy" not in names:
  599. warnings.warn(
  600. "Using `policy` in airflow_local_settings.py is deprecated. "
  601. "Please rename your `policy` to `task_policy`.",
  602. RemovedInAirflow3Warning,
  603. stacklevel=2,
  604. )
  605. setattr(airflow_local_settings, "task_policy", airflow_local_settings.policy)
  606. names.remove("policy")
  607. plugin_functions = policies.make_plugin_from_local_settings(
  608. POLICY_PLUGIN_MANAGER, airflow_local_settings, names
  609. )
  610. # If we have already handled a function by adding it to the plugin,
  611. # then don't clobber the global function
  612. for name in names - plugin_functions:
  613. globals()[name] = getattr(airflow_local_settings, name)
  614. if POLICY_PLUGIN_MANAGER.hook.task_instance_mutation_hook.get_hookimpls():
  615. task_instance_mutation_hook.is_noop = False
  616. log.info("Loaded airflow_local_settings from %s .", airflow_local_settings.__file__)
  617. def initialize():
  618. """Initialize Airflow with all the settings from this file."""
  619. configure_vars()
  620. prepare_syspath_for_config_and_plugins()
  621. configure_policy_plugin_manager()
  622. # Load policy plugins _before_ importing airflow_local_settings, as Pluggy uses LIFO and we want anything
  623. # in airflow_local_settings to take precendec
  624. load_policy_plugins(POLICY_PLUGIN_MANAGER)
  625. import_local_settings()
  626. prepare_syspath_for_dags_folder()
  627. global LOGGING_CLASS_PATH
  628. LOGGING_CLASS_PATH = configure_logging()
  629. State.state_color.update(STATE_COLORS)
  630. configure_adapters()
  631. # The webservers import this file from models.py with the default settings.
  632. configure_orm()
  633. configure_action_logging()
  634. # mask the sensitive_config_values
  635. conf.mask_secrets()
  636. # Run any custom runtime checks that needs to be executed for providers
  637. run_providers_custom_runtime_checks()
  638. # Ensure we close DB connections at scheduler and gunicorn worker terminations
  639. atexit.register(dispose_orm)
  640. # Const stuff
  641. KILOBYTE = 1024
  642. MEGABYTE = KILOBYTE * KILOBYTE
  643. WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
  644. # Updating serialized DAG can not be faster than a minimum interval to reduce database
  645. # write rate.
  646. MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)
  647. # If set to True, serialized DAGs is compressed before writing to DB,
  648. COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)
  649. # Fetching serialized DAG can not be faster than a minimum interval to reduce database
  650. # read rate. This config controls when your DAGs are updated in the Webserver
  651. MIN_SERIALIZED_DAG_FETCH_INTERVAL = conf.getint("core", "min_serialized_dag_fetch_interval", fallback=10)
  652. CAN_FORK = hasattr(os, "fork")
  653. EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
  654. "core",
  655. "execute_tasks_new_python_interpreter",
  656. fallback=False,
  657. )
  658. ALLOW_FUTURE_EXEC_DATES = conf.getboolean("scheduler", "allow_trigger_in_future", fallback=False)
  659. # Whether or not to check each dagrun against defined SLAs
  660. CHECK_SLAS = conf.getboolean("core", "check_slas", fallback=True)
  661. USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)
  662. # By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
  663. # if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
  664. LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
  665. # By default Airflow providers are lazily-discovered (discovery and imports happen only when required).
  666. # Set it to False, if you want to discover providers whenever 'airflow' is invoked via cli or
  667. # loaded from module.
  668. LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)
  669. # Determines if the executor utilizes Kubernetes
  670. IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
  671. executor_constants.KUBERNETES_EXECUTOR,
  672. executor_constants.CELERY_KUBERNETES_EXECUTOR,
  673. executor_constants.LOCAL_KUBERNETES_EXECUTOR,
  674. }
  675. # Executors can set this to true to configure logging correctly for
  676. # containerized executors.
  677. IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
  678. IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
  679. """Will be True if running in kubernetes executor pod."""
  680. HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")
  681. # By default this is off, but is automatically configured on when running task
  682. # instances
  683. MASK_SECRETS_IN_LOGS = False
  684. # Display alerts on the dashboard
  685. # Useful for warning about setup issues or announcing changes to end users
  686. # List of UIAlerts, which allows for specifying the message, category, and roles the
  687. # message should be shown to. For example:
  688. # from airflow.www.utils import UIAlert
  689. #
  690. # DASHBOARD_UIALERTS = [
  691. # UIAlert("Welcome to Airflow"), # All users
  692. # UIAlert("Airflow update happening next week", roles=["User"]), # Only users with the User role
  693. # # A flash message with html:
  694. # UIAlert('Visit <a href="http://airflow.apache.org">airflow.apache.org</a>', html=True),
  695. # ]
  696. #
  697. DASHBOARD_UIALERTS: list[UIAlert] = []
  698. # Prefix used to identify tables holding data moved during migration.
  699. AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"
  700. DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
  701. # AIP-44: internal_api (experimental)
  702. # This feature is not complete yet, so we disable it by default.
  703. _ENABLE_AIP_44: bool = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {
  704. "true",
  705. "t",
  706. "yes",
  707. "y",
  708. "1",
  709. }