feat(api): support OAuth2.0 and OIDC authentication, it has been tested with casdoor

feat(api): support OAuth2.0 and OIDC authentication, it has been tested with casdoor
This commit is contained in:
pycook
2023-12-12 20:29:57 +08:00
committed by GitHub
parent a06599ce33
commit 21c9d9accd
10 changed files with 340 additions and 10 deletions

View File

@@ -0,0 +1 @@
# -*- coding:utf-8 -*-

View File

@@ -0,0 +1,78 @@
# -*- coding:utf-8 -*-
"""
flask_cas.__init__
"""
import flask
from flask import current_app
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from . import routing
class CAS(object):
"""
Required Configs:
|Key |
|----------------|
|CAS_SERVER |
|CAS_AFTER_LOGIN |
Optional Configs:
|Key | Default |
|-------------------------|----------------|
|CAS_TOKEN_SESSION_KEY | _CAS_TOKEN |
|CAS_USERNAME_SESSION_KEY | CAS_USERNAME |
|CAS_LOGIN_ROUTE | '/cas' |
|CAS_LOGOUT_ROUTE | '/cas/logout' |
|CAS_VALIDATE_ROUTE | '/cas/validate'|
"""
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
def init_app(self, app, url_prefix=None):
# Configuration defaults
app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
app.config.setdefault('CAS_LOGIN_ROUTE', '/login')
app.config.setdefault('CAS_LOGOUT_ROUTE', '/logout')
app.config.setdefault('CAS_VALIDATE_ROUTE', '/serviceValidate')
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
# Use the newstyle teardown_appcontext if it's available,
# otherwise fall back to the request context
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def teardown(self, exception):
ctx = stack.top
@property
def app(self):
return self._app or current_app
@property
def username(self):
return flask.session.get(
self.app.config['CAS_USERNAME_SESSION_KEY'], None)
@property
def token(self):
return flask.session.get(
self.app.config['CAS_TOKEN_SESSION_KEY'], None)

View File

