1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import signal
- from argparse import Namespace
- from typing import Callable
- from daemon import daemon
- from daemon.pidfile import TimeoutPIDLockFile
- from airflow import settings
- from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
- from airflow.utils.process_utils import check_if_pidfile_process_is_running
- def run_command_with_daemon_option(
- *,
- args: Namespace,
- process_name: str,
- callback: Callable,
- should_setup_logging: bool = False,
- umask: str = settings.DAEMON_UMASK,
- pid_file: str | None = None,
- ):
- """
- Run the command in a daemon process if daemon mode enabled or within this process if not.
- :param args: the set of arguments passed to the original CLI command
- :param process_name: process name used in naming log and PID files for the daemon
- :param callback: the actual command to run with or without daemon context
- :param should_setup_logging: if true, then a log file handler for the daemon process will be created
- :param umask: file access creation mask ("umask") to set for the process on daemon start
- :param pid_file: if specified, this file path us used to store daemon process PID.
- If not specified, a file path is generated with the default pattern.
- """
- if args.daemon:
- pid = pid_file or args.pid if pid_file is not None or args.pid is not None else None
- pid, stdout, stderr, log_file = setup_locations(
- process=process_name, pid=pid, stdout=args.stdout, stderr=args.stderr, log=args.log_file
- )
- # Check if the process is already running; if not but a pidfile exists, clean it up
- check_if_pidfile_process_is_running(pid_file=pid, process_name=process_name)
- if should_setup_logging:
- files_preserve = [setup_logging(log_file)]
- else:
- files_preserve = None
- with open(stdout, "a") as stdout_handle, open(stderr, "a") as stderr_handle:
- stdout_handle.truncate(0)
- stderr_handle.truncate(0)
- ctx = daemon.DaemonContext(
- pidfile=TimeoutPIDLockFile(pid, -1),
- files_preserve=files_preserve,
- stdout=stdout_handle,
- stderr=stderr_handle,
- umask=int(umask, 8),
- )
- with ctx:
- # in daemon context stats client needs to be reinitialized.
- from airflow.stats import Stats
- Stats.instance = None
- callback()
- else:
- signal.signal(signal.SIGINT, sigint_handler)
- signal.signal(signal.SIGTERM, sigint_handler)
- signal.signal(signal.SIGQUIT, sigquit_handler)
- callback()
|