standalone_command.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import logging
  19. import os
  20. import socket
  21. import subprocess
  22. import threading
  23. import time
  24. from collections import deque
  25. from typing import TYPE_CHECKING
  26. from termcolor import colored
  27. from airflow.configuration import conf
  28. from airflow.executors import executor_constants
  29. from airflow.executors.executor_loader import ExecutorLoader
  30. from airflow.jobs.job import most_recent_job
  31. from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
  32. from airflow.jobs.triggerer_job_runner import TriggererJobRunner
  33. from airflow.utils import db
  34. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  35. if TYPE_CHECKING:
  36. from airflow.jobs.base_job_runner import BaseJobRunner
  37. class StandaloneCommand:
  38. """
  39. Runs all components of Airflow under a single parent process.
  40. Useful for local development.
  41. """
  42. @classmethod
  43. def entrypoint(cls, args):
  44. """CLI entrypoint, called by the main CLI system."""
  45. StandaloneCommand().run()
  46. def __init__(self):
  47. self.subcommands = {}
  48. self.output_queue = deque()
  49. self.user_info = {}
  50. self.ready_time = None
  51. self.ready_delay = 3
  52. @providers_configuration_loaded
  53. def run(self):
  54. self.print_output("standalone", "Starting Airflow Standalone")
  55. # Silence built-in logging at INFO
  56. logging.getLogger("").setLevel(logging.WARNING)
  57. # Startup checks and prep
  58. env = self.calculate_env()
  59. self.initialize_database()
  60. # Set up commands to run
  61. self.subcommands["scheduler"] = SubCommand(
  62. self,
  63. name="scheduler",
  64. command=["scheduler"],
  65. env=env,
  66. )
  67. self.subcommands["webserver"] = SubCommand(
  68. self,
  69. name="webserver",
  70. command=["webserver"],
  71. env=env,
  72. )
  73. self.subcommands["triggerer"] = SubCommand(
  74. self,
  75. name="triggerer",
  76. command=["triggerer"],
  77. env=env,
  78. )
  79. self.web_server_port = conf.getint("webserver", "WEB_SERVER_PORT", fallback=8080)
  80. # Run subcommand threads
  81. for command in self.subcommands.values():
  82. command.start()
  83. # Run output loop
  84. shown_ready = False
  85. try:
  86. while True:
  87. # Print all the current lines onto the screen
  88. self.update_output()
  89. # Print info banner when all components are ready and the
  90. # delay has passed
  91. if not self.ready_time and self.is_ready():
  92. self.ready_time = time.monotonic()
  93. if (
  94. not shown_ready
  95. and self.ready_time
  96. and time.monotonic() - self.ready_time > self.ready_delay
  97. ):
  98. self.print_ready()
  99. shown_ready = True
  100. # Ensure we idle-sleep rather than fast-looping
  101. time.sleep(0.1)
  102. except KeyboardInterrupt:
  103. pass
  104. # Stop subcommand threads
  105. self.print_output("standalone", "Shutting down components")
  106. for command in self.subcommands.values():
  107. command.stop()
  108. for command in self.subcommands.values():
  109. command.join()
  110. self.print_output("standalone", "Complete")
  111. def update_output(self):
  112. """Drains the output queue and prints its contents to the screen."""
  113. while self.output_queue:
  114. # Extract info
  115. name, line = self.output_queue.popleft()
  116. # Make line printable
  117. line_str = line.decode("utf8").strip()
  118. self.print_output(name, line_str)
  119. def print_output(self, name: str, output):
  120. """
  121. Print an output line with name and colouring.
  122. You can pass multiple lines to output if you wish; it will be split for you.
  123. """
  124. color = {
  125. "webserver": "green",
  126. "scheduler": "blue",
  127. "triggerer": "cyan",
  128. "standalone": "white",
  129. }.get(name, "white")
  130. colorised_name = colored(f"{name:10}", color)
  131. for line in output.splitlines():
  132. print(f"{colorised_name} | {line.strip()}")
  133. def print_error(self, name: str, output):
  134. """
  135. Print an error message to the console.
  136. This is the same as print_output but with the text red
  137. """
  138. self.print_output(name, colored(output, "red"))
  139. def calculate_env(self):
  140. """
  141. Works out the environment variables needed to run subprocesses.
  142. We override some settings as part of being standalone.
  143. """
  144. env = dict(os.environ)
  145. # Make sure we're using a local executor flavour
  146. executor_class, _ = ExecutorLoader.import_default_executor_cls()
  147. if not executor_class.is_local:
  148. if "sqlite" in conf.get("database", "sql_alchemy_conn"):
  149. self.print_output("standalone", "Forcing executor to SequentialExecutor")
  150. env["AIRFLOW__CORE__EXECUTOR"] = executor_constants.SEQUENTIAL_EXECUTOR
  151. else:
  152. self.print_output("standalone", "Forcing executor to LocalExecutor")
  153. env["AIRFLOW__CORE__EXECUTOR"] = executor_constants.LOCAL_EXECUTOR
  154. return env
  155. def initialize_database(self):
  156. """Make sure all the tables are created."""
  157. # Set up DB tables
  158. self.print_output("standalone", "Checking database is initialized")
  159. db.initdb()
  160. self.print_output("standalone", "Database ready")
  161. # Then create a "default" admin user if necessary
  162. from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder
  163. with get_application_builder() as appbuilder:
  164. user_name, password = appbuilder.sm.create_admin_standalone()
  165. # Store what we know about the user for printing later in startup
  166. self.user_info = {"username": user_name, "password": password}
  167. def is_ready(self):
  168. """
  169. Detect when all Airflow components are ready to serve.
  170. For now, it's simply time-based.
  171. """
  172. return (
  173. self.port_open(self.web_server_port)
  174. and self.job_running(SchedulerJobRunner)
  175. and self.job_running(TriggererJobRunner)
  176. )
  177. def port_open(self, port):
  178. """
  179. Check if the given port is listening on the local machine.
  180. Used to tell if webserver is alive.
  181. """
  182. try:
  183. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  184. sock.settimeout(1)
  185. sock.connect(("127.0.0.1", port))
  186. sock.close()
  187. except (OSError, ValueError):
  188. # Any exception means the socket is not available
  189. return False
  190. return True
  191. def job_running(self, job_runner_class: type[BaseJobRunner]):
  192. """
  193. Check if the given job name is running and heartbeating correctly.
  194. Used to tell if scheduler is alive.
  195. """
  196. recent = most_recent_job(job_runner_class.job_type)
  197. if not recent:
  198. return False
  199. return recent.is_alive()
  200. def print_ready(self):
  201. """
  202. Print the banner shown when Airflow is ready to go.
  203. Include with login details.
  204. """
  205. self.print_output("standalone", "")
  206. self.print_output("standalone", "Airflow is ready")
  207. if self.user_info["password"]:
  208. self.print_output(
  209. "standalone",
  210. f"Login with username: {self.user_info['username']} password: {self.user_info['password']}",
  211. )
  212. self.print_output(
  213. "standalone",
  214. "Airflow Standalone is for development purposes only. Do not use this in production!",
  215. )
  216. self.print_output("standalone", "")
  217. class SubCommand(threading.Thread):
  218. """
  219. Execute a subcommand on another thread.
  220. Thread that launches a process and then streams its output back to the main
  221. command. We use threads to avoid using select() and raw filehandles, and the
  222. complex logic that brings doing line buffering.
  223. """
  224. def __init__(self, parent, name: str, command: list[str], env: dict[str, str]):
  225. super().__init__()
  226. self.parent = parent
  227. self.name = name
  228. self.command = command
  229. self.env = env
  230. def run(self):
  231. """Run the actual process and captures it output to a queue."""
  232. self.process = subprocess.Popen(
  233. ["airflow", *self.command],
  234. stdout=subprocess.PIPE,
  235. stderr=subprocess.STDOUT,
  236. env=self.env,
  237. )
  238. for line in self.process.stdout:
  239. self.parent.output_queue.append((self.name, line))
  240. def stop(self):
  241. """Call to stop this process (and thus this thread)."""
  242. self.process.terminate()
  243. # Alias for use in the CLI parser
  244. standalone = StandaloneCommand.entrypoint