cmdb/api/flask_cas/routing.py

163 lines
5.7 KiB
Python

# -*- coding:utf-8 -*-
import json
import bs4
from flask import Blueprint
from flask import current_app, session, request, url_for, redirect
from flask_login import login_user, logout_user
from six.moves.urllib_request import urlopen
from api.models.account import UserCache
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
blueprint = Blueprint('cas', __name__)
@blueprint.route('/api/sso/login')
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY`.
"""
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
if request.values.get("next"):
session["next"] = request.values.get("next")
_service = url_for('cas.login', _external=True, next=session["next"]) \
if session.get("next") else url_for('cas.login', _external=True)
redirect_url = create_cas_login_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGIN_ROUTE'],
_service)
if 'ticket' in request.args:
session[cas_token_session_key] = request.args.get('ticket')
if request.args.get('ticket'):
if validate(request.args['ticket']):
redirect_url = session.get("next") or \
current_app.config.get("CAS_AFTER_LOGIN")
username = session.get("CAS_USERNAME")
user = UserCache.get(username)
login_user(user)
session.permanent = True
else:
del session[cas_token_session_key]
redirect_url = create_cas_login_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGIN_ROUTE'],
url_for('cas.login', _external=True),
renew=True)
current_app.logger.info("redirect to: {0}".format(redirect_url))
return redirect(redirect_url)
@blueprint.route('/api/sso/logout')
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
cas_username_session_key in session and session.pop(cas_username_session_key)
"acl" in session and session.pop("acl")
"uid" in session and session.pop("uid")
cas_token_session_key in session and session.pop(cas_token_session_key)
"next" in session and session.pop("next")
redirect_url = create_cas_logout_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGOUT_ROUTE'],
url_for('cas.login', _external=True, next=request.referrer))
logout_user()
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY`.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config['CAS_VALIDATE_SERVER'],
current_app.config['CAS_VALIDATE_ROUTE'],
url_for('cas.login', _external=True),
ticket)
current_app.logger.debug("Making GET request to {0}".format(cas_validate_url))
try:
response = urlopen(cas_validate_url).read()
ticketid = _parse_tag(response, "cas:user")
strs = [s.strip() for s in ticketid.split('|') if s.strip()]
username, is_valid = None, False
if len(strs) == 1:
username = strs[0]
is_valid = True
user_info = json.loads(_parse_tag(response, "cas:other"))
current_app.logger.info(user_info)
except ValueError:
current_app.logger.error("CAS returned unexpected result")
is_valid = False
return is_valid
if is_valid:
current_app.logger.debug("valid")
session[cas_username_session_key] = username
user = UserCache.get(username)
session["acl"] = dict(uid=user_info.get("uuid"),
avatar=user.avatar if user else user_info.get("avatar"),
userId=user_info.get("id"),
userName=user_info.get("name"),
nickName=user_info.get("nickname"),
parentRoles=user_info.get("parents"),
childRoles=user_info.get("children"),
roleName=user_info.get("role"))
session["uid"] = user_info.get("uuid")
current_app.logger.debug(session)
current_app.logger.debug(request.url)
else:
current_app.logger.debug("invalid")
return is_valid
def _parse_tag(string, tag):
"""
Used for parsing xml. Search string for the first occurence of
<tag>.....</tag> and return text (stripped of leading and tailing
whitespace) between tags. Return "" if tag not found.
"""
soup = bs4.BeautifulSoup(string)
if soup.find(tag) is None:
return ''
return soup.find(tag).string.strip()