secrets_masker.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Mask sensitive information from logs."""
  18. from __future__ import annotations
  19. import collections.abc
  20. import logging
  21. import sys
  22. from enum import Enum
  23. from functools import cached_property
  24. from typing import (
  25. TYPE_CHECKING,
  26. Any,
  27. Callable,
  28. Dict,
  29. Generator,
  30. Iterable,
  31. Iterator,
  32. List,
  33. Pattern,
  34. TextIO,
  35. Tuple,
  36. TypeVar,
  37. Union,
  38. )
  39. import re2
  40. from airflow import settings
  41. from airflow.compat.functools import cache
  42. if TYPE_CHECKING:
  43. from kubernetes.client import V1EnvVar
  44. from airflow.typing_compat import TypeGuard
  45. Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
  46. Redacted = Union[Redactable, str]
  47. log = logging.getLogger(__name__)
  48. DEFAULT_SENSITIVE_FIELDS = frozenset(
  49. {
  50. "access_token",
  51. "api_key",
  52. "apikey",
  53. "authorization",
  54. "passphrase",
  55. "passwd",
  56. "password",
  57. "private_key",
  58. "secret",
  59. "token",
  60. "keyfile_dict",
  61. "service_account",
  62. }
  63. )
  64. """Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
  65. SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"}
  66. @cache
  67. def get_sensitive_variables_fields():
  68. """Get comma-separated sensitive Variable Fields from airflow.cfg."""
  69. from airflow.configuration import conf
  70. sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
  71. sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
  72. if sensitive_variable_fields:
  73. sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
  74. return sensitive_fields
  75. def should_hide_value_for_key(name):
  76. """
  77. Return if the value for this given name should be hidden.
  78. Name might be a Variable name, or key in conn.extra_dejson, for example.
  79. """
  80. from airflow import settings
  81. if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
  82. name = name.strip().lower()
  83. return any(s in name for s in get_sensitive_variables_fields())
  84. return False
  85. def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None:
  86. """
  87. Mask a secret from appearing in the task logs.
  88. If ``name`` is provided, then it will only be masked if the name matches
  89. one of the configured "sensitive" names.
  90. If ``secret`` is a dict or a iterable (excluding str) then it will be
  91. recursively walked and keys with sensitive names will be hidden.
  92. """
  93. # Filtering all log messages is not a free process, so we only do it when
  94. # running tasks
  95. if not secret:
  96. return
  97. _secrets_masker().add_mask(secret, name)
  98. def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
  99. """Redact any secrets found in ``value``."""
  100. return _secrets_masker().redact(value, name, max_depth)
  101. @cache
  102. def _secrets_masker() -> SecretsMasker:
  103. for flt in logging.getLogger("airflow.task").filters:
  104. if isinstance(flt, SecretsMasker):
  105. return flt
  106. raise RuntimeError(
  107. "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
  108. "sure you configure it taking airflow configuration as a base as explained at "
  109. "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
  110. "#advanced-configuration"
  111. )
  112. @cache
  113. def _get_v1_env_var_type() -> type:
  114. try:
  115. from kubernetes.client import V1EnvVar
  116. except ImportError:
  117. return type("V1EnvVar", (), {})
  118. return V1EnvVar
  119. def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]:
  120. return isinstance(v, _get_v1_env_var_type())
  121. class SecretsMasker(logging.Filter):
  122. """Redact secrets from logs."""
  123. replacer: Pattern | None = None
  124. patterns: set[str]
  125. ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
  126. MAX_RECURSION_DEPTH = 5
  127. def __init__(self):
  128. super().__init__()
  129. self.patterns = set()
  130. @cached_property
  131. def _record_attrs_to_ignore(self) -> Iterable[str]:
  132. # Doing log.info(..., extra={'foo': 2}) sets extra properties on
  133. # record, i.e. record.foo. And we need to filter those too. Fun
  134. #
  135. # Create a record, and look at what attributes are on it, and ignore
  136. # all the default ones!
  137. record = logging.getLogRecordFactory()(
  138. # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
  139. "x",
  140. logging.INFO,
  141. __file__,
  142. 1,
  143. "",
  144. (),
  145. exc_info=None,
  146. func="funcname",
  147. )
  148. return frozenset(record.__dict__).difference({"msg", "args"})
  149. def _redact_exception_with_context(self, exception):
  150. # Exception class may not be modifiable (e.g. declared by an
  151. # extension module such as JDBC).
  152. try:
  153. exception.args = (self.redact(v) for v in exception.args)
  154. except AttributeError:
  155. pass
  156. if exception.__context__:
  157. self._redact_exception_with_context(exception.__context__)
  158. if exception.__cause__ and exception.__cause__ is not exception.__context__:
  159. self._redact_exception_with_context(exception.__cause__)
  160. def filter(self, record) -> bool:
  161. if settings.MASK_SECRETS_IN_LOGS is not True:
  162. return True
  163. if self.ALREADY_FILTERED_FLAG in record.__dict__:
  164. # Filters are attached to multiple handlers and logs, keep a
  165. # "private" flag that stops us needing to process it more than once
  166. return True
  167. if self.replacer:
  168. for k, v in record.__dict__.items():
  169. if k not in self._record_attrs_to_ignore:
  170. record.__dict__[k] = self.redact(v)
  171. if record.exc_info and record.exc_info[1] is not None:
  172. exc = record.exc_info[1]
  173. self._redact_exception_with_context(exc)
  174. record.__dict__[self.ALREADY_FILTERED_FLAG] = True
  175. return True
  176. # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
  177. # this function directly. New versions of that provider, and this class itself call it with a value
  178. def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted:
  179. if depth > max_depth or isinstance(item, str):
  180. return "***"
  181. if isinstance(item, dict):
  182. return {
  183. dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items()
  184. }
  185. elif isinstance(item, (tuple, set)):
  186. # Turn set in to tuple!
  187. return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item)
  188. elif isinstance(item, list):
  189. return list(self._redact_all(subval, depth + 1, max_depth) for subval in item)
  190. else:
  191. return item
  192. def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
  193. # Avoid spending too much effort on redacting on deeply nested
  194. # structures. This also avoid infinite recursion if a structure has
  195. # reference to self.
  196. if depth > max_depth:
  197. return item
  198. try:
  199. if name and should_hide_value_for_key(name):
  200. return self._redact_all(item, depth, max_depth)
  201. if isinstance(item, dict):
  202. to_return = {
  203. dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth)
  204. for dict_key, subval in item.items()
  205. }
  206. return to_return
  207. elif isinstance(item, Enum):
  208. return self._redact(item=item.value, name=name, depth=depth, max_depth=max_depth)
  209. elif _is_v1_env_var(item):
  210. tmp: dict = item.to_dict()
  211. if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
  212. tmp["value"] = "***"
  213. else:
  214. return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth)
  215. return tmp
  216. elif isinstance(item, str):
  217. if self.replacer:
  218. # We can't replace specific values, but the key-based redacting
  219. # can still happen, so we can't short-circuit, we need to walk
  220. # the structure.
  221. return self.replacer.sub("***", str(item))
  222. return item
  223. elif isinstance(item, (tuple, set)):
  224. # Turn set in to tuple!
  225. return tuple(
  226. self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
  227. )
  228. elif isinstance(item, list):
  229. return [
  230. self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
  231. ]
  232. else:
  233. return item
  234. # I think this should never happen, but it does not hurt to leave it just in case
  235. # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
  236. # but it caused infinite recursion, to avoid this we mark the log as already filtered.
  237. except Exception as exc:
  238. log.warning(
  239. "Unable to redact value of type %s, please report this via "
  240. "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
  241. item,
  242. type(exc).__name__,
  243. exc,
  244. extra={self.ALREADY_FILTERED_FLAG: True},
  245. )
  246. return item
  247. def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
  248. """
  249. Redact an any secrets found in ``item``, if it is a string.
  250. If ``name`` is given, and it's a "sensitive" name (see
  251. :func:`should_hide_value_for_key`) then all string values in the item
  252. is redacted.
  253. """
  254. return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH)
  255. @cached_property
  256. def _mask_adapter(self) -> None | Callable:
  257. """
  258. Pulls the secret mask adapter from config.
  259. This lives in a function here to be cached and only hit the config once.
  260. """
  261. from airflow.configuration import conf
  262. return conf.getimport("logging", "secret_mask_adapter", fallback=None)
  263. @cached_property
  264. def _test_mode(self) -> bool:
  265. """
  266. Pulls the unit test mode flag from config.
  267. This lives in a function here to be cached and only hit the config once.
  268. """
  269. from airflow.configuration import conf
  270. return conf.getboolean("core", "unit_test_mode")
  271. def _adaptations(self, secret: str) -> Generator[str, None, None]:
  272. """Yield the secret along with any adaptations to the secret that should be masked."""
  273. yield secret
  274. if self._mask_adapter:
  275. # This can return an iterable of secrets to mask OR a single secret as a string
  276. secret_or_secrets = self._mask_adapter(secret)
  277. if not isinstance(secret_or_secrets, str):
  278. # if its not a string, it must be an iterable
  279. yield from secret_or_secrets
  280. else:
  281. yield secret_or_secrets
  282. def add_mask(self, secret: str | dict | Iterable, name: str | None = None):
  283. """Add a new secret to be masked to this filter instance."""
  284. if isinstance(secret, dict):
  285. for k, v in secret.items():
  286. self.add_mask(v, k)
  287. elif isinstance(secret, str):
  288. if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS):
  289. return
  290. new_mask = False
  291. for s in self._adaptations(secret):
  292. if s:
  293. pattern = re2.escape(s)
  294. if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
  295. self.patterns.add(pattern)
  296. new_mask = True
  297. if new_mask:
  298. self.replacer = re2.compile("|".join(self.patterns))
  299. elif isinstance(secret, collections.abc.Iterable):
  300. for v in secret:
  301. self.add_mask(v, name)
  302. class RedactedIO(TextIO):
  303. """
  304. IO class that redacts values going into stdout.
  305. Expected usage::
  306. with contextlib.redirect_stdout(RedactedIO()):
  307. ... # Writes to stdout will be redacted.
  308. """
  309. def __init__(self):
  310. self.target = sys.stdout
  311. def __enter__(self) -> TextIO:
  312. return self.target.__enter__()
  313. def __exit__(self, t, v, b) -> None:
  314. return self.target.__exit__(t, v, b)
  315. def __iter__(self) -> Iterator[str]:
  316. return iter(self.target)
  317. def __next__(self) -> str:
  318. return next(self.target)
  319. def close(self) -> None:
  320. return self.target.close()
  321. def fileno(self) -> int:
  322. return self.target.fileno()
  323. def flush(self) -> None:
  324. return self.target.flush()
  325. def isatty(self) -> bool:
  326. return self.target.isatty()
  327. def read(self, n: int = -1) -> str:
  328. return self.target.read(n)
  329. def readable(self) -> bool:
  330. return self.target.readable()
  331. def readline(self, n: int = -1) -> str:
  332. return self.target.readline(n)
  333. def readlines(self, n: int = -1) -> list[str]:
  334. return self.target.readlines(n)
  335. def seek(self, offset: int, whence: int = 0) -> int:
  336. return self.target.seek(offset, whence)
  337. def seekable(self) -> bool:
  338. return self.target.seekable()
  339. def tell(self) -> int:
  340. return self.target.tell()
  341. def truncate(self, s: int | None = None) -> int:
  342. return self.target.truncate(s)
  343. def writable(self) -> bool:
  344. return self.target.writable()
  345. def write(self, s: str) -> int:
  346. s = redact(s)
  347. return self.target.write(s)
  348. def writelines(self, lines) -> None:
  349. self.target.writelines(lines)