Source code for kadi.modules.accounts.schemas

# Copyright 2020 Karlsruhe Institute of Technology
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import has_request_context
from flask_login import current_user
from marshmallow import fields
from marshmallow import post_dump
from marshmallow import validates
from marshmallow.validate import ValidationError

from .models import User
from kadi.lib.permissions.models import Role
from kadi.lib.schemas import KadiSchema
from kadi.lib.web import url_for

[docs]class IdentitySchema(KadiSchema): """Schema to represent identities. See :class:`.Identity`. """ type = fields.String(dump_only=True, data_key="identity_type") displayname = fields.String(dump_only=True) username = fields.String(dump_only=True) created_at = fields.DateTime(dump_only=True) last_modified = fields.DateTime(dump_only=True) identity_name = fields.Method("_get_identity_name") email = fields.Method("_get_email") email_confirmed = fields.Method("_get_email_confirmed") @post_dump def _post_dump(self, data, **kwargs): if "email" in data and data["email"] is None: del data["email"] if "email_confirmed" in data and data["email_confirmed"] is None: del data["email_confirmed"] return data def _has_visible_email(self, obj): # If within a request context, only mark the email address as visible if the # current user is a sysadmin, matches the user to be serialized or set their # email to be publicly visible. return not has_request_context() or ( current_user.is_sysadmin or obj.user == current_user or not obj.user.email_is_private ) def _get_identity_name(self, obj): return str(obj.Meta.identity_type["name"]) def _get_email(self, obj): if self._has_visible_email(obj): return return None def _get_email_confirmed(self, obj): if self._has_visible_email(obj): return obj.email_confirmed return None
[docs]class UserSchema(KadiSchema): """Schema to represent users. See :class:`.User`. """ id = fields.Integer(required=True) is_sysadmin = fields.Bool(dump_only=True) state = fields.String(dump_only=True) created_at = fields.DateTime(dump_only=True) last_modified = fields.DateTime(dump_only=True) identity = fields.Nested(IdentitySchema, dump_only=True) system_role = fields.Method("_get_system_role") _links = fields.Method("_generate_links") _actions = fields.Method("_generate_actions") @validates("id") def _validate_id(self, value): if User.query.get_active(value) is None: raise ValidationError("No user with this ID exists.") @post_dump def _post_dump(self, data, **kwargs): if "_actions" in data and data["_actions"] is None: del data["_actions"] return data def _get_system_role(self, obj): role = obj.roles.filter(Role.object.is_(None), Role.object_id.is_(None)).first() return if role is not None else None def _generate_links(self, obj): links = { "self": url_for("api.get_user",, "identities": url_for("api.get_user_identities",, "records": url_for("api.get_user_records",, "collections": url_for("api.get_user_collections",, "templates": url_for("api.get_user_templates",, "groups": url_for("api.get_user_groups",, } if self._internal: if obj.image_name: links["image"] = url_for("api.preview_user_image", links["view"] = url_for("accounts.view_user", return links def _generate_actions(self, obj): if self._internal: return { "change_role": url_for("api.change_system_role",, "toggle_state": url_for("api.toggle_user_state",, "toggle_sysadmin": url_for("api.toggle_user_sysadmin",, "delete": url_for("api.delete_user",, } return None