Source code for kadi.modules.accounts.utils

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from uuid import uuid4

from flask import current_app
from flask_login import current_user
from flask_login import login_user as _login_user
from flask_login import logout_user as _logout_user

import kadi.lib.constants as const
from kadi.ext.db import db
from kadi.lib.db import escape_like
from kadi.lib.security import decode_jwt
from kadi.lib.storage.misc import delete_thumbnail
from kadi.lib.storage.misc import save_as_thumbnail
from kadi.lib.utils import compact_json
from kadi.lib.utils import get_class_by_name
from kadi.lib.web import url_for

from .models import User
from .models import UserState
from .providers import ShibProvider
from .schemas import UserSchema


[docs]def login_user(identity): """Log in a user by their identity. Wraps Flask-Login's ``login_user`` function but also ensures that the user's latest identity is updated. Note that this function requires an active request context. :param identity: The identity to log in with. :return: ``True`` if the login was successful, ``False`` otherwise. """ user = identity.user user.identity = identity return _login_user(user, force=True)
[docs]def logout_user(): """Log out the current user. Wraps Flask-Login's ``logout_user`` function. Note that this function requires an active request context. :return: The URL to redirect the user to after logging out, depending on their latest identity. """ redirect_url = url_for("main.index") if ( current_user.is_authenticated and current_user.identity is not None and current_user.identity.type == const.AUTH_PROVIDER_TYPE_SHIB and ShibProvider.is_registered() ): redirect_url = ShibProvider.get_logout_initiator(redirect_url) _logout_user() return redirect_url
[docs]def save_user_image(user, stream): """Save image data as a user's profile image. Uses :func:`kadi.lib.storage.misc.save_as_thumbnail` to store the actual image data. Note that any previous profile image will be deleted beforehand using :func:`delete_user_image`, which will also be called if the image could not be saved. :param user: The user to set the profile image for. :param stream: The image data as a readable binary stream. """ delete_user_image(user) user.image_name = uuid4() if not save_as_thumbnail(str(user.image_name), stream): delete_user_image(user)
[docs]def delete_user_image(user): """Delete a user's profile image. This is the inverse operation of :func:`save_user_image`. :param user: The user whose profile image should be deleted. """ if user.image_name: delete_thumbnail(str(user.image_name)) user.image_name = None
[docs]def json_user(user): """Convert a user into a JSON representation for use in HTML templates. :param user: The user to convert. :return: The converted user. """ json_data = UserSchema(_internal=True).dump(user) return compact_json(json_data, ensure_ascii=True, sort_keys=False)
def _decode_token(token, token_type): payload = decode_jwt(token) if payload is None or payload.get("type") != token_type: return None return payload
[docs]def decode_email_confirmation_token(token): """Decode the given JSON web token used for email confirmation. :param token: The token to decode. :return: The token's decoded payload or ``None`` if the token is invalid or expired. """ return _decode_token(token, const.JWT_TYPE_EMAIL_CONFIRMATION)
[docs]def decode_password_reset_token(token): """Decode the given JSON web token used for password resets. :param token: The token to decode. :return: The token's decoded payload or ``None`` if the token is invalid or expired. """ return _decode_token(token, const.JWT_TYPE_PASSWORD_RESET)
[docs]def get_filtered_user_ids(filter_term): """Get all IDs of users filtered by the given term. Convenience function to filter users based on their displayname and username of their latest identity. :param filter_term: A (case insensitive) term to filter the users by their display name or username. :return: The filtered user IDs as query. """ filter_term = escape_like(filter_term) user_queries = [] # Always consider all authentication provider types, not just the currently active # ones. for provider_type in const.AUTH_PROVIDER_TYPES.values(): identity_class = get_class_by_name(provider_type["identity"]) user_query = ( User.query.join( identity_class, identity_class.id == User.latest_identity_id ) .filter( db.or_( User.displayname.ilike(f"%{filter_term}%"), identity_class.username.ilike(f"%{filter_term}%"), ) ) .with_entities(User.id) ) user_queries.append(user_query) return user_queries[0].union(*user_queries[1:])
[docs]def clean_users(inside_task=False): """Clean all deleted users. Note that this function may issue one or more database commits. :param inside_task: (optional) A flag indicating whether the function is executed in a task. In that case, additional information will be logged. """ from .core import purge_user users = User.query.filter(User.state == UserState.DELETED) if inside_task and users.count() > 0: current_app.logger.info(f"Cleaning {users.count()} deleted user(s).") for user in users: purge_user(user)