process_utils.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Utilities for running or stopping processes."""
  19. from __future__ import annotations
  20. import errno
  21. import logging
  22. import os
  23. import select
  24. import shlex
  25. import signal
  26. import subprocess
  27. import sys
  28. from airflow.utils.platform import IS_WINDOWS
  29. if not IS_WINDOWS:
  30. import pty
  31. import termios
  32. import tty
  33. from contextlib import contextmanager
  34. from typing import Generator
  35. import psutil
  36. from lockfile.pidlockfile import PIDLockFile
  37. from airflow.configuration import conf
  38. from airflow.exceptions import AirflowException
  39. log = logging.getLogger(__name__)
  40. # When killing processes, time to wait after issuing a SIGTERM before issuing a
  41. # SIGKILL.
  42. DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = conf.getint("core", "KILLED_TASK_CLEANUP_TIME")
  43. def reap_process_group(
  44. process_group_id: int,
  45. logger,
  46. sig: signal.Signals = signal.SIGTERM,
  47. timeout: int = DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM,
  48. ) -> dict[int, int]:
  49. """
  50. Send sig (SIGTERM) to the process group of pid.
  51. Tries really hard to terminate all processes in the group (including grandchildren). Will send
  52. sig (SIGTERM) to the process group of pid. If any process is alive after timeout
  53. a SIGKILL will be send.
  54. :param process_group_id: process group id to kill.
  55. The process that wants to create the group should run
  56. `airflow.utils.process_utils.set_new_process_group()` as the first command
  57. it executes which will set group id = process_id. Effectively the process that is the
  58. "root" of the group has pid = gid and all other processes in the group have different
  59. pids but the same gid (equal the pid of the root process)
  60. :param logger: log handler
  61. :param sig: signal type
  62. :param timeout: how much time a process has to terminate
  63. """
  64. returncodes = {}
  65. def on_terminate(p):
  66. logger.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
  67. returncodes[p.pid] = p.returncode
  68. def signal_procs(sig):
  69. if IS_WINDOWS:
  70. return
  71. try:
  72. logger.info("Sending the signal %s to group %s", sig, process_group_id)
  73. os.killpg(process_group_id, sig)
  74. except OSError as err_killpg:
  75. # If operation not permitted error is thrown due to run_as_user,
  76. # use sudo -n(--non-interactive) to kill the process
  77. if err_killpg.errno == errno.EPERM:
  78. subprocess.check_call(
  79. ["sudo", "-n", "kill", "-" + str(int(sig))]
  80. + [str(p.pid) for p in all_processes_in_the_group]
  81. )
  82. elif err_killpg.errno == errno.ESRCH:
  83. # There is a rare condition that the process has not managed yet to change its process
  84. # group. In this case os.killpg fails with ESRCH error
  85. # So we additionally send a kill signal to the process itself.
  86. logger.info(
  87. "Sending the signal %s to process %s as process group is missing.", sig, process_group_id
  88. )
  89. try:
  90. os.kill(process_group_id, sig)
  91. except OSError as err_kill:
  92. if err_kill.errno == errno.EPERM:
  93. subprocess.check_call(["sudo", "-n", "kill", "-" + str(process_group_id)])
  94. else:
  95. raise
  96. else:
  97. raise
  98. if not IS_WINDOWS and process_group_id == os.getpgid(0):
  99. raise RuntimeError("I refuse to kill myself")
  100. try:
  101. parent = psutil.Process(process_group_id)
  102. all_processes_in_the_group = parent.children(recursive=True)
  103. all_processes_in_the_group.append(parent)
  104. except psutil.NoSuchProcess:
  105. # The process already exited, but maybe its children haven't.
  106. all_processes_in_the_group = []
  107. for proc in psutil.process_iter():
  108. try:
  109. if os.getpgid(proc.pid) == process_group_id and proc.pid != 0:
  110. all_processes_in_the_group.append(proc)
  111. except OSError:
  112. pass
  113. logger.info(
  114. "Sending %s to group %s. PIDs of all processes in the group: %s",
  115. sig,
  116. process_group_id,
  117. [p.pid for p in all_processes_in_the_group],
  118. )
  119. try:
  120. signal_procs(sig)
  121. except OSError as err:
  122. # No such process, which means there is no such process group - our job
  123. # is done
  124. if err.errno == errno.ESRCH:
  125. return returncodes
  126. _, alive = psutil.wait_procs(all_processes_in_the_group, timeout=timeout, callback=on_terminate)
  127. if alive:
  128. for proc in alive:
  129. logger.warning("process %s did not respond to SIGTERM. Trying SIGKILL", proc)
  130. try:
  131. signal_procs(signal.SIGKILL)
  132. except OSError as err:
  133. if err.errno != errno.ESRCH:
  134. raise
  135. _, alive = psutil.wait_procs(alive, timeout=timeout, callback=on_terminate)
  136. if alive:
  137. for proc in alive:
  138. logger.error("Process %s (%s) could not be killed. Giving up.", proc, proc.pid)
  139. return returncodes
  140. def execute_in_subprocess(cmd: list[str], cwd: str | None = None, env: dict | None = None) -> None:
  141. """
  142. Execute a process and stream output to logger.
  143. :param cmd: command and arguments to run
  144. :param cwd: Current working directory passed to the Popen constructor
  145. :param env: Additional environment variables to set for the subprocess. If None,
  146. the subprocess will inherit the current environment variables of the parent process
  147. (``os.environ``)
  148. """
  149. execute_in_subprocess_with_kwargs(cmd, cwd=cwd, env=env)
  150. def execute_in_subprocess_with_kwargs(cmd: list[str], **kwargs) -> None:
  151. """
  152. Execute a process and stream output to logger.
  153. :param cmd: command and arguments to run
  154. All other keyword args will be passed directly to subprocess.Popen
  155. """
  156. log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
  157. with subprocess.Popen(
  158. cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=0, close_fds=True, **kwargs
  159. ) as proc:
  160. log.info("Output:")
  161. if proc.stdout:
  162. with proc.stdout:
  163. for line in iter(proc.stdout.readline, b""):
  164. log.info("%s", line.decode().rstrip())
  165. exit_code = proc.wait()
  166. if exit_code != 0:
  167. raise subprocess.CalledProcessError(exit_code, cmd)
  168. def execute_interactive(cmd: list[str], **kwargs) -> None:
  169. """
  170. Run the new command as a subprocess.
  171. Runs the new command as a subprocess and ensures that the terminal's state is restored to its original
  172. state after the process is completed e.g. if the subprocess hides the cursor, it will be restored after
  173. the process is completed.
  174. """
  175. log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
  176. old_tty = termios.tcgetattr(sys.stdin)
  177. old_sigint_handler = signal.getsignal(signal.SIGINT)
  178. tty.setcbreak(sys.stdin.fileno())
  179. # open pseudo-terminal to interact with subprocess
  180. primary_fd, secondary_fd = pty.openpty()
  181. try:
  182. with subprocess.Popen(
  183. cmd,
  184. stdin=secondary_fd,
  185. stdout=secondary_fd,
  186. stderr=secondary_fd,
  187. universal_newlines=True,
  188. **kwargs,
  189. ) as proc:
  190. # ignore SIGINT in the parent process
  191. signal.signal(signal.SIGINT, signal.SIG_IGN)
  192. while proc.poll() is None:
  193. readable_fbs, _, _ = select.select([sys.stdin, primary_fd], [], [], 0)
  194. if sys.stdin in readable_fbs:
  195. input_data = os.read(sys.stdin.fileno(), 10240)
  196. os.write(primary_fd, input_data)
  197. if primary_fd in readable_fbs:
  198. output_data = os.read(primary_fd, 10240)
  199. if output_data:
  200. os.write(sys.stdout.fileno(), output_data)
  201. finally:
  202. # restore tty settings back
  203. signal.signal(signal.SIGINT, old_sigint_handler)
  204. termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)
  205. def kill_child_processes_by_pids(pids_to_kill: list[int], timeout: int = 5) -> None:
  206. """
  207. Kills child processes for the current process.
  208. First, it sends the SIGTERM signal, and after the time specified by the `timeout` parameter, sends
  209. the SIGKILL signal, if the process is still alive.
  210. :param pids_to_kill: List of PID to be killed.
  211. :param timeout: The time to wait before sending the SIGKILL signal.
  212. """
  213. this_process = psutil.Process(os.getpid())
  214. # Only check child processes to ensure that we don't have a case
  215. # where we kill the wrong process because a child process died
  216. # but the PID got reused.
  217. child_processes = [
  218. x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
  219. ]
  220. # First try SIGTERM
  221. for child in child_processes:
  222. log.info("Terminating child PID: %s", child.pid)
  223. child.terminate()
  224. log.info("Waiting up to %s seconds for processes to exit...", timeout)
  225. try:
  226. psutil.wait_procs(
  227. child_processes, timeout=timeout, callback=lambda x: log.info("Terminated PID %s", x.pid)
  228. )
  229. except psutil.TimeoutExpired:
  230. log.debug("Ran out of time while waiting for processes to exit")
  231. # Then SIGKILL
  232. child_processes = [
  233. x for x in this_process.children(recursive=True) if x.is_running() and x.pid in pids_to_kill
  234. ]
  235. if child_processes:
  236. log.info("SIGKILL processes that did not terminate gracefully")
  237. for child in child_processes:
  238. log.info("Killing child PID: %s", child.pid)
  239. child.kill()
  240. child.wait()
  241. @contextmanager
  242. def patch_environ(new_env_variables: dict[str, str]) -> Generator[None, None, None]:
  243. """
  244. Set environment variables in context.
  245. After leaving the context, it restores its original state.
  246. :param new_env_variables: Environment variables to set
  247. """
  248. current_env_state = {key: os.environ.get(key) for key in new_env_variables}
  249. os.environ.update(new_env_variables)
  250. try:
  251. yield
  252. finally:
  253. for key, old_value in current_env_state.items():
  254. if old_value is None:
  255. if key in os.environ:
  256. del os.environ[key]
  257. else:
  258. os.environ[key] = old_value
  259. def check_if_pidfile_process_is_running(pid_file: str, process_name: str):
  260. """
  261. Check if a pidfile already exists and process is still running.
  262. If process is dead then pidfile is removed.
  263. :param pid_file: path to the pidfile
  264. :param process_name: name used in exception if process is up and
  265. running
  266. """
  267. pid_lock_file = PIDLockFile(path=pid_file)
  268. # If file exists
  269. if pid_lock_file.is_locked():
  270. # Read the pid
  271. pid = pid_lock_file.read_pid()
  272. if pid is None:
  273. return
  274. try:
  275. # Check if process is still running
  276. proc = psutil.Process(pid)
  277. if proc.is_running():
  278. raise AirflowException(f"The {process_name} is already running under PID {pid}.")
  279. except psutil.NoSuchProcess:
  280. # If process is dead remove the pidfile
  281. pid_lock_file.break_lock()
  282. def set_new_process_group() -> None:
  283. """
  284. Try to set current process to a new process group.
  285. That makes it easy to kill all sub-process of this at the OS-level,
  286. rather than having to iterate the child processes.
  287. If current process was spawned by system call ``exec()``, the current
  288. process group is kept.
  289. """
  290. if os.getpid() == os.getsid(0):
  291. # If PID = SID than process a session leader, and it is not possible to change process group
  292. return
  293. os.setpgid(0, 0)