manager.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. from datetime import datetime
  2. import json
  3. import logging
  4. from typing import Dict, List, Optional, Tuple, Union
  5. import uuid
  6. from sqlalchemy import and_, func, literal, update
  7. from sqlalchemy.engine.reflection import Inspector
  8. from sqlalchemy.orm import contains_eager
  9. from sqlalchemy.orm.exc import MultipleResultsFound
  10. from werkzeug.security import generate_password_hash
  11. from .apis import PermissionApi, PermissionViewMenuApi, RoleApi, UserApi, ViewMenuApi
  12. from .models import (
  13. assoc_permissionview_role,
  14. Permission,
  15. PermissionView,
  16. RegisterUser,
  17. Role,
  18. User,
  19. ViewMenu,
  20. )
  21. from ..manager import BaseSecurityManager
  22. from ... import const as c
  23. from ...models.sqla import Base
  24. from ...models.sqla.interface import SQLAInterface
  25. log = logging.getLogger(__name__)
  26. class SecurityManager(BaseSecurityManager):
  27. """
  28. Responsible for authentication, registering security views,
  29. role and permission auto management
  30. If you want to change anything just inherit and override, then
  31. pass your own security manager to AppBuilder.
  32. """
  33. user_model = User
  34. """ Override to set your own User Model """
  35. role_model = Role
  36. """ Override to set your own Role Model """
  37. permission_model = Permission
  38. viewmenu_model = ViewMenu
  39. permissionview_model = PermissionView
  40. registeruser_model = RegisterUser
  41. # APIs
  42. permission_api = PermissionApi
  43. role_api = RoleApi
  44. user_api = UserApi
  45. view_menu_api = ViewMenuApi
  46. permission_view_menu_api = PermissionViewMenuApi
  47. def __init__(self, appbuilder):
  48. """
  49. SecurityManager contructor
  50. param appbuilder:
  51. F.A.B AppBuilder main object
  52. """
  53. super(SecurityManager, self).__init__(appbuilder)
  54. user_datamodel = SQLAInterface(self.user_model)
  55. if self.auth_type == c.AUTH_DB:
  56. self.userdbmodelview.datamodel = user_datamodel
  57. elif self.auth_type == c.AUTH_LDAP:
  58. self.userldapmodelview.datamodel = user_datamodel
  59. elif self.auth_type == c.AUTH_OID:
  60. self.useroidmodelview.datamodel = user_datamodel
  61. elif self.auth_type == c.AUTH_OAUTH:
  62. self.useroauthmodelview.datamodel = user_datamodel
  63. elif self.auth_type == c.AUTH_REMOTE_USER:
  64. self.userremoteusermodelview.datamodel = user_datamodel
  65. if self.userstatschartview:
  66. self.userstatschartview.datamodel = user_datamodel
  67. if self.auth_user_registration:
  68. self.registerusermodelview.datamodel = SQLAInterface(
  69. self.registeruser_model
  70. )
  71. self.rolemodelview.datamodel = SQLAInterface(self.role_model)
  72. self.permissionmodelview.datamodel = SQLAInterface(self.permission_model)
  73. self.viewmenumodelview.datamodel = SQLAInterface(self.viewmenu_model)
  74. self.permissionviewmodelview.datamodel = SQLAInterface(
  75. self.permissionview_model
  76. )
  77. self.create_db()
  78. @property
  79. def get_session(self):
  80. return self.appbuilder.get_session
  81. def register_views(self):
  82. super(SecurityManager, self).register_views()
  83. if self.appbuilder.app.config.get("FAB_ADD_SECURITY_API", False):
  84. self.appbuilder.add_api(self.permission_api)
  85. self.appbuilder.add_api(self.role_api)
  86. self.appbuilder.add_api(self.user_api)
  87. self.appbuilder.add_api(self.view_menu_api)
  88. self.appbuilder.add_api(self.permission_view_menu_api)
  89. def create_db(self):
  90. try:
  91. engine = self.get_session.get_bind(mapper=None, clause=None)
  92. inspector = Inspector.from_engine(engine)
  93. if "ab_user" not in inspector.get_table_names():
  94. log.info(c.LOGMSG_INF_SEC_NO_DB)
  95. Base.metadata.create_all(engine)
  96. log.info(c.LOGMSG_INF_SEC_ADD_DB)
  97. super(SecurityManager, self).create_db()
  98. except Exception as e:
  99. log.error(c.LOGMSG_ERR_SEC_CREATE_DB, e)
  100. exit(1)
  101. def find_register_user(self, registration_hash):
  102. return (
  103. self.get_session.query(self.registeruser_model)
  104. .filter(self.registeruser_model.registration_hash == registration_hash)
  105. .scalar()
  106. )
  107. def add_register_user(
  108. self, username, first_name, last_name, email, password="", hashed_password=""
  109. ):
  110. """
  111. Add a registration request for the user.
  112. :rtype : RegisterUser
  113. """
  114. register_user = self.registeruser_model()
  115. register_user.username = username
  116. register_user.email = email
  117. register_user.first_name = first_name
  118. register_user.last_name = last_name
  119. if hashed_password:
  120. register_user.password = hashed_password
  121. else:
  122. register_user.password = generate_password_hash(password)
  123. register_user.registration_hash = str(uuid.uuid1())
  124. try:
  125. self.get_session.add(register_user)
  126. self.get_session.commit()
  127. return register_user
  128. except Exception as e:
  129. log.error(c.LOGMSG_ERR_SEC_ADD_REGISTER_USER, e)
  130. self.appbuilder.get_session.rollback()
  131. return None
  132. def del_register_user(self, register_user):
  133. """
  134. Deletes registration object from database
  135. :param register_user: RegisterUser object to delete
  136. """
  137. try:
  138. self.get_session.delete(register_user)
  139. self.get_session.commit()
  140. return True
  141. except Exception as e:
  142. log.error(c.LOGMSG_ERR_SEC_DEL_REGISTER_USER, e)
  143. self.get_session.rollback()
  144. return False
  145. def find_user(self, username=None, email=None):
  146. """
  147. Finds user by username or email
  148. """
  149. if username:
  150. try:
  151. if self.auth_username_ci:
  152. return (
  153. self.get_session.query(self.user_model)
  154. .filter(
  155. func.lower(self.user_model.username) == func.lower(username)
  156. )
  157. .one_or_none()
  158. )
  159. else:
  160. return (
  161. self.get_session.query(self.user_model)
  162. .filter(self.user_model.username == username)
  163. .one_or_none()
  164. )
  165. except MultipleResultsFound:
  166. log.error("Multiple results found for user %s", username)
  167. return None
  168. elif email:
  169. try:
  170. return (
  171. self.get_session.query(self.user_model)
  172. .filter_by(email=email)
  173. .one_or_none()
  174. )
  175. except MultipleResultsFound:
  176. log.error("Multiple results found for user with email %s", email)
  177. return None
  178. def get_all_users(self):
  179. return self.get_session.query(self.user_model).all()
  180. def add_user(
  181. self,
  182. username,
  183. first_name,
  184. last_name,
  185. email,
  186. role,
  187. password="",
  188. hashed_password="",
  189. ):
  190. """
  191. Generic function to create user
  192. """
  193. try:
  194. user = self.user_model()
  195. user.first_name = first_name
  196. user.last_name = last_name
  197. user.username = username
  198. user.email = email
  199. user.active = True
  200. user.roles = role if isinstance(role, list) else [role]
  201. if hashed_password:
  202. user.password = hashed_password
  203. else:
  204. user.password = generate_password_hash(password)
  205. self.get_session.add(user)
  206. self.get_session.commit()
  207. log.info(c.LOGMSG_INF_SEC_ADD_USER, username)
  208. return user
  209. except Exception as e:
  210. log.error(c.LOGMSG_ERR_SEC_ADD_USER, e)
  211. self.get_session.rollback()
  212. return False
  213. def count_users(self):
  214. return self.get_session.query(func.count(self.user_model.id)).scalar()
  215. def update_user(self, user):
  216. try:
  217. self.get_session.merge(user)
  218. self.get_session.commit()
  219. log.info(c.LOGMSG_INF_SEC_UPD_USER, user)
  220. except Exception as e:
  221. log.error(c.LOGMSG_ERR_SEC_UPD_USER, e)
  222. self.get_session.rollback()
  223. return False
  224. def get_user_by_id(self, pk):
  225. return self.get_session.query(self.user_model).get(pk)
  226. def get_first_user(self) -> "User":
  227. return self.get_session.query(self.user_model).first()
  228. def noop_user_update(self, user: "User") -> None:
  229. stmt = (
  230. update(self.user_model)
  231. .where(self.user_model.id == user.id)
  232. .values(login_count=user.login_count)
  233. )
  234. self.get_session.execute(stmt)
  235. self.get_session.commit()
  236. """
  237. -----------------------
  238. PERMISSION MANAGEMENT
  239. -----------------------
  240. """
  241. def add_role(
  242. self, name: str, permissions: Optional[List[PermissionView]] = None
  243. ) -> Optional[Role]:
  244. if not permissions:
  245. permissions = []
  246. role = self.find_role(name)
  247. if role is None:
  248. try:
  249. role = self.role_model()
  250. role.name = name
  251. role.permissions = permissions
  252. self.get_session.add(role)
  253. self.get_session.commit()
  254. log.info(c.LOGMSG_INF_SEC_ADD_ROLE, name)
  255. return role
  256. except Exception as e:
  257. log.error(c.LOGMSG_ERR_SEC_ADD_ROLE, e)
  258. self.get_session.rollback()
  259. return role
  260. def update_role(self, pk, name: str) -> Optional[Role]:
  261. role = self.get_session.query(self.role_model).get(pk)
  262. if not role:
  263. return
  264. try:
  265. role.name = name
  266. self.get_session.merge(role)
  267. self.get_session.commit()
  268. log.info(c.LOGMSG_INF_SEC_UPD_ROLE, role)
  269. except Exception as e:
  270. log.error(c.LOGMSG_ERR_SEC_UPD_ROLE, e)
  271. self.get_session.rollback()
  272. return
  273. def find_role(self, name):
  274. return (
  275. self.get_session.query(self.role_model).filter_by(name=name).one_or_none()
  276. )
  277. def get_all_roles(self):
  278. return self.get_session.query(self.role_model).all()
  279. def get_public_role(self):
  280. return (
  281. self.get_session.query(self.role_model)
  282. .filter_by(name=self.auth_role_public)
  283. .one_or_none()
  284. )
  285. def get_public_permissions(self):
  286. role = self.get_public_role()
  287. if role:
  288. return role.permissions
  289. return []
  290. def find_permission(self, name):
  291. """
  292. Finds and returns a Permission by name
  293. """
  294. return (
  295. self.get_session.query(self.permission_model)
  296. .filter_by(name=name)
  297. .one_or_none()
  298. )
  299. def exist_permission_on_roles(
  300. self, view_name: str, permission_name: str, role_ids: List[int]
  301. ) -> bool:
  302. """
  303. Method to efficiently check if a certain permission exists
  304. on a list of role id's. This is used by `has_access`
  305. :param view_name: The view's name to check if exists on one of the roles
  306. :param permission_name: The permission name to check if exists
  307. :param role_ids: a list of Role ids
  308. :return: Boolean
  309. """
  310. q = (
  311. self.appbuilder.get_session.query(self.permissionview_model)
  312. .join(
  313. assoc_permissionview_role,
  314. and_(
  315. (
  316. self.permissionview_model.id
  317. == assoc_permissionview_role.c.permission_view_id
  318. )
  319. ),
  320. )
  321. .join(self.role_model)
  322. .join(self.permission_model)
  323. .join(self.viewmenu_model)
  324. .filter(
  325. self.viewmenu_model.name == view_name,
  326. self.permission_model.name == permission_name,
  327. self.role_model.id.in_(role_ids),
  328. )
  329. .exists()
  330. )
  331. # Special case for MSSQL/Oracle (works on PG and MySQL > 8)
  332. if self.appbuilder.get_session.bind.dialect.name in ("mssql", "oracle"):
  333. return self.appbuilder.get_session.query(literal(True)).filter(q).scalar()
  334. return self.appbuilder.get_session.query(q).scalar()
  335. def find_roles_permission_view_menus(
  336. self, permission_name: str, role_ids: List[int]
  337. ):
  338. return (
  339. self.appbuilder.get_session.query(self.permissionview_model)
  340. .join(
  341. assoc_permissionview_role,
  342. and_(
  343. (
  344. self.permissionview_model.id
  345. == assoc_permissionview_role.c.permission_view_id
  346. )
  347. ),
  348. )
  349. .join(self.role_model)
  350. .join(self.permission_model)
  351. .join(self.viewmenu_model)
  352. .filter(
  353. self.permission_model.name == permission_name,
  354. self.role_model.id.in_(role_ids),
  355. )
  356. ).all()
  357. def get_user_roles_permissions(self, user) -> Dict[str, List[Tuple[str, str]]]:
  358. """
  359. Utility method for fetching all roles and permissions for a specific user.
  360. Example of the returned data:
  361. ```
  362. {
  363. 'Admin': [
  364. ('can_this_form_get', 'ResetPasswordView'),
  365. ('can_this_form_post', 'ResetPasswordView'),
  366. ...
  367. ]
  368. 'EmptyRole': [],
  369. }
  370. ```
  371. """
  372. if not user.roles:
  373. raise AttributeError("User object does not have roles")
  374. result: Dict[str, List[Tuple[str, str]]] = {}
  375. db_roles_ids = []
  376. for role in user.roles:
  377. # Make sure all db roles are included on the result
  378. result[role.name] = []
  379. if role.name in self.builtin_roles:
  380. for permission in self.builtin_roles[role.name]:
  381. result[role.name].append((permission[1], permission[0]))
  382. else:
  383. db_roles_ids.append(role.id)
  384. permission_views = (
  385. self.appbuilder.get_session.query(PermissionView)
  386. .join(Permission)
  387. .join(ViewMenu)
  388. .join(PermissionView.role)
  389. .filter(Role.id.in_(db_roles_ids))
  390. .options(contains_eager(PermissionView.permission))
  391. .options(contains_eager(PermissionView.view_menu))
  392. .options(contains_eager(PermissionView.role))
  393. ).all()
  394. for permission_view in permission_views:
  395. for role_item in permission_view.role:
  396. if role_item.name in result:
  397. result[role_item.name].append(
  398. (
  399. permission_view.permission.name,
  400. permission_view.view_menu.name,
  401. )
  402. )
  403. return result
  404. def get_db_role_permissions(self, role_id: int) -> List[PermissionView]:
  405. """
  406. Get all DB permissions from a role (one single query)
  407. """
  408. return (
  409. self.appbuilder.get_session.query(PermissionView)
  410. .join(Permission)
  411. .join(ViewMenu)
  412. .join(PermissionView.role)
  413. .filter(Role.id == role_id)
  414. .options(contains_eager(PermissionView.permission))
  415. .options(contains_eager(PermissionView.view_menu))
  416. .all()
  417. )
  418. def add_permission(self, name):
  419. """
  420. Adds a permission to the backend, model permission
  421. :param name:
  422. name of the permission: 'can_add','can_edit' etc...
  423. """
  424. perm = self.find_permission(name)
  425. if perm is None:
  426. try:
  427. perm = self.permission_model()
  428. perm.name = name
  429. self.get_session.add(perm)
  430. self.get_session.commit()
  431. return perm
  432. except Exception as e:
  433. log.error(c.LOGMSG_ERR_SEC_ADD_PERMISSION, e)
  434. self.get_session.rollback()
  435. return perm
  436. def del_permission(self, name: str) -> bool:
  437. """
  438. Deletes a permission from the backend, model permission
  439. :param name:
  440. name of the permission: 'can_add','can_edit' etc...
  441. """
  442. perm = self.find_permission(name)
  443. if not perm:
  444. log.warning(c.LOGMSG_WAR_SEC_DEL_PERMISSION, name)
  445. return False
  446. try:
  447. pvms = (
  448. self.get_session.query(self.permissionview_model)
  449. .filter(self.permissionview_model.permission == perm)
  450. .all()
  451. )
  452. if pvms:
  453. log.warning(c.LOGMSG_WAR_SEC_DEL_PERM_PVM, perm, pvms)
  454. return False
  455. self.get_session.delete(perm)
  456. self.get_session.commit()
  457. return True
  458. except Exception as e:
  459. log.error(c.LOGMSG_ERR_SEC_DEL_PERMISSION, e)
  460. self.get_session.rollback()
  461. return False
  462. """
  463. ----------------------
  464. PRIMITIVES VIEW MENU
  465. ----------------------
  466. """
  467. def find_view_menu(self, name):
  468. """
  469. Finds and returns a ViewMenu by name
  470. """
  471. return (
  472. self.get_session.query(self.viewmenu_model)
  473. .filter_by(name=name)
  474. .one_or_none()
  475. )
  476. def get_all_view_menu(self):
  477. return self.get_session.query(self.viewmenu_model).all()
  478. def add_view_menu(self, name):
  479. """
  480. Adds a view or menu to the backend, model view_menu
  481. param name:
  482. name of the view menu to add
  483. """
  484. view_menu = self.find_view_menu(name)
  485. if view_menu is None:
  486. try:
  487. view_menu = self.viewmenu_model()
  488. view_menu.name = name
  489. self.get_session.add(view_menu)
  490. self.get_session.commit()
  491. return view_menu
  492. except Exception as e:
  493. log.error(c.LOGMSG_ERR_SEC_ADD_VIEWMENU, e)
  494. self.get_session.rollback()
  495. return view_menu
  496. def del_view_menu(self, name: str) -> bool:
  497. """
  498. Deletes a ViewMenu from the backend
  499. :param name:
  500. name of the ViewMenu
  501. """
  502. view_menu = self.find_view_menu(name)
  503. if not view_menu:
  504. log.warning(c.LOGMSG_WAR_SEC_DEL_VIEWMENU, name)
  505. return False
  506. try:
  507. pvms = (
  508. self.get_session.query(self.permissionview_model)
  509. .filter(self.permissionview_model.view_menu == view_menu)
  510. .all()
  511. )
  512. if pvms:
  513. log.warning(c.LOGMSG_WAR_SEC_DEL_VIEWMENU_PVM, view_menu, pvms)
  514. return False
  515. self.get_session.delete(view_menu)
  516. self.get_session.commit()
  517. return True
  518. except Exception as e:
  519. log.error(c.LOGMSG_ERR_SEC_DEL_PERMISSION, e)
  520. self.get_session.rollback()
  521. return False
  522. """
  523. ----------------------
  524. PERMISSION VIEW MENU
  525. ----------------------
  526. """
  527. def find_permission_view_menu(self, permission_name, view_menu_name):
  528. """
  529. Finds and returns a PermissionView by names
  530. """
  531. permission = self.find_permission(permission_name)
  532. view_menu = self.find_view_menu(view_menu_name)
  533. if permission and view_menu:
  534. return (
  535. self.get_session.query(self.permissionview_model)
  536. .filter_by(permission=permission, view_menu=view_menu)
  537. .one_or_none()
  538. )
  539. def find_permissions_view_menu(self, view_menu):
  540. """
  541. Finds all permissions from ViewMenu, returns list of PermissionView
  542. :param view_menu: ViewMenu object
  543. :return: list of PermissionView objects
  544. """
  545. return (
  546. self.get_session.query(self.permissionview_model)
  547. .filter_by(view_menu_id=view_menu.id)
  548. .all()
  549. )
  550. def add_permission_view_menu(self, permission_name, view_menu_name):
  551. """
  552. Adds a permission on a view or menu to the backend
  553. :param permission_name:
  554. name of the permission to add: 'can_add','can_edit' etc...
  555. :param view_menu_name:
  556. name of the view menu to add
  557. """
  558. if not (permission_name and view_menu_name):
  559. return None
  560. pv = self.find_permission_view_menu(permission_name, view_menu_name)
  561. if pv:
  562. return pv
  563. vm = self.add_view_menu(view_menu_name)
  564. perm = self.add_permission(permission_name)
  565. pv = self.permissionview_model()
  566. pv.view_menu, pv.permission = vm, perm
  567. try:
  568. self.get_session.add(pv)
  569. self.get_session.commit()
  570. log.info(c.LOGMSG_INF_SEC_ADD_PERMVIEW, pv)
  571. return pv
  572. except Exception as e:
  573. log.error(c.LOGMSG_ERR_SEC_ADD_PERMVIEW, e)
  574. self.get_session.rollback()
  575. def del_permission_view_menu(self, permission_name, view_menu_name, cascade=True):
  576. if not (permission_name and view_menu_name):
  577. return
  578. pv = self.find_permission_view_menu(permission_name, view_menu_name)
  579. if not pv:
  580. return
  581. roles_pvs = (
  582. self.get_session.query(self.role_model)
  583. .filter(self.role_model.permissions.contains(pv))
  584. .first()
  585. )
  586. if roles_pvs:
  587. log.warning(
  588. c.LOGMSG_WAR_SEC_DEL_PERMVIEW,
  589. view_menu_name,
  590. permission_name,
  591. roles_pvs,
  592. )
  593. return
  594. try:
  595. # delete permission on view
  596. self.get_session.delete(pv)
  597. self.get_session.commit()
  598. # if no more permission on permission view, delete permission
  599. if not cascade:
  600. return
  601. if (
  602. not self.get_session.query(self.permissionview_model)
  603. .filter_by(permission=pv.permission)
  604. .all()
  605. ):
  606. self.del_permission(pv.permission.name)
  607. log.info(c.LOGMSG_INF_SEC_DEL_PERMVIEW, permission_name, view_menu_name)
  608. except Exception as e:
  609. log.error(c.LOGMSG_ERR_SEC_DEL_PERMVIEW, e)
  610. self.get_session.rollback()
  611. def exist_permission_on_views(self, lst, item):
  612. for i in lst:
  613. if i.permission and i.permission.name == item:
  614. return True
  615. return False
  616. def exist_permission_on_view(self, lst, permission, view_menu):
  617. for i in lst:
  618. if i.permission.name == permission and i.view_menu.name == view_menu:
  619. return True
  620. return False
  621. def add_permission_role(self, role, perm_view):
  622. """
  623. Add permission-ViewMenu object to Role
  624. :param role:
  625. The role object
  626. :param perm_view:
  627. The PermissionViewMenu object
  628. """
  629. if perm_view and perm_view not in role.permissions:
  630. try:
  631. role.permissions.append(perm_view)
  632. self.get_session.merge(role)
  633. self.get_session.commit()
  634. log.info(c.LOGMSG_INF_SEC_ADD_PERMROLE, perm_view, role.name)
  635. except Exception as e:
  636. log.error(c.LOGMSG_ERR_SEC_ADD_PERMROLE, e)
  637. self.get_session.rollback()
  638. def del_permission_role(self, role, perm_view):
  639. """
  640. Remove permission-ViewMenu object to Role
  641. :param role:
  642. The role object
  643. :param perm_view:
  644. The PermissionViewMenu object
  645. """
  646. if perm_view in role.permissions:
  647. try:
  648. role.permissions.remove(perm_view)
  649. self.get_session.merge(role)
  650. self.get_session.commit()
  651. log.info(c.LOGMSG_INF_SEC_DEL_PERMROLE, perm_view, role.name)
  652. except Exception as e:
  653. log.error(c.LOGMSG_ERR_SEC_DEL_PERMROLE, e)
  654. self.get_session.rollback()
  655. def export_roles(
  656. self, path: Optional[str] = None, indent: Optional[Union[int, str]] = None
  657. ) -> None:
  658. """Exports roles to JSON file."""
  659. timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
  660. filename = path or f"roles_export_{timestamp}.json"
  661. serialized_roles = []
  662. for role in self.get_all_roles():
  663. serialized_role = {"name": role.name, "permissions": []}
  664. for pvm in role.permissions:
  665. permission = pvm.permission
  666. view_menu = pvm.view_menu
  667. permission_view_menu = {
  668. "permission": {"name": permission.name},
  669. "view_menu": {"name": view_menu.name},
  670. }
  671. serialized_role["permissions"].append(permission_view_menu)
  672. serialized_roles.append(serialized_role)
  673. with open(filename, "w") as fd:
  674. fd.write(json.dumps(serialized_roles, indent=indent))
  675. def import_roles(self, path: str) -> None:
  676. """Imports roles from JSON file."""
  677. session = self.get_session()
  678. with open(path, "r") as fd:
  679. roles_json = json.loads(fd.read())
  680. roles = []
  681. for role_kwargs in roles_json:
  682. role = self.add_role(role_kwargs["name"])
  683. permission_view_menus = [
  684. self.add_permission_view_menu(
  685. permission_name=pvm_kwargs["permission"]["name"],
  686. view_menu_name=pvm_kwargs["view_menu"]["name"],
  687. )
  688. for pvm_kwargs in role_kwargs["permissions"]
  689. ]
  690. for permission_view_menu in permission_view_menus:
  691. if permission_view_menu not in role.permissions:
  692. role.permissions.append(permission_view_menu)
  693. roles.append(role)
  694. session.add_all(roles)
  695. session.commit()