Source code for kadi.modules.records.links

# Copyright 2021 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask_babel import gettext as _
from flask_login import current_user

from kadi.ext.db import db
from kadi.lib.conversion import truncate
from kadi.lib.db import update_object
from kadi.lib.exceptions import KadiPermissionError
from kadi.lib.permissions.core import get_permitted_objects
from kadi.lib.permissions.core import has_permission
from kadi.lib.plugins.utils import signal_resource_change
from kadi.lib.revisions.core import create_revision
from kadi.lib.revisions.models import Revision
from kadi.lib.web import url_for

from .models import File
from .models import Record
from .models import RecordLink
from .models import RecordState


def _create_record_link(creator, record_from, record_to, name, term=None):
    if not has_permission(
        creator, "link", "record", record_from.id
    ) or not has_permission(creator, "link", "record", record_to.id):
        raise KadiPermissionError("No permission to link records.")

    if record_from == record_to:
        raise ValueError("Cannot link record with itself.")

    # Check for duplicate links. Note that this constraint is not enforced on the
    # database level.
    link_to_check = (
        RecordLink.query.filter_by(
            name=name, record_from_id=record_from.id, record_to_id=record_to.id
        )
        .with_entities(RecordLink.id)
        .first()
    )

    if link_to_check is not None:
        raise ValueError(_("Link already exists."))

    return RecordLink.create(
        creator=creator,
        name=name,
        record_from=record_from,
        record_to=record_to,
        term=term,
    )


def _trigger_record_revisions(updated_records, user):
    for record in updated_records:
        record._revision_created = create_revision(record, user=user)

    db.session.commit()

    for record in updated_records:
        if record._revision_created:
            signal_resource_change(record, user=user)








def _get_updated_record(record, user):
    if not isinstance(record, Record):
        record = Record.query.get_active(record)

    if record is None:
        return None

    if not has_permission(user, "link", "record", record.id):
        raise KadiPermissionError("No permission to link records.")

    return record








[docs]def get_linked_record(record_link, record): """Get the record another record is linked to based on a given record link. :record_link: The record link. :record: One of the records linked via the given record link. :return: A record linked to the given record via the given record link or ``None`` if the given record is not part of the given record link. """ if record.id not in {record_link.record_from_id, record_link.record_to_id}: return None if record_link.record_from_id == record.id: return record_link.record_to return record_link.record_from
def _get_record_data(record, depth=1): return { "id": record.id, "identifier": truncate(record.identifier, 25), "identifier_full": record.identifier, "type": truncate(record.type, 25), "type_full": record.type, "url": url_for( "records.view_record", id=record.id, tab="links", visualize="true", depth=depth, ), } def _calculate_link_meta(data): link_indices = {} link_lengths = {} for link_data in data: source_id = link_data["source"] target_id = link_data["target"] # The index of a link is increased for each link that has the same source and # target, starting at 1. link_index = 1 key = (source_id, target_id) if key in link_indices: link_index = link_indices[key] + 1 link_indices[key] = link_index link_data["link_index"] = link_index # The link length is determined by the maximum length of the (truncated) link # names between two records, independent of link direction. link_length = len(link_data["name"]) key = ( (source_id, target_id) if source_id < target_id else (target_id, source_id) ) if key in link_lengths: link_lengths[key] = max(link_length, link_lengths[key]) else: link_lengths[key] = link_length for link_data in data: source_id = link_data["source"] target_id = link_data["target"] key = ( (source_id, target_id) if source_id < target_id else (target_id, source_id) ) if key in link_lengths: link_data["link_length"] = link_lengths[key] def _collect_link_data( record_id, link_direction, depth, records, record_links, processed_record_ids, added_record_ids, user, ): new_record_ids = set() # Limit the links per record to a maximum of 100. record_links_query = ( get_permitted_record_links(record_id, direction=link_direction, user=user) .order_by(RecordLink.last_modified.desc()) .limit(100) ) for record_link in record_links_query: # Skip all links involving records that were already checked for their links. if ( record_link.record_from_id in processed_record_ids or record_link.record_to_id in processed_record_ids ): continue source = record_link.record_from target = record_link.record_to for record in [source, target]: new_record_ids.add(record.id) if record.id not in added_record_ids: records.append(_get_record_data(record, depth)) added_record_ids.add(record.id) record_links.append( { "id": record_link.id, "source": source.id, "target": target.id, "name": truncate(record_link.name, 25), "name_full": record_link.name, # We simply take the outgoing record as base for the URL. "url": url_for( "records.view_record_link", record_id=record_link.record_from_id, link_id=record_link.id, ), } ) # Add the link indices and lengths to the data. _calculate_link_meta(record_links) processed_record_ids.add(record_id) return new_record_ids