@@ -0,0 +1,122 @@
# -*- coding:utf-8 -*-
"""
flask_cas.cas_urls
Functions for creating urls to access CAS.
"""
from six.moves.urllib.parse import quote
from six.moves.urllib.parse import urlencode
from six.moves.urllib.parse import urljoin
def create_url(base, path=None, *query):
""" Create a url.
Creates a url by combining base, path, and the query's list of
key/value pairs. Escaping is handled automatically. Any
key/value pair with a value that is None is ignored.
Keyword arguments:
base -- The left most part of the url (ex. http://localhost:5000).
path -- The path after the base (ex. /foo/bar).
query -- A list of key value pairs (ex. [('key', 'value')]).
Example usage:
>>> create_url(
... 'http://localhost:5000',
... 'foo/bar',
... ('key1', 'value'),
... ('key2', None), # Will not include None
... ('url', 'http://example.com'),
... )
'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
"""
url = base
# Add the path to the url if it's not None.
if path is not None:
url = urljoin(url, quote(path))
# Remove key/value pairs with None values.
query = filter(lambda pair: pair[1] is not None, query)
# Add the query string to the url
url = urljoin(url, '?{0}'.format(urlencode(list(query))))
return url
def create_cas_login_url(cas_url, cas_route, service,
renew=None, gateway=None):
""" Create a CAS login URL .
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas)
service -- (ex. http://localhost:5000/login)
renew -- "true" or "false"
gateway -- "true" or "false"
Example usage:
>>> create_cas_login_url(
... 'http://sso.pdx.edu',
... '/cas',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
('service', service),
('renew', renew),
('gateway', gateway),
)
def create_cas_logout_url(cas_url, cas_route, url=None):
""" Create a CAS logout URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/logout)
url -- (ex. http://localhost:5000/login)
Example usage:
>>> create_cas_logout_url(
... 'http://sso.pdx.edu',
... '/cas/logout',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas/logout?url=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
('service', url),
)
def create_cas_validate_url(cas_url, cas_route, service, ticket,
renew=None):
""" Create a CAS validate URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/validate)
service -- (ex. http://localhost:5000/login)
ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
renew -- "true" or "false"
Example usage:
>>> create_cas_validate_url(
... 'http://sso.pdx.edu',
... '/cas/validate',
... 'http://localhost:5000/login',
... 'ST-58274-x839euFek492ou832Eena7ee-cas'
... )
"""
return create_url(
cas_url,
cas_route,
('service', service),
('ticket', ticket),
('renew', renew),
)

View File

@@ -0,0 +1,183 @@
# -*- coding:utf-8 -*-
import uuid
import bs4
from flask import Blueprint
from flask import current_app
from flask import redirect
from flask import request
from flask import session
from flask import url_for
from flask_login import login_user
from flask_login import logout_user
from six.moves.urllib_request import urlopen
from api.lib.perm.acl.cache import UserCache
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
blueprint = Blueprint('cas', __name__)
@blueprint.route('/api/cas/login')
# @blueprint.route('/api/sso/login')
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY`.
"""
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
if request.values.get("next"):
session["next"] = request.values.get("next")
_service = url_for('cas.login', _external=True)
redirect_url = create_cas_login_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGIN_ROUTE'],
_service)
if 'ticket' in request.args:
session[cas_token_session_key] = request.args.get('ticket')
if request.args.get('ticket'):
if validate(request.args['ticket']):
redirect_url = session.get("next") or current_app.config.get("CAS_AFTER_LOGIN")
username = session.get("CAS_USERNAME")
user = UserCache.get(username)
login_user(user)
session.permanent = True
else:
del session[cas_token_session_key]
redirect_url = create_cas_login_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGIN_ROUTE'],
url_for('cas.login', _external=True),
renew=True)
current_app.logger.info("redirect to: {0}".format(redirect_url))
return redirect(redirect_url)
@blueprint.route('/api/cas/logout')
# @blueprint.route('/api/sso/logout')
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
cas_username_session_key in session and session.pop(cas_username_session_key)
"acl" in session and session.pop("acl")
"uid" in session and session.pop("uid")
cas_token_session_key in session and session.pop(cas_token_session_key)
"next" in session and session.pop("next")
redirect_url = create_cas_logout_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGOUT_ROUTE'],
url_for('cas.login', _external=True, next=request.referrer))
logout_user()
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY`.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config['CAS_VALIDATE_SERVER'],
current_app.config['CAS_VALIDATE_ROUTE'],
url_for('cas.login', _external=True),
ticket)
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
try:
response = urlopen(cas_validate_url).read()
ticket_id = _parse_tag(response, "cas:user")
strs = [s.strip() for s in ticket_id.split('|') if s.strip()]
username, is_valid = None, False
if len(strs) == 1:
username = strs[0]
is_valid = True
except ValueError:
current_app.logger.error("CAS returned unexpected result")
is_valid = False
return is_valid
if is_valid:
current_app.logger.debug("{}: {}".format(cas_username_session_key, username))
session[cas_username_session_key] = username
user = UserCache.get(username)
if user is None:
current_app.logger.info("create user: {}".format(username))
from api.lib.perm.acl.user import UserCRUD
soup = bs4.BeautifulSoup(response)
cas_user_map = current_app.config.get('CAS_USER_MAP')
user_dict = dict()
for k in cas_user_map:
v = soup.find(cas_user_map[k]['tag'], cas_user_map[k].get('attrs', {}))
user_dict[k] = v and v.text or None
user_dict['password'] = uuid.uuid4().hex
UserCRUD.add(**user_dict)
from api.lib.perm.acl.acl import ACLManager
user_info = ACLManager.get_user_info(username)
session["acl"] = dict(uid=user_info.get("uid"),
avatar=user.avatar if user else user_info.get("avatar"),
userId=user_info.get("uid"),
rid=user_info.get("rid"),
userName=user_info.get("username"),
nickName=user_info.get("nickname"),
parentRoles=user_info.get("parents"),
childRoles=user_info.get("children"),
roleName=user_info.get("role"))
session["uid"] = user_info.get("uid")
current_app.logger.debug(session)
current_app.logger.debug(request.url)
else:
current_app.logger.debug("invalid")
return is_valid
def _parse_tag(string, tag):
"""
Used for parsing xml. Search string for the first occurence of
<tag>.....</tag> and return text (stripped of leading and tailing
whitespace) between tags. Return "" if tag not found.
"""
soup = bs4.BeautifulSoup(string)
if soup.find(tag) is None:
return ''
return soup.find(tag).string.strip()

