修复一些BUG

将修改密码的代码逻辑做复用处理,简化代码。
This commit is contained in:
向乐🌌 2021-05-20 12:02:11 +08:00
parent 4ad2a126d8
commit ca84457c1f
9 changed files with 224 additions and 212 deletions

View File

@ -2,7 +2,6 @@
import os
import sys
from utils.ad_ops import AdOps
from django_redis import get_redis_connection
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pwdselfservice.settings')
@ -15,10 +14,4 @@ if __name__ == '__main__':
"forget to activate a virtual environment?"
) from exc
try:
AdOps()
except Exception as e:
print(str(e))
print("未能连接到AD先决条件未满足Django不会运行..")
sys.exit(1)
execute_from_command_line(sys.argv)

View File

@ -1,4 +1,5 @@
import datetime
import time
from django_redis import get_redis_connection
@ -8,11 +9,12 @@ from utils.storage.kvstorage import KvStorage
try:
redis_conn = get_redis_connection()
cache_storage = KvStorage(redis_conn)
cache_storage.set('redis_connection', str(datetime.datetime))
cache_storage.get('redis_connection')
print("Redis连接成功set/get测试通过Token缓存将使用Redis处理")
cache_storage.set('redis_connection', str(datetime.datetime.now()))
redis_get = cache_storage.get('redis_connection')
print("Redis连接成功set/get测试通过--{}Token缓存将使用Redis处理" .format(redis_get))
except Exception as e:
cache_storage = MemoryStorage()
print("Redis无法连接Token缓存将使用MemoryStorage处理")
print("如果确定需要使用Redis作为缓存请排查Redis配置")
print("Exception: {}".format(e))
print("如果确定需要使用Redis作为缓存请排查Redis配置错误信息如下")
print("Redis Exception: {}".format(e))

View File

@ -116,6 +116,10 @@ TEMPLATES = [
WSGI_APPLICATION = 'pwdselfservice.wsgi.application'
# 514 66050是AD中账号被禁用的特定代码这个可以在微软官网查到。
# 可能不是太准确,如果使用者能确定还有其它状态码,可以自行在此处添加
AD_ACCOUNT_DISABLE_CODE = [514, 66050]
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",

View File

@ -6,7 +6,7 @@ urlpatterns = {
path("favicon.ico", RedirectView.as_view(url='static/img/favicon.ico')),
path('', resetpwd.views.index, name='index'),
path('callbackCheck', resetpwd.views.callback_check, name='callbackCheck'),
path('resetPassword', resetpwd.views.reset_pwd_by_ding_callback, name='resetPassword'),
path('resetPassword', resetpwd.views.reset_pwd_by_callback, name='resetPassword'),
path('unlockAccount', resetpwd.views.unlock_account, name='unlockAccount'),
path('messages', resetpwd.views.messages, name='messages'),
}

137
resetpwd/utils.py Normal file
View File

@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @FileName utils.py
# @Software:
# @Author: Leven Xiang
# @Mail: xiangle0109@outlook.com
# @Date 2021/5/20 8:47
from django.shortcuts import render
import logging
from utils.crypto import Crypto
from pwdselfservice.local_settings import CRYPTO_KEY
from django.conf import settings
logger = logging.getLogger('django')
def code_2_user_id(ops, request, msg_template, home_url, code):
_status, user_id = ops.get_user_id_by_code(code)
# 判断 user_id 在本企业钉钉/微信中是否存在
if not _status:
context = {
'msg': '获取钉钉userid失败错误信息{}'.format(user_id),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
detail_status, user_info = ops.get_user_detail_by_user_id(user_id)
if not detail_status:
context = {
'msg': '获取钉钉用户信息失败,错误信息:{}'.format(user_info),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
return user_id, user_info
def tmpid_2_user_info(ops, request, msg_template, home_url, scan_app_tag):
try:
tmpid_crypto = request.COOKIES.get('tmpid')
except Exception as e:
tmpid_crypto = None
logger.error('[异常] %s' % str(e))
if not tmpid_crypto:
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到TmpID或会话己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
# 解密
crypto = Crypto(CRYPTO_KEY)
user_id = crypto.decrypt(tmpid_crypto)
# 通过user_id拿到用户的邮箱并格式化为username
userid_status, user_info = ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取{}用户信息失败,错误信息:{}'.format(user_info, scan_app_tag),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
return user_info
def tmpid_2_user_id(request, msg_template, home_url):
try:
tmpid_crypto = request.COOKIES.get('tmpid')
except Exception as e:
logger.error('[异常] %s' % str(e))
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到TmpID或会话己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
# 解密
crypto = Crypto(CRYPTO_KEY)
return crypto.decrypt(tmpid_crypto)
def ops_account(ad_ops, request, msg_template, home_url, username, new_password):
if ad_ops.ad_ensure_user_by_account(username) is False:
context = {
'msg': "账号[%s]在AD中不存在请确认当前钉钉扫码账号绑定的邮箱是否和您正在使用的邮箱一致或者该账号己被禁用\n猜测:您的账号或邮箱是否是带有数字或其它字母区分?" % username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
account_code = ad_ops.ad_get_user_status_by_account(username)
if account_code in settings.AD_ACCOUNT_DISABLE_CODE:
context = {
'msg': "此账号状态为己禁用请联系HR确认账号是否正确。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
if new_password:
reset_status, result = ad_ops.ad_reset_user_pwd_by_account(username=username, new_password=new_password)
if reset_status:
# 重置密码并执行一次解锁,防止重置后账号还是锁定状态。
unlock_status, result = ad_ops.ad_unlock_user_by_account(username)
if unlock_status:
context = {
'msg': "密码己修改/重置成功,请妥善保管。你可以点击返回主页或直接关闭此页面!",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
context = {
'msg': "密码未修改/重置成功,错误信息:{}".format(result),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
unlock_status, result = ad_ops.ad_unlock_user_by_account(username)
if unlock_status:
context = {
'msg': "账号己解锁成功。你可以点击返回主页或直接关闭此页面!",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
context = {
'msg': "账号未能解锁,错误信息:{}".format(result),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)

View File

@ -1,6 +1,6 @@
import logging
from django.http import *
from django.http import HttpResponseRedirect
from django.shortcuts import render
from pwdselfservice.local_settings import SCAN_CODE_TYPE, DING_MO_APP_ID, WEWORK_CORP_ID, WEWORK_AGENT_ID, HOME_URL, CRYPTO_KEY, TMPID_COOKIE_AGE
@ -8,6 +8,7 @@ from utils.ad_ops import AdOps
from utils.crypto import Crypto
from utils.format_username import format2username, get_user_is_active
from .form import CheckForm
from .utils import code_2_user_id, tmpid_2_user_info, ops_account, tmpid_2_user_id
msg_template = 'messages.html'
logger = logging.getLogger('django')
@ -30,7 +31,12 @@ class PARAMS(object):
scan_params = PARAMS()
_ops = scan_params.ops
ad_ops = AdOps()
try:
ad_ops = AdOps()
except Exception as e:
ad_ops = None
print("初始化Active Directory连接失败")
print(str(e))
def index(request):
@ -77,33 +83,7 @@ def index(request):
'button_display': "返回"
}
return render(request, msg_template, context)
# 514 66050是AD中账号被禁用的特定代码这个可以在微软官网查到。
# 可能不是太准确,如果使用者能确定还有其它状态码,可以自行在此处添加
account_code = ad_ops.ad_get_user_status_by_account(username)
if account_code == 514 or account_code == 66050:
context = {
'msg': "此账号状态为己禁用请联系HR确认账号是否正确。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
reset_status, reset_result = ad_ops.ad_reset_user_pwd_by_account(username=username, new_password=new_password)
if reset_status:
context = {
'msg': "密码己修改成功,新密码稍后生效,请妥善保管。您可直接关闭此页面!",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
context = {
'msg': "密码未修改成功,原因:{}".format(reset_result),
'button_click': "window.history.back()",
'button_display': "返回"
}
return render(request, msg_template, context)
return ops_account(ad_ops, request, msg_template, home_url, username, new_password)
else:
context = {
'msg': "请从主页进行修改密码操作或扫码验证用户信息。",
@ -127,23 +107,7 @@ def callback_check(request):
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到CODE。' % (request.method, request.path))
try:
_status, user_id = _ops.get_user_id_by_code(code)
# 判断 user_id 在本企业钉钉/微信中是否存在
if not _status:
context = {
'msg': '获取钉钉userid失败错误信息{}'.format(user_id),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
detail_status, user_info = _ops.get_user_detail_by_user_id(user_id)
if not detail_status:
context = {
'msg': '获取钉钉用户信息失败,错误信息:{}'.format(user_info),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
user_id, user_info = code_2_user_id(_ops, request, msg_template, home_url, code)
# 账号是否是激活的
if get_user_is_active(user_info):
crypto = Crypto(CRYPTO_KEY)
@ -168,7 +132,6 @@ def callback_check(request):
}
logger.error('[异常] %s' % str(KeyError))
return render(request, msg_template, context)
except Exception as callback_e:
context = {
'msg': "错误[%s],请与管理员联系." % str(callback_e),
@ -179,52 +142,35 @@ def callback_check(request):
return render(request, msg_template, context)
def reset_pwd_by_ding_callback(request):
def reset_pwd_by_callback(request):
"""
钉钉扫码并验证信息通过之后在重置密码页面将用户账号进行绑定
:param request:
:return:
"""
global tmpid_crypto
home_url = '%s://%s' % (request.scheme, HOME_URL)
# 从cookie中提取union_id并解密然后对当前union_id的用户进行重置密码
if request.method == 'GET':
try:
tmpid_crypto = request.COOKIES.get('tmpid')
except Exception as e:
tmpid_crypto = None
logger.error('[异常] %s' % str(e))
if not tmpid_crypto:
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到TmpID或会话己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
# 解密
crypto = Crypto(CRYPTO_KEY)
user_id = crypto.decrypt(tmpid_crypto)
# 通过user_id拿到用户的邮箱并格式化为username
userid_status, user_result = _ops.get_user_detail_by_user_id(user_id)
user_id = tmpid_2_user_id(request, msg_template, home_url)
userid_status, user_info = _ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取{}用户信息失败,错误信息:{}'.format(user_result, scan_params.SCAN_APP),
'msg': '获取{}用户信息失败,错误信息:{}'.format(user_info, scan_params.SCAN_APP),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
username = format2username(user_result['email'])
# 通过user_id拿到用户信息并格式化为username
username = format2username(user_info.get('email'))
# 如果邮箱能提取到,则格式化之后,提取出账号提交到前端绑定
if username:
context = {
'username': username,
}
return render(request, 'resetPassword.html', context)
# 否则就是用户未配置邮箱,返回相关提示
else:
context = {
'msg': "{},您好,企业{}中未能找到您账号的邮箱配置请联系HR完善信息。" .format(user_result.get('name'), scan_params.SCAN_APP),
'msg': "{},您好,企业{}中未能找到您账号的邮箱配置请联系HR完善信息。" .format(user_info.get('name'), scan_params.SCAN_APP),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
@ -233,67 +179,9 @@ def reset_pwd_by_ding_callback(request):
# 重置密码页面,输入新密码后点击提交
elif request.method == 'POST':
_new_password = request.POST.get('new_password').strip()
try:
tmpid_crypto = request.COOKIES.get('tmpid')
except Exception as e:
tmpid_crypto = None
logger.error('[异常] %s' % str(e))
if not tmpid_crypto:
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到CODE或CODE己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
crypto = Crypto(CRYPTO_KEY)
user_id = crypto.decrypt(tmpid_crypto)
userid_status, user_result = _ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取企业{}用户信息失败,错误信息:{}'.format(scan_params.SCAN_APP, user_result),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
username = format2username(user_result['email'])
if ad_ops.ad_ensure_user_by_account(username) is False:
context = {
'msg': "账号[%s]在AD中不存在请确认当前钉钉扫码账号绑定的邮箱是否和您正在使用的邮箱一致或者该账号己被禁用\n猜测:您的账号或邮箱是否是带有数字或其它字母区分?" % username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
# 514 66050是AD中账号被禁用的特定代码这个可以在微软官网查到。
# 可能不是太准确,如果使用者能确定还有其它状态码,可以自行在此处添加
account_code = ad_ops.ad_get_user_status_by_account(username)
if account_code == 514 or account_code == 66050:
context = {
'msg': "此账号状态为己禁用请联系HR确认账号是否正确。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
reset_status, result = ad_ops.ad_reset_user_pwd_by_account(username=username, new_password=_new_password)
if reset_status:
# 重置密码并执行一次解锁,防止重置后账号还是锁定状态。
unlock_status, result = ad_ops.ad_unlock_user_by_account(username)
if unlock_status:
context = {
'msg': "密码己重置成功,请妥善保管。你可以点击返回主页或直接关闭此页面!",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
context = {
'msg': "密码未重置成功,错误信息:{}".format(result),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
user_info = tmpid_2_user_info(_ops, request, msg_template, home_url, scan_params.SCAN_APP)
username = format2username(user_info.get('email'))
return ops_account(ad_ops, request, msg_template, home_url, username, _new_password)
else:
context = {
'msg': "请从主页开始进行操作。",
@ -311,74 +199,17 @@ def unlock_account(request):
"""
home_url = '%s://%s' % (request.scheme, HOME_URL)
if request.method == 'GET':
_union_id_crypto = request.COOKIES.get('tmpid')
if not _union_id_crypto:
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
crypto = Crypto(CRYPTO_KEY)
user_id = crypto.decrypt(_union_id_crypto)
userid_status, user_info = _ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取{}用户信息失败,错误信息:{}'.format(scan_params.SCAN_APP, user_info),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
username = format2username(user_info['email'])
user_info = tmpid_2_user_info(_ops, request, msg_template, home_url, scan_params.SCAN_APP)
username = format2username(user_info.get('email'))
context = {
'username': username,
}
return render(request, 'resetPassword.html', context)
elif request.method == 'POST':
_union_id_crypto = request.COOKIES.get('tmpid')
if not _union_id_crypto:
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
crypto = Crypto(CRYPTO_KEY)
user_id = crypto.decrypt(_union_id_crypto)
userid_status, user_info = _ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取{}用户信息失败,错误信息:{}'.format(scan_params.SCAN_APP, user_info),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
username = format2username(user_info['email'])
if ad_ops.ad_ensure_user_by_account(username=username) is False:
context = {
'msg': "账号[%s]在AD中未能正确检索到请确认当前钉钉扫码账号绑定的邮箱是否和您正在使用的邮箱一致或者该账号己被禁用\n猜测:您的账号或邮箱是否是带有数字或其它字母区分?" %
username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
unlock_status, result = ad_ops.ad_unlock_user_by_account(username)
if unlock_status:
context = {
'msg': "账号己解锁成功。你可以点击返回主页或直接关闭此页面!",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
else:
context = {
'msg': "账号未能解锁,错误信息:{}".format(result),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
user_info = tmpid_2_user_info(_ops, request, msg_template, home_url, scan_params.SCAN_APP)
username = format2username(user_info.get('email'))
return ops_account(ad_ops, request, msg_template, home_url, username, None)
else:
context = {
'msg': "请从主页开始进行操作。",

View File

@ -31,7 +31,7 @@
</div>
<ul>
<li class="con_r_right" style="display: block;">
<form name="resetcheck" method="post" action="" autocomplete="off">
<form name="resetPassword" method="post" action="" autocomplete="off">
{% csrf_token %}
<div class="user">
<div><span class="user-icon"></span>
@ -51,7 +51,7 @@
</li>
<li class="con_r_left" style="display: none;">
<form name="resetunlock" method="post" action="resetunlock" autocomplete="off">
<form name="unlockAccount" method="post" action="unlockAccount" autocomplete="off">
{% csrf_token %}
<div class="user" style="height: 168px">
<div><span class="user-icon"></span>

View File

@ -52,8 +52,8 @@ class AdOps(object):
authentication=self.authentication, raise_exceptions=True)
except LDAPOperationResult as e:
raise LDAPOperationResult("LDAPOperationResult: " + str(e))
except Exception:
raise Exception('出现错误无法连接到AD控制器。')
except Exception as e:
raise Exception('LDAP Exception无法连接到AD控制器 --- {}' .format(e))
def ad_auth_user(self, username, password):
"""

View File

@ -2,11 +2,56 @@
from __future__ import absolute_import, unicode_literals
import json
import random
import string
import json
import six
from dingtalk.core.utils import to_text
from utils.storage import BaseStorage
def to_text(value, encoding='utf-8'):
"""Convert value to unicode, default encoding is utf-8
:param value: Value to be converted
:param encoding: Desired encoding
"""
if not value:
return ''
if isinstance(value, six.text_type):
return value
if isinstance(value, six.binary_type):
return value.decode(encoding)
return six.text_type(value)
def to_binary(value, encoding='utf-8'):
"""Convert value to binary string, default encoding is utf-8
:param value: Value to be converted
:param encoding: Desired encoding
"""
if not value:
return b''
if isinstance(value, six.binary_type):
return value
if isinstance(value, six.text_type):
return value.encode(encoding)
return to_text(value).encode(encoding)
def random_string(length=16):
rule = string.ascii_letters + string.digits
rand_list = random.sample(rule, length)
return ''.join(rand_list)
def byte2int(c):
if six.PY2:
return ord(c)
return c
class KvStorage(BaseStorage):
def __init__(self, kvdb, prefix='wework'):