cli.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Utilities module for cli."""
  19. from __future__ import annotations
  20. import functools
  21. import logging
  22. import os
  23. import socket
  24. import sys
  25. import threading
  26. import traceback
  27. import warnings
  28. from argparse import Namespace
  29. from pathlib import Path
  30. from typing import TYPE_CHECKING, Callable, TypeVar, cast
  31. import re2
  32. from sqlalchemy import select
  33. from airflow import settings
  34. from airflow.api_internal.internal_api_call import InternalApiConfig
  35. from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
  36. from airflow.utils import cli_action_loggers, timezone
  37. from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
  38. from airflow.utils.log.secrets_masker import should_hide_value_for_key
  39. from airflow.utils.platform import getuser, is_terminal_support_colors
  40. from airflow.utils.session import NEW_SESSION, provide_session
  41. T = TypeVar("T", bound=Callable)
  42. if TYPE_CHECKING:
  43. from sqlalchemy.orm import Session
  44. from airflow.models.dag import DAG
  45. logger = logging.getLogger(__name__)
  46. def _check_cli_args(args):
  47. if not args:
  48. raise ValueError("Args should be set")
  49. if not isinstance(args[0], Namespace):
  50. raise ValueError(
  51. f"1st positional argument should be argparse.Namespace instance, but is {type(args[0])}"
  52. )
  53. def action_cli(func=None, check_db=True):
  54. def action_logging(f: T) -> T:
  55. """
  56. Decorate function to execute function at the same time submitting action_logging but in CLI context.
  57. It will call action logger callbacks twice, one for
  58. pre-execution and the other one for post-execution.
  59. Action logger will be called with below keyword parameters:
  60. sub_command : name of sub-command
  61. start_datetime : start datetime instance by utc
  62. end_datetime : end datetime instance by utc
  63. full_command : full command line arguments
  64. user : current user
  65. log : airflow.models.log.Log ORM instance
  66. dag_id : dag id (optional)
  67. task_id : task_id (optional)
  68. execution_date : execution date (optional)
  69. error : exception instance if there's an exception
  70. :param f: function instance
  71. :return: wrapped function
  72. """
  73. @functools.wraps(f)
  74. def wrapper(*args, **kwargs):
  75. """
  76. Wrap cli functions; assume Namespace instance as first positional argument.
  77. :param args: Positional argument. It assumes to have Namespace instance
  78. at 1st positional argument
  79. :param kwargs: A passthrough keyword argument
  80. """
  81. _check_cli_args(args)
  82. metrics = _build_metrics(f.__name__, args[0])
  83. cli_action_loggers.on_pre_execution(**metrics)
  84. verbose = getattr(args[0], "verbose", False)
  85. root_logger = logging.getLogger()
  86. if verbose:
  87. root_logger.setLevel(logging.DEBUG)
  88. for handler in root_logger.handlers:
  89. handler.setLevel(logging.DEBUG)
  90. try:
  91. # Check and run migrations if necessary
  92. if check_db and not InternalApiConfig.get_use_internal_api():
  93. from airflow.configuration import conf
  94. from airflow.utils.db import check_and_run_migrations, synchronize_log_template
  95. if conf.getboolean("database", "check_migrations"):
  96. check_and_run_migrations()
  97. synchronize_log_template()
  98. return f(*args, **kwargs)
  99. except Exception as e:
  100. metrics["error"] = e
  101. raise
  102. finally:
  103. metrics["end_datetime"] = timezone.utcnow()
  104. cli_action_loggers.on_post_execution(**metrics)
  105. return cast(T, wrapper)
  106. if func:
  107. return action_logging(func)
  108. return action_logging
  109. def _build_metrics(func_name, namespace):
  110. """
  111. Build metrics dict from function args.
  112. It assumes that function arguments is from airflow.bin.cli module's function
  113. and has Namespace instance where it optionally contains "dag_id", "task_id",
  114. and "execution_date".
  115. :param func_name: name of function
  116. :param namespace: Namespace instance from argparse
  117. :return: dict with metrics
  118. """
  119. sub_commands_to_check_for_sensitive_fields = {"users", "connections"}
  120. sub_commands_to_check_for_sensitive_key = {"variables"}
  121. sensitive_fields = {"-p", "--password", "--conn-password"}
  122. full_command = list(sys.argv)
  123. sub_command = full_command[1] if len(full_command) > 1 else None
  124. # For cases when value under sub_commands_to_check_for_sensitive_key have sensitive info
  125. if sub_command in sub_commands_to_check_for_sensitive_key:
  126. key = full_command[-2] if len(full_command) > 3 else None
  127. if key and should_hide_value_for_key(key):
  128. # Mask the sensitive value since key contain sensitive keyword
  129. full_command[-1] = "*" * 8
  130. elif sub_command in sub_commands_to_check_for_sensitive_fields:
  131. for idx, command in enumerate(full_command):
  132. if command in sensitive_fields:
  133. # For cases when password is passed as "--password xyz" (with space between key and value)
  134. full_command[idx + 1] = "*" * 8
  135. else:
  136. # For cases when password is passed as "--password=xyz" (with '=' between key and value)
  137. for sensitive_field in sensitive_fields:
  138. if command.startswith(f"{sensitive_field}="):
  139. full_command[idx] = f'{sensitive_field}={"*" * 8}'
  140. metrics = {
  141. "sub_command": func_name,
  142. "start_datetime": timezone.utcnow(),
  143. "full_command": f"{full_command}",
  144. "user": getuser(),
  145. }
  146. if not isinstance(namespace, Namespace):
  147. raise ValueError(
  148. f"namespace argument should be argparse.Namespace instance, but is {type(namespace)}"
  149. )
  150. tmp_dic = vars(namespace)
  151. metrics["dag_id"] = tmp_dic.get("dag_id")
  152. metrics["task_id"] = tmp_dic.get("task_id")
  153. metrics["execution_date"] = tmp_dic.get("execution_date")
  154. metrics["host_name"] = socket.gethostname()
  155. return metrics
  156. def process_subdir(subdir: str | None):
  157. """Expand path to absolute by replacing 'DAGS_FOLDER', '~', '.', etc."""
  158. if subdir:
  159. if not settings.DAGS_FOLDER:
  160. raise ValueError("DAGS_FOLDER variable in settings should be filled.")
  161. subdir = subdir.replace("DAGS_FOLDER", settings.DAGS_FOLDER)
  162. subdir = os.path.abspath(os.path.expanduser(subdir))
  163. return subdir
  164. def get_dag_by_file_location(dag_id: str):
  165. """Return DAG of a given dag_id by looking up file location."""
  166. from airflow.models import DagBag, DagModel
  167. # Benefit is that logging from other dags in dagbag will not appear
  168. dag_model = DagModel.get_current(dag_id)
  169. if dag_model is None:
  170. raise AirflowException(
  171. f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
  172. )
  173. dagbag = DagBag(dag_folder=dag_model.fileloc)
  174. return dagbag.dags[dag_id]
  175. def _search_for_dag_file(val: str | None) -> str | None:
  176. """
  177. Search for the file referenced at fileloc.
  178. By the time we get to this function, we've already run this `val` through `process_subdir`
  179. and loaded the DagBag there and came up empty. So here, if `val` is a file path, we make
  180. a last ditch effort to try and find a dag file with the same name in our dags folder. (This
  181. avoids the unnecessary dag parsing that would occur if we just parsed the dags folder).
  182. If `val` is a path to a file, this likely means that the serializing process had a dags_folder
  183. equal to only the dag file in question. This prevents us from determining the relative location.
  184. And if the paths are different between worker and dag processor / scheduler, then we won't find
  185. the dag at the given location.
  186. """
  187. if val and Path(val).suffix in (".zip", ".py"):
  188. matches = list(Path(settings.DAGS_FOLDER).rglob(Path(val).name))
  189. if len(matches) == 1:
  190. return matches[0].as_posix()
  191. return None
  192. def get_dag(subdir: str | None, dag_id: str, from_db: bool = False) -> DAG:
  193. """
  194. Return DAG of a given dag_id.
  195. First we'll try to use the given subdir. If that doesn't work, we'll try to
  196. find the correct path (assuming it's a file) and failing that, use the configured
  197. dags folder.
  198. """
  199. from airflow.models import DagBag
  200. if from_db:
  201. dagbag = DagBag(read_dags_from_db=True)
  202. dag = dagbag.get_dag(dag_id) # get_dag loads from the DB as requested
  203. else:
  204. first_path = process_subdir(subdir)
  205. dagbag = DagBag(first_path)
  206. dag = dagbag.dags.get(dag_id) # avoids db calls made in get_dag
  207. if not dag:
  208. if from_db:
  209. raise AirflowException(f"Dag {dag_id!r} could not be found in DagBag read from database.")
  210. fallback_path = _search_for_dag_file(subdir) or settings.DAGS_FOLDER
  211. logger.warning("Dag %r not found in path %s; trying path %s", dag_id, first_path, fallback_path)
  212. dagbag = DagBag(dag_folder=fallback_path)
  213. dag = dagbag.get_dag(dag_id)
  214. if not dag:
  215. raise AirflowException(
  216. f"Dag {dag_id!r} could not be found; either it does not exist or it failed to parse."
  217. )
  218. return dag
  219. def get_dags(subdir: str | None, dag_id: str, use_regex: bool = False):
  220. """Return DAG(s) matching a given regex or dag_id."""
  221. from airflow.models import DagBag
  222. if not use_regex:
  223. return [get_dag(subdir, dag_id)]
  224. dagbag = DagBag(process_subdir(subdir))
  225. matched_dags = [dag for dag in dagbag.dags.values() if re2.search(dag_id, dag.dag_id)]
  226. if not matched_dags:
  227. raise AirflowException(
  228. f"dag_id could not be found with regex: {dag_id}. Either the dag did not exist or "
  229. f"it failed to parse."
  230. )
  231. return matched_dags
  232. @provide_session
  233. def get_dag_by_pickle(pickle_id: int, session: Session = NEW_SESSION) -> DAG:
  234. """Fetch DAG from the database using pickling."""
  235. from airflow.models import DagPickle
  236. dag_pickle = session.scalar(select(DagPickle).where(DagPickle.id == pickle_id).limit(1))
  237. if not dag_pickle:
  238. raise AirflowException(f"pickle_id could not be found in DagPickle.id list: {pickle_id}")
  239. pickle_dag = dag_pickle.pickle
  240. return pickle_dag
  241. def setup_locations(process, pid=None, stdout=None, stderr=None, log=None):
  242. """Create logging paths."""
  243. if not stderr:
  244. stderr = os.path.join(settings.AIRFLOW_HOME, f"airflow-{process}.err")
  245. if not stdout:
  246. stdout = os.path.join(settings.AIRFLOW_HOME, f"airflow-{process}.out")
  247. if not log:
  248. log = os.path.join(settings.AIRFLOW_HOME, f"airflow-{process}.log")
  249. if not pid:
  250. pid = os.path.join(settings.AIRFLOW_HOME, f"airflow-{process}.pid")
  251. else:
  252. pid = os.path.abspath(pid)
  253. return pid, stdout, stderr, log
  254. def setup_logging(filename):
  255. """Create log file handler for daemon process."""
  256. root = logging.getLogger()
  257. handler = NonCachingFileHandler(filename)
  258. formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT)
  259. handler.setFormatter(formatter)
  260. root.addHandler(handler)
  261. root.setLevel(settings.LOGGING_LEVEL)
  262. return handler.stream
  263. def sigint_handler(sig, frame):
  264. """
  265. Return without error on SIGINT or SIGTERM signals in interactive command mode.
  266. e.g. CTRL+C or kill <PID>
  267. """
  268. sys.exit(0)
  269. def sigquit_handler(sig, frame):
  270. """
  271. Help debug deadlocks by printing stacktraces when this gets a SIGQUIT.
  272. e.g. kill -s QUIT <PID> or CTRL+
  273. """
  274. print(f"Dumping stack traces for all threads in PID {os.getpid()}")
  275. id_to_name = {th.ident: th.name for th in threading.enumerate()}
  276. code = []
  277. for thread_id, stack in sys._current_frames().items():
  278. code.append(f"\n# Thread: {id_to_name.get(thread_id, '')}({thread_id})")
  279. for filename, line_number, name, line in traceback.extract_stack(stack):
  280. code.append(f'File: "{filename}", line {line_number}, in {name}')
  281. if line:
  282. code.append(f" {line.strip()}")
  283. print("\n".join(code))
  284. class ColorMode:
  285. """Coloring modes. If `auto` is then automatically detected."""
  286. ON = "on"
  287. OFF = "off"
  288. AUTO = "auto"
  289. def should_use_colors(args) -> bool:
  290. """Process arguments and decide whether to enable color in output."""
  291. if args.color == ColorMode.ON:
  292. return True
  293. if args.color == ColorMode.OFF:
  294. return False
  295. return is_terminal_support_colors()
  296. def should_ignore_depends_on_past(args) -> bool:
  297. if args.ignore_depends_on_past:
  298. warnings.warn(
  299. "Using `--ignore-depends-on-past` is Deprecated."
  300. "Please use `--depends-on-past ignore` instead.",
  301. RemovedInAirflow3Warning,
  302. stacklevel=2,
  303. )
  304. return True
  305. return args.depends_on_past == "ignore"
  306. def suppress_logs_and_warning(f: T) -> T:
  307. """Suppress logging and warning messages in cli functions."""
  308. @functools.wraps(f)
  309. def _wrapper(*args, **kwargs):
  310. _check_cli_args(args)
  311. if args[0].verbose:
  312. f(*args, **kwargs)
  313. else:
  314. with warnings.catch_warnings():
  315. warnings.simplefilter("ignore")
  316. logging.disable(logging.CRITICAL)
  317. try:
  318. f(*args, **kwargs)
  319. finally:
  320. # logging output again depends on the effective
  321. # levels of individual loggers
  322. logging.disable(logging.NOTSET)
  323. return cast(T, _wrapper)