123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Webserver command."""
- from __future__ import annotations
- import logging
- import os
- import signal
- import subprocess
- import sys
- import textwrap
- import time
- from contextlib import suppress
- from pathlib import Path
- from time import sleep
- from typing import NoReturn
- import psutil
- from lockfile.pidlockfile import read_pid_from_pidfile
- from airflow import settings
- from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
- from airflow.configuration import conf
- from airflow.exceptions import AirflowException, AirflowWebServerTimeout
- from airflow.utils import cli as cli_utils
- from airflow.utils.cli import setup_locations
- from airflow.utils.hashlib_wrapper import md5
- from airflow.utils.log.logging_mixin import LoggingMixin
- from airflow.utils.providers_configuration_loader import providers_configuration_loaded
- log = logging.getLogger(__name__)
- class GunicornMonitor(LoggingMixin):
- """
- Runs forever.
- Monitoring the child processes of @gunicorn_master_proc and restarting
- workers occasionally or when files in the plug-in directory has been modified.
- Each iteration of the loop traverses one edge of this state transition
- diagram, where each state (node) represents
- [ num_ready_workers_running / num_workers_running ]. We expect most time to
- be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
- The horizontal transition at ? happens after the new worker parses all the
- dags (so it could take a while!)
- V ────────────────────────────────────────────────────────────────────────┐
- [n / n] ──TTIN──> [ [n, n+bs) / n + bs ] ────?───> [n + bs / n + bs] ──TTOU─┘
- ^ ^───────────────┘
- │
- │ ┌────────────────v
- └──────┴────── [ [0, n) / n ] <─── start
- We change the number of workers by sending TTIN and TTOU to the gunicorn
- master process, which increases and decreases the number of child workers
- respectively. Gunicorn guarantees that on TTOU workers are terminated
- gracefully and that the oldest worker is terminated.
- :param gunicorn_master_pid: PID for the main Gunicorn process
- :param num_workers_expected: Number of workers to run the Gunicorn web server
- :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
- doesn't respond
- :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
- :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
- refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
- bringing up new ones and killing old ones.
- :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_folder directory.
- When it detects changes, then reload the gunicorn.
- """
- def __init__(
- self,
- gunicorn_master_pid: int,
- num_workers_expected: int,
- master_timeout: int,
- worker_refresh_interval: int,
- worker_refresh_batch_size: int,
- reload_on_plugin_change: bool,
- ):
- super().__init__()
- self.gunicorn_master_proc = psutil.Process(gunicorn_master_pid)
- self.num_workers_expected = num_workers_expected
- self.master_timeout = master_timeout
- self.worker_refresh_interval = worker_refresh_interval
- self.worker_refresh_batch_size = worker_refresh_batch_size
- self.reload_on_plugin_change = reload_on_plugin_change
- self._num_workers_running = 0
- self._num_ready_workers_running = 0
- self._last_refresh_time = time.monotonic() if worker_refresh_interval > 0 else None
- self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
- self._restart_on_next_plugin_check = False
- def _generate_plugin_state(self) -> dict[str, float]:
- """
- Get plugin states.
- Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
- directory.
- """
- if not settings.PLUGINS_FOLDER:
- return {}
- all_filenames: list[str] = []
- for root, _, filenames in os.walk(settings.PLUGINS_FOLDER):
- all_filenames.extend(os.path.join(root, f) for f in filenames)
- plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
- return plugin_state
- @staticmethod
- def _get_file_hash(fname: str):
- """Calculate MD5 hash for file."""
- hash_md5 = md5()
- with open(fname, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- return hash_md5.hexdigest()
- def _get_num_ready_workers_running(self) -> int:
- """Return number of ready Gunicorn workers by looking for READY_PREFIX in process name."""
- workers = psutil.Process(self.gunicorn_master_proc.pid).children()
- def ready_prefix_on_cmdline(proc):
- try:
- cmdline = proc.cmdline()
- if cmdline:
- return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
- except psutil.NoSuchProcess:
- pass
- return False
- nb_ready_workers = sum(1 for proc in workers if ready_prefix_on_cmdline(proc))
- return nb_ready_workers
- def _get_num_workers_running(self) -> int:
- """Return number of running Gunicorn workers processes."""
- workers = psutil.Process(self.gunicorn_master_proc.pid).children()
- return len(workers)
- def _wait_until_true(self, fn, timeout: int = 0) -> None:
- """Sleep until fn is true."""
- start_time = time.monotonic()
- while not fn():
- if 0 < timeout <= time.monotonic() - start_time:
- raise AirflowWebServerTimeout(f"No response from gunicorn master within {timeout} seconds")
- sleep(0.1)
- def _spawn_new_workers(self, count: int) -> None:
- """
- Send signal to kill the worker.
- :param count: The number of workers to spawn
- """
- excess = 0
- for _ in range(count):
- # TTIN: Increment the number of processes by one
- self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
- excess += 1
- self._wait_until_true(
- lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
- timeout=self.master_timeout,
- )
- def _kill_old_workers(self, count: int) -> None:
- """
- Send signal to kill the worker.
- :param count: The number of workers to kill
- """
- for _ in range(count):
- count -= 1
- # TTOU: Decrement the number of processes by one
- self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
- self._wait_until_true(
- lambda: self.num_workers_expected + count == self._get_num_workers_running(),
- timeout=self.master_timeout,
- )
- def _reload_gunicorn(self) -> None:
- """
- Send signal to reload the gunicorn configuration.
- When gunicorn receive signals, it reloads the configuration,
- start the new worker processes with a new configuration and gracefully
- shutdown older workers.
- """
- # HUP: Reload the configuration.
- self.gunicorn_master_proc.send_signal(signal.SIGHUP)
- sleep(1)
- self._wait_until_true(
- lambda: self.num_workers_expected == self._get_num_workers_running(), timeout=self.master_timeout
- )
- def start(self) -> NoReturn:
- """Start monitoring the webserver."""
- try:
- self._wait_until_true(
- lambda: self.num_workers_expected == self._get_num_workers_running(),
- timeout=self.master_timeout,
- )
- while True:
- if not self.gunicorn_master_proc.is_running():
- sys.exit(1)
- self._check_workers()
- # Throttle loop
- sleep(1)
- except (AirflowWebServerTimeout, OSError) as err:
- self.log.error(err)
- self.log.error("Shutting down webserver")
- try:
- self.gunicorn_master_proc.terminate()
- self.gunicorn_master_proc.wait()
- finally:
- sys.exit(1)
- def _check_workers(self) -> None:
- num_workers_running = self._get_num_workers_running()
- num_ready_workers_running = self._get_num_ready_workers_running()
- # Whenever some workers are not ready, wait until all workers are ready
- if num_ready_workers_running < num_workers_running:
- self.log.debug(
- "[%d / %d] Some workers are starting up, waiting...",
- num_ready_workers_running,
- num_workers_running,
- )
- sleep(1)
- return
- # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
- # number of workers
- if num_workers_running > self.num_workers_expected:
- excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
- self.log.debug(
- "[%d / %d] Killing %s workers", num_ready_workers_running, num_workers_running, excess
- )
- self._kill_old_workers(excess)
- return
- # If there are too few workers, start a new worker by asking gunicorn
- # to increase number of workers
- if num_workers_running < self.num_workers_expected:
- self.log.error(
- "[%d / %d] Some workers seem to have died and gunicorn did not restart them as expected",
- num_ready_workers_running,
- num_workers_running,
- )
- sleep(10)
- num_workers_running = self._get_num_workers_running()
- if num_workers_running < self.num_workers_expected:
- new_worker_count = min(
- self.num_workers_expected - num_workers_running, self.worker_refresh_batch_size
- )
- # log at info since we are trying fix an error logged just above
- self.log.info(
- "[%d / %d] Spawning %d workers",
- num_ready_workers_running,
- num_workers_running,
- new_worker_count,
- )
- self._spawn_new_workers(new_worker_count)
- return
- # Now the number of running and expected worker should be equal
- # If workers should be restarted periodically.
- if self.worker_refresh_interval > 0 and self._last_refresh_time:
- # and we refreshed the workers a long time ago, refresh the workers
- last_refresh_diff = time.monotonic() - self._last_refresh_time
- if self.worker_refresh_interval < last_refresh_diff:
- num_new_workers = self.worker_refresh_batch_size
- self.log.debug(
- "[%d / %d] Starting doing a refresh. Starting %d workers.",
- num_ready_workers_running,
- num_workers_running,
- num_new_workers,
- )
- self._spawn_new_workers(num_new_workers)
- self._last_refresh_time = time.monotonic()
- return
- # if we should check the directory with the plugin,
- if self.reload_on_plugin_change:
- # compare the previous and current contents of the directory
- new_state = self._generate_plugin_state()
- # If changed, wait until its content is fully saved.
- if new_state != self._last_plugin_state:
- self.log.debug(
- "[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the "
- "plugin directory is checked, if there is no change in it.",
- num_ready_workers_running,
- num_workers_running,
- )
- self._restart_on_next_plugin_check = True
- self._last_plugin_state = new_state
- elif self._restart_on_next_plugin_check:
- self.log.debug(
- "[%d / %d] Starts reloading the gunicorn configuration.",
- num_ready_workers_running,
- num_workers_running,
- )
- self._restart_on_next_plugin_check = False
- self._last_refresh_time = time.monotonic()
- self._reload_gunicorn()
- @cli_utils.action_cli
- @providers_configuration_loaded
- def webserver(args):
- """Start Airflow Webserver."""
- print(settings.HEADER)
- # Check for old/insecure config, and fail safe (i.e. don't launch) if the config is wildly insecure.
- if conf.get("webserver", "secret_key") == "temporary_key":
- from rich import print as rich_print
- rich_print(
- "[red][bold]ERROR:[/bold] The `secret_key` setting under the webserver config has an insecure "
- "value - Airflow has failed safe and refuses to start. Please change this value to a new, "
- "per-environment, randomly generated string, for example using this command `[cyan]openssl rand "
- "-hex 30[/cyan]`",
- file=sys.stderr,
- )
- sys.exit(1)
- access_logfile = args.access_logfile or conf.get("webserver", "access_logfile")
- error_logfile = args.error_logfile or conf.get("webserver", "error_logfile")
- access_logformat = args.access_logformat or conf.get("webserver", "access_logformat")
- num_workers = args.workers or conf.get("webserver", "workers")
- worker_timeout = args.worker_timeout or conf.get("webserver", "web_server_worker_timeout")
- ssl_cert = args.ssl_cert or conf.get("webserver", "web_server_ssl_cert")
- ssl_key = args.ssl_key or conf.get("webserver", "web_server_ssl_key")
- if not ssl_cert and ssl_key:
- raise AirflowException("An SSL certificate must also be provided for use with " + ssl_key)
- if ssl_cert and not ssl_key:
- raise AirflowException("An SSL key must also be provided for use with " + ssl_cert)
- from airflow.www.app import create_app
- if args.debug:
- print(f"Starting the web server on port {args.port} and host {args.hostname}.")
- app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
- app.run(
- debug=True,
- use_reloader=not app.config["TESTING"],
- port=args.port,
- host=args.hostname,
- ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None,
- )
- else:
- print(
- textwrap.dedent(
- f"""\
- Running the Gunicorn Server with:
- Workers: {num_workers} {args.workerclass}
- Host: {args.hostname}:{args.port}
- Timeout: {worker_timeout}
- Logfiles: {access_logfile} {error_logfile}
- Access Logformat: {access_logformat}
- ================================================================="""
- )
- )
- pid_file, _, _, _ = setup_locations("webserver", pid=args.pid)
- run_args = [
- sys.executable,
- "-m",
- "gunicorn",
- "--workers",
- str(num_workers),
- "--worker-class",
- str(args.workerclass),
- "--timeout",
- str(worker_timeout),
- "--bind",
- args.hostname + ":" + str(args.port),
- "--name",
- "airflow-webserver",
- "--pid",
- pid_file,
- "--config",
- "python:airflow.www.gunicorn_config",
- ]
- if args.access_logfile:
- run_args += ["--access-logfile", str(args.access_logfile)]
- if args.error_logfile:
- run_args += ["--error-logfile", str(args.error_logfile)]
- if args.access_logformat and args.access_logformat.strip():
- run_args += ["--access-logformat", str(args.access_logformat)]
- if args.daemon:
- run_args += ["--daemon"]
- if ssl_cert:
- run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
- run_args += ["airflow.www.app:cached_app()"]
- if conf.getboolean("webserver", "reload_on_plugin_change", fallback=False):
- log.warning(
- "Setting reload_on_plugin_change = true prevents running Gunicorn with preloading. "
- "This means the app cannot be loaded before workers are forked, and each worker has a "
- "separate copy of the app. This may cause IntegrityError during webserver startup, and "
- "should be avoided in production."
- )
- else:
- # To prevent different workers creating the web app and
- # all writing to the database at the same time, we use the --preload option.
- run_args += ["--preload"]
- def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
- log.info("Received signal: %s. Closing gunicorn.", signum)
- gunicorn_master_proc.terminate()
- with suppress(TimeoutError):
- gunicorn_master_proc.wait(timeout=30)
- if isinstance(gunicorn_master_proc, subprocess.Popen):
- still_running = gunicorn_master_proc.poll() is not None
- else:
- still_running = gunicorn_master_proc.is_running()
- if still_running:
- gunicorn_master_proc.kill()
- sys.exit(0)
- def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
- # Register signal handlers
- signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
- signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
- # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
- GunicornMonitor(
- gunicorn_master_pid=gunicorn_master_proc.pid,
- num_workers_expected=num_workers,
- master_timeout=conf.getint("webserver", "web_server_master_timeout"),
- worker_refresh_interval=conf.getint("webserver", "worker_refresh_interval", fallback=30),
- worker_refresh_batch_size=conf.getint("webserver", "worker_refresh_batch_size", fallback=1),
- reload_on_plugin_change=conf.getboolean(
- "webserver", "reload_on_plugin_change", fallback=False
- ),
- ).start()
- def start_and_monitor_gunicorn(args):
- if args.daemon:
- subprocess.Popen(run_args, close_fds=True)
- # Reading pid of gunicorn master as it will be different that
- # the one of process spawned above.
- gunicorn_master_proc_pid = None
- while not gunicorn_master_proc_pid:
- sleep(0.1)
- gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
- # Run Gunicorn monitor
- gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
- monitor_gunicorn(gunicorn_master_proc)
- else:
- with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
- monitor_gunicorn(gunicorn_master_proc)
- if args.daemon:
- # This makes possible errors get reported before daemonization
- os.environ["SKIP_DAGS_PARSING"] = "True"
- create_app(None)
- os.environ.pop("SKIP_DAGS_PARSING")
- pid_file_path = Path(pid_file)
- monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
- run_command_with_daemon_option(
- args=args,
- process_name="webserver",
- callback=lambda: start_and_monitor_gunicorn(args),
- should_setup_logging=True,
- pid_file=monitor_pid_file,
- )
|