variable.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import json
  20. import logging
  21. from typing import TYPE_CHECKING, Any
  22. from sqlalchemy import Boolean, Column, Integer, String, Text, delete, select
  23. from sqlalchemy.dialects.mysql import MEDIUMTEXT
  24. from sqlalchemy.orm import declared_attr, reconstructor, synonym
  25. from airflow.api_internal.internal_api_call import internal_api_call
  26. from airflow.configuration import ensure_secrets_loaded
  27. from airflow.models.base import ID_LEN, Base
  28. from airflow.models.crypto import get_fernet
  29. from airflow.secrets.cache import SecretCache
  30. from airflow.secrets.metastore import MetastoreBackend
  31. from airflow.utils.log.logging_mixin import LoggingMixin
  32. from airflow.utils.log.secrets_masker import mask_secret
  33. from airflow.utils.session import provide_session
  34. if TYPE_CHECKING:
  35. from sqlalchemy.orm import Session
  36. log = logging.getLogger(__name__)
  37. class Variable(Base, LoggingMixin):
  38. """A generic way to store and retrieve arbitrary content or settings as a simple key/value store."""
  39. __tablename__ = "variable"
  40. __NO_DEFAULT_SENTINEL = object()
  41. id = Column(Integer, primary_key=True)
  42. key = Column(String(ID_LEN), unique=True)
  43. _val = Column("val", Text().with_variant(MEDIUMTEXT, "mysql"))
  44. description = Column(Text)
  45. is_encrypted = Column(Boolean, unique=False, default=False)
  46. def __init__(self, key=None, val=None, description=None):
  47. super().__init__()
  48. self.key = key
  49. self.val = val
  50. self.description = description
  51. @reconstructor
  52. def on_db_load(self):
  53. if self._val:
  54. mask_secret(self.val, self.key)
  55. def __repr__(self):
  56. # Hiding the value
  57. return f"{self.key} : {self._val}"
  58. def get_val(self):
  59. """Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""
  60. from cryptography.fernet import InvalidToken as InvalidFernetToken
  61. if self._val is not None and self.is_encrypted:
  62. try:
  63. fernet = get_fernet()
  64. return fernet.decrypt(bytes(self._val, "utf-8")).decode()
  65. except InvalidFernetToken:
  66. self.log.error("Can't decrypt _val for key=%s, invalid token or value", self.key)
  67. return None
  68. except Exception:
  69. self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing", self.key)
  70. return None
  71. else:
  72. return self._val
  73. def set_val(self, value):
  74. """Encode the specified value with Fernet Key and store it in Variables Table."""
  75. if value is not None:
  76. fernet = get_fernet()
  77. self._val = fernet.encrypt(bytes(value, "utf-8")).decode()
  78. self.is_encrypted = fernet.is_encrypted
  79. @declared_attr
  80. def val(cls):
  81. """Get Airflow Variable from Metadata DB and decode it using the Fernet Key."""
  82. return synonym("_val", descriptor=property(cls.get_val, cls.set_val))
  83. @classmethod
  84. def setdefault(cls, key, default, description=None, deserialize_json=False):
  85. """
  86. Return the current value for a key or store the default value and return it.
  87. Works the same as the Python builtin dict object.
  88. :param key: Dict key for this Variable
  89. :param default: Default value to set and return if the variable
  90. isn't already in the DB
  91. :param description: Default value to set Description of the Variable
  92. :param deserialize_json: Store this as a JSON encoded value in the DB
  93. and un-encode it when retrieving a value
  94. :param session: Session
  95. :return: Mixed
  96. """
  97. obj = Variable.get(key, default_var=None, deserialize_json=deserialize_json)
  98. if obj is None:
  99. if default is not None:
  100. Variable.set(key=key, value=default, description=description, serialize_json=deserialize_json)
  101. return default
  102. else:
  103. raise ValueError("Default Value must be set")
  104. else:
  105. return obj
  106. @classmethod
  107. def get(
  108. cls,
  109. key: str,
  110. default_var: Any = __NO_DEFAULT_SENTINEL,
  111. deserialize_json: bool = False,
  112. ) -> Any:
  113. """
  114. Get a value for an Airflow Variable Key.
  115. :param key: Variable Key
  116. :param default_var: Default value of the Variable if the Variable doesn't exist
  117. :param deserialize_json: Deserialize the value to a Python dict
  118. """
  119. var_val = Variable.get_variable_from_secrets(key=key)
  120. if var_val is None:
  121. if default_var is not cls.__NO_DEFAULT_SENTINEL:
  122. return default_var
  123. else:
  124. raise KeyError(f"Variable {key} does not exist")
  125. else:
  126. if deserialize_json:
  127. obj = json.loads(var_val)
  128. mask_secret(obj, key)
  129. return obj
  130. else:
  131. mask_secret(var_val, key)
  132. return var_val
  133. @staticmethod
  134. @provide_session
  135. def set(
  136. key: str,
  137. value: Any,
  138. description: str | None = None,
  139. serialize_json: bool = False,
  140. session: Session = None,
  141. ) -> None:
  142. """
  143. Set a value for an Airflow Variable with a given Key.
  144. This operation overwrites an existing variable.
  145. :param key: Variable Key
  146. :param value: Value to set for the Variable
  147. :param description: Description of the Variable
  148. :param serialize_json: Serialize the value to a JSON string
  149. :param session: Session
  150. """
  151. Variable._set(
  152. key=key, value=value, description=description, serialize_json=serialize_json, session=session
  153. )
  154. # invalidate key in cache for faster propagation
  155. # we cannot save the value set because it's possible that it's shadowed by a custom backend
  156. # (see call to check_for_write_conflict above)
  157. SecretCache.invalidate_variable(key)
  158. @staticmethod
  159. @provide_session
  160. @internal_api_call
  161. def _set(
  162. key: str,
  163. value: Any,
  164. description: str | None = None,
  165. serialize_json: bool = False,
  166. session: Session = None,
  167. ) -> None:
  168. """
  169. Set a value for an Airflow Variable with a given Key.
  170. This operation overwrites an existing variable.
  171. :param key: Variable Key
  172. :param value: Value to set for the Variable
  173. :param description: Description of the Variable
  174. :param serialize_json: Serialize the value to a JSON string
  175. :param session: Session
  176. """
  177. # check if the secret exists in the custom secrets' backend.
  178. Variable.check_for_write_conflict(key=key)
  179. if serialize_json:
  180. stored_value = json.dumps(value, indent=2)
  181. else:
  182. stored_value = str(value)
  183. Variable.delete(key, session=session)
  184. session.add(Variable(key=key, val=stored_value, description=description))
  185. session.flush()
  186. # invalidate key in cache for faster propagation
  187. # we cannot save the value set because it's possible that it's shadowed by a custom backend
  188. # (see call to check_for_write_conflict above)
  189. SecretCache.invalidate_variable(key)
  190. @staticmethod
  191. @provide_session
  192. def update(
  193. key: str,
  194. value: Any,
  195. serialize_json: bool = False,
  196. session: Session = None,
  197. ) -> None:
  198. """
  199. Update a given Airflow Variable with the Provided value.
  200. :param key: Variable Key
  201. :param value: Value to set for the Variable
  202. :param serialize_json: Serialize the value to a JSON string
  203. :param session: Session
  204. """
  205. Variable._update(key=key, value=value, serialize_json=serialize_json, session=session)
  206. # We need to invalidate the cache for internal API cases on the client side
  207. SecretCache.invalidate_variable(key)
  208. @staticmethod
  209. @provide_session
  210. @internal_api_call
  211. def _update(
  212. key: str,
  213. value: Any,
  214. serialize_json: bool = False,
  215. session: Session = None,
  216. ) -> None:
  217. """
  218. Update a given Airflow Variable with the Provided value.
  219. :param key: Variable Key
  220. :param value: Value to set for the Variable
  221. :param serialize_json: Serialize the value to a JSON string
  222. :param session: Session
  223. """
  224. Variable.check_for_write_conflict(key=key)
  225. if Variable.get_variable_from_secrets(key=key) is None:
  226. raise KeyError(f"Variable {key} does not exist")
  227. obj = session.scalar(select(Variable).where(Variable.key == key))
  228. if obj is None:
  229. raise AttributeError(f"Variable {key} does not exist in the Database and cannot be updated.")
  230. Variable.set(
  231. key=key, value=value, description=obj.description, serialize_json=serialize_json, session=session
  232. )
  233. @staticmethod
  234. @provide_session
  235. def delete(key: str, session: Session = None) -> int:
  236. """
  237. Delete an Airflow Variable for a given key.
  238. :param key: Variable Keys
  239. """
  240. rows = Variable._delete(key=key, session=session)
  241. SecretCache.invalidate_variable(key)
  242. return rows
  243. @staticmethod
  244. @provide_session
  245. @internal_api_call
  246. def _delete(key: str, session: Session = None) -> int:
  247. """
  248. Delete an Airflow Variable for a given key.
  249. :param key: Variable Keys
  250. """
  251. rows = session.execute(delete(Variable).where(Variable.key == key)).rowcount
  252. SecretCache.invalidate_variable(key)
  253. return rows
  254. def rotate_fernet_key(self):
  255. """Rotate Fernet Key."""
  256. fernet = get_fernet()
  257. if self._val and self.is_encrypted:
  258. self._val = fernet.rotate(self._val.encode("utf-8")).decode()
  259. @staticmethod
  260. def check_for_write_conflict(key: str) -> None:
  261. """
  262. Log a warning if a variable exists outside the metastore.
  263. If we try to write a variable to the metastore while the same key
  264. exists in an environment variable or custom secrets backend, then
  265. subsequent reads will not read the set value.
  266. :param key: Variable Key
  267. """
  268. for secrets_backend in ensure_secrets_loaded():
  269. if not isinstance(secrets_backend, MetastoreBackend):
  270. try:
  271. var_val = secrets_backend.get_variable(key=key)
  272. if var_val is not None:
  273. _backend_name = type(secrets_backend).__name__
  274. log.warning(
  275. "The variable %s is defined in the %s secrets backend, which takes "
  276. "precedence over reading from the database. The value in the database will be "
  277. "updated, but to read it you have to delete the conflicting variable "
  278. "from %s",
  279. key,
  280. _backend_name,
  281. _backend_name,
  282. )
  283. return
  284. except Exception:
  285. log.exception(
  286. "Unable to retrieve variable from secrets backend (%s). "
  287. "Checking subsequent secrets backend.",
  288. type(secrets_backend).__name__,
  289. )
  290. return None
  291. @staticmethod
  292. def get_variable_from_secrets(key: str) -> str | None:
  293. """
  294. Get Airflow Variable by iterating over all Secret Backends.
  295. :param key: Variable Key
  296. :return: Variable Value
  297. """
  298. # check cache first
  299. # enabled only if SecretCache.init() has been called first
  300. try:
  301. return SecretCache.get_variable(key)
  302. except SecretCache.NotPresentException:
  303. pass # continue business
  304. var_val = None
  305. # iterate over backends if not in cache (or expired)
  306. for secrets_backend in ensure_secrets_loaded():
  307. try:
  308. var_val = secrets_backend.get_variable(key=key)
  309. if var_val is not None:
  310. break
  311. except Exception:
  312. log.exception(
  313. "Unable to retrieve variable from secrets backend (%s). "
  314. "Checking subsequent secrets backend.",
  315. type(secrets_backend).__name__,
  316. )
  317. SecretCache.save_variable(key, var_val) # we save None as well
  318. return var_val