View File

@@ -0,0 +1,26 @@
# -*- coding:utf-8 -*-
from flask import current_app
from . import routing
class OAuth2(object):
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
@staticmethod
def init_app(app, url_prefix=None):
# Configuration defaults
app.config.setdefault('OAUTH2_GRANT_TYPE', 'authorization_code')
app.config.setdefault('OAUTH2_RESPONSE_TYPE', 'code')
app.config.setdefault('OAUTH2_AFTER_LOGIN', '/')
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
@property
def app(self):
return self._app or current_app

View File

@@ -0,0 +1,118 @@
# -*- coding:utf-8 -*-
import secrets
import uuid
import requests
from flask import Blueprint
from flask import abort
from flask import current_app
from flask import redirect
from flask import request
from flask import session
from flask import url_for
from flask_login import login_user, logout_user
from six.moves.urllib.parse import urlencode
from api.lib.perm.acl.cache import UserCache
blueprint = Blueprint('oauth2', __name__)
@blueprint.route('/api/oauth2/login')
@blueprint.route('/api/sso/login')
def login():
if request.values.get("next"):
session["next"] = request.values.get("next")
session['oauth2_state'] = secrets.token_urlsafe(16)
qs = urlencode({
'client_id': current_app.config['OAUTH2_CLIENT_ID'],
'redirect_uri': url_for('oauth2.callback', _external=True),
'response_type': current_app.config['OAUTH2_RESPONSE_TYPE'],
'scope': ' '.join(current_app.config['OAUTH2_SCOPES'] or []),
'state': session['oauth2_state'],
})
return redirect("{}?{}".format(current_app.config['OAUTH2_AUTHORIZE_URL'].split('?')[0], qs))
@blueprint.route('/api/oauth2/callback')
def callback():
redirect_url = session.get("next") or current_app.config.get("OAUTH2_AFTER_LOGIN")
if request.values['state'] != session.get('oauth2_state'):
return abort(401, "state is invalid")
if 'code' not in request.values:
return abort(401, 'code is invalid')
response = requests.post(current_app.config['OAUTH2_TOKEN_URL'], data={
'client_id': current_app.config['OAUTH2_CLIENT_ID'],
'client_secret': current_app.config['OAUTH2_CLIENT_SECRET'],
'code': request.values['code'],
'grant_type': current_app.config['OAUTH2_GRANT_TYPE'],
'redirect_uri': url_for('oauth2.callback', _external=True),
}, headers={'Accept': 'application/json'})
if response.status_code != 200:
current_app.logger.error(response.text)
return abort(401)
oauth2_token = response.json().get('access_token')
if not oauth2_token:
return abort(401)
response = requests.get(current_app.config['OAUTH2_USER_INFO']['url'], headers={
'Authorization': 'Bearer {}'.format(oauth2_token),
'Accept': 'application/json',
})
if response.status_code != 200:
return abort(401)
email = current_app.config['OAUTH2_USER_INFO']['email'](response.json())
username = current_app.config['OAUTH2_USER_INFO']['username'](response.json())
user = UserCache.get(username)
if user is None:
current_app.logger.info("create user: {}".format(username))
from api.lib.perm.acl.user import UserCRUD
user_dict = dict(username=username, email=email)
user_dict['password'] = uuid.uuid4().hex
user = UserCRUD.add(**user_dict)
# log the user in
login_user(user)
from api.lib.perm.acl.acl import ACLManager
user_info = ACLManager.get_user_info(username)
session["acl"] = dict(uid=user_info.get("uid"),
avatar=user.avatar if user else user_info.get("avatar"),
userId=user_info.get("uid"),
rid=user_info.get("rid"),
userName=user_info.get("username"),
nickName=user_info.get("nickname") or user_info.get("username"),
parentRoles=user_info.get("parents"),
childRoles=user_info.get("children"),
roleName=user_info.get("role"))
session["uid"] = user_info.get("uid")
return redirect(redirect_url)
@blueprint.route('/api/oauth2/logout')
@blueprint.route('/api/sso/logout')
def logout():
"acl" in session and session.pop("acl")
"uid" in session and session.pop("uid")
'oauth2_state' in session and session.pop('oauth2_state')
"next" in session and session.pop("next")
redirect_url = url_for('oauth2.login', _external=True, next=request.referrer)
logout_user()
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return redirect(redirect_url)

