Source code for kadi.modules.records.links

# Copyright 2021 Karlsruhe Institute of Technology
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
from flask_babel import gettext as _
from flask_login import current_user

from .models import File
from .models import Record
from .models import RecordLink
from .models import RecordState
from .schemas import RecordRevisionSchema
from kadi.ext.db import db
from kadi.lib.conversion import truncate
from kadi.lib.db import update_object
from kadi.lib.exceptions import KadiPermissionError
from kadi.lib.permissions.core import get_permitted_objects
from kadi.lib.permissions.core import has_permission
from kadi.lib.revisions.core import create_revision
from kadi.lib.revisions.models import Revision
from kadi.lib.revisions.schemas import ObjectRevisionSchema
from kadi.lib.web import url_for
from kadi.plugins.utils import signal_resource_change

def _trigger_record_revisions(record_from, record_to, user):
    record_from_revision_created = create_revision(record_from, user=user)
    record_to_revision_created = create_revision(record_to, user=user)


    if record_from_revision_created:
        signal_resource_change(record_from, user=user)

    if record_to_revision_created:
        signal_resource_change(record_to, user=user)

[docs]def get_record_changes(record_link): """Get all changes of linked records since the given record link was created. The changes are based on the record and file revisions of the two linked records that were triggered after the record link was initially created. :param record_link: The record link representing the two linked records whose changes should be collected. :return: The changes of the linked records, consisting of the amount of new record and file revisions for each record. For the record revisions, the revision at the point of creation of the record link will also be included. The changes are returned as a dictionary in the following form: .. code-block:: python3 { <record_from_id>: { "record": { "count": 0, "revision": <revision>, }, "files": { "count": 0, }, }, <record_to_id>: { "record": { "count": 1, "revision": <revision>, }, "files": { "count": 2, }, }, } """ changes = {} for record in [record_link.record_from, record_link.record_to]: num_revisions = record.ordered_revisions.filter( Revision.timestamp > record_link.created_at ).count() nearest_revision = record.ordered_revisions.filter( Revision.timestamp < record_link.created_at ).first() # Due to the creation of record links also triggering new revisions, the child # of the nearest revision is actually the correct one. However, as record link # revisions were added later to the existing revisions, previous links might # appear together with other changes in a single revision, which needs to be # taken into account. if nearest_revision is not None and nearest_revision.child is not None: schema = ObjectRevisionSchema(RecordRevisionSchema, only=["diff"]) diff = schema.dump(nearest_revision.child)["diff"] # Check if the diff only contains a single change related to creating a # single record link, in which case the child revision is taken instead. if len(diff) == 1: link_revision_key = None if "links_to" in diff: link_revision_key = "links_to" elif "linked_from" in diff: link_revision_key = "linked_from" if link_revision_key is not None: link_revision = diff[link_revision_key] if len(link_revision["new"]) - len(link_revision["prev"]) == 1: nearest_revision = nearest_revision.child # Also adjust the total number of revisions accordingly. num_revisions -= 1 changes[] = { "record": { "count": num_revisions, "revision": nearest_revision, } } num_revisions = ( File.revision_class.query.join(File) .join(Revision) .order_by(Revision.timestamp.desc()) .filter( File.record_id ==, Revision.timestamp > record_link.created_at ) .count() ) changes[]["files"] = {"count": num_revisions} return changes
def _get_record_data(record, depth=1): return { "id":, "identifier": truncate(record.identifier, 25), "identifier_full": record.identifier, "type": truncate(record.type, 25), "type_full": record.type, "url": url_for( "records.view_record",, tab="links", visualize="true", depth=depth, ), } def _calculate_link_meta(data): link_indices = {} link_lengths = {} for link_data in data: source_id = link_data["source"] target_id = link_data["target"] # The index of a link is increased for each link that has the same source and # target, starting at 1. link_index = 1 key = (source_id, target_id) if key in link_indices: link_index = link_indices[key] + 1 link_indices[key] = link_index link_data["link_index"] = link_index # The link length is determined by the maximum length of the (truncated) link # names between two records, independent of link direction. link_length = len(link_data["name"]) key = ( (source_id, target_id) if source_id < target_id else (target_id, source_id) ) if key in link_lengths: link_lengths[key] = max(link_length, link_lengths[key]) else: link_lengths[key] = link_length for link_data in data: source_id = link_data["source"] target_id = link_data["target"] key = ( (source_id, target_id) if source_id < target_id else (target_id, source_id) ) if key in link_lengths: link_data["link_length"] = link_lengths[key] def _collect_link_data( record_id, link_direction, depth, records, record_links, processed_record_ids, added_record_ids, user, ): new_record_ids = set() # Limit the links per record to a maximum of 100. record_links_query = ( get_permitted_record_links(record_id, direction=link_direction, user=user) .order_by(RecordLink.last_modified.desc()) .limit(100) ) for record_link in record_links_query: # Skip all links involving records that were already checked for their links. if ( record_link.record_from_id in processed_record_ids or record_link.record_to_id in processed_record_ids ): continue source = record_link.record_from target = record_link.record_to for record in [source, target]: new_record_ids.add( if not in added_record_ids: records.append(_get_record_data(record, depth)) added_record_ids.add( record_links.append( { "id":, "source":, "target":, "name": truncate(, 25), "name_full":, # We simply take the outgoing record as base for the URL. "url": url_for( "records.view_record_link", record_id=record_link.record_from_id,, ), } ) # Add the link indices and lengths to the data. _calculate_link_meta(record_links) processed_record_ids.add(record_id) return new_record_ids