修改钉钉/企业微信直接使用内部应用免密登录的方式来验证,不再支持扫码。

由于一些API的权限发生变化,导致一些关键信息无法获取,所以做以上改变。
删除了无用的代码,其它没啥变化,没太多时间重写,先就这么着吧。
This commit is contained in:
Leven
2022-12-20 13:20:40 +08:00
parent 2e886dc6e8
commit c5bc154924
11 changed files with 69 additions and 205 deletions

View File

@@ -13,7 +13,6 @@ from utils.crypto_ops import Crypto
from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPOperationResult, LDAPExceptionError, LDAPException
from django.conf import settings
from pwdselfservice import crypto_key
import os
APP_ENV = os.getenv('APP_ENV')
if APP_ENV == 'dev':
@@ -56,70 +55,6 @@ def code_2_user_info_with_oauth2(ops, request, msg_template, home_url, code):
return True, user_id, user_info
def crypto_id_2_user_info(ops, request, msg_template, home_url, scan_app_tag):
"""
能过前端提交的加密的userid来获取用户信息<userinfo>
"""
try:
crypto_tmp_id = request.COOKIES.get('tmpid')
if not crypto_tmp_id:
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到TmpID或会话己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return False, context
# 解密
crypto = Crypto(crypto_key)
user_id = crypto.decrypt(crypto_tmp_id)
# 通过user_id拿到用户的邮箱并格式化为username
userid_status, user_info = ops.get_user_detail_by_user_id(user_id)
if not userid_status:
context = {
'msg': '获取{}用户信息失败,错误信息:{}'.format(user_info, scan_app_tag),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return False, context
return True, user_info
except Exception as e:
return False, str(e)
def crypto_user_id_2_cookie(user_id):
"""
加密userid写入到cookie
"""
crypto = Crypto(crypto_key)
# 对user_id进行加密因为user_id基本上固定不变的为了防止user_id泄露而导致重复使用进行加密后再传回。
_id_cryto = crypto.encrypt(user_id)
# 配置cookie通过cookie把加密后的用户user_id传到重置密码页面并重定向到重置密码页面。
set_cookie = HttpResponseRedirect('resetPassword')
set_cookie.set_cookie('tmpid', _id_cryto, expires=TMPID_COOKIE_AGE)
return set_cookie
def crypto_id_2_user_id(request, msg_template, home_url):
"""
前端提交的加密的userid解密出真实的userid
"""
try:
crypto_tmp_id = request.COOKIES.get('tmpid')
# 解密
crypto = Crypto(crypto_key)
return True, crypto.decrypt(crypto_tmp_id)
except Exception as e:
logger.error('[异常] %s' % str(e))
logger.error('[异常] 请求方法:%s,请求路径:%s未能拿到TmpID或会话己超时。' % (request.method, request.path))
context = {
'msg': "会话己超时,请重新扫码验证用户信息。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return False, context
def ops_account(ad_ops, request, msg_template, home_url, username, new_password):
"""
ad 账号操作,判断账号状态,重置密码或解锁账号

View File

@@ -7,7 +7,7 @@ from ldap3.core.exceptions import LDAPException
import urllib.parse as url_encode
from utils.format_username import format2username, get_user_is_active, get_email_from_userinfo
from .form import CheckForm
from .utils import code_2_user_detail, crypto_id_2_user_info, ops_account
from .utils import code_2_user_detail, ops_account
from django.conf import settings
APP_ENV = os.getenv('APP_ENV')
if APP_ENV == 'dev':
@@ -34,13 +34,6 @@ class PARAMS(object):
AUTH_APP = '微信'
from utils.wework_ops import WeWorkOps
ops = WeWorkOps()
else:
corp_id = None
app_id = WEWORK_CORP_ID
agent_id = WEWORK_AGENT_ID
AUTH_APP = '微信'
from utils.wework_ops import WeWorkOps
ops = WeWorkOps()
scan_params = PARAMS()
@@ -63,8 +56,6 @@ def index(request):
return render(request, 'ding_index.v1.html', locals())
elif request.method == 'GET' and AUTH_CODE_TYPE == 'WEWORK':
return render(request, 'we_index.v1.html', locals())
elif request.method == 'GET' and AUTH_CODE_TYPE == 'FEISHU':
return render(request, 'index.v1.html', locals())
else:
logger.error('[异常] 请求方法:%s,请求路径%s' % (request.method, request.path))
#
@@ -123,7 +114,6 @@ def reset_password(request):
:return:
"""
home_url = '%s://%s' % (request.scheme, HOME_URL)
# 从cookie中提取union_id并解密然后对当前union_id的用户进行重置密码
if request.method == 'GET':
code = request.GET.get('code')
if code:
@@ -138,13 +128,13 @@ def reset_password(request):
return render(request, msg_template, context)
try:
# 用code换取用户基本信息
_status, user_id, user_info = code_2_user_detail(_ops, home_url, code)
if not _status:
return render(request, msg_template, user_id)
# 账号是否是激活的
# 账号在企业微信或钉钉中是否是激活的
_, res = get_user_is_active(user_info)
if not _:
# 否则账号不存在或未激活
context = {
'msg': '当前扫码的用户未激活或可能己离职,用户信息如下:%s' % user_info,
'button_click': "window.location.href='%s'" % home_url,
@@ -160,7 +150,7 @@ def reset_password(request):
logger.error('[异常] %s' % str(callback_e))
return render(request, msg_template, context)
# 通过user_info拿到用户信息并格式化为username
# 通过user_info拿到用户邮箱并格式化为username
_, email = get_email_from_userinfo(user_info)
if not _:
context = {
@@ -181,8 +171,10 @@ def reset_password(request):
# 如果邮箱能提取到,则格式化之后,提取出账号提交到前端绑定
if username:
request.session[username] = code
context = {
'username': username,
'code': code,
}
return render(request, 'resetPassword.v1.html', context)
else:
@@ -195,43 +187,25 @@ def reset_password(request):
# 重置密码页面,输入新密码后点击提交
elif request.method == 'POST':
try:
username = request.POST.get('username')
code = request.POST.get('code')
if request.session.get(username) and request.session.get(username) == code:
_new_password = request.POST.get('new_password').strip()
_status, user_info = crypto_id_2_user_info(_ops, request, msg_template, home_url, scan_params.AUTH_APP)
if not _status:
return render(request, msg_template, user_info)
# 通过user_info拿到用户信息并格式化为username
_, email = get_email_from_userinfo(user_info)
if not _:
try:
return ops_account(ad_ops=AdOps(), request=request, msg_template=msg_template, home_url=home_url, username=username, new_password=_new_password)
except Exception as reset_e:
context = {
'msg': email,
'msg': "错误[%s],请与管理员联系." % str(reset_e),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
logger.error('[异常] %s' % str(reset_e))
return render(request, msg_template, context)
# 格式化用户名
_, username = format2username(email)
if _ is False:
context = {
'msg': username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
return ops_account(ad_ops=AdOps(), request=request, msg_template=msg_template, home_url=home_url, username=username, new_password=_new_password)
except Exception as reset_e:
context = {
'msg': "错误[%s],请与管理员联系." % str(reset_e),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
logger.error('[异常] %s' % str(reset_e))
return render(request, msg_template, context)
finally:
del request.session[username]
else:
context = {
'msg': "请从主页开始进行操作。",
'msg': "认证已经失效,请从主页重新进行操作。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
@@ -245,62 +219,26 @@ def unlock_account(request):
:return:
"""
home_url = '%s://%s' % (request.scheme, HOME_URL)
if request.method == 'GET':
_status, user_info = crypto_id_2_user_info(_ops, request, msg_template, home_url, scan_params.AUTH_APP)
if not _status:
return render(request, msg_template, user_info)
# 通过user_info拿到用户信息并格式化为username
_, email = get_email_from_userinfo(user_info)
if not _:
context = {
'msg': email,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
_, username = format2username(email)
if _ is False:
context = {
'msg': username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
context = {
'username': username,
}
return render(request, 'resetPassword.v1.html', context)
elif request.method == 'POST':
_status, user_info = crypto_id_2_user_info(_ops, request, msg_template, home_url, scan_params.AUTH_APP)
if not _status:
return render(request, msg_template, user_info)
# 通过user_info拿到用户信息并格式化为username
_, email = get_email_from_userinfo(user_info)
if not _:
context = {
'msg': email,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
# 格式化用户名
_, username = format2username(email)
if _ is False:
context = {
'msg': username,
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
return render(request, msg_template, context)
return ops_account(AdOps(), request, msg_template, home_url, username, None)
if request.method == 'POST':
username = request.POST.get('username')
code = request.POST.get('code')
if request.session.get(username) and request.session.get(username) == code:
try:
return ops_account(AdOps(), request, msg_template, home_url, username, None)
except Exception as reset_e:
context = {
'msg': "错误[%s],请与管理员联系." % str(reset_e),
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}
logger.error('[异常] %s' % str(reset_e))
return render(request, msg_template, context)
finally:
del request.session[username]
else:
context = {
'msg': "请从主页开始进行操作。",
'msg': "认证已经失效,请从主页重新进行操作。",
'button_click': "window.location.href='%s'" % home_url,
'button_display': "返回主页"
}