View File

@@ -0,0 +1,25 @@
# -*- coding:utf-8 -*-
from flask import current_app
from . import routing
class OIDC(object):
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
@staticmethod
def init_app(app, url_prefix=None):
# Configuration defaults
app.config.setdefault('OIDC_GRANT_TYPE', 'authorization_code')
app.config.setdefault('OIDC_RESPONSE_TYPE', 'code')
app.config.setdefault('OIDC_AFTER_LOGIN', '/')
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
@property
def app(self):
return self._app or current_app

View File

@@ -0,0 +1,118 @@
# -*- coding:utf-8 -*-
import secrets
import uuid
import requests
from flask import Blueprint
from flask import abort
from flask import current_app
from flask import redirect
from flask import request
from flask import session
from flask import url_for
from flask_login import login_user, logout_user
from six.moves.urllib.parse import urlencode
from api.lib.perm.acl.cache import UserCache
blueprint = Blueprint('oidc', __name__)
@blueprint.route('/api/oidc/login')
@blueprint.route('/api/sso/login')
def login():
if request.values.get("next"):
session["next"] = request.values.get("next")
session['oidc_state'] = secrets.token_urlsafe(16)
qs = urlencode({
'client_id': current_app.config['OIDC_CLIENT_ID'],
'redirect_uri': url_for('oidc.callback', _external=True),
'response_type': current_app.config['OIDC_RESPONSE_TYPE'],
'scope': ' '.join(current_app.config['OIDC_SCOPES'] or []),
'state': session['oidc_state'],
})
return redirect("{}?{}".format(current_app.config['OIDC_AUTHORIZE_URL'].split('?')[0], qs))
@blueprint.route('/api/oidc/callback')
def callback():
redirect_url = session.get("next") or current_app.config.get("OIDC_AFTER_LOGIN")
if request.values['state'] != session.get('oidc_state'):
return abort(401, "state is invalid")
if 'code' not in request.values:
return abort(401, 'code is invalid')
response = requests.post(current_app.config['OIDC_TOKEN_URL'], data={
'client_id': current_app.config['OIDC_CLIENT_ID'],
'client_secret': current_app.config['OIDC_CLIENT_SECRET'],
'code': request.values['code'],
'grant_type': current_app.config['OIDC_GRANT_TYPE'],
'redirect_uri': url_for('oidc.callback', _external=True),
}, headers={'Accept': 'application/json'})
if response.status_code != 200:
current_app.logger.error(response.text)
return abort(401)
oidc_token = response.json().get('access_token')
if not oidc_token:
return abort(401)
response = requests.get(current_app.config['OIDC_USER_INFO']['url'], headers={
'Authorization': 'Bearer {}'.format(oidc_token),
'Accept': 'application/json',
})
if response.status_code != 200:
return abort(401)
email = current_app.config['OIDC_USER_INFO']['email'](response.json())
username = current_app.config['OIDC_USER_INFO']['username'](response.json())
user = UserCache.get(username)
if user is None:
current_app.logger.info("create user: {}".format(username))
from api.lib.perm.acl.user import UserCRUD
user_dict = dict(username=username, email=email)
user_dict['password'] = uuid.uuid4().hex
user = UserCRUD.add(**user_dict)
# log the user in
login_user(user)
from api.lib.perm.acl.acl import ACLManager
user_info = ACLManager.get_user_info(username)
session["acl"] = dict(uid=user_info.get("uid"),
avatar=user.avatar if user else user_info.get("avatar"),
userId=user_info.get("uid"),
rid=user_info.get("rid"),
userName=user_info.get("username"),
nickName=user_info.get("nickname") or user_info.get("username"),
parentRoles=user_info.get("parents"),
childRoles=user_info.get("children"),
roleName=user_info.get("role"))
session["uid"] = user_info.get("uid")
return redirect(redirect_url)
@blueprint.route('/api/oidc/logout')
@blueprint.route('/api/sso/logout')
def logout():
"acl" in session and session.pop("acl")
"uid" in session and session.pop("uid")
'oidc_state' in session and session.pop('oidc_state')
"next" in session and session.pop("next")
redirect_url = url_for('oidc.login', _external=True, next=request.referrer)
logout_user()
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return redirect(redirect_url)