123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import json
- import logging
- from typing import TYPE_CHECKING, Any
- from sqlalchemy import Boolean, Column, Integer, String, Text, delete, select
- from sqlalchemy.dialects.mysql import MEDIUMTEXT
- from sqlalchemy.orm import declared_attr, reconstructor, synonym
- from airflow.api_internal.internal_api_call import internal_api_call
- from airflow.configuration import ensure_secrets_loaded
- from airflow.models.base import ID_LEN, Base
- from airflow.models.crypto import get_fernet
- from airflow.secrets.cache import SecretCache
- from airflow.secrets.metastore import MetastoreBackend
- from airflow.utils.log.logging_mixin import LoggingMixin
- from airflow.utils.log.secrets_masker import mask_secret
- from airflow.utils.session import provide_session
- if TYPE_CHECKING:
- from sqlalchemy.orm import Session
- log = logging.getLogger(__name__)
- class Variable(Base, LoggingMixin):
- """A generic way to store and retrieve arbitrary content or settings as a simple key/value store."""
- __tablename__ = "variable"
- __NO_DEFAULT_SENTINEL = object()
- id = Column(Integer, primary_key=True)
- key = Column(String(ID_LEN), unique=True)
- _val = Column("val", Text().with_variant(MEDIUMTEXT, "mysql"))
- description = Column(Text)
- is_encrypted = Column(Boolean, unique=False, default=False)
- def __init__(self, key=None, val=None, description=None):
- super().__init__()
- self.key = key
- self.val = val
- self.description = description
- @reconstructor
- def on_db_load(self):
- if self._val:
- mask_secret(self.val, self.key)
- def __repr__(self):
- # Hiding the value
- return f"{self.key} : {self._val}"
- def get_val(self):
- """Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""
- from cryptography.fernet import InvalidToken as InvalidFernetToken
- if self._val is not None and self.is_encrypted:
- try:
- fernet = get_fernet()
- return fernet.decrypt(bytes(self._val, "utf-8")).decode()
- except InvalidFernetToken:
- self.log.error("Can't decrypt _val for key=%s, invalid token or value", self.key)
- return None
- except Exception:
- self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key)
- return None
- else:
- return self._val
- def set_val(self, value):
- """Encode the specified value with Fernet Key and store it in Variables Table."""
- if value is not None:
- fernet = get_fernet()
- self._val = fernet.encrypt(bytes(value, "utf-8")).decode()
- self.is_encrypted = fernet.is_encrypted
- @declared_attr
- def val(cls):
- """Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""
- return synonym("_val", descriptor=property(cls.get_val, cls.set_val))
- @classmethod
- def setdefault(cls, key, default, description=None, deserialize_json=False):
- """
- Return the current value for a key or store the default value and return it.
- Works the same as the Python builtin dict object.
- :param key: Dict key for this Variable
- :param default: Default value to set and return if the variable
- isn't already in the DB
- :param description: Default value to set Description of the Variable
- :param deserialize_json: Store this as a JSON encoded value in the DB
- and un-encode it when retrieving a value
- :param session: Session
- :return: Mixed
- """
- obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json)
- if obj is None:
- if default is not None:
- Variable.set(key=key, value=default, description=description, serialize_json=deserialize_json)
- return default
- else:
- raise ValueError("Default Value must be set")
- else:
- return obj
- @classmethod
- def get(
- cls,
- key: str,
- default_var: Any = __NO_DEFAULT_SENTINEL,
- deserialize_json: bool = False,
- ) -> Any:
- """
- Get a value for an Airflow Variable Key.
- :param key: Variable Key
- :param default_var: Default value of the Variable if the Variable doesn't exist
- :param deserialize_json: Deserialize the value to a Python dict
- """
- var_val = Variable.get_variable_from_secrets(key=key)
- if var_val is None:
- if default_var is not cls.__NO_DEFAULT_SENTINEL:
- return default_var
- else:
- raise KeyError(f"Variable {key} does not exist")
- else:
- if deserialize_json:
- obj = json.loads(var_val)
- mask_secret(obj, key)
- return obj
- else:
- mask_secret(var_val, key)
- return var_val
- @staticmethod
- @provide_session
- def set(
- key: str,
- value: Any,
- description: str | None = None,
- serialize_json: bool = False,
- session: Session = None,
- ) -> None:
- """
- Set a value for an Airflow Variable with a given Key.
- This operation overwrites an existing variable.
- :param key: Variable Key
- :param value: Value to set for the Variable
- :param description: Description of the Variable
- :param serialize_json: Serialize the value to a JSON string
- :param session: Session
- """
- Variable._set(
- key=key, value=value, description=description, serialize_json=serialize_json, session=session
- )
- # invalidate key in cache for faster propagation
- # we cannot save the value set because it's possible that it's shadowed by a custom backend
- # (see call to check_for_write_conflict above)
- SecretCache.invalidate_variable(key)
- @staticmethod
- @provide_session
- @internal_api_call
- def _set(
- key: str,
- value: Any,
- description: str | None = None,
- serialize_json: bool = False,
- session: Session = None,
- ) -> None:
- """
- Set a value for an Airflow Variable with a given Key.
- This operation overwrites an existing variable.
- :param key: Variable Key
- :param value: Value to set for the Variable
- :param description: Description of the Variable
- :param serialize_json: Serialize the value to a JSON string
- :param session: Session
- """
- # check if the secret exists in the custom secrets' backend.
- Variable.check_for_write_conflict(key=key)
- if serialize_json:
- stored_value = json.dumps(value, indent=2)
- else:
- stored_value = str(value)
- Variable.delete(key, session=session)
- session.add(Variable(key=key, val=stored_value, description=description))
- session.flush()
- # invalidate key in cache for faster propagation
- # we cannot save the value set because it's possible that it's shadowed by a custom backend
- # (see call to check_for_write_conflict above)
- SecretCache.invalidate_variable(key)
- @staticmethod
- @provide_session
- def update(
- key: str,
- value: Any,
- serialize_json: bool = False,
- session: Session = None,
- ) -> None:
- """
- Update a given Airflow Variable with the Provided value.
- :param key: Variable Key
- :param value: Value to set for the Variable
- :param serialize_json: Serialize the value to a JSON string
- :param session: Session
- """
- Variable._update(key=key, value=value, serialize_json=serialize_json, session=session)
- # We need to invalidate the cache for internal API cases on the client side
- SecretCache.invalidate_variable(key)
- @staticmethod
- @provide_session
- @internal_api_call
- def _update(
- key: str,
- value: Any,
- serialize_json: bool = False,
- session: Session = None,
- ) -> None:
- """
- Update a given Airflow Variable with the Provided value.
- :param key: Variable Key
- :param value: Value to set for the Variable
- :param serialize_json: Serialize the value to a JSON string
- :param session: Session
- """
- Variable.check_for_write_conflict(key=key)
- if Variable.get_variable_from_secrets(key=key) is None:
- raise KeyError(f"Variable {key} does not exist")
- obj = session.scalar(select(Variable).where(Variable.key == key))
- if obj is None:
- raise AttributeError(f"Variable {key} does not exist in the Database and cannot be updated.")
- Variable.set(
- key=key, value=value, description=obj.description, serialize_json=serialize_json, session=session
- )
- @staticmethod
- @provide_session
- def delete(key: str, session: Session = None) -> int:
- """
- Delete an Airflow Variable for a given key.
- :param key: Variable Keys
- """
- rows = Variable._delete(key=key, session=session)
- SecretCache.invalidate_variable(key)
- return rows
- @staticmethod
- @provide_session
- @internal_api_call
- def _delete(key: str, session: Session = None) -> int:
- """
- Delete an Airflow Variable for a given key.
- :param key: Variable Keys
- """
- rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
- SecretCache.invalidate_variable(key)
- return rows
- def rotate_fernet_key(self):
- """Rotate Fernet Key."""
- fernet = get_fernet()
- if self._val and self.is_encrypted:
- self._val = fernet.rotate(self._val.encode("utf-8")).decode()
- @staticmethod
- def check_for_write_conflict(key: str) -> None:
- """
- Log a warning if a variable exists outside the metastore.
- If we try to write a variable to the metastore while the same key
- exists in an environment variable or custom secrets backend, then
- subsequent reads will not read the set value.
- :param key: Variable Key
- """
- for secrets_backend in ensure_secrets_loaded():
- if not isinstance(secrets_backend, MetastoreBackend):
- try:
- var_val = secrets_backend.get_variable(key=key)
- if var_val is not None:
- _backend_name = type(secrets_backend).__name__
- log.warning(
- "The variable %s is defined in the %s secrets backend, which takes "
- "precedence over reading from the database. The value in the database will be "
- "updated, but to read it you have to delete the conflicting variable "
- "from %s",
- key,
- _backend_name,
- _backend_name,
- )
- return
- except Exception:
- log.exception(
- "Unable to retrieve variable from secrets backend (%s). "
- "Checking subsequent secrets backend.",
- type(secrets_backend).__name__,
- )
- return None
- @staticmethod
- def get_variable_from_secrets(key: str) -> str | None:
- """
- Get Airflow Variable by iterating over all Secret Backends.
- :param key: Variable Key
- :return: Variable Value
- """
- # check cache first
- # enabled only if SecretCache.init() has been called first
- try:
- return SecretCache.get_variable(key)
- except SecretCache.NotPresentException:
- pass # continue business
- var_val = None
- # iterate over backends if not in cache (or expired)
- for secrets_backend in ensure_secrets_loaded():
- try:
- var_val = secrets_backend.get_variable(key=key)
- if var_val is not None:
- break
- except Exception:
- log.exception(
- "Unable to retrieve variable from secrets backend (%s). "
- "Checking subsequent secrets backend.",
- type(secrets_backend).__name__,
- )
- SecretCache.save_variable(key, var_val) # we save None as well
- return var_val
|