security.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import warnings
  19. from functools import wraps
  20. from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast
  21. from flask import Response, g
  22. from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
  23. from airflow.auth.managers.models.resource_details import (
  24. AccessView,
  25. ConfigurationDetails,
  26. ConnectionDetails,
  27. DagAccessEntity,
  28. DagDetails,
  29. DatasetDetails,
  30. PoolDetails,
  31. VariableDetails,
  32. )
  33. from airflow.exceptions import RemovedInAirflow3Warning
  34. from airflow.utils.airflow_flask_app import get_airflow_app
  35. from airflow.www.extensions.init_auth_manager import get_auth_manager
  36. if TYPE_CHECKING:
  37. from airflow.auth.managers.base_auth_manager import ResourceMethod
  38. T = TypeVar("T", bound=Callable)
  39. def check_authentication() -> None:
  40. """Check that the request has valid authorization information."""
  41. for auth in get_airflow_app().api_auth:
  42. response = auth.requires_authentication(Response)()
  43. if response.status_code == 200:
  44. return
  45. # Even if the current_user is anonymous, the AUTH_ROLE_PUBLIC might still have permission.
  46. appbuilder = get_airflow_app().appbuilder
  47. if appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None):
  48. return
  49. # since this handler only checks authentication, not authorization,
  50. # we should always return 401
  51. raise Unauthenticated(headers=response.headers)
  52. def requires_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]:
  53. """
  54. Check current user's permissions against required permissions.
  55. Deprecated. Do not use this decorator, use one of the decorator `has_access_*` defined in
  56. airflow/api_connexion/security.py instead.
  57. This decorator will only work with FAB authentication and not with other auth providers.
  58. This decorator might be used in user plugins, do not remove it.
  59. """
  60. warnings.warn(
  61. "The 'requires_access' decorator is deprecated. Please use one of the decorator `requires_access_*`"
  62. "defined in airflow/api_connexion/security.py instead.",
  63. RemovedInAirflow3Warning,
  64. stacklevel=2,
  65. )
  66. from airflow.providers.fab.auth_manager.decorators.auth import _requires_access_fab
  67. return _requires_access_fab(permissions)
  68. def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
  69. """
  70. Define the behavior whether the user is authorized to access the resource.
  71. :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
  72. the resource
  73. :param func: the function to call if the user is authorized
  74. :param args: the arguments of ``func``
  75. :param kwargs: the keyword arguments ``func``
  76. :meta private:
  77. """
  78. check_authentication()
  79. if is_authorized_callback():
  80. return func(*args, **kwargs)
  81. raise PermissionDenied()
  82. def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
  83. def requires_access_decorator(func: T):
  84. @wraps(func)
  85. def decorated(*args, **kwargs):
  86. section: str | None = kwargs.get("section")
  87. return _requires_access(
  88. is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
  89. method=method, details=ConfigurationDetails(section=section)
  90. ),
  91. func=func,
  92. args=args,
  93. kwargs=kwargs,
  94. )
  95. return cast(T, decorated)
  96. return requires_access_decorator
  97. def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
  98. def requires_access_decorator(func: T):
  99. @wraps(func)
  100. def decorated(*args, **kwargs):
  101. connection_id: str | None = kwargs.get("connection_id")
  102. return _requires_access(
  103. is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
  104. method=method, details=ConnectionDetails(conn_id=connection_id)
  105. ),
  106. func=func,
  107. args=args,
  108. kwargs=kwargs,
  109. )
  110. return cast(T, decorated)
  111. return requires_access_decorator
  112. def requires_access_dag(
  113. method: ResourceMethod, access_entity: DagAccessEntity | None = None
  114. ) -> Callable[[T], T]:
  115. def _is_authorized_callback(dag_id: str):
  116. def callback():
  117. access = get_auth_manager().is_authorized_dag(
  118. method=method,
  119. access_entity=access_entity,
  120. details=DagDetails(id=dag_id),
  121. )
  122. # ``access`` means here:
  123. # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
  124. # - if no DAG id is provided: is the user authorized to access all DAGs
  125. if dag_id or access or access_entity:
  126. return access
  127. # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done
  128. # on DAG level
  129. # If method is "GET", return whether the user has read access to any DAGs
  130. # If method is "PUT", return whether the user has edit access to any DAGs
  131. return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
  132. method == "PUT" and any(get_auth_manager().get_permitted_dag_ids(methods=["PUT"]))
  133. )
  134. return callback
  135. def requires_access_decorator(func: T):
  136. @wraps(func)
  137. def decorated(*args, **kwargs):
  138. dag_id: str | None = kwargs.get("dag_id") if kwargs.get("dag_id") != "~" else None
  139. return _requires_access(
  140. is_authorized_callback=_is_authorized_callback(dag_id),
  141. func=func,
  142. args=args,
  143. kwargs=kwargs,
  144. )
  145. return cast(T, decorated)
  146. return requires_access_decorator
  147. def requires_access_dataset(method: ResourceMethod) -> Callable[[T], T]:
  148. def requires_access_decorator(func: T):
  149. @wraps(func)
  150. def decorated(*args, **kwargs):
  151. uri: str | None = kwargs.get("uri")
  152. return _requires_access(
  153. is_authorized_callback=lambda: get_auth_manager().is_authorized_dataset(
  154. method=method, details=DatasetDetails(uri=uri)
  155. ),
  156. func=func,
  157. args=args,
  158. kwargs=kwargs,
  159. )
  160. return cast(T, decorated)
  161. return requires_access_decorator
  162. def requires_access_pool(method: ResourceMethod) -> Callable[[T], T]:
  163. def requires_access_decorator(func: T):
  164. @wraps(func)
  165. def decorated(*args, **kwargs):
  166. pool_name: str | None = kwargs.get("pool_name")
  167. return _requires_access(
  168. is_authorized_callback=lambda: get_auth_manager().is_authorized_pool(
  169. method=method, details=PoolDetails(name=pool_name)
  170. ),
  171. func=func,
  172. args=args,
  173. kwargs=kwargs,
  174. )
  175. return cast(T, decorated)
  176. return requires_access_decorator
  177. def requires_access_variable(method: ResourceMethod) -> Callable[[T], T]:
  178. def requires_access_decorator(func: T):
  179. @wraps(func)
  180. def decorated(*args, **kwargs):
  181. variable_key: str | None = kwargs.get("variable_key")
  182. return _requires_access(
  183. is_authorized_callback=lambda: get_auth_manager().is_authorized_variable(
  184. method=method, details=VariableDetails(key=variable_key)
  185. ),
  186. func=func,
  187. args=args,
  188. kwargs=kwargs,
  189. )
  190. return cast(T, decorated)
  191. return requires_access_decorator
  192. def requires_access_view(access_view: AccessView) -> Callable[[T], T]:
  193. def requires_access_decorator(func: T):
  194. @wraps(func)
  195. def decorated(*args, **kwargs):
  196. return _requires_access(
  197. is_authorized_callback=lambda: get_auth_manager().is_authorized_view(access_view=access_view),
  198. func=func,
  199. args=args,
  200. kwargs=kwargs,
  201. )
  202. return cast(T, decorated)
  203. return requires_access_decorator
  204. def requires_access_custom_view(
  205. method: ResourceMethod,
  206. resource_name: str,
  207. ) -> Callable[[T], T]:
  208. def requires_access_decorator(func: T):
  209. @wraps(func)
  210. def decorated(*args, **kwargs):
  211. return _requires_access(
  212. is_authorized_callback=lambda: get_auth_manager().is_authorized_custom_view(
  213. method=method, resource_name=resource_name
  214. ),
  215. func=func,
  216. args=args,
  217. kwargs=kwargs,
  218. )
  219. return cast(T, decorated)
  220. return requires_access_decorator
  221. def get_readable_dags() -> set[str]:
  222. return get_auth_manager().get_permitted_dag_ids(user=g.user)