diff --git a/cmdb-api/api/lib/common_setting/acl.py b/cmdb-api/api/lib/common_setting/acl.py index 67774ce..5591bc0 100644 --- a/cmdb-api/api/lib/common_setting/acl.py +++ b/cmdb-api/api/lib/common_setting/acl.py @@ -10,6 +10,11 @@ from api.lib.perm.acl.role import RoleCRUD, RoleRelationCRUD from api.lib.perm.acl.user import UserCRUD +def validate_app(app_id): + app = AppCache.get(app_id) + return app.id if app else None + + class ACLManager(object): def __init__(self, app_name='acl', uid=None): self.log = current_app.logger @@ -133,7 +138,8 @@ class ACLManager(object): numfound, res = ResourceCRUD.search(q, u, self.validate_app().id, rt_id, page, page_size) return res - def grant_resource(self, rid, resource_id, perms): + @staticmethod + def grant_resource(rid, resource_id, perms): PermissionCRUD.grant(rid, perms, resource_id=resource_id, group_id=None) @staticmethod @@ -141,3 +147,7 @@ class ACLManager(object): rt = AppCRUD.add(**payload) return rt.to_dict() + + def role_has_perms(self, rid, resource_name, resource_type_name, perm): + app_id = validate_app(self.app_name) + return RoleCRUD.has_permission(rid, resource_name, resource_type_name, app_id, perm) diff --git a/cmdb-api/api/lib/common_setting/const.py b/cmdb-api/api/lib/common_setting/const.py index a68dcbf..9f654fe 100644 --- a/cmdb-api/api/lib/common_setting/const.py +++ b/cmdb-api/api/lib/common_setting/const.py @@ -64,3 +64,9 @@ MIMEExtMap = { 'text/plain': '.txt', 'text/csv': '.csv', } + + +class RoleType(BaseEnum): + System = 'system' + Technician = 'technician' + User = 'user' diff --git a/cmdb-api/api/lib/common_setting/decorator.py b/cmdb-api/api/lib/common_setting/decorator.py new file mode 100644 index 0000000..f2aa249 --- /dev/null +++ b/cmdb-api/api/lib/common_setting/decorator.py @@ -0,0 +1,35 @@ +import functools + +from flask import abort, session +from api.lib.common_setting.acl import ACLManager +from api.lib.common_setting.resp_format import ErrFormat + + +def perms_role_required(app_name, resource_type_name, resource_name, perm, role_name=None): + def decorator_perms_role_required(func): + @functools.wraps(func) + def wrapper_required(*args, **kwargs): + acl = ACLManager(app_name) + has_perms = False + try: + has_perms = acl.role_has_perms(session["acl"]['rid'], resource_name, resource_type_name, perm) + except Exception as e: + # resource_type not exist, continue check role + if role_name: + if role_name not in session.get("acl", {}).get("parentRoles", []): + abort(403, ErrFormat.role_required.format(role_name)) + else: + abort(403, ErrFormat.resource_no_permission.format(resource_name, perm)) + + if not has_perms: + if role_name: + if role_name not in session.get("acl", {}).get("parentRoles", []): + abort(403, ErrFormat.role_required.format(role_name)) + else: + abort(403, ErrFormat.resource_no_permission.format(resource_name, perm)) + + return func(*args, **kwargs) + + return wrapper_required + + return decorator_perms_role_required diff --git a/cmdb-api/api/lib/common_setting/resp_format.py b/cmdb-api/api/lib/common_setting/resp_format.py index 676e17b..6561d65 100644 --- a/cmdb-api/api/lib/common_setting/resp_format.py +++ b/cmdb-api/api/lib/common_setting/resp_format.py @@ -80,3 +80,5 @@ class ErrFormat(CommonErrFormat): ldap_test_username_required = _l("LDAP test username required") # LDAP测试用户名必填 company_wide = _l("Company wide") # 全公司 + + resource_no_permission = _l("No permission to access resource {}, perm {} ") # 没有权限访问 {} 资源的 {} 权限" diff --git a/cmdb-api/api/lib/common_setting/role_perm_base.py b/cmdb-api/api/lib/common_setting/role_perm_base.py new file mode 100644 index 0000000..0eb0279 --- /dev/null +++ b/cmdb-api/api/lib/common_setting/role_perm_base.py @@ -0,0 +1,128 @@ +from api.lib.common_setting.const import RoleType + + +class OperationPermission(object): + + def __init__(self, resource_perms): + for _r in resource_perms: + setattr(self, f"{_r['page']}", _r['page']) + for _p in _r['perms']: + setattr(self, f"{_p}", _p) + + +class BaseApp(object): + resource_type_name = 'OperationPermission' + all_resource_perms = [] + + def __init__(self): + self.admin_name = None + self.roles = [] + self.app_name = 'acl' + self.require_create_resource_type = self.resource_type_name + self.extra_create_resource_type_list = [] + + self.op = None + + @staticmethod + def format_role(role_name, role_type, acl_rid, resource_perms, description=''): + return dict( + role_name=role_name, + role_type=role_type, + acl_rid=acl_rid, + description=description, + resource_perms=resource_perms, + ) + + +class CMDBApp(BaseApp): + all_resource_perms = [ + {"page": "Big_Screen", "page_cn": "大屏", "perms": ["read"]}, + {"page": "Dashboard", "page_cn": "仪表盘", "perms": ["read"]}, + {"page": "Resource_Search", "page_cn": "资源搜索", "perms": ["read"]}, + {"page": "Auto_Discovery_Pool", "page_cn": "自动发现池", "perms": ["read"]}, + {"page": "My_Subscriptions", "page_cn": "我的订阅", "perms": ["read"]}, + {"page": "Bulk_Import", "page_cn": "批量导入", "perms": ["read"]}, + {"page": "Model_Configuration", "page_cn": "模型配置", + "perms": ["read", "create_CIType", "create_CIType_group", "update_CIType_group", + "delete_CIType_group", "download_CIType"]}, + {"page": "Backend_Management", "page_cn": "后台管理", "perms": ["read"]}, + {"page": "Customized_Dashboard", "page_cn": "定制仪表盘", "perms": ["read"]}, + {"page": "Service_Tree_Definition", "page_cn": "服务树定义", "perms": ["read"]}, + {"page": "Model_Relationships", "page_cn": "模型关系", "perms": ["read"]}, + {"page": "Operation_Audit", "page_cn": "操作审计", "perms": ["read"]}, + {"page": "Relationship_Types", "page_cn": "关系类型", "perms": ["read"]}, + {"page": "Auto_Discovery", "page_cn": "自动发现", "perms": ["read"]}] + + def __init__(self): + super().__init__() + + self.admin_name = 'cmdb_admin' + self.app_name = 'cmdb' + self.roles = self.parse_roles() + + self.op = OperationPermission(self.all_resource_perms) + + def parse_roles(self): + return [self.cmdb_admin_role, self.cmdb_technician, self.cmdb_user] + + @property + def cmdb_admin_role(self): + return self.format_role( + 'CMDB管理员', RoleType.System, 0, self.all_resource_perms + ) + + @property + def cmdb_technician(self): + resource_perms_map = dict( + Big_Screen=["read"], + Dashboard=["read"], + Resource_Search=["read"], + Auto_Discovery_Pool=["read"], + My_Subscriptions=["read"], + Bulk_Import=["read"], + Model_Configuration=["read", "create_CIType"], + Backend_Management=[], + Customized_Dashboard=[], + Service_Tree_Definition=[], + Model_Relationships=[], + Operation_Audit=[], + Relationship_Types=[], + Auto_Discovery=[] + ) + resource_perms = [] + for _page, _perms in resource_perms_map.items(): + resource_perms.append( + dict(page=_page, perms=_perms) + ) + + return self.format_role( + 'CMDB技术员', RoleType.Technician, 0, resource_perms + ) + + @property + def cmdb_user(self): + resource_perms_map = dict( + Big_Screen=["read"], + Dashboard=["read"], + Resource_Search=["read"], + Auto_Discovery_Pool=["read"], + My_Subscriptions=["read"], + Bulk_Import=["read"], + Model_Configuration=[], + Backend_Management=[], + Customized_Dashboard=[], + Service_Tree_Definition=[], + Model_Relationships=[], + Operation_Audit=[], + Relationship_Types=[], + Auto_Discovery=[] + ) + resource_perms = [] + for _page, _perms in resource_perms_map.items(): + resource_perms.append( + dict(page=_page, perms=_perms) + ) + + return self.format_role( + 'CMDB用户', RoleType.User, 0, resource_perms + )