import datetime import logging import re from typing import Any, List, Optional from flask import abort, current_app, flash, g, redirect, request, session, url_for from flask_appbuilder._compat import as_unicode from flask_appbuilder.actions import action from flask_appbuilder.baseviews import BaseView from flask_appbuilder.charts.views import DirectByChartView from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget from flask_appbuilder.security.decorators import has_access, no_cache from flask_appbuilder.security.forms import ( DynamicForm, LoginForm_db, LoginForm_oid, ResetPasswordForm, SelectDataRequired, UserInfoEdit, ) from flask_appbuilder.security.utils import generate_random_string from flask_appbuilder.utils.base import get_safe_redirect, lazy_formatter_gettext from flask_appbuilder.validators import PasswordComplexityValidator from flask_appbuilder.views import expose, ModelView, SimpleFormView from flask_appbuilder.widgets import ListWidget, ShowWidget from flask_babel import lazy_gettext from flask_login import login_user, logout_user import jwt from werkzeug.security import generate_password_hash from werkzeug.wrappers import Response as WerkzeugResponse from wtforms import PasswordField, validators from wtforms.validators import EqualTo log = logging.getLogger(__name__) class PermissionModelView(ModelView): route_base = "/permissions" base_permissions = ["can_list"] list_title = lazy_gettext("List Base Permissions") show_title = lazy_gettext("Show Base Permission") add_title = lazy_gettext("Add Base Permission") edit_title = lazy_gettext("Edit Base Permission") label_columns = {"name": lazy_gettext("Name")} class ViewMenuModelView(ModelView): route_base = "/viewmenus" base_permissions = ["can_list"] list_title = lazy_gettext("List View Menus") show_title = lazy_gettext("Show View Menu") add_title = lazy_gettext("Add View Menu") edit_title = lazy_gettext("Edit View Menu") label_columns = {"name": lazy_gettext("Name")} class PermissionViewModelView(ModelView): route_base = "/permissionviews" base_permissions = ["can_list"] list_title = lazy_gettext("List Permissions on Views/Menus") show_title = lazy_gettext("Show Permission on Views/Menus") add_title = lazy_gettext("Add Permission on Views/Menus") edit_title = lazy_gettext("Edit Permission on Views/Menus") label_columns = { "permission": lazy_gettext("Permission"), "view_menu": lazy_gettext("View/Menu"), } list_columns = ["permission", "view_menu"] class ResetMyPasswordView(SimpleFormView): """ View for resetting own user password """ route_base = "/resetmypassword" form = ResetPasswordForm form_title = lazy_gettext("Reset Password Form") redirect_url = "/" message = lazy_gettext("Password Changed") def form_post(self, form: DynamicForm) -> None: self.appbuilder.sm.reset_password(g.user.id, form.password.data) flash(as_unicode(self.message), "info") class ResetPasswordView(SimpleFormView): """ View for reseting all users password """ route_base = "/resetpassword" form = ResetPasswordForm form_title = lazy_gettext("Reset Password Form") redirect_url = "/" message = lazy_gettext("Password Changed") def form_post(self, form: DynamicForm) -> None: pk = request.args.get("pk") self.appbuilder.sm.reset_password(pk, form.password.data) flash(as_unicode(self.message), "info") class UserInfoEditView(SimpleFormView): form = UserInfoEdit form_title = lazy_gettext("Edit User Information") redirect_url = "/" message = lazy_gettext("User information changed") def form_get(self, form: DynamicForm) -> None: item = self.appbuilder.sm.get_user_by_id(g.user.id) # fills the form generic solution for key, value in form.data.items(): if key == "csrf_token": continue form_field = getattr(form, key) form_field.data = getattr(item, key) def form_post(self, form: DynamicForm) -> None: form = self.form.refresh(request.form) item = self.appbuilder.sm.get_user_by_id(g.user.id) form.populate_obj(item) self.appbuilder.sm.update_user(item) flash(as_unicode(self.message), "info") def _roles_custom_formatter(string: str) -> str: if current_app.config.get("AUTH_ROLES_SYNC_AT_LOGIN", False): string += ( ". " ) return string class UserModelView(ModelView): route_base = "/users" list_title = lazy_gettext("List Users") show_title = lazy_gettext("Show User") add_title = lazy_gettext("Add User") edit_title = lazy_gettext("Edit User") label_columns = { "get_full_name": lazy_gettext("Full Name"), "first_name": lazy_gettext("First Name"), "last_name": lazy_gettext("Last Name"), "username": lazy_gettext("User Name"), "password": lazy_gettext("Password"), "active": lazy_gettext("Is Active?"), "email": lazy_gettext("Email"), "roles": lazy_gettext("Role"), "last_login": lazy_gettext("Last login"), "login_count": lazy_gettext("Login count"), "fail_login_count": lazy_gettext("Failed login count"), "created_on": lazy_gettext("Created on"), "created_by": lazy_gettext("Created by"), "changed_on": lazy_gettext("Changed on"), "changed_by": lazy_gettext("Changed by"), } description_columns = { "first_name": lazy_gettext("Write the user first name or names"), "last_name": lazy_gettext("Write the user last name"), "username": lazy_gettext( "Username valid for authentication on DB or LDAP, unused for OID auth" ), "password": lazy_gettext("The user's password for authentication"), "active": lazy_gettext( "It's not a good policy to remove a user, just make it inactive" ), "email": lazy_gettext("The user's email, this will also be used for OID auth"), "roles": lazy_formatter_gettext( "The user role on the application," " this will associate with a list of permissions", _roles_custom_formatter, ), "conf_password": lazy_gettext("Please rewrite the user's password to confirm"), } list_columns = ["first_name", "last_name", "username", "email", "active", "roles"] show_fieldsets = [ ( lazy_gettext("User info"), {"fields": ["username", "active", "roles", "login_count"]}, ), ( lazy_gettext("Personal Info"), {"fields": ["first_name", "last_name", "email"], "expanded": True}, ), ( lazy_gettext("Audit Info"), { "fields": [ "last_login", "fail_login_count", "created_on", "created_by", "changed_on", "changed_by", ], "expanded": False, }, ), ] user_show_fieldsets = [ ( lazy_gettext("User info"), {"fields": ["username", "active", "roles", "login_count"]}, ), ( lazy_gettext("Personal Info"), {"fields": ["first_name", "last_name", "email"], "expanded": True}, ), ] search_columns = [ "first_name", "last_name", "username", "email", "active", "roles", "created_on", "changed_on", "last_login", "login_count", "fail_login_count", ] add_columns = ["first_name", "last_name", "username", "active", "email", "roles"] edit_columns = ["first_name", "last_name", "username", "active", "email", "roles"] user_info_title = lazy_gettext("Your user information") @expose("/userinfo/") @has_access def userinfo(self) -> WerkzeugResponse: item = self.datamodel.get(g.user.id, self._base_filters) widgets = self._get_show_widget( g.user.id, item, show_fieldsets=self.user_show_fieldsets ) self.update_redirect() return self.render_template( self.show_template, title=self.user_info_title, widgets=widgets, appbuilder=self.appbuilder, ) @action("userinfoedit", lazy_gettext("Edit User"), "", "fa-edit", multiple=False) def userinfoedit(self, item: Any) -> WerkzeugResponse: return redirect( url_for(self.appbuilder.sm.userinfoeditview.__name__ + ".this_form_get") ) class UserOIDModelView(UserModelView): """ View that add OID specifics to User view. Override to implement your own custom view. Then override useroidmodelview property on SecurityManager """ pass class UserLDAPModelView(UserModelView): """ View that add LDAP specifics to User view. Override to implement your own custom view. Then override userldapmodelview property on SecurityManager """ pass class UserOAuthModelView(UserModelView): """ View that add OAUTH specifics to User view. Override to implement your own custom view. Then override userldapmodelview property on SecurityManager """ pass class UserRemoteUserModelView(UserModelView): """ View that add REMOTE_USER specifics to User view. Override to implement your own custom view. Then override userldapmodelview property on SecurityManager """ pass class UserDBModelView(UserModelView): """ View that add DB specifics to User view. Override to implement your own custom view. Then override userdbmodelview property on SecurityManager """ add_form_extra_fields = { "password": PasswordField( lazy_gettext("Password"), description=lazy_gettext("The user's password for authentication"), validators=[validators.DataRequired(), PasswordComplexityValidator()], widget=BS3PasswordFieldWidget(), ), "conf_password": PasswordField( lazy_gettext("Confirm Password"), description=lazy_gettext("Please rewrite the user's password to confirm"), validators=[ validators.DataRequired(), EqualTo("password", message=lazy_gettext("Passwords must match")), ], widget=BS3PasswordFieldWidget(), ), } add_columns = [ "first_name", "last_name", "username", "active", "email", "roles", "password", "conf_password", ] validators_columns = {"roles": [SelectDataRequired()]} @expose("/show/", methods=["GET"]) @has_access def show(self, pk: Any) -> WerkzeugResponse: actions = dict() actions["resetpasswords"] = self.actions.get("resetpasswords") item = self.datamodel.get(pk, self._base_filters) if not item: abort(404) widgets = self._get_show_widget(pk, item, actions=actions) self.update_redirect() return self.render_template( self.show_template, pk=pk, title=self.show_title, widgets=widgets, appbuilder=self.appbuilder, related_views=self._related_views, ) @expose("/userinfo/") @has_access def userinfo(self) -> WerkzeugResponse: actions = dict() actions["resetmypassword"] = self.actions.get("resetmypassword") actions["userinfoedit"] = self.actions.get("userinfoedit") item = self.datamodel.get(g.user.id, self._base_filters) widgets = self._get_show_widget( g.user.id, item, actions=actions, show_fieldsets=self.user_show_fieldsets ) self.update_redirect() return self.render_template( self.show_template, title=self.user_info_title, widgets=widgets, appbuilder=self.appbuilder, ) @action( "resetmypassword", lazy_gettext("Reset my password"), "", "fa-lock", multiple=False, ) def resetmypassword(self, item: Any): return redirect( url_for(self.appbuilder.sm.resetmypasswordview.__name__ + ".this_form_get") ) @action( "resetpasswords", lazy_gettext("Reset Password"), "", "fa-lock", multiple=False ) def resetpasswords(self, item: Any) -> WerkzeugResponse: return redirect( url_for( self.appbuilder.sm.resetpasswordview.__name__ + ".this_form_get", pk=item.id, ) ) def pre_update(self, item: Any) -> None: item.changed_on = datetime.datetime.now() item.changed_by_fk = g.user.id def pre_add(self, item: Any) -> None: item.password = generate_password_hash(item.password) class UserStatsChartView(DirectByChartView): chart_title = lazy_gettext("User Statistics") label_columns = { "username": lazy_gettext("User Name"), "login_count": lazy_gettext("Login count"), "fail_login_count": lazy_gettext("Failed login count"), } search_columns = UserModelView.search_columns definitions = [ {"label": "Login Count", "group": "username", "series": ["login_count"]}, { "label": "Failed Login Count", "group": "username", "series": ["fail_login_count"], }, ] class RoleListWidget(ListWidget): template = "appbuilder/general/widgets/roles/list.html" def __init__(self, **kwargs): kwargs["appbuilder"] = current_app.appbuilder super().__init__(**kwargs) class RoleShowWidget(ShowWidget): template = "appbuilder/general/widgets/roles/show.html" def __init__(self, **kwargs): kwargs["appbuilder"] = current_app.appbuilder super().__init__(**kwargs) class RoleModelView(ModelView): route_base = "/roles" list_title = lazy_gettext("List Roles") show_title = lazy_gettext("Show Role") add_title = lazy_gettext("Add Role") edit_title = lazy_gettext("Edit Role") list_widget = RoleListWidget show_widget = RoleShowWidget label_columns = { "name": lazy_gettext("Name"), "permissions": lazy_gettext("Permissions"), } list_columns = ["name", "permissions"] show_columns = ["name", "permissions"] edit_columns = ["name", "permissions"] add_columns = edit_columns order_columns = ["name"] @action( "copyrole", lazy_gettext("Copy Role"), lazy_gettext("Copy the selected roles?"), icon="fa-copy", single=False, ) def copy_role(self, items): self.update_redirect() for item in items: new_role = item.__class__() new_role.name = item.name new_role.permissions = item.permissions new_role.name = new_role.name + " copy" self.datamodel.add(new_role) return redirect(self.get_redirect()) class RegisterUserModelView(ModelView): route_base = "/registeruser" base_permissions = ["can_list", "can_show", "can_delete"] list_title = lazy_gettext("List of Registration Requests") show_title = lazy_gettext("Show Registration") list_columns = ["username", "registration_date", "email"] show_exclude_columns = ["password"] search_exclude_columns = ["password"] class AuthView(BaseView): route_base = "" login_template = "" invalid_login_message = lazy_gettext("Invalid login. Please try again.") title = lazy_gettext("Sign In") @expose("/login/", methods=["GET", "POST"]) def login(self): pass @expose("/logout/") def logout(self): logout_user() return redirect( self.appbuilder.app.config.get( "LOGOUT_REDIRECT_URL", self.appbuilder.get_url_for_index ) ) class AuthDBView(AuthView): login_template = "appbuilder/general/security/login_db.html" @expose("/login/", methods=["GET", "POST"]) @no_cache def login(self): if g.user is not None and g.user.is_authenticated: return redirect(self.appbuilder.get_url_for_index) form = LoginForm_db() if form.validate_on_submit(): next_url = get_safe_redirect(request.args.get("next", "")) user = self.appbuilder.sm.auth_user_db( form.username.data, form.password.data ) if not user: flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login_with(next_url)) login_user(user, remember=False) return redirect(next_url) return self.render_template( self.login_template, title=self.title, form=form, appbuilder=self.appbuilder ) class AuthLDAPView(AuthView): login_template = "appbuilder/general/security/login_ldap.html" @expose("/login/", methods=["GET", "POST"]) @no_cache def login(self): if g.user is not None and g.user.is_authenticated: return redirect(self.appbuilder.get_url_for_index) form = LoginForm_db() if form.validate_on_submit(): next_url = get_safe_redirect(request.args.get("next", "")) user = self.appbuilder.sm.auth_user_ldap( form.username.data, form.password.data ) if not user: flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login_with(next_url)) login_user(user, remember=False) return redirect(next_url) return self.render_template( self.login_template, title=self.title, form=form, appbuilder=self.appbuilder ) class AuthOIDView(AuthView): login_template = "appbuilder/general/security/login_oid.html" oid_ask_for = ["email"] oid_ask_for_optional: List[str] = [] @expose("/login/", methods=["GET", "POST"]) @no_cache def login(self, flag=True) -> WerkzeugResponse: @self.appbuilder.sm.oid.loginhandler def login_handler(self): if g.user is not None and g.user.is_authenticated: return redirect(self.appbuilder.get_url_for_index) form = LoginForm_oid() if form.validate_on_submit(): session["remember_me"] = form.remember_me.data identity_url = self.appbuilder.sm.get_oid_identity_url(form.openid.data) if identity_url is None: flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login) return self.appbuilder.sm.oid.try_login( identity_url, ask_for=self.oid_ask_for, ask_for_optional=self.oid_ask_for_optional, ) return self.render_template( self.login_template, title=self.title, form=form, providers=self.appbuilder.sm.openid_providers, appbuilder=self.appbuilder, ) @self.appbuilder.sm.oid.after_login def after_login(resp): if resp.email is None or resp.email == "": flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login) user = self.appbuilder.sm.auth_user_oid(resp.email) if user is None: flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login) remember_me = False if "remember_me" in session: remember_me = session["remember_me"] session.pop("remember_me", None) log.warning( "AUTH_OID is deprecated and will be removed in version 5. " "Migrate to other authentication methods." ) login_user(user, remember=remember_me) next_url = request.args.get("next", "") return redirect(get_safe_redirect(next_url)) return login_handler(self) class AuthOAuthView(AuthView): login_template = "appbuilder/general/security/login_oauth.html" @expose("/login/") @expose("/login/") def login(self, provider: Optional[str] = None) -> WerkzeugResponse: log.debug("Provider: %s", provider) if g.user is not None and g.user.is_authenticated: log.debug("Already authenticated %s", g.user) return redirect(self.appbuilder.get_url_for_index) if provider is None: return self.render_template( self.login_template, providers=self.appbuilder.sm.oauth_providers, title=self.title, appbuilder=self.appbuilder, ) log.debug("Going to call authorize for: %s", provider) random_state = generate_random_string() state = jwt.encode( request.args.to_dict(flat=False), random_state, algorithm="HS256" ) session["oauth_state"] = random_state try: if provider == "twitter": return self.appbuilder.sm.oauth_remotes[provider].authorize_redirect( redirect_uri=url_for( ".oauth_authorized", provider=provider, _external=True, state=state, ) ) else: return self.appbuilder.sm.oauth_remotes[provider].authorize_redirect( redirect_uri=url_for( ".oauth_authorized", provider=provider, _external=True ), state=state.decode("ascii") if isinstance(state, bytes) else state, ) except Exception as e: log.error("Error on OAuth authorize: %s", e) flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_index) @expose("/oauth-authorized/") def oauth_authorized(self, provider: str) -> WerkzeugResponse: log.debug("Authorized init") if provider not in self.appbuilder.sm.oauth_remotes: flash("Provider not supported.", "warning") log.warning("OAuth authorized got an unknown provider %s", provider) return redirect(self.appbuilder.get_url_for_login) try: resp = self.appbuilder.sm.oauth_remotes[provider].authorize_access_token() except Exception as e: log.error("Error authorizing OAuth access token: %s", e) flash("The request to sign in was denied.", "error") return redirect(self.appbuilder.get_url_for_login) if resp is None: flash("You denied the request to sign in.", "warning") return redirect(self.appbuilder.get_url_for_login) log.debug("OAUTH Authorized resp: %s", resp) # Retrieves specific user info from the provider try: self.appbuilder.sm.set_oauth_session(provider, resp) userinfo = self.appbuilder.sm.oauth_user_info(provider, resp) except Exception as e: log.error("Error returning OAuth user info: %s", e) user = None else: log.debug("User info retrieved from %s: %s", provider, userinfo) # User email is not whitelisted if provider in self.appbuilder.sm.oauth_whitelists: whitelist = self.appbuilder.sm.oauth_whitelists[provider] allow = False for email in whitelist: if "email" in userinfo and re.search(email, userinfo["email"]): allow = True break if not allow: flash("You are not authorized.", "warning") return redirect(self.appbuilder.get_url_for_login) else: log.debug("No whitelist for OAuth provider") user = self.appbuilder.sm.auth_user_oauth(userinfo) if user is None: flash(as_unicode(self.invalid_login_message), "warning") return redirect(self.appbuilder.get_url_for_login) else: try: state = jwt.decode( request.args["state"], session["oauth_state"], algorithms=["HS256"] ) except (jwt.InvalidTokenError, KeyError): flash(as_unicode("Invalid state signature"), "warning") return redirect(self.appbuilder.get_url_for_login) login_user(user) next_url = self.appbuilder.get_url_for_index # Check if there is a next url on state if "next" in state and len(state["next"]) > 0: next_url = get_safe_redirect(state["next"][0]) return redirect(next_url) class AuthRemoteUserView(AuthView): login_template = "" @expose("/login/") def login(self) -> WerkzeugResponse: username = request.environ.get(self.appbuilder.sm.auth_remote_user_env_var) if g.user is not None and g.user.is_authenticated: next_url = request.args.get("next", "") return redirect(get_safe_redirect(next_url)) if username: user = self.appbuilder.sm.auth_user_remote_user(username) if user is None: flash(as_unicode(self.invalid_login_message), "warning") else: login_user(user) else: flash(as_unicode(self.invalid_login_message), "warning") next_url = request.args.get("next", "") return redirect(get_safe_redirect(next_url))