123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- from datetime import datetime
- import json
- import logging
- from typing import List, Optional, Union
- import uuid
- from werkzeug.security import generate_password_hash
- from .models import Permission, PermissionView, RegisterUser, Role, User, ViewMenu
- from ..manager import BaseSecurityManager
- from ... import const as c
- from ...models.mongoengine.interface import MongoEngineInterface
- log = logging.getLogger(__name__)
- class SecurityManager(BaseSecurityManager):
- """
- Responsible for authentication, registering security views,
- role and permission auto management
- If you want to change anything just inherit and override, then
- pass your own security manager to AppBuilder.
- """
- user_model = User
- """ Override to set your own User Model """
- role_model = Role
- """ Override to set your own User Model """
- permission_model = Permission
- viewmenu_model = ViewMenu
- permissionview_model = PermissionView
- registeruser_model = RegisterUser
- def __init__(self, appbuilder):
- """
- SecurityManager contructor
- param appbuilder:
- F.A.B AppBuilder main object
- """
- super(SecurityManager, self).__init__(appbuilder)
- log.warning("MongoEngine is deprecated and will be removed in version 5.")
- user_datamodel = MongoEngineInterface(self.user_model)
- if self.auth_type == c.AUTH_DB:
- self.userdbmodelview.datamodel = user_datamodel
- elif self.auth_type == c.AUTH_LDAP:
- self.userldapmodelview.datamodel = user_datamodel
- elif self.auth_type == c.AUTH_OID:
- self.useroidmodelview.datamodel = user_datamodel
- elif self.auth_type == c.AUTH_OAUTH:
- self.useroauthmodelview.datamodel = user_datamodel
- elif self.auth_type == c.AUTH_REMOTE_USER:
- self.userremoteusermodelview.datamodel = user_datamodel
- self.userstatschartview.datamodel = MongoEngineInterface(self.user_model)
- if self.auth_user_registration:
- self.registerusermodelview.datamodel = MongoEngineInterface(
- self.registeruser_model
- )
- self.rolemodelview.datamodel = MongoEngineInterface(self.role_model)
- self.permissionmodelview.datamodel = MongoEngineInterface(self.permission_model)
- self.viewmenumodelview.datamodel = MongoEngineInterface(self.viewmenu_model)
- self.permissionviewmodelview.datamodel = MongoEngineInterface(
- self.permissionview_model
- )
- self.create_db()
- def find_register_user(self, registration_hash):
- return self.registeruser_model.objects(
- registration_hash=registration_hash
- ).first()
- def add_register_user(
- self, username, first_name, last_name, email, password="", hashed_password=""
- ):
- try:
- register_user = self.registeruser_model()
- register_user.first_name = first_name
- register_user.last_name = last_name
- register_user.username = username
- register_user.email = email
- if hashed_password:
- register_user.password = hashed_password
- else:
- register_user.password = generate_password_hash(password)
- register_user.registration_hash = str(uuid.uuid1())
- register_user.save()
- return register_user
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_REGISTER_USER, e)
- return False
- def del_register_user(self, register_user):
- try:
- register_user.delete()
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_DEL_REGISTER_USER, e)
- def find_user(self, username=None, email=None):
- if username:
- return self.user_model.objects(username=username).first()
- elif email:
- return self.user_model.objects(email=email).first()
- def get_all_users(self):
- return User.objects
- def add_user(
- self,
- username,
- first_name,
- last_name,
- email,
- role,
- password="",
- hashed_password="",
- ):
- """
- Generic function to create user
- """
- try:
- user = self.user_model()
- user.first_name = first_name
- user.last_name = last_name
- user.username = username
- user.email = email
- user.active = True
- user.roles = role if isinstance(role, list) else [role]
- if hashed_password:
- user.password = hashed_password
- else:
- user.password = generate_password_hash(password)
- user.save()
- log.info(c.LOGMSG_INF_SEC_ADD_USER, username)
- return user
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_USER, e)
- return False
- def count_users(self):
- return len(self.user_model.objects)
- def update_user(self, user):
- try:
- user.save()
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_UPD_USER, e)
- return False
- def get_user_by_id(self, pk):
- return self.user_model.objects(pk=pk).first()
- def load_user(self, pk):
- return self.get_user_by_id(pk)
- """
- -----------------------
- PERMISSION MANAGEMENT
- -----------------------
- """
- def add_role(
- self, name: str, permissions: Optional[List[PermissionView]] = None
- ) -> Optional[Role]:
- if not permissions:
- permissions = []
- role = self.find_role(name)
- if role is None:
- try:
- role = self.role_model(name=name, permissions=permissions)
- role.save()
- log.info(c.LOGMSG_INF_SEC_ADD_ROLE, name)
- return role
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_ROLE, e)
- return role
- def update_role(self, pk, name: str) -> Optional[Role]:
- try:
- role = self.role_model.objects(id=pk).update(name=name)
- log.info(c.LOGMSG_INF_SEC_UPD_ROLE, role)
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_UPD_ROLE, e)
- return
- def find_role(self, name):
- return self.role_model.objects(name=name).first()
- def get_all_roles(self):
- return self.role_model.objects
- def get_public_permissions(self):
- role = self.find_role(self.auth_role_public)
- return role.permissions
- def find_permission(self, name):
- """
- Finds and returns a Permission by name
- """
- return self.permission_model.objects(name=name).first()
- def exist_permission_on_roles(
- self, view_name: str, permission_name: str, role_ids: List[int]
- ) -> bool:
- for role_id in role_ids:
- role = self.role_model.objects(id=role_id).first()
- permissions = role.permissions
- if permissions:
- for permission in permissions:
- if (view_name == permission.view_menu.name) and (
- permission_name == permission.permission.name
- ):
- return True
- return False
- def add_permission(self, name):
- """
- Adds a permission to the backend, model permission
- :param name:
- name of the permission: 'can_add','can_edit' etc...
- """
- perm = self.find_permission(name)
- if perm is None:
- try:
- perm = self.permission_model(name=name)
- perm.save()
- return perm
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_PERMISSION, e)
- return perm
- def del_permission(self, name):
- """
- Deletes a permission from the backend, model permission
- :param name:
- name of the permission: 'can_add','can_edit' etc...
- """
- perm = self.find_permission(name)
- if perm:
- try:
- perm.delete()
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_DEL_PERMISSION, e)
- """
- ----------------------
- PRIMITIVES VIEW MENU
- ----------------------
- """
- def find_view_menu(self, name):
- """
- Finds and returns a ViewMenu by name
- """
- return self.viewmenu_model.objects(name=name).first()
- def get_all_view_menu(self):
- return self.viewmenu_model.objects
- def add_view_menu(self, name):
- """
- Adds a view or menu to the backend, model view_menu
- param name:
- name of the view menu to add
- """
- view_menu = self.find_view_menu(name)
- if view_menu is None:
- try:
- view_menu = self.viewmenu_model(name=name)
- view_menu.save()
- return view_menu
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_VIEWMENU, e)
- return view_menu
- def del_view_menu(self, name):
- """
- Deletes a ViewMenu from the backend
- :param name:
- name of the ViewMenu
- """
- obj = self.find_view_menu(name)
- if obj:
- try:
- obj.delete()
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_DEL_PERMISSION, e)
- """
- ----------------------
- PERMISSION VIEW MENU
- ----------------------
- """
- def find_permission_view_menu(self, permission_name, view_menu_name):
- """
- Finds and returns a PermissionView by names
- """
- permission = self.find_permission(permission_name)
- view_menu = self.find_view_menu(view_menu_name)
- if permission and view_menu:
- return self.permissionview_model.objects(
- permission=permission, view_menu=view_menu
- ).first()
- def find_permissions_view_menu(self, view_menu):
- """
- Finds all permissions from ViewMenu, returns list of PermissionView
- :param view_menu: ViewMenu object
- :return: list of PermissionView objects
- """
- return self.permissionview_model.objects(view_menu=view_menu)
- def add_permission_view_menu(self, permission_name, view_menu_name):
- """
- Adds a permission on a view or menu to the backend
- :param permission_name:
- name of the permission to add: 'can_add','can_edit' etc...
- :param view_menu_name:
- name of the view menu to add
- """
- if not (permission_name and view_menu_name):
- return None
- pv = self.find_permission_view_menu(permission_name, view_menu_name)
- if pv:
- return pv
- vm = self.add_view_menu(view_menu_name)
- perm = self.add_permission(permission_name)
- pv = self.permissionview_model()
- pv.view_menu, pv.permission = vm, perm
- try:
- pv.save()
- log.info(c.LOGMSG_INF_SEC_ADD_PERMVIEW, pv)
- return pv
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_PERMVIEW, e)
- def del_permission_view_menu(self, permission_name, view_menu_name, cascade=True):
- try:
- pv = self.find_permission_view_menu(permission_name, view_menu_name)
- # delete permission on view
- pv.delete()
- if not cascade:
- return
- # if no more permission on permission view, delete permission
- pv = self.permissionview_model.objects(permission=pv.permission)
- if not pv:
- self.del_permission(pv.permission.name)
- log.info(c.LOGMSG_INF_SEC_DEL_PERMVIEW, permission_name, view_menu_name)
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_DEL_PERMVIEW, e)
- def exist_permission_on_views(self, lst, item):
- for i in lst:
- if i.permission.name == item:
- return True
- return False
- def exist_permission_on_view(self, lst, permission, view_menu):
- for i in lst:
- if i.permission.name == permission and i.view_menu.name == view_menu:
- return True
- return False
- def add_permission_role(self, role, perm_view):
- """
- Add permission-ViewMenu object to Role
- :param role:
- The role object
- :param perm_view:
- The PermissionViewMenu object
- """
- if perm_view and perm_view not in role.permissions:
- try:
- role.permissions.append(perm_view)
- role.save()
- log.info(c.LOGMSG_INF_SEC_ADD_PERMROLE, perm_view, role.name)
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_ADD_PERMROLE, e)
- def del_permission_role(self, role, perm_view):
- """
- Remove permission-ViewMenu object to Role
- :param role:
- The role object
- :param perm_view:
- The PermissionViewMenu object
- """
- if perm_view in role.permissions:
- try:
- role.permissions.remove(perm_view)
- role.save()
- log.info(c.LOGMSG_INF_SEC_DEL_PERMROLE, perm_view, role.name)
- except Exception as e:
- log.error(c.LOGMSG_ERR_SEC_DEL_PERMROLE, e)
- def export_roles(
- self, path: Optional[str] = None, indent: Optional[Union[int, str]] = None
- ) -> None:
- """Exports roles to JSON file."""
- timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
- filename = path or f"roles_export_{timestamp}.json"
- roles = self.get_all_roles()
- serialized_roles = []
- for role in roles:
- serialized_role = {"name": role.name, "permissions": []}
- for pvm in role.permissions:
- permission = pvm.permission
- view_menu = pvm.view_menu
- permission_view_menu = {
- "permission": {"name": permission.name},
- "view_menu": {"name": view_menu.name},
- }
- serialized_role["permissions"].append(permission_view_menu)
- serialized_roles.append(serialized_role)
- with open(filename, "w") as fd:
- fd.write(json.dumps(serialized_roles, indent=indent))
- def import_roles(self, path: str) -> None:
- """Imports roles from JSON file."""
- with open(path, "r") as fd:
- serialized_roles = json.loads(fd.read())
- for role_kwargs in serialized_roles:
- role = self.add_role(role_kwargs["name"])
- permission_view_menus = [
- self.add_permission_view_menu(
- permission_name=pvm_kwargs["permission"]["name"],
- view_menu_name=pvm_kwargs["view_menu"]["name"],
- )
- for pvm_kwargs in role_kwargs["permissions"]
- ]
- role.permissions = permission_view_menus
- role.save()
- def get_first_user(self) -> "User":
- return self.user_model.objects.first()
- def noop_user_update(self, user: "User") -> None:
- pass
|