# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import current_app
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import StaleDataError
from kadi.ext.db import db
from kadi.lib.cache import memoize_request
from kadi.lib.db import BaseTimestampMixin
from kadi.lib.db import NestedTransaction
from kadi.lib.db import escape_like
from kadi.lib.db import get_class_by_tablename
from kadi.lib.utils import rgetattr
from kadi.modules.accounts.models import User
from kadi.modules.groups.models import Group
from .models import Permission
from .models import Role
from .models import RoleRule
from .models import RoleRuleType
def _get_permissions(subject, action, object_name, check_groups=True):
from kadi.modules.groups.utils import get_user_groups
group_permissions_query = None
if isinstance(subject, User) and check_groups:
group_ids_query = get_user_groups(subject).with_entities(Group.id)
# Role permissions of the user's groups.
group_permissions_query = (
Permission.query.join(Permission.roles)
.join(Role.groups)
.filter(
Permission.action == action,
Permission.object == object_name,
Group.id.in_(group_ids_query),
)
)
# Role permissions of the subject.
permissions_query = (
Permission.query.join(Permission.roles)
.join(Role.users if isinstance(subject, User) else Role.groups)
.filter(
Permission.action == action,
Permission.object == object_name,
subject.__class__.id == subject.id,
)
)
if group_permissions_query:
permissions_query = permissions_query.union(group_permissions_query)
return permissions_query
[docs]@memoize_request
def has_permission(
subject, action, object_name, object_id, check_groups=True, check_defaults=True
):
"""Check if a user or group has permission to perform a specific action.
Checks all permissions grouped by the roles of the given subject.
:param subject: The :class:`.User` or :class:`.Group`.
:param action: The action to check for.
:param object_name: The type of object.
:param object_id: The ID of a specific object or ``None`` for a global permission.
:param check_groups: (optional) Flag indicating whether the groups of a user should
be checked as well for their permissions.
:param check_defaults: (optional) Flag indicating whether the default permissions of
any object should be checked as well.
:return: ``True`` if permission is granted, ``False`` otherwise or if the object
instance to check does not exist.
"""
model = get_class_by_tablename(object_name)
if model is None:
return False
permissions_query = _get_permissions(
subject, action, object_name, check_groups=check_groups
)
# Check for any global action.
if permissions_query.filter(Permission.object_id.is_(None)).first() is not None:
return True
if object_id is None:
return False
object_instance = model.query.get(object_id)
if object_instance is None:
return False
# Check the default permissions.
if check_defaults:
default_permissions = rgetattr(object_instance, "Meta.permissions", {}).get(
"default_permissions", {}
)
if action in default_permissions:
for attr, val in default_permissions[action].items():
if getattr(object_instance, attr, None) == val:
return True
# Finally, check the regular permissions.
return (
permissions_query.filter(Permission.object_id == object_id).first() is not None
)
[docs]def get_permitted_objects(
subject, action, object_name, check_groups=True, check_defaults=True
):
"""Get all objects a user or group has a specific permission for.
Checks all permissions grouped by the roles of the given subject.
:param subject: The :class:`.User` or :class:`.Group`.
:param action: The action to check for.
:param object_name: The type of object.
:param check_groups: (optional) Flag indicating whether the groups of a user should
be checked as well for their permissions.
:param check_defaults: (optional) Flag indicating whether the default permissions of
the objects should be checked as well.
:return: The permitted objects as query or ``None`` if the object type does not
exist.
"""
model = get_class_by_tablename(object_name)
if model is None:
return None
permissions_query = _get_permissions(
subject, action, object_name, check_groups=check_groups
)
# Check for any global action.
if permissions_query.filter(Permission.object_id.is_(None)).first() is not None:
return model.query
# Get all objects for the regular permissions.
objects_query = model.query.filter(
model.id.in_(permissions_query.with_entities(Permission.object_id))
)
# Get all objects for the default permissions.
if check_defaults:
default_permissions = rgetattr(model, "Meta.permissions", {}).get(
"default_permissions", {}
)
if action in default_permissions:
filters = []
for attr, val in default_permissions[action].items():
filters.append(getattr(model, attr, None) == val)
if filters:
return objects_query.union(model.query.filter(db.or_(*filters)))
return objects_query
[docs]def add_role(subject, object_name, object_id, role_name, update_timestamp=True):
"""Add an existing role to a user or group.
:param subject: The :class:`.User` or :class:`.Group`.
:param object_name: The type of object the role refers to.
:param object_id: The ID of the object.
:param role_name: The name of the role.
:param update_timestamp: (optional) Flag indicating whether the timestamp of the
underlying object should be updated or not. The object needs to implement
:class:`.BaseTimestampMixin` in that case.
:return: ``True`` if the role was added successfully, ``False`` if the subject
already has a role related to the given object.
:raises ValueError: If no object or role with the given arguments exists or when
trying to add a role to the object that is being referred to by that role.
"""
model = get_class_by_tablename(object_name)
if model is None:
raise ValueError(f"Object type '{object_name}' does not exist.")
object_instance = model.query.get(object_id)
if object_instance is None:
raise ValueError(f"Object '{object_name}' with ID {object_id} does not exist.")
if subject.__tablename__ == object_name and subject.id == object_id:
raise ValueError("Cannot add a role to the object to which the role refers.")
roles = subject.roles.filter(
Role.object == object_name, Role.object_id == object_id
)
if roles.count() > 0:
return False
role = Role.query.filter_by(
name=role_name, object=object_name, object_id=object_id
).first()
if not role:
raise ValueError("A role with that name does not exist.")
with NestedTransaction(exc=IntegrityError) as t:
subject.roles.append(role)
if (
t.success
and update_timestamp
and isinstance(object_instance, BaseTimestampMixin)
):
object_instance.update_timestamp()
return t.success
[docs]def remove_role(subject, object_name, object_id, update_timestamp=True):
"""Remove an existing role of a user or group.
:param subject: The :class:`.User` or :class:`.Group`.
:param object_name: The type of object the role refers to.
:param object_id: The ID of the object.
:param update_timestamp: (optional) Flag indicating whether the timestamp of the
underlying object should be updated or not. The object needs to implement
:class:`.BaseTimestampMixin` in that case.
:return: ``True`` if the role was removed successfully, ``False`` if there was no
role to remove.
:raises ValueError: If no object with the given arguments exists.
"""
model = get_class_by_tablename(object_name)
if model is None:
raise ValueError(f"Object type '{object_name}' does not exist.")
object_instance = model.query.get(object_id)
if object_instance is None:
raise ValueError(f"Object '{object_name}' with ID {object_id} does not exist.")
roles = subject.roles.filter(
Role.object == object_name, Role.object_id == object_id
)
if roles.count() == 0:
return False
with NestedTransaction(exc=StaleDataError) as t:
# As in certain circumstances (e.g. merging two users or potential race
# conditions when adding roles) a subject may have multiple different roles, all
# roles related to the given object will be removed.
for role in roles:
subject.roles.remove(role)
if (
t.success
and update_timestamp
and isinstance(object_instance, BaseTimestampMixin)
):
object_instance.update_timestamp()
return t.success
[docs]def set_system_role(user, system_role):
"""Set an existing system role for a given user.
:param user: The user to set the system role for.
:param system_role: The name of the system role to set as defined in
:const:`kadi.lib.constants.SYSTEM_ROLES`.
:return: ``True`` if the system role was set successfully, ``False`` otherwise or if
the given system role does not exist.
"""
new_role = Role.query.filter_by(
name=system_role, object=None, object_id=None
).first()
if new_role is None:
return False
user_roles = user.roles.filter(Role.object.is_(None), Role.object_id.is_(None))
with NestedTransaction(exc=StaleDataError) as t:
# As in certain circumstances (e.g. merging two users) a user may have different
# system roles, all of them will be removed.
for role in user_roles:
user.roles.remove(role)
if not t.success:
return False
with NestedTransaction(exc=IntegrityError) as t:
user.roles.append(new_role)
return t.success
[docs]def setup_permissions(object_name, object_id):
"""Setup the default permissions of an object.
The default actions and roles have to be specified in a ``Meta.permissions``
attribute in each model.
**Example:**
.. code-block:: python3
class Foo:
class Meta:
permissions = {
"actions": [
("read", "Read this object."),
("update", "Edit this object."),
],
"roles": [("admin", ["read", "update"])],
}
:param object_name: The type of object the permissions refer to.
:param object_id: The ID of the object.
:raises ValueError: If no object with the given arguments exists.
"""
model = get_class_by_tablename(object_name)
if model is None:
raise ValueError(f"Object type '{object_name}' does not exist.")
object_instance = model.query.get(object_id)
if object_instance is None:
raise ValueError(f"Object '{object_name}' with ID {object_id} does not exist.")
permissions = {}
for action, _ in model.Meta.permissions["actions"]:
permission = Permission.create(
action=action, object=object_name, object_id=object_id
)
permissions[action] = permission
for name, actions in model.Meta.permissions["roles"]:
role = Role.create(name=name, object=object_name, object_id=object_id)
for action in actions:
role.permissions.append(permissions[action])
[docs]def delete_permissions(object_name, object_id):
"""Delete all permissions of an object.
:param object_name: The type of object the permissions refer to.
:param object_id: The ID of the object.
"""
roles = Role.query.filter(Role.object == object_name, Role.object_id == object_id)
for role in roles:
db.session.delete(role)
permissions = Permission.query.filter(
Permission.object == object_name, Permission.object_id == object_id
)
for permission in permissions:
db.session.delete(permission)
[docs]def create_role_rule(
object_name, object_id, role_name, rule_type, condition, update_timestamp=True
):
"""Create a new role rule.
:param object_name: The type of object the role refers to.
:param object_id: The ID of the object.
:param role_name: The name of the role.
:param rule_type: The type of the role rule.
:param condition: The condition of the role rule.
:param update_timestamp: (optional) Flag indicating whether the timestamp of the
underlying object should be updated or not. The object needs to implement
:class:`.BaseTimestampMixin` in that case.
:return: The created role rule or ``None`` if the role rule could not be created.
"""
model = get_class_by_tablename(object_name)
if model is None:
return None
object_instance = model.query.get(object_id)
if object_instance is None:
return None
# Basic structure check of the condition data.
if rule_type == RoleRuleType.USERNAME and not isinstance(condition, dict):
return None
role = Role.query.filter_by(
name=role_name, object=object_name, object_id=object_id
).first()
if not role:
return None
if update_timestamp and isinstance(object_instance, BaseTimestampMixin):
object_instance.update_timestamp()
return RoleRule.create(role=role, type=rule_type, condition=condition)
[docs]def remove_role_rule(role_rule, update_timestamp=True):
"""Remove an existing role rule.
:param role_role: The role rule to remove.
:param update_timestamp: (optional) Flag indicating whether the timestamp of the
underlying object should be updated or not. The object needs to implement
:class:`.BaseTimestampMixin` in that case.
"""
role = role_rule.role
model = get_class_by_tablename(role.object)
object_instance = model.query.get(role.object_id)
if update_timestamp and isinstance(object_instance, BaseTimestampMixin):
object_instance.update_timestamp()
db.session.delete(role_rule)
[docs]def apply_role_rule(role_rule, user=None):
"""Apply a given role rule.
:param role_rule: The role rule to apply.
:param user: (optional) A specific user to apply the role rule to. If not given, all
existing users are considered.
"""
role = role_rule.role
if role_rule.type == RoleRuleType.USERNAME:
identity_type = role_rule.condition.get("identity_type")
provider_config = current_app.config["AUTH_PROVIDERS"].get(identity_type)
if provider_config is None:
return
pattern = role_rule.condition.get("pattern", "")
# As the pattern is used in a LIKE query, escape it first and then replace all
# wildcards (*) with the ones used by the database (%).
pattern = escape_like(pattern).replace("*", "%")
identity_class = provider_config["identity_class"]
identities_query = identity_class.query.filter(
identity_class.username.like(pattern)
)
if user is not None:
# The role only needs to be added once, even if a user has multiple matching
# identities.
identity = identities_query.filter(
identity_class.user_id == user.id
).first()
if identity is not None:
add_role(identity.user, role.object, role.object_id, role.name)
else:
for identity in identities_query:
add_role(identity.user, role.object, role.object_id, role.name)