Merge pull request #499 from simontigers/common_decorator_perms

feat(api): role perm
This commit is contained in:
simontigers 2024-04-28 19:22:43 +08:00 committed by GitHub
commit 3da43b6cef
5 changed files with 182 additions and 1 deletions

View File

@ -10,6 +10,11 @@ from api.lib.perm.acl.role import RoleCRUD, RoleRelationCRUD
from api.lib.perm.acl.user import UserCRUD
def validate_app(app_id):
app = AppCache.get(app_id)
return app.id if app else None
class ACLManager(object):
def __init__(self, app_name='acl', uid=None):
self.log = current_app.logger
@ -133,7 +138,8 @@ class ACLManager(object):
numfound, res = ResourceCRUD.search(q, u, self.validate_app().id, rt_id, page, page_size)
return res
def grant_resource(self, rid, resource_id, perms):
@staticmethod
def grant_resource(rid, resource_id, perms):
PermissionCRUD.grant(rid, perms, resource_id=resource_id, group_id=None)
@staticmethod
@ -141,3 +147,7 @@ class ACLManager(object):
rt = AppCRUD.add(**payload)
return rt.to_dict()
def role_has_perms(self, rid, resource_name, resource_type_name, perm):
app_id = validate_app(self.app_name)
return RoleCRUD.has_permission(rid, resource_name, resource_type_name, app_id, perm)

View File

@ -64,3 +64,9 @@ MIMEExtMap = {
'text/plain': '.txt',
'text/csv': '.csv',
}
class RoleType(BaseEnum):
System = 'system'
Technician = 'technician'
User = 'user'

View File

@ -0,0 +1,35 @@
import functools
from flask import abort, session
from api.lib.common_setting.acl import ACLManager
from api.lib.common_setting.resp_format import ErrFormat
def perms_role_required(app_name, resource_type_name, resource_name, perm, role_name=None):
def decorator_perms_role_required(func):
@functools.wraps(func)
def wrapper_required(*args, **kwargs):
acl = ACLManager(app_name)
has_perms = False
try:
has_perms = acl.role_has_perms(session["acl"]['rid'], resource_name, resource_type_name, perm)
except Exception as e:
# resource_type not exist, continue check role
if role_name:
if role_name not in session.get("acl", {}).get("parentRoles", []):
abort(403, ErrFormat.role_required.format(role_name))
else:
abort(403, ErrFormat.resource_no_permission.format(resource_name, perm))
if not has_perms:
if role_name:
if role_name not in session.get("acl", {}).get("parentRoles", []):
abort(403, ErrFormat.role_required.format(role_name))
else:
abort(403, ErrFormat.resource_no_permission.format(resource_name, perm))
return func(*args, **kwargs)
return wrapper_required
return decorator_perms_role_required

View File

@ -80,3 +80,5 @@ class ErrFormat(CommonErrFormat):
ldap_test_username_required = _l("LDAP test username required") # LDAP测试用户名必填
company_wide = _l("Company wide") # 全公司
resource_no_permission = _l("No permission to access resource {}, perm {} ") # 没有权限访问 {} 资源的 {} 权限"

View File

@ -0,0 +1,128 @@
from api.lib.common_setting.const import RoleType
class OperationPermission(object):
def __init__(self, resource_perms):
for _r in resource_perms:
setattr(self, f"{_r['page']}", _r['page'])
for _p in _r['perms']:
setattr(self, f"{_p}", _p)
class BaseApp(object):
resource_type_name = 'OperationPermission'
all_resource_perms = []
def __init__(self):
self.admin_name = None
self.roles = []
self.app_name = 'acl'
self.require_create_resource_type = self.resource_type_name
self.extra_create_resource_type_list = []
self.op = None
@staticmethod
def format_role(role_name, role_type, acl_rid, resource_perms, description=''):
return dict(
role_name=role_name,
role_type=role_type,
acl_rid=acl_rid,
description=description,
resource_perms=resource_perms,
)
class CMDBApp(BaseApp):
all_resource_perms = [
{"page": "Big_Screen", "page_cn": "大屏", "perms": ["read"]},
{"page": "Dashboard", "page_cn": "仪表盘", "perms": ["read"]},
{"page": "Resource_Search", "page_cn": "资源搜索", "perms": ["read"]},
{"page": "Auto_Discovery_Pool", "page_cn": "自动发现池", "perms": ["read"]},
{"page": "My_Subscriptions", "page_cn": "我的订阅", "perms": ["read"]},
{"page": "Bulk_Import", "page_cn": "批量导入", "perms": ["read"]},
{"page": "Model_Configuration", "page_cn": "模型配置",
"perms": ["read", "create_CIType", "create_CIType_group", "update_CIType_group",
"delete_CIType_group", "download_CIType"]},
{"page": "Backend_Management", "page_cn": "后台管理", "perms": ["read"]},
{"page": "Customized_Dashboard", "page_cn": "定制仪表盘", "perms": ["read"]},
{"page": "Service_Tree_Definition", "page_cn": "服务树定义", "perms": ["read"]},
{"page": "Model_Relationships", "page_cn": "模型关系", "perms": ["read"]},
{"page": "Operation_Audit", "page_cn": "操作审计", "perms": ["read"]},
{"page": "Relationship_Types", "page_cn": "关系类型", "perms": ["read"]},
{"page": "Auto_Discovery", "page_cn": "自动发现", "perms": ["read"]}]
def __init__(self):
super().__init__()
self.admin_name = 'cmdb_admin'
self.app_name = 'cmdb'
self.roles = self.parse_roles()
self.op = OperationPermission(self.all_resource_perms)
def parse_roles(self):
return [self.cmdb_admin_role, self.cmdb_technician, self.cmdb_user]
@property
def cmdb_admin_role(self):
return self.format_role(
'CMDB管理员', RoleType.System, 0, self.all_resource_perms
)
@property
def cmdb_technician(self):
resource_perms_map = dict(
Big_Screen=["read"],
Dashboard=["read"],
Resource_Search=["read"],
Auto_Discovery_Pool=["read"],
My_Subscriptions=["read"],
Bulk_Import=["read"],
Model_Configuration=["read", "create_CIType"],
Backend_Management=[],
Customized_Dashboard=[],
Service_Tree_Definition=[],
Model_Relationships=[],
Operation_Audit=[],
Relationship_Types=[],
Auto_Discovery=[]
)
resource_perms = []
for _page, _perms in resource_perms_map.items():
resource_perms.append(
dict(page=_page, perms=_perms)
)
return self.format_role(
'CMDB技术员', RoleType.Technician, 0, resource_perms
)
@property
def cmdb_user(self):
resource_perms_map = dict(
Big_Screen=["read"],
Dashboard=["read"],
Resource_Search=["read"],
Auto_Discovery_Pool=["read"],
My_Subscriptions=["read"],
Bulk_Import=["read"],
Model_Configuration=[],
Backend_Management=[],
Customized_Dashboard=[],
Service_Tree_Definition=[],
Model_Relationships=[],
Operation_Audit=[],
Relationship_Types=[],
Auto_Discovery=[]
)
resource_perms = []
for _page, _perms in resource_perms_map.items():
resource_perms.append(
dict(page=_page, perms=_perms)
)
return self.format_role(
'CMDB用户', RoleType.User, 0, resource_perms
)