daemon_utils.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import signal
  19. from argparse import Namespace
  20. from typing import Callable
  21. from daemon import daemon
  22. from daemon.pidfile import TimeoutPIDLockFile
  23. from airflow import settings
  24. from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
  25. from airflow.utils.process_utils import check_if_pidfile_process_is_running
  26. def run_command_with_daemon_option(
  27. *,
  28. args: Namespace,
  29. process_name: str,
  30. callback: Callable,
  31. should_setup_logging: bool = False,
  32. umask: str = settings.DAEMON_UMASK,
  33. pid_file: str | None = None,
  34. ):
  35. """
  36. Run the command in a daemon process if daemon mode enabled or within this process if not.
  37. :param args: the set of arguments passed to the original CLI command
  38. :param process_name: process name used in naming log and PID files for the daemon
  39. :param callback: the actual command to run with or without daemon context
  40. :param should_setup_logging: if true, then a log file handler for the daemon process will be created
  41. :param umask: file access creation mask ("umask") to set for the process on daemon start
  42. :param pid_file: if specified, this file path us used to store daemon process PID.
  43. If not specified, a file path is generated with the default pattern.
  44. """
  45. if args.daemon:
  46. pid = pid_file or args.pid if pid_file is not None or args.pid is not None else None
  47. pid, stdout, stderr, log_file = setup_locations(
  48. process=process_name, pid=pid, stdout=args.stdout, stderr=args.stderr, log=args.log_file
  49. )
  50. # Check if the process is already running; if not but a pidfile exists, clean it up
  51. check_if_pidfile_process_is_running(pid_file=pid, process_name=process_name)
  52. if should_setup_logging:
  53. files_preserve = [setup_logging(log_file)]
  54. else:
  55. files_preserve = None
  56. with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
  57. stdout_handle.truncate(0)
  58. stderr_handle.truncate(0)
  59. ctx = daemon.DaemonContext(
  60. pidfile=TimeoutPIDLockFile(pid, -1),
  61. files_preserve=files_preserve,
  62. stdout=stdout_handle,
  63. stderr=stderr_handle,
  64. umask=int(umask, 8),
  65. )
  66. with ctx:
  67. # in daemon context stats client needs to be reinitialized.
  68. from airflow.stats import Stats
  69. Stats.instance = None
  70. callback()
  71. else:
  72. signal.signal(signal.SIGINT, sigint_handler)
  73. signal.signal(signal.SIGTERM, sigint_handler)
  74. signal.signal(signal.SIGQUIT, sigquit_handler)
  75. callback()