webserver_command.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Webserver command."""
  18. from __future__ import annotations
  19. import logging
  20. import os
  21. import signal
  22. import subprocess
  23. import sys
  24. import textwrap
  25. import time
  26. from contextlib import suppress
  27. from pathlib import Path
  28. from time import sleep
  29. from typing import NoReturn
  30. import psutil
  31. from lockfile.pidlockfile import read_pid_from_pidfile
  32. from airflow import settings
  33. from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
  34. from airflow.configuration import conf
  35. from airflow.exceptions import AirflowException, AirflowWebServerTimeout
  36. from airflow.utils import cli as cli_utils
  37. from airflow.utils.cli import setup_locations
  38. from airflow.utils.hashlib_wrapper import md5
  39. from airflow.utils.log.logging_mixin import LoggingMixin
  40. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  41. log = logging.getLogger(__name__)
  42. class GunicornMonitor(LoggingMixin):
  43. """
  44. Runs forever.
  45. Monitoring the child processes of @gunicorn_master_proc and restarting
  46. workers occasionally or when files in the plug-in directory has been modified.
  47. Each iteration of the loop traverses one edge of this state transition
  48. diagram, where each state (node) represents
  49. [ num_ready_workers_running / num_workers_running ]. We expect most time to
  50. be spent in [n / n]. `bs` is the setting webserver.worker_refresh_batch_size.
  51. The horizontal transition at ? happens after the new worker parses all the
  52. dags (so it could take a while!)
  53. V ────────────────────────────────────────────────────────────────────────┐
  54. [n / n] ──TTIN──> [ [n, n+bs) / n + bs ] ────?───> [n + bs / n + bs] ──TTOU─┘
  55. ^ ^───────────────┘
  56. │ ┌────────────────v
  57. └──────┴────── [ [0, n) / n ] <─── start
  58. We change the number of workers by sending TTIN and TTOU to the gunicorn
  59. master process, which increases and decreases the number of child workers
  60. respectively. Gunicorn guarantees that on TTOU workers are terminated
  61. gracefully and that the oldest worker is terminated.
  62. :param gunicorn_master_pid: PID for the main Gunicorn process
  63. :param num_workers_expected: Number of workers to run the Gunicorn web server
  64. :param master_timeout: Number of seconds the webserver waits before killing gunicorn master that
  65. doesn't respond
  66. :param worker_refresh_interval: Number of seconds to wait before refreshing a batch of workers.
  67. :param worker_refresh_batch_size: Number of workers to refresh at a time. When set to 0, worker
  68. refresh is disabled. When nonzero, airflow periodically refreshes webserver workers by
  69. bringing up new ones and killing old ones.
  70. :param reload_on_plugin_change: If set to True, Airflow will track files in plugins_folder directory.
  71. When it detects changes, then reload the gunicorn.
  72. """
  73. def __init__(
  74. self,
  75. gunicorn_master_pid: int,
  76. num_workers_expected: int,
  77. master_timeout: int,
  78. worker_refresh_interval: int,
  79. worker_refresh_batch_size: int,
  80. reload_on_plugin_change: bool,
  81. ):
  82. super().__init__()
  83. self.gunicorn_master_proc = psutil.Process(gunicorn_master_pid)
  84. self.num_workers_expected = num_workers_expected
  85. self.master_timeout = master_timeout
  86. self.worker_refresh_interval = worker_refresh_interval
  87. self.worker_refresh_batch_size = worker_refresh_batch_size
  88. self.reload_on_plugin_change = reload_on_plugin_change
  89. self._num_workers_running = 0
  90. self._num_ready_workers_running = 0
  91. self._last_refresh_time = time.monotonic() if worker_refresh_interval > 0 else None
  92. self._last_plugin_state = self._generate_plugin_state() if reload_on_plugin_change else None
  93. self._restart_on_next_plugin_check = False
  94. def _generate_plugin_state(self) -> dict[str, float]:
  95. """
  96. Get plugin states.
  97. Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
  98. directory.
  99. """
  100. if not settings.PLUGINS_FOLDER:
  101. return {}
  102. all_filenames: list[str] = []
  103. for root, _, filenames in os.walk(settings.PLUGINS_FOLDER):
  104. all_filenames.extend(os.path.join(root, f) for f in filenames)
  105. plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
  106. return plugin_state
  107. @staticmethod
  108. def _get_file_hash(fname: str):
  109. """Calculate MD5 hash for file."""
  110. hash_md5 = md5()
  111. with open(fname, "rb") as f:
  112. for chunk in iter(lambda: f.read(4096), b""):
  113. hash_md5.update(chunk)
  114. return hash_md5.hexdigest()
  115. def _get_num_ready_workers_running(self) -> int:
  116. """Return number of ready Gunicorn workers by looking for READY_PREFIX in process name."""
  117. workers = psutil.Process(self.gunicorn_master_proc.pid).children()
  118. def ready_prefix_on_cmdline(proc):
  119. try:
  120. cmdline = proc.cmdline()
  121. if cmdline:
  122. return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0]
  123. except psutil.NoSuchProcess:
  124. pass
  125. return False
  126. nb_ready_workers = sum(1 for proc in workers if ready_prefix_on_cmdline(proc))
  127. return nb_ready_workers
  128. def _get_num_workers_running(self) -> int:
  129. """Return number of running Gunicorn workers processes."""
  130. workers = psutil.Process(self.gunicorn_master_proc.pid).children()
  131. return len(workers)
  132. def _wait_until_true(self, fn, timeout: int = 0) -> None:
  133. """Sleep until fn is true."""
  134. start_time = time.monotonic()
  135. while not fn():
  136. if 0 < timeout <= time.monotonic() - start_time:
  137. raise AirflowWebServerTimeout(f"No response from gunicorn master within {timeout} seconds")
  138. sleep(0.1)
  139. def _spawn_new_workers(self, count: int) -> None:
  140. """
  141. Send signal to kill the worker.
  142. :param count: The number of workers to spawn
  143. """
  144. excess = 0
  145. for _ in range(count):
  146. # TTIN: Increment the number of processes by one
  147. self.gunicorn_master_proc.send_signal(signal.SIGTTIN)
  148. excess += 1
  149. self._wait_until_true(
  150. lambda: self.num_workers_expected + excess == self._get_num_workers_running(),
  151. timeout=self.master_timeout,
  152. )
  153. def _kill_old_workers(self, count: int) -> None:
  154. """
  155. Send signal to kill the worker.
  156. :param count: The number of workers to kill
  157. """
  158. for _ in range(count):
  159. count -= 1
  160. # TTOU: Decrement the number of processes by one
  161. self.gunicorn_master_proc.send_signal(signal.SIGTTOU)
  162. self._wait_until_true(
  163. lambda: self.num_workers_expected + count == self._get_num_workers_running(),
  164. timeout=self.master_timeout,
  165. )
  166. def _reload_gunicorn(self) -> None:
  167. """
  168. Send signal to reload the gunicorn configuration.
  169. When gunicorn receive signals, it reloads the configuration,
  170. start the new worker processes with a new configuration and gracefully
  171. shutdown older workers.
  172. """
  173. # HUP: Reload the configuration.
  174. self.gunicorn_master_proc.send_signal(signal.SIGHUP)
  175. sleep(1)
  176. self._wait_until_true(
  177. lambda: self.num_workers_expected == self._get_num_workers_running(), timeout=self.master_timeout
  178. )
  179. def start(self) -> NoReturn:
  180. """Start monitoring the webserver."""
  181. try:
  182. self._wait_until_true(
  183. lambda: self.num_workers_expected == self._get_num_workers_running(),
  184. timeout=self.master_timeout,
  185. )
  186. while True:
  187. if not self.gunicorn_master_proc.is_running():
  188. sys.exit(1)
  189. self._check_workers()
  190. # Throttle loop
  191. sleep(1)
  192. except (AirflowWebServerTimeout, OSError) as err:
  193. self.log.error(err)
  194. self.log.error("Shutting down webserver")
  195. try:
  196. self.gunicorn_master_proc.terminate()
  197. self.gunicorn_master_proc.wait()
  198. finally:
  199. sys.exit(1)
  200. def _check_workers(self) -> None:
  201. num_workers_running = self._get_num_workers_running()
  202. num_ready_workers_running = self._get_num_ready_workers_running()
  203. # Whenever some workers are not ready, wait until all workers are ready
  204. if num_ready_workers_running < num_workers_running:
  205. self.log.debug(
  206. "[%d / %d] Some workers are starting up, waiting...",
  207. num_ready_workers_running,
  208. num_workers_running,
  209. )
  210. sleep(1)
  211. return
  212. # If there are too many workers, then kill a worker gracefully by asking gunicorn to reduce
  213. # number of workers
  214. if num_workers_running > self.num_workers_expected:
  215. excess = min(num_workers_running - self.num_workers_expected, self.worker_refresh_batch_size)
  216. self.log.debug(
  217. "[%d / %d] Killing %s workers", num_ready_workers_running, num_workers_running, excess
  218. )
  219. self._kill_old_workers(excess)
  220. return
  221. # If there are too few workers, start a new worker by asking gunicorn
  222. # to increase number of workers
  223. if num_workers_running < self.num_workers_expected:
  224. self.log.error(
  225. "[%d / %d] Some workers seem to have died and gunicorn did not restart them as expected",
  226. num_ready_workers_running,
  227. num_workers_running,
  228. )
  229. sleep(10)
  230. num_workers_running = self._get_num_workers_running()
  231. if num_workers_running < self.num_workers_expected:
  232. new_worker_count = min(
  233. self.num_workers_expected - num_workers_running, self.worker_refresh_batch_size
  234. )
  235. # log at info since we are trying fix an error logged just above
  236. self.log.info(
  237. "[%d / %d] Spawning %d workers",
  238. num_ready_workers_running,
  239. num_workers_running,
  240. new_worker_count,
  241. )
  242. self._spawn_new_workers(new_worker_count)
  243. return
  244. # Now the number of running and expected worker should be equal
  245. # If workers should be restarted periodically.
  246. if self.worker_refresh_interval > 0 and self._last_refresh_time:
  247. # and we refreshed the workers a long time ago, refresh the workers
  248. last_refresh_diff = time.monotonic() - self._last_refresh_time
  249. if self.worker_refresh_interval < last_refresh_diff:
  250. num_new_workers = self.worker_refresh_batch_size
  251. self.log.debug(
  252. "[%d / %d] Starting doing a refresh. Starting %d workers.",
  253. num_ready_workers_running,
  254. num_workers_running,
  255. num_new_workers,
  256. )
  257. self._spawn_new_workers(num_new_workers)
  258. self._last_refresh_time = time.monotonic()
  259. return
  260. # if we should check the directory with the plugin,
  261. if self.reload_on_plugin_change:
  262. # compare the previous and current contents of the directory
  263. new_state = self._generate_plugin_state()
  264. # If changed, wait until its content is fully saved.
  265. if new_state != self._last_plugin_state:
  266. self.log.debug(
  267. "[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the "
  268. "plugin directory is checked, if there is no change in it.",
  269. num_ready_workers_running,
  270. num_workers_running,
  271. )
  272. self._restart_on_next_plugin_check = True
  273. self._last_plugin_state = new_state
  274. elif self._restart_on_next_plugin_check:
  275. self.log.debug(
  276. "[%d / %d] Starts reloading the gunicorn configuration.",
  277. num_ready_workers_running,
  278. num_workers_running,
  279. )
  280. self._restart_on_next_plugin_check = False
  281. self._last_refresh_time = time.monotonic()
  282. self._reload_gunicorn()
  283. @cli_utils.action_cli
  284. @providers_configuration_loaded
  285. def webserver(args):
  286. """Start Airflow Webserver."""
  287. print(settings.HEADER)
  288. # Check for old/insecure config, and fail safe (i.e. don't launch) if the config is wildly insecure.
  289. if conf.get("webserver", "secret_key") == "temporary_key":
  290. from rich import print as rich_print
  291. rich_print(
  292. "[red][bold]ERROR:[/bold] The `secret_key` setting under the webserver config has an insecure "
  293. "value - Airflow has failed safe and refuses to start. Please change this value to a new, "
  294. "per-environment, randomly generated string, for example using this command `[cyan]openssl rand "
  295. "-hex 30[/cyan]`",
  296. file=sys.stderr,
  297. )
  298. sys.exit(1)
  299. access_logfile = args.access_logfile or conf.get("webserver", "access_logfile")
  300. error_logfile = args.error_logfile or conf.get("webserver", "error_logfile")
  301. access_logformat = args.access_logformat or conf.get("webserver", "access_logformat")
  302. num_workers = args.workers or conf.get("webserver", "workers")
  303. worker_timeout = args.worker_timeout or conf.get("webserver", "web_server_worker_timeout")
  304. ssl_cert = args.ssl_cert or conf.get("webserver", "web_server_ssl_cert")
  305. ssl_key = args.ssl_key or conf.get("webserver", "web_server_ssl_key")
  306. if not ssl_cert and ssl_key:
  307. raise AirflowException("An SSL certificate must also be provided for use with " + ssl_key)
  308. if ssl_cert and not ssl_key:
  309. raise AirflowException("An SSL key must also be provided for use with " + ssl_cert)
  310. from airflow.www.app import create_app
  311. if args.debug:
  312. print(f"Starting the web server on port {args.port} and host {args.hostname}.")
  313. app = create_app(testing=conf.getboolean("core", "unit_test_mode"))
  314. app.run(
  315. debug=True,
  316. use_reloader=not app.config["TESTING"],
  317. port=args.port,
  318. host=args.hostname,
  319. ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None,
  320. )
  321. else:
  322. print(
  323. textwrap.dedent(
  324. f"""\
  325. Running the Gunicorn Server with:
  326. Workers: {num_workers} {args.workerclass}
  327. Host: {args.hostname}:{args.port}
  328. Timeout: {worker_timeout}
  329. Logfiles: {access_logfile} {error_logfile}
  330. Access Logformat: {access_logformat}
  331. ================================================================="""
  332. )
  333. )
  334. pid_file, _, _, _ = setup_locations("webserver", pid=args.pid)
  335. run_args = [
  336. sys.executable,
  337. "-m",
  338. "gunicorn",
  339. "--workers",
  340. str(num_workers),
  341. "--worker-class",
  342. str(args.workerclass),
  343. "--timeout",
  344. str(worker_timeout),
  345. "--bind",
  346. args.hostname + ":" + str(args.port),
  347. "--name",
  348. "airflow-webserver",
  349. "--pid",
  350. pid_file,
  351. "--config",
  352. "python:airflow.www.gunicorn_config",
  353. ]
  354. if args.access_logfile:
  355. run_args += ["--access-logfile", str(args.access_logfile)]
  356. if args.error_logfile:
  357. run_args += ["--error-logfile", str(args.error_logfile)]
  358. if args.access_logformat and args.access_logformat.strip():
  359. run_args += ["--access-logformat", str(args.access_logformat)]
  360. if args.daemon:
  361. run_args += ["--daemon"]
  362. if ssl_cert:
  363. run_args += ["--certfile", ssl_cert, "--keyfile", ssl_key]
  364. run_args += ["airflow.www.app:cached_app()"]
  365. if conf.getboolean("webserver", "reload_on_plugin_change", fallback=False):
  366. log.warning(
  367. "Setting reload_on_plugin_change = true prevents running Gunicorn with preloading. "
  368. "This means the app cannot be loaded before workers are forked, and each worker has a "
  369. "separate copy of the app. This may cause IntegrityError during webserver startup, and "
  370. "should be avoided in production."
  371. )
  372. else:
  373. # To prevent different workers creating the web app and
  374. # all writing to the database at the same time, we use the --preload option.
  375. run_args += ["--preload"]
  376. def kill_proc(signum: int, gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
  377. log.info("Received signal: %s. Closing gunicorn.", signum)
  378. gunicorn_master_proc.terminate()
  379. with suppress(TimeoutError):
  380. gunicorn_master_proc.wait(timeout=30)
  381. if isinstance(gunicorn_master_proc, subprocess.Popen):
  382. still_running = gunicorn_master_proc.poll() is not None
  383. else:
  384. still_running = gunicorn_master_proc.is_running()
  385. if still_running:
  386. gunicorn_master_proc.kill()
  387. sys.exit(0)
  388. def monitor_gunicorn(gunicorn_master_proc: psutil.Process | subprocess.Popen) -> NoReturn:
  389. # Register signal handlers
  390. signal.signal(signal.SIGINT, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
  391. signal.signal(signal.SIGTERM, lambda signum, _: kill_proc(signum, gunicorn_master_proc))
  392. # These run forever until SIG{INT, TERM, KILL, ...} signal is sent
  393. GunicornMonitor(
  394. gunicorn_master_pid=gunicorn_master_proc.pid,
  395. num_workers_expected=num_workers,
  396. master_timeout=conf.getint("webserver", "web_server_master_timeout"),
  397. worker_refresh_interval=conf.getint("webserver", "worker_refresh_interval", fallback=30),
  398. worker_refresh_batch_size=conf.getint("webserver", "worker_refresh_batch_size", fallback=1),
  399. reload_on_plugin_change=conf.getboolean(
  400. "webserver", "reload_on_plugin_change", fallback=False
  401. ),
  402. ).start()
  403. def start_and_monitor_gunicorn(args):
  404. if args.daemon:
  405. subprocess.Popen(run_args, close_fds=True)
  406. # Reading pid of gunicorn master as it will be different that
  407. # the one of process spawned above.
  408. gunicorn_master_proc_pid = None
  409. while not gunicorn_master_proc_pid:
  410. sleep(0.1)
  411. gunicorn_master_proc_pid = read_pid_from_pidfile(pid_file)
  412. # Run Gunicorn monitor
  413. gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
  414. monitor_gunicorn(gunicorn_master_proc)
  415. else:
  416. with subprocess.Popen(run_args, close_fds=True) as gunicorn_master_proc:
  417. monitor_gunicorn(gunicorn_master_proc)
  418. if args.daemon:
  419. # This makes possible errors get reported before daemonization
  420. os.environ["SKIP_DAGS_PARSING"] = "True"
  421. create_app(None)
  422. os.environ.pop("SKIP_DAGS_PARSING")
  423. pid_file_path = Path(pid_file)
  424. monitor_pid_file = str(pid_file_path.with_name(f"{pid_file_path.stem}-monitor{pid_file_path.suffix}"))
  425. run_command_with_daemon_option(
  426. args=args,
  427. process_name="webserver",
  428. callback=lambda: start_and_monitor_gunicorn(args),
  429. should_setup_logging=True,
  430. pid_file=monitor_pid_file,
  431. )