mirror of
https://github.com/capricornxl/ad-password-self-service.git
synced 2025-08-12 04:01:50 +08:00
修改钉钉/企业微信直接使用内部应用免密登录的方式来验证,不再支持扫码。
由于一些API的权限发生变化,导致一些关键信息无法获取,所以做以上改变。
This commit is contained in:
@@ -1,16 +1,10 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import base64
|
||||
import hmac
|
||||
import time
|
||||
from hashlib import sha256
|
||||
from urllib.parse import quote
|
||||
|
||||
import requests
|
||||
from dingtalk.client import AppKeyClient
|
||||
from pwdselfservice import cache_storage
|
||||
|
||||
|
||||
import os
|
||||
APP_ENV = os.getenv('APP_ENV')
|
||||
|
||||
@@ -31,44 +25,17 @@ class DingDingOps(AppKeyClient):
|
||||
self.mo_app_secret = mo_app_secret
|
||||
self.storage = storage
|
||||
|
||||
def get_union_id_by_code(self, code):
|
||||
"""
|
||||
通过移动应用接入扫码返回的临时授权码,使用临时授权码换取用户的 unionid
|
||||
:param code: 临时授权码
|
||||
:return: unionid
|
||||
"""
|
||||
time_stamp = int(round(time.time() * 1000))
|
||||
# 通过appSecret计算出来的签名值,该参数值在HTTP请求参数中需要urlEncode(因为签名中可能包含特殊字符+)。
|
||||
signature = quote(base64.b64encode(hmac.new(
|
||||
self.mo_app_secret.encode('utf-8'),
|
||||
str(time_stamp).encode('utf-8'),
|
||||
digestmod=sha256).digest()).decode("utf-8"))
|
||||
# accessKey 是 登录开发者后台,选择应用开发 > 移动接入应用 > 登录所看到应用的appId。
|
||||
url = '{}/sns/getuserinfo_bycode?accessKey={}&signature={}×tamp={}'.format(DING_URL, self.mo_app_id, signature, time_stamp)
|
||||
resp = requests.post(
|
||||
url=url,
|
||||
json=dict(tmp_auth_code=code),
|
||||
)
|
||||
try:
|
||||
resp = resp.json()
|
||||
if resp['errcode'] != 0:
|
||||
return False, 'get_union_id_by_code: %s' % str(resp)
|
||||
else:
|
||||
return True, resp["user_info"]["unionid"]
|
||||
except Exception:
|
||||
return False, 'get_union_id_by_code: %s' % str(resp)
|
||||
|
||||
def get_user_id_by_code(self, code):
|
||||
"""
|
||||
通过code获取用户的 userid
|
||||
:param id: 用户在当前钉钉开放平台账号范围内的唯一标识
|
||||
:return:
|
||||
"""
|
||||
_status, union_id = self.get_union_id_by_code(code)
|
||||
if _status:
|
||||
return True, self.user.get_userid_by_unionid(union_id).get('userid')
|
||||
user_id_data = self.user.getuserinfo(code)
|
||||
if user_id_data.get('errcode') == 0:
|
||||
user_id = user_id_data.get('userid')
|
||||
return True, user_id
|
||||
else:
|
||||
return False, 'get_user_id_by_code: %s' % str(union_id)
|
||||
return False, '通过临时Code换取用户ID失败: %s' % str(user_id_data)
|
||||
|
||||
def get_user_detail_by_user_id(self, user_id):
|
||||
"""
|
||||
|
Reference in New Issue
Block a user