celery_command.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. # DO NOT MODIFY THIS FILE unless it is a serious bugfix - all the new celery commands should be added in celery provider.
  19. # This file is kept for backward compatibility only.
  20. """Celery command."""
  21. from __future__ import annotations
  22. import logging
  23. import sys
  24. import warnings
  25. from contextlib import contextmanager
  26. from multiprocessing import Process
  27. import psutil
  28. import sqlalchemy.exc
  29. from celery import maybe_patch_concurrency # type: ignore[attr-defined]
  30. from celery.app.defaults import DEFAULT_TASK_LOG_FMT
  31. from celery.signals import after_setup_logger
  32. from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
  33. from airflow import settings
  34. from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
  35. from airflow.configuration import conf
  36. from airflow.utils import cli as cli_utils
  37. from airflow.utils.cli import setup_locations
  38. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  39. from airflow.utils.serve_logs import serve_logs
  40. WORKER_PROCESS_NAME = "worker"
  41. warnings.warn(
  42. "Use celery command from providers package, Use celery provider >= 3.6.1",
  43. DeprecationWarning,
  44. stacklevel=2,
  45. )
  46. @cli_utils.action_cli
  47. @providers_configuration_loaded
  48. def flower(args):
  49. """Start Flower, Celery monitoring tool."""
  50. # This needs to be imported locally to not trigger Providers Manager initialization
  51. from airflow.providers.celery.executors.celery_executor import app as celery_app
  52. options = [
  53. "flower",
  54. conf.get("celery", "BROKER_URL"),
  55. f"--address={args.hostname}",
  56. f"--port={args.port}",
  57. ]
  58. if args.broker_api:
  59. options.append(f"--broker-api={args.broker_api}")
  60. if args.url_prefix:
  61. options.append(f"--url-prefix={args.url_prefix}")
  62. if args.basic_auth:
  63. options.append(f"--basic-auth={args.basic_auth}")
  64. if args.flower_conf:
  65. options.append(f"--conf={args.flower_conf}")
  66. run_command_with_daemon_option(
  67. args=args, process_name="flower", callback=lambda: celery_app.start(options)
  68. )
  69. @contextmanager
  70. def _serve_logs(skip_serve_logs: bool = False):
  71. """Start serve_logs sub-process."""
  72. sub_proc = None
  73. if skip_serve_logs is False:
  74. sub_proc = Process(target=serve_logs)
  75. sub_proc.start()
  76. try:
  77. yield
  78. finally:
  79. if sub_proc:
  80. sub_proc.terminate()
  81. @after_setup_logger.connect()
  82. @providers_configuration_loaded
  83. def logger_setup_handler(logger, **kwargs):
  84. """
  85. Reconfigure the logger.
  86. * remove any previously configured handlers
  87. * logs of severity error, and above goes to stderr,
  88. * logs of severity lower than error goes to stdout.
  89. """
  90. if conf.getboolean("logging", "celery_stdout_stderr_separation", fallback=False):
  91. celery_formatter = logging.Formatter(DEFAULT_TASK_LOG_FMT)
  92. class NoErrorOrAboveFilter(logging.Filter):
  93. """Allow only logs with level *lower* than ERROR to be reported."""
  94. def filter(self, record):
  95. return record.levelno < logging.ERROR
  96. below_error_handler = logging.StreamHandler(sys.stdout)
  97. below_error_handler.addFilter(NoErrorOrAboveFilter())
  98. below_error_handler.setFormatter(celery_formatter)
  99. from_error_handler = logging.StreamHandler(sys.stderr)
  100. from_error_handler.setLevel(logging.ERROR)
  101. from_error_handler.setFormatter(celery_formatter)
  102. logger.handlers[:] = [below_error_handler, from_error_handler]
  103. @cli_utils.action_cli
  104. @providers_configuration_loaded
  105. def worker(args):
  106. """Start Airflow Celery worker."""
  107. # This needs to be imported locally to not trigger Providers Manager initialization
  108. from airflow.providers.celery.executors.celery_executor import app as celery_app
  109. # Disable connection pool so that celery worker does not hold an unnecessary db connection
  110. settings.reconfigure_orm(disable_connection_pool=True)
  111. if not settings.validate_session():
  112. raise SystemExit("Worker exiting, database connection precheck failed.")
  113. autoscale = args.autoscale
  114. skip_serve_logs = args.skip_serve_logs
  115. if autoscale is None and conf.has_option("celery", "worker_autoscale"):
  116. autoscale = conf.get("celery", "worker_autoscale")
  117. if hasattr(celery_app.backend, "ResultSession"):
  118. # Pre-create the database tables now, otherwise SQLA via Celery has a
  119. # race condition where one of the subprocesses can die with "Table
  120. # already exists" error, because SQLA checks for which tables exist,
  121. # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
  122. # EXISTS
  123. try:
  124. session = celery_app.backend.ResultSession()
  125. session.close()
  126. except sqlalchemy.exc.IntegrityError:
  127. # At least on postgres, trying to create a table that already exist
  128. # gives a unique constraint violation or the
  129. # "pg_type_typname_nsp_index" table. If this happens we can ignore
  130. # it, we raced to create the tables and lost.
  131. pass
  132. # backwards-compatible: https://github.com/apache/airflow/pull/21506#pullrequestreview-879893763
  133. celery_log_level = conf.get("logging", "CELERY_LOGGING_LEVEL")
  134. if not celery_log_level:
  135. celery_log_level = conf.get("logging", "LOGGING_LEVEL")
  136. # Setup Celery worker
  137. options = [
  138. "worker",
  139. "-O",
  140. "fair",
  141. "--queues",
  142. args.queues,
  143. "--concurrency",
  144. args.concurrency,
  145. "--hostname",
  146. args.celery_hostname,
  147. "--loglevel",
  148. celery_log_level,
  149. ]
  150. if autoscale:
  151. options.extend(["--autoscale", autoscale])
  152. if args.without_mingle:
  153. options.append("--without-mingle")
  154. if args.without_gossip:
  155. options.append("--without-gossip")
  156. if conf.has_option("celery", "pool"):
  157. pool = conf.get("celery", "pool")
  158. options.extend(["--pool", pool])
  159. # Celery pools of type eventlet and gevent use greenlets, which
  160. # requires monkey patching the app:
  161. # https://eventlet.net/doc/patching.html#monkey-patch
  162. # Otherwise task instances hang on the workers and are never
  163. # executed.
  164. maybe_patch_concurrency(["-P", pool])
  165. worker_pid_file_path, stdout, stderr, log_file = setup_locations(
  166. process=WORKER_PROCESS_NAME,
  167. stdout=args.stdout,
  168. stderr=args.stderr,
  169. log=args.log_file,
  170. pid=args.pid,
  171. )
  172. def run_celery_worker():
  173. with _serve_logs(skip_serve_logs):
  174. celery_app.worker_main(options)
  175. if args.umask:
  176. umask = args.umask
  177. else:
  178. umask = conf.get("celery", "worker_umask", fallback=settings.DAEMON_UMASK)
  179. run_command_with_daemon_option(
  180. args=args,
  181. process_name=WORKER_PROCESS_NAME,
  182. callback=run_celery_worker,
  183. should_setup_logging=True,
  184. umask=umask,
  185. pid_file=worker_pid_file_path,
  186. )
  187. @cli_utils.action_cli
  188. @providers_configuration_loaded
  189. def stop_worker(args):
  190. """Send SIGTERM to Celery worker."""
  191. # Read PID from file
  192. if args.pid:
  193. pid_file_path = args.pid
  194. else:
  195. pid_file_path, _, _, _ = setup_locations(process=WORKER_PROCESS_NAME)
  196. pid = read_pid_from_pidfile(pid_file_path)
  197. # Send SIGTERM
  198. if pid:
  199. worker_process = psutil.Process(pid)
  200. worker_process.terminate()
  201. # Remove pid file
  202. remove_existing_pidfile(pid_file_path)