123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Mask sensitive information from logs."""
- from __future__ import annotations
- import collections.abc
- import logging
- import sys
- from enum import Enum
- from functools import cached_property
- from typing import (
- TYPE_CHECKING,
- Any,
- Callable,
- Dict,
- Generator,
- Iterable,
- Iterator,
- List,
- Pattern,
- TextIO,
- Tuple,
- TypeVar,
- Union,
- )
- import re2
- from airflow import settings
- from airflow.compat.functools import cache
- if TYPE_CHECKING:
- from kubernetes.client import V1EnvVar
- from airflow.typing_compat import TypeGuard
- Redactable = TypeVar("Redactable", str, "V1EnvVar", Dict[Any, Any], Tuple[Any, ...], List[Any])
- Redacted = Union[Redactable, str]
- log = logging.getLogger(__name__)
- DEFAULT_SENSITIVE_FIELDS = frozenset(
- {
- "access_token",
- "api_key",
- "apikey",
- "authorization",
- "passphrase",
- "passwd",
- "password",
- "private_key",
- "secret",
- "token",
- "keyfile_dict",
- "service_account",
- }
- )
- """Names of fields (Connection extra, Variable key name etc.) that are deemed sensitive"""
- SECRETS_TO_SKIP_MASKING_FOR_TESTS = {"airflow"}
- @cache
- def get_sensitive_variables_fields():
- """Get comma-separated sensitive Variable Fields from airflow.cfg."""
- from airflow.configuration import conf
- sensitive_fields = DEFAULT_SENSITIVE_FIELDS.copy()
- sensitive_variable_fields = conf.get("core", "sensitive_var_conn_names")
- if sensitive_variable_fields:
- sensitive_fields |= frozenset({field.strip() for field in sensitive_variable_fields.split(",")})
- return sensitive_fields
- def should_hide_value_for_key(name):
- """
- Return if the value for this given name should be hidden.
- Name might be a Variable name, or key in conn.extra_dejson, for example.
- """
- from airflow import settings
- if isinstance(name, str) and settings.HIDE_SENSITIVE_VAR_CONN_FIELDS:
- name = name.strip().lower()
- return any(s in name for s in get_sensitive_variables_fields())
- return False
- def mask_secret(secret: str | dict | Iterable, name: str | None = None) -> None:
- """
- Mask a secret from appearing in the task logs.
- If ``name`` is provided, then it will only be masked if the name matches
- one of the configured "sensitive" names.
- If ``secret`` is a dict or a iterable (excluding str) then it will be
- recursively walked and keys with sensitive names will be hidden.
- """
- # Filtering all log messages is not a free process, so we only do it when
- # running tasks
- if not secret:
- return
- _secrets_masker().add_mask(secret, name)
- def redact(value: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
- """Redact any secrets found in ``value``."""
- return _secrets_masker().redact(value, name, max_depth)
- @cache
- def _secrets_masker() -> SecretsMasker:
- for flt in logging.getLogger("airflow.task").filters:
- if isinstance(flt, SecretsMasker):
- return flt
- raise RuntimeError(
- "Logging Configuration Error! No SecretsMasker found! If you have custom logging, please make "
- "sure you configure it taking airflow configuration as a base as explained at "
- "https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/logging-tasks.html"
- "#advanced-configuration"
- )
- @cache
- def _get_v1_env_var_type() -> type:
- try:
- from kubernetes.client import V1EnvVar
- except ImportError:
- return type("V1EnvVar", (), {})
- return V1EnvVar
- def _is_v1_env_var(v: Any) -> TypeGuard[V1EnvVar]:
- return isinstance(v, _get_v1_env_var_type())
- class SecretsMasker(logging.Filter):
- """Redact secrets from logs."""
- replacer: Pattern | None = None
- patterns: set[str]
- ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
- MAX_RECURSION_DEPTH = 5
- def __init__(self):
- super().__init__()
- self.patterns = set()
- @cached_property
- def _record_attrs_to_ignore(self) -> Iterable[str]:
- # Doing log.info(..., extra={'foo': 2}) sets extra properties on
- # record, i.e. record.foo. And we need to filter those too. Fun
- #
- # Create a record, and look at what attributes are on it, and ignore
- # all the default ones!
- record = logging.getLogRecordFactory()(
- # name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None,
- "x",
- logging.INFO,
- __file__,
- 1,
- "",
- (),
- exc_info=None,
- func="funcname",
- )
- return frozenset(record.__dict__).difference({"msg", "args"})
- def _redact_exception_with_context(self, exception):
- # Exception class may not be modifiable (e.g. declared by an
- # extension module such as JDBC).
- try:
- exception.args = (self.redact(v) for v in exception.args)
- except AttributeError:
- pass
- if exception.__context__:
- self._redact_exception_with_context(exception.__context__)
- if exception.__cause__ and exception.__cause__ is not exception.__context__:
- self._redact_exception_with_context(exception.__cause__)
- def filter(self, record) -> bool:
- if settings.MASK_SECRETS_IN_LOGS is not True:
- return True
- if self.ALREADY_FILTERED_FLAG in record.__dict__:
- # Filters are attached to multiple handlers and logs, keep a
- # "private" flag that stops us needing to process it more than once
- return True
- if self.replacer:
- for k, v in record.__dict__.items():
- if k not in self._record_attrs_to_ignore:
- record.__dict__[k] = self.redact(v)
- if record.exc_info and record.exc_info[1] is not None:
- exc = record.exc_info[1]
- self._redact_exception_with_context(exc)
- record.__dict__[self.ALREADY_FILTERED_FLAG] = True
- return True
- # Default on `max_depth` is to support versions of the OpenLineage plugin (not the provider) which called
- # this function directly. New versions of that provider, and this class itself call it with a value
- def _redact_all(self, item: Redactable, depth: int, max_depth: int = MAX_RECURSION_DEPTH) -> Redacted:
- if depth > max_depth or isinstance(item, str):
- return "***"
- if isinstance(item, dict):
- return {
- dict_key: self._redact_all(subval, depth + 1, max_depth) for dict_key, subval in item.items()
- }
- elif isinstance(item, (tuple, set)):
- # Turn set in to tuple!
- return tuple(self._redact_all(subval, depth + 1, max_depth) for subval in item)
- elif isinstance(item, list):
- return list(self._redact_all(subval, depth + 1, max_depth) for subval in item)
- else:
- return item
- def _redact(self, item: Redactable, name: str | None, depth: int, max_depth: int) -> Redacted:
- # Avoid spending too much effort on redacting on deeply nested
- # structures. This also avoid infinite recursion if a structure has
- # reference to self.
- if depth > max_depth:
- return item
- try:
- if name and should_hide_value_for_key(name):
- return self._redact_all(item, depth, max_depth)
- if isinstance(item, dict):
- to_return = {
- dict_key: self._redact(subval, name=dict_key, depth=(depth + 1), max_depth=max_depth)
- for dict_key, subval in item.items()
- }
- return to_return
- elif isinstance(item, Enum):
- return self._redact(item=item.value, name=name, depth=depth, max_depth=max_depth)
- elif _is_v1_env_var(item):
- tmp: dict = item.to_dict()
- if should_hide_value_for_key(tmp.get("name", "")) and "value" in tmp:
- tmp["value"] = "***"
- else:
- return self._redact(item=tmp, name=name, depth=depth, max_depth=max_depth)
- return tmp
- elif isinstance(item, str):
- if self.replacer:
- # We can't replace specific values, but the key-based redacting
- # can still happen, so we can't short-circuit, we need to walk
- # the structure.
- return self.replacer.sub("***", str(item))
- return item
- elif isinstance(item, (tuple, set)):
- # Turn set in to tuple!
- return tuple(
- self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
- )
- elif isinstance(item, list):
- return [
- self._redact(subval, name=None, depth=(depth + 1), max_depth=max_depth) for subval in item
- ]
- else:
- return item
- # I think this should never happen, but it does not hurt to leave it just in case
- # Well. It happened (see https://github.com/apache/airflow/issues/19816#issuecomment-983311373)
- # but it caused infinite recursion, to avoid this we mark the log as already filtered.
- except Exception as exc:
- log.warning(
- "Unable to redact value of type %s, please report this via "
- "<https://github.com/apache/airflow/issues>. Error was: %s: %s",
- item,
- type(exc).__name__,
- exc,
- extra={self.ALREADY_FILTERED_FLAG: True},
- )
- return item
- def redact(self, item: Redactable, name: str | None = None, max_depth: int | None = None) -> Redacted:
- """
- Redact an any secrets found in ``item``, if it is a string.
- If ``name`` is given, and it's a "sensitive" name (see
- :func:`should_hide_value_for_key`) then all string values in the item
- is redacted.
- """
- return self._redact(item, name, depth=0, max_depth=max_depth or self.MAX_RECURSION_DEPTH)
- @cached_property
- def _mask_adapter(self) -> None | Callable:
- """
- Pulls the secret mask adapter from config.
- This lives in a function here to be cached and only hit the config once.
- """
- from airflow.configuration import conf
- return conf.getimport("logging", "secret_mask_adapter", fallback=None)
- @cached_property
- def _test_mode(self) -> bool:
- """
- Pulls the unit test mode flag from config.
- This lives in a function here to be cached and only hit the config once.
- """
- from airflow.configuration import conf
- return conf.getboolean("core", "unit_test_mode")
- def _adaptations(self, secret: str) -> Generator[str, None, None]:
- """Yield the secret along with any adaptations to the secret that should be masked."""
- yield secret
- if self._mask_adapter:
- # This can return an iterable of secrets to mask OR a single secret as a string
- secret_or_secrets = self._mask_adapter(secret)
- if not isinstance(secret_or_secrets, str):
- # if its not a string, it must be an iterable
- yield from secret_or_secrets
- else:
- yield secret_or_secrets
- def add_mask(self, secret: str | dict | Iterable, name: str | None = None):
- """Add a new secret to be masked to this filter instance."""
- if isinstance(secret, dict):
- for k, v in secret.items():
- self.add_mask(v, k)
- elif isinstance(secret, str):
- if not secret or (self._test_mode and secret in SECRETS_TO_SKIP_MASKING_FOR_TESTS):
- return
- new_mask = False
- for s in self._adaptations(secret):
- if s:
- pattern = re2.escape(s)
- if pattern not in self.patterns and (not name or should_hide_value_for_key(name)):
- self.patterns.add(pattern)
- new_mask = True
- if new_mask:
- self.replacer = re2.compile("|".join(self.patterns))
- elif isinstance(secret, collections.abc.Iterable):
- for v in secret:
- self.add_mask(v, name)
- class RedactedIO(TextIO):
- """
- IO class that redacts values going into stdout.
- Expected usage::
- with contextlib.redirect_stdout(RedactedIO()):
- ... # Writes to stdout will be redacted.
- """
- def __init__(self):
- self.target = sys.stdout
- def __enter__(self) -> TextIO:
- return self.target.__enter__()
- def __exit__(self, t, v, b) -> None:
- return self.target.__exit__(t, v, b)
- def __iter__(self) -> Iterator[str]:
- return iter(self.target)
- def __next__(self) -> str:
- return next(self.target)
- def close(self) -> None:
- return self.target.close()
- def fileno(self) -> int:
- return self.target.fileno()
- def flush(self) -> None:
- return self.target.flush()
- def isatty(self) -> bool:
- return self.target.isatty()
- def read(self, n: int = -1) -> str:
- return self.target.read(n)
- def readable(self) -> bool:
- return self.target.readable()
- def readline(self, n: int = -1) -> str:
- return self.target.readline(n)
- def readlines(self, n: int = -1) -> list[str]:
- return self.target.readlines(n)
- def seek(self, offset: int, whence: int = 0) -> int:
- return self.target.seek(offset, whence)
- def seekable(self) -> bool:
- return self.target.seekable()
- def tell(self) -> int:
- return self.target.tell()
- def truncate(self, s: int | None = None) -> int:
- return self.target.truncate(s)
- def writable(self) -> bool:
- return self.target.writable()
- def write(self, s: str) -> int:
- s = redact(s)
- return self.target.write(s)
- def writelines(self, lines) -> None:
- self.target.writelines(lines)
|