123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import warnings
- from functools import wraps
- from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast
- from flask import Response, g
- from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
- from airflow.auth.managers.models.resource_details import (
- AccessView,
- ConfigurationDetails,
- ConnectionDetails,
- DagAccessEntity,
- DagDetails,
- DatasetDetails,
- PoolDetails,
- VariableDetails,
- )
- from airflow.exceptions import RemovedInAirflow3Warning
- from airflow.utils.airflow_flask_app import get_airflow_app
- from airflow.www.extensions.init_auth_manager import get_auth_manager
- if TYPE_CHECKING:
- from airflow.auth.managers.base_auth_manager import ResourceMethod
- T = TypeVar("T", bound=Callable)
- def check_authentication() -> None:
- """Check that the request has valid authorization information."""
- for auth in get_airflow_app().api_auth:
- response = auth.requires_authentication(Response)()
- if response.status_code == 200:
- return
- # Even if the current_user is anonymous, the AUTH_ROLE_PUBLIC might still have permission.
- appbuilder = get_airflow_app().appbuilder
- if appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None):
- return
- # since this handler only checks authentication, not authorization,
- # we should always return 401
- raise Unauthenticated(headers=response.headers)
- def requires_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]:
- """
- Check current user's permissions against required permissions.
- Deprecated. Do not use this decorator, use one of the decorator `has_access_*` defined in
- airflow/api_connexion/security.py instead.
- This decorator will only work with FAB authentication and not with other auth providers.
- This decorator might be used in user plugins, do not remove it.
- """
- warnings.warn(
- "The 'requires_access' decorator is deprecated. Please use one of the decorator `requires_access_*`"
- "defined in airflow/api_connexion/security.py instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- from airflow.providers.fab.auth_manager.decorators.auth import _requires_access_fab
- return _requires_access_fab(permissions)
- def _requires_access(*, is_authorized_callback: Callable[[], bool], func: Callable, args, kwargs) -> bool:
- """
- Define the behavior whether the user is authorized to access the resource.
- :param is_authorized_callback: callback to execute to figure whether the user is authorized to access
- the resource
- :param func: the function to call if the user is authorized
- :param args: the arguments of ``func``
- :param kwargs: the keyword arguments ``func``
- :meta private:
- """
- check_authentication()
- if is_authorized_callback():
- return func(*args, **kwargs)
- raise PermissionDenied()
- def requires_access_configuration(method: ResourceMethod) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- section: str | None = kwargs.get("section")
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_configuration(
- method=method, details=ConfigurationDetails(section=section)
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_connection(method: ResourceMethod) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- connection_id: str | None = kwargs.get("connection_id")
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_connection(
- method=method, details=ConnectionDetails(conn_id=connection_id)
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_dag(
- method: ResourceMethod, access_entity: DagAccessEntity | None = None
- ) -> Callable[[T], T]:
- def _is_authorized_callback(dag_id: str):
- def callback():
- access = get_auth_manager().is_authorized_dag(
- method=method,
- access_entity=access_entity,
- details=DagDetails(id=dag_id),
- )
- # ``access`` means here:
- # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG
- # - if no DAG id is provided: is the user authorized to access all DAGs
- if dag_id or access or access_entity:
- return access
- # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done
- # on DAG level
- # If method is "GET", return whether the user has read access to any DAGs
- # If method is "PUT", return whether the user has edit access to any DAGs
- return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
- method == "PUT" and any(get_auth_manager().get_permitted_dag_ids(methods=["PUT"]))
- )
- return callback
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- dag_id: str | None = kwargs.get("dag_id") if kwargs.get("dag_id") != "~" else None
- return _requires_access(
- is_authorized_callback=_is_authorized_callback(dag_id),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_dataset(method: ResourceMethod) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- uri: str | None = kwargs.get("uri")
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_dataset(
- method=method, details=DatasetDetails(uri=uri)
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_pool(method: ResourceMethod) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- pool_name: str | None = kwargs.get("pool_name")
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_pool(
- method=method, details=PoolDetails(name=pool_name)
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_variable(method: ResourceMethod) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- variable_key: str | None = kwargs.get("variable_key")
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_variable(
- method=method, details=VariableDetails(key=variable_key)
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_view(access_view: AccessView) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_view(access_view=access_view),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def requires_access_custom_view(
- method: ResourceMethod,
- resource_name: str,
- ) -> Callable[[T], T]:
- def requires_access_decorator(func: T):
- @wraps(func)
- def decorated(*args, **kwargs):
- return _requires_access(
- is_authorized_callback=lambda: get_auth_manager().is_authorized_custom_view(
- method=method, resource_name=resource_name
- ),
- func=func,
- args=args,
- kwargs=kwargs,
- )
- return cast(T, decorated)
- return requires_access_decorator
- def get_readable_dags() -> set[str]:
- return get_auth_manager().get_permitted_dag_ids(user=g.user)
|