mirror of
				https://github.com/veops/cmdb.git
				synced 2025-11-04 21:56:16 +08:00 
			
		
		
		
	fix: auth config (#317)
This commit is contained in:
		@@ -11,7 +11,7 @@ from api.extensions import db
 | 
			
		||||
from api.lib.common_setting.resp_format import ErrFormat
 | 
			
		||||
from api.models.common_setting import CommonData
 | 
			
		||||
from api.lib.utils import AESCrypto
 | 
			
		||||
from api.lib.common_setting.const import AuthCommonConfig, AuthenticateType, AuthCommonConfigAutoRedirect
 | 
			
		||||
from api.lib.common_setting.const import AuthCommonConfig, AuthenticateType, AuthCommonConfigAutoRedirect, TestType
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CommonDataCRUD(object):
 | 
			
		||||
@@ -29,6 +29,7 @@ class CommonDataCRUD(object):
 | 
			
		||||
    def create_new_data(data_type, **kwargs):
 | 
			
		||||
        try:
 | 
			
		||||
            CommonDataCRUD.check_auth_type(data_type)
 | 
			
		||||
 | 
			
		||||
            return CommonData.create(data_type=data_type, **kwargs)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            db.session.rollback()
 | 
			
		||||
@@ -143,6 +144,9 @@ class AuthenticateDataCRUD(object):
 | 
			
		||||
 | 
			
		||||
    def create(self, data) -> CommonData:
 | 
			
		||||
        self.check_by_type()
 | 
			
		||||
        encrypt = data.pop('encrypt', None)
 | 
			
		||||
        if encrypt is False:
 | 
			
		||||
            return CommonData.create(data_type=self._type, data=data)
 | 
			
		||||
        encrypted_data = self.encrypt(data)
 | 
			
		||||
        try:
 | 
			
		||||
            return CommonData.create(data_type=self._type, data=encrypted_data)
 | 
			
		||||
@@ -151,6 +155,9 @@ class AuthenticateDataCRUD(object):
 | 
			
		||||
            abort(400, str(e))
 | 
			
		||||
 | 
			
		||||
    def update_by_record(self, record, data) -> CommonData:
 | 
			
		||||
        encrypt = data.pop('encrypt', None)
 | 
			
		||||
        if encrypt is False:
 | 
			
		||||
            return record.update(data=data)
 | 
			
		||||
        encrypted_data = self.encrypt(data)
 | 
			
		||||
        try:
 | 
			
		||||
            return record.update(data=encrypted_data)
 | 
			
		||||
@@ -228,25 +235,33 @@ class AuthenticateDataCRUD(object):
 | 
			
		||||
            auth_auto_redirect=auth_auto_redirect,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def test(self, data):
 | 
			
		||||
    def test(self, test_type, data):
 | 
			
		||||
        type_lower = self._type.lower()
 | 
			
		||||
        func_name = f'test_{type_lower}'
 | 
			
		||||
        if hasattr(self, func_name):
 | 
			
		||||
            try:
 | 
			
		||||
                return getattr(self, f'test_{type_lower}')(data)
 | 
			
		||||
                return getattr(self, f'test_{type_lower}')(test_type, data)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                abort(400, str(e))
 | 
			
		||||
        abort(400, ErrFormat.not_support_test.format(self._type))
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def test_ldap(data):
 | 
			
		||||
    def test_ldap(test_type, data):
 | 
			
		||||
        ldap_server = data.get('ldap_server')
 | 
			
		||||
        ldap_user_dn = data.get('ldap_user_dn', '{}')
 | 
			
		||||
        username = data.get('username', '')
 | 
			
		||||
        user = ldap_user_dn.format(username)
 | 
			
		||||
        password = data.get('password', '')
 | 
			
		||||
 | 
			
		||||
        server = Server(ldap_server, connect_timeout=2)
 | 
			
		||||
        if not server.check_availability():
 | 
			
		||||
            raise Exception(ErrFormat.ldap_server_connect_not_available)
 | 
			
		||||
        else:
 | 
			
		||||
            if test_type == TestType.Connect:
 | 
			
		||||
                return True
 | 
			
		||||
 | 
			
		||||
        username = data.get('username', None)
 | 
			
		||||
        if not username:
 | 
			
		||||
            raise Exception(ErrFormat.ldap_test_username_required)
 | 
			
		||||
        user = ldap_user_dn.format(username)
 | 
			
		||||
        password = data.get('password', None)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            Connection(server, user=user, password=password, auto_bind=AUTO_BIND_NO_TLS)
 | 
			
		||||
 
 | 
			
		||||
@@ -30,3 +30,8 @@ class AuthenticateType(BaseEnum):
 | 
			
		||||
 | 
			
		||||
AuthCommonConfig = 'AuthCommonConfig'
 | 
			
		||||
AuthCommonConfigAutoRedirect = 'auto_redirect'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestType(BaseEnum):
 | 
			
		||||
    Connect = 'connect'
 | 
			
		||||
    Login = 'login'
 | 
			
		||||
 
 | 
			
		||||
@@ -70,6 +70,8 @@ class ErrFormat(CommonErrFormat):
 | 
			
		||||
    not_support_test = "不支持的测试类型: {}"
 | 
			
		||||
    not_support_auth_type = "不支持的认证类型: {}"
 | 
			
		||||
    ldap_server_connect_timeout = "LDAP服务器连接超时"
 | 
			
		||||
    ldap_server_connect_not_available = "LDAP服务器连接不可用"
 | 
			
		||||
    ldap_test_unknown_error = "LDAP测试未知错误: {}"
 | 
			
		||||
    common_data_not_support_auth_type = "通用数据不支持auth类型: {}"
 | 
			
		||||
    ldap_test_username_required = "LDAP测试用户名必填"
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -2,7 +2,7 @@ from flask import abort, request
 | 
			
		||||
 | 
			
		||||
from api.lib.perm.acl.acl import role_required
 | 
			
		||||
from api.resource import APIView
 | 
			
		||||
from api.lib.common_setting.common_data import AuthenticateDataCRUD, CommonDataCRUD
 | 
			
		||||
from api.lib.common_setting.common_data import AuthenticateDataCRUD
 | 
			
		||||
from api.lib.common_setting.resp_format import ErrFormat
 | 
			
		||||
 | 
			
		||||
prefix = '/auth_config'
 | 
			
		||||
@@ -33,7 +33,8 @@ class AuthConfigView(APIView):
 | 
			
		||||
 | 
			
		||||
        params = request.json
 | 
			
		||||
        if auth_type in cli.common_type_list:
 | 
			
		||||
            CommonDataCRUD.create_new_data(auth_type, **params)
 | 
			
		||||
            params['encrypt'] = False
 | 
			
		||||
            cli.create(**params)
 | 
			
		||||
        else:
 | 
			
		||||
            cli.create(params.get('data', {}))
 | 
			
		||||
 | 
			
		||||
@@ -51,10 +52,12 @@ class AuthConfigViewWithId(APIView):
 | 
			
		||||
            abort(400, ErrFormat.not_support_auth_type.format(auth_type))
 | 
			
		||||
 | 
			
		||||
        params = request.json
 | 
			
		||||
        data = params.get('data', {})
 | 
			
		||||
        if auth_type in cli.common_type_list:
 | 
			
		||||
            res = CommonDataCRUD.update_data(_id, **params)
 | 
			
		||||
            data['encrypt'] = False
 | 
			
		||||
            res = cli.update(_id, data)
 | 
			
		||||
        else:
 | 
			
		||||
            res = cli.update(_id, params.get('data', {}))
 | 
			
		||||
            res = cli.update(_id, data)
 | 
			
		||||
 | 
			
		||||
        return self.jsonify(res.to_dict())
 | 
			
		||||
 | 
			
		||||
@@ -81,5 +84,6 @@ class AuthConfigTestView(APIView):
 | 
			
		||||
    url_prefix = (f'{prefix}/<string:auth_type>/test',)
 | 
			
		||||
 | 
			
		||||
    def post(self, auth_type):
 | 
			
		||||
        test_type = request.values.get('test_type', TestType.Connect)
 | 
			
		||||
        params = request.json
 | 
			
		||||
        return self.jsonify(AuthenticateDataCRUD(auth_type).test(params.get('data')))
 | 
			
		||||
        return self.jsonify(AuthenticateDataCRUD(auth_type).test(test_type, params.get('data')))
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user