mirror of https://github.com/veops/cmdb.git
140 lines
4.1 KiB
Python
140 lines
4.1 KiB
Python
# -*- coding:utf-8 -*-
|
|
|
|
import functools
|
|
|
|
import six
|
|
|
|
from flask import current_app, g, request
|
|
from flask import session, abort
|
|
|
|
from api.extensions import cache
|
|
|
|
|
|
def get_access_token():
|
|
return
|
|
|
|
|
|
class AccessTokenCache(object):
|
|
@classmethod
|
|
def get(cls):
|
|
if cache.get("AccessToken") is not None:
|
|
return cache.get("AccessToken")
|
|
|
|
res = get_access_token() or ""
|
|
|
|
cache.set("AccessToken", res, timeout=60 * 60)
|
|
return res
|
|
|
|
@classmethod
|
|
def clean(cls):
|
|
cache.clear("AccessToken")
|
|
|
|
|
|
class ACLManager(object):
|
|
def __init__(self):
|
|
self.access_token = AccessTokenCache.get()
|
|
self.acl_session = dict(uid=session.get("uid"),
|
|
token=self.access_token)
|
|
|
|
self.user_info = session["acl"] if "acl" in session else {}
|
|
|
|
def add_resource(self, name, resource_type_name=None):
|
|
pass
|
|
|
|
def grant_resource_to_role(self, name, role, resource_type_name=None):
|
|
pass
|
|
|
|
def del_resource(self, name, resource_type_name=None):
|
|
pass
|
|
|
|
def get_user_info(self, username):
|
|
return dict()
|
|
|
|
def get_resources(self, resource_type_name=None):
|
|
if "acl" not in session:
|
|
abort(405)
|
|
return []
|
|
|
|
def has_permission(self, resource_name, resource_type, perm):
|
|
if "acl" not in session:
|
|
abort(405)
|
|
return True
|
|
|
|
|
|
def validate_permission(resources, resource_type, perm):
|
|
if not resources:
|
|
return
|
|
|
|
if current_app.config.get("USE_ACL"):
|
|
if g.user.username == "worker":
|
|
return
|
|
|
|
resources = [resources] if isinstance(resources, six.string_types) else resources
|
|
for resource in resources:
|
|
if not ACLManager().has_permission(resource, resource_type, perm):
|
|
return abort(403, "has no permission")
|
|
|
|
|
|
def can_access_resources(resource_type):
|
|
def decorator_can_access_resources(func):
|
|
@functools.wraps(func)
|
|
def wrapper_can_access_resources(*args, **kwargs):
|
|
if current_app.config.get("USE_ACL"):
|
|
res = ACLManager().get_resources(resource_type)
|
|
result = {i.get("name"): i.get("permissions") for i in res}
|
|
if hasattr(g, "resources"):
|
|
g.resources.update({resource_type: result})
|
|
else:
|
|
g.resources = {resource_type: result}
|
|
return func(*args, **kwargs)
|
|
return wrapper_can_access_resources
|
|
return decorator_can_access_resources
|
|
|
|
|
|
def has_perm(resources, resource_type, perm):
|
|
def decorator_has_perm(func):
|
|
@functools.wraps(func)
|
|
def wrapper_has_perm(*args, **kwargs):
|
|
if not resources:
|
|
return
|
|
|
|
if current_app.config.get("USE_ACL"):
|
|
validate_permission(resources, resource_type, perm)
|
|
|
|
return func(*args, **kwargs)
|
|
return wrapper_has_perm
|
|
return decorator_has_perm
|
|
|
|
|
|
def has_perm_from_args(arg_name, resource_type, perm, callback=None):
|
|
def decorator_has_perm(func):
|
|
@functools.wraps(func)
|
|
def wrapper_has_perm(*args, **kwargs):
|
|
if not arg_name:
|
|
return
|
|
resource = request.view_args.get(arg_name) or request.values.get(arg_name)
|
|
if callback is not None and resource:
|
|
resource = callback(resource)
|
|
|
|
if current_app.config.get("USE_ACL") and resource:
|
|
validate_permission(resource, resource_type, perm)
|
|
|
|
return func(*args, **kwargs)
|
|
return wrapper_has_perm
|
|
return decorator_has_perm
|
|
|
|
|
|
def role_required(role_name):
|
|
def decorator_role_required(func):
|
|
@functools.wraps(func)
|
|
def wrapper_role_required(*args, **kwargs):
|
|
if not role_name:
|
|
return
|
|
|
|
if current_app.config.get("USE_ACL"):
|
|
if role_name not in session.get("acl", {}).get("parentRoles", []):
|
|
return abort(403, "Role {0} is required".format(role_name))
|
|
return func(*args, **kwargs)
|
|
return wrapper_role_required
|
|
return decorator_role_required
|