mirror of https://github.com/veops/cmdb.git
68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
# -*- coding:utf-8 -*-
|
|
|
|
import uuid
|
|
|
|
from flask import abort
|
|
from flask import current_app
|
|
from flask import session
|
|
from ldap3 import ALL
|
|
from ldap3 import AUTO_BIND_NO_TLS
|
|
from ldap3 import Connection
|
|
from ldap3 import Server
|
|
from ldap3.core.exceptions import LDAPBindError
|
|
from ldap3.core.exceptions import LDAPCertificateError
|
|
from ldap3.core.exceptions import LDAPSocketOpenError
|
|
|
|
from api.lib.common_setting.common_data import AuthenticateDataCRUD
|
|
from api.lib.common_setting.const import AuthenticateType
|
|
from api.lib.perm.acl.audit import AuditCRUD
|
|
from api.lib.perm.acl.resp_format import ErrFormat
|
|
from api.models.acl import User
|
|
|
|
|
|
def authenticate_with_ldap(username, password):
|
|
config = AuthenticateDataCRUD(AuthenticateType.LDAP).get()
|
|
|
|
server = Server(config.get('LDAP').get('ldap_server'), get_info=ALL, connect_timeout=3)
|
|
if '@' in username:
|
|
email = username
|
|
who = config['LDAP'].get('ldap_user_dn').format(username.split('@')[0])
|
|
else:
|
|
who = config['LDAP'].get('ldap_user_dn').format(username)
|
|
email = "{}@{}".format(who, config['LDAP'].get('ldap_domain'))
|
|
|
|
username = username.split('@')[0]
|
|
user = User.query.get_by_username(username)
|
|
try:
|
|
if not password:
|
|
raise LDAPCertificateError
|
|
|
|
try:
|
|
conn = Connection(server, user=who, password=password, auto_bind=AUTO_BIND_NO_TLS)
|
|
except LDAPBindError:
|
|
conn = Connection(server,
|
|
user=f"{username}@{config['LDAP'].get('ldap_domain')}",
|
|
password=password,
|
|
auto_bind=AUTO_BIND_NO_TLS)
|
|
|
|
if conn.result['result'] != 0:
|
|
AuditCRUD.add_login_log(username, False, ErrFormat.invalid_password)
|
|
raise LDAPBindError
|
|
else:
|
|
_id = AuditCRUD.add_login_log(username, True, ErrFormat.login_succeed)
|
|
session['LOGIN_ID'] = _id
|
|
|
|
if not user:
|
|
from api.lib.perm.acl.user import UserCRUD
|
|
user = UserCRUD.add(username=username, email=email, password=uuid.uuid4().hex)
|
|
|
|
return user, True
|
|
|
|
except LDAPBindError as e:
|
|
current_app.logger.info(e)
|
|
return user, False
|
|
|
|
except LDAPSocketOpenError as e:
|
|
current_app.logger.info(e)
|
|
return abort(403, ErrFormat.ldap_connection_failed)
|