Source code for kadi.modules.records.forms

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import partial

from flask_babel import gettext as _
from flask_babel import lazy_gettext as _l
from flask_login import current_user
from marshmallow import ValidationError
from wtforms.validators import DataRequired
from wtforms.validators import InputRequired
from wtforms.validators import Length
from wtforms.validators import NumberRange
from wtforms.validators import StopValidation

import kadi.lib.constants as const
from kadi.ext.db import db
from kadi.lib.conversion import empty_str
from kadi.lib.conversion import lower
from kadi.lib.conversion import none
from kadi.lib.conversion import normalize
from kadi.lib.conversion import strip
from kadi.lib.format import filesize
from kadi.lib.forms import BooleanField
from kadi.lib.forms import DynamicMultiSelectField
from kadi.lib.forms import DynamicSelectField
from kadi.lib.forms import FileField
from kadi.lib.forms import IntegerField
from kadi.lib.forms import JSONField
from kadi.lib.forms import KadiForm
from kadi.lib.forms import LFTextAreaField
from kadi.lib.forms import StringField
from kadi.lib.forms import SubmitField
from kadi.lib.forms import convert_schema_validation_msg
from kadi.lib.forms import validate_iri
from kadi.lib.forms import validate_mimetype
from kadi.lib.licenses.models import License
from kadi.lib.permissions.core import get_permitted_objects
from kadi.lib.permissions.core import has_permission
from kadi.lib.resources.forms import BaseResourceForm
from kadi.lib.resources.forms import RolesField
from kadi.lib.resources.forms import TagsField
from kadi.lib.resources.forms import check_duplicate_identifier
from kadi.lib.tags.models import Tag
from kadi.modules.collections.models import Collection
from kadi.modules.collections.models import CollectionState
from kadi.modules.records.links import get_permitted_record_links
from kadi.modules.templates.models import TemplateType

from .extras import ExtrasField
from .links import get_linked_record
from .models import Chunk
from .models import File
from .models import FileState
from .models import Record
from .models import RecordLink
from .models import Upload
from .schemas import RecordLinkDataSchema


[docs]class RecordLinksField(JSONField): """Custom field to process and validate record links. Uses :class:`.RecordLinkDataSchema` for its validation. """ def __init__(self, *args, **kwargs): kwargs["default"] = [] super().__init__(*args, **kwargs) self.initial = [] self._validation_errors = {} def _value(self): return self.initial
[docs] def process_formdata(self, valuelist): super().process_formdata(valuelist) if valuelist: try: schema = RecordLinkDataSchema(many=True) self.data = schema.load(self.data) except ValidationError as e: self._validation_errors = e.messages # Check if at least the basic structure of the data is valid and discard # it otherwise. if not isinstance(self.data, list): self.data = self.default self._validation_errors = {} else: for item in self.data: if not isinstance(item, dict): self.data = self.default self._validation_errors = {} break raise ValueError("Invalid data structure.") from e
[docs] def to_dict(self): data = super().to_dict() constraints = RecordLink.Meta.check_constraints max_term_len = constraints["term"]["length"]["max"] data["validation"]["max"] = { "name": constraints["name"]["length"]["max"], "term": max_term_len, } data["errors"] = self._validation_errors for error_data in data["errors"].values(): for field_name, error_msgs in error_data.items(): for index, error_msg in enumerate(error_msgs): interpolations = {} if field_name == "term": interpolations["max"] = max_term_len error_msgs[index] = convert_schema_validation_msg( error_msg, **interpolations ) return data
[docs] def set_initial_data(self, data=None, record=None, user=None): """Set the initial data of this field. :param data: (optional) The form data to use for prefilling. Defaults to the submitted data of the current field instance. :param record: (optional) An existing record, which can be used to set the initial data instead of the given form data. :param user: (optional) A user that will be used for checking various access permissions when setting the data. Defaults to the current user. """ data = data if data is not None else getattr(self, "data", []) user = user if user is not None else current_user initial_data = [] if record is not None: record_links_query = get_permitted_record_links( record, actions=["link"], user=user ).order_by(RecordLink.created_at) for record_link in record_links_query: if record_link.record_from_id == record.id: direction = "out" linked_record = record_link.record_to else: direction = "in" linked_record = record_link.record_from initial_data.append( { "direction": direction, "record": [linked_record.id, f"@{linked_record.identifier}"], "name": record_link.name, "term": record_link.term, } ) else: # Make sure to not rely on the data contents, as invalid (submitted) form # data is only discarded if the overall structure is invalid. for link_meta in data: linked_record_data = None record_id = link_meta.get("record") # pylint: disable=unidiomatic-typecheck if type(record_id) is int: linked_record = Record.query.get_active(record_id) if linked_record is not None and has_permission( user, "link", "record", linked_record.id ): linked_record_data = [ linked_record.id, f"@{linked_record.identifier}", ] initial_data.append( { "direction": link_meta.get("direction", "out"), "record": linked_record_data, "name": link_meta.get("name"), "term": link_meta.get("term"), } ) self.initial = initial_data
[docs]class BaseRecordForm(BaseResourceForm): """Base form class for use in creating or updating records. :param import_data: (optional) A dictionary containing data imported from a file used for prefilling the form. :param record: (optional) A record used for prefilling the form. :param template: (optional) A record or extras template used for prefilling the form. """ identifier = BaseResourceForm.identifier_field( description=_l("Unique identifier of this record.") ) type = DynamicSelectField( _l("Type"), filters=[lower, normalize], validators=[Length(max=Record.Meta.check_constraints["type"]["length"]["max"])], description=_l( "Optional type of this record, e.g. dataset, experimental device, etc." ), ) license = DynamicSelectField( _l("License"), description=_l( "Specifying an optional license can determine the conditions for the" " correct reuse of data and metadata when the record is published or simply" " shared with other users. A license can also be uploaded as a file, in" ' which case one of the "Other" licenses can be chosen.' ), ) visibility = BaseResourceForm.visibility_field( description=_l( "Public visibility automatically grants EVERY logged-in user read" " permissions for this record." ) ) tags = TagsField( _l("Tags"), max_len=Tag.Meta.check_constraints["name"]["length"]["max"], description=_l("An optional list of keywords further describing the record."), ) extras = ExtrasField(_l("Extra metadata")) def _prefill_type(self, type_data): if type_data is not None: self.type.initial = (type_data, type_data) def _prefill_license(self, license): if license is None: return if not isinstance(license, License): license = License.query.filter_by(name=license).first() if license is not None: self.license.initial = (license.name, license.title) def _prefill_tags(self, tags): if tags is not None: self.tags.initial = [(tag, tag) for tag in sorted(tags)] def __init__(self, *args, import_data=None, record=None, template=None, **kwargs): # Prefill all simple fields directly. accessor = None if import_data is not None: accessor = import_data.get elif record is not None: accessor = partial(getattr, record) elif template is not None: if template.type == TemplateType.RECORD: accessor = template.data.get elif template.type == TemplateType.EXTRAS: accessor = {"extras": template.data}.get if accessor is not None: kwargs["data"] = { "title": accessor("title", ""), "identifier": accessor("identifier", ""), "description": accessor("description", ""), "visibility": accessor("visibility", const.RESOURCE_VISIBILITY_PRIVATE), "extras": accessor("extras", []), } super().__init__(*args, **kwargs) # Prefill all other fields separately, also taking into account whether the form # was submitted. However, check for import data first, as the default form # submission check only considers the current HTTP request method. if import_data is not None: self._prefill_type(import_data.get("type")) self._prefill_license(import_data.get("license")) self._prefill_tags(import_data.get("tags")) elif self.is_submitted(): self._prefill_type(self.type.data) self._prefill_license(self.license.data) self._prefill_tags(self.tags.data) elif record is not None: self._prefill_type(record.type) self._prefill_license(record.license) self._prefill_tags([tag.name for tag in record.tags]) elif template is not None and template.type == TemplateType.RECORD: self._prefill_type(template.data.get("type")) self._prefill_license(template.data.get("license")) self._prefill_tags(template.data.get("tags")) def validate_license(self, field): # pylint: disable=missing-function-docstring if ( field.data is not None and License.query.filter_by(name=field.data).first() is None ): raise StopValidation(_("Not a valid license."))
[docs]class NewRecordForm(BaseRecordForm): """A form for use in creating new records. :param import_data: (optional) See :class:`BaseRecordForm`. :param record: (optional) See :class:`BaseRecordForm`. :param template: (optional) See :class:`BaseRecordForm`. :param collection: (optional) A collection used for prefilling the linked collections. :param user: (optional) A user that will be used for checking various access permissions when prefilling the form. Defaults to the current user. """ collections = DynamicMultiSelectField( _l("Collections"), coerce=int, description=_l("Directly link this record with one or more collections."), ) record_links = RecordLinksField( _l("Record links"), description=_l("Directly link this record with one or more other records."), ) roles = RolesField( _l("Permissions"), roles=[(r, r.capitalize()) for r in Record.Meta.permissions["roles"]], description=_l("Directly add user or group roles to this record."), ) submit = SubmitField(_l("Create record")) submit_files = SubmitField(_l("Create record and add files")) def _prefill_collections(self, collections): self.collections.initial = [(c.id, f"@{c.identifier}") for c in collections] def __init__( self, *args, import_data=None, record=None, template=None, collection=None, user=None, **kwargs, ): user = user if user is not None else current_user super().__init__( *args, import_data=import_data, record=record, template=template, **kwargs ) linkable_collections_ids = ( get_permitted_objects(user, "link", "collection") .filter(Collection.state == CollectionState.ACTIVE) .with_entities(Collection.id) ) if self.is_submitted(): if self.collections.data: collections = Collection.query.filter( db.and_( Collection.id.in_(linkable_collections_ids), Collection.id.in_(self.collections.data), ) ) self._prefill_collections(collections) self.record_links.set_initial_data(user=user) self.roles.set_initial_data(user=user) else: if record is not None: collections = record.collections.filter( Collection.id.in_(linkable_collections_ids) ) self._prefill_collections(collections) self.record_links.set_initial_data(record=record, user=user) self.roles.set_initial_data(resource=record, user=user) elif template is not None and template.type == TemplateType.RECORD: if template.data.get("collections"): collections = Collection.query.filter( db.and_( Collection.id.in_(linkable_collections_ids), Collection.id.in_(template.data["collections"]), ) ) self._prefill_collections(collections) self.record_links.set_initial_data( data=template.data.get("record_links", []), user=user ) self.roles.set_initial_data( data=template.data.get("roles", []), user=user ) # If a collection is given, overwrite all values set previously for the # linked collections. if collection is not None: self._prefill_collections([collection]) def validate_identifier(self, field): # pylint: disable=missing-function-docstring check_duplicate_identifier(Record, field.data)
[docs]class EditRecordForm(BaseRecordForm): """A form for use in editing existing records. :param record: The record to edit, used for prefilling the form. """ submit = SubmitField(_l("Save changes")) submit_quit = SubmitField(_l("Save changes and quit")) def __init__(self, record, *args, **kwargs): self.record = record super().__init__(*args, record=record, **kwargs) def validate_identifier(self, field): # pylint: disable=missing-function-docstring check_duplicate_identifier(Record, field.data, exclude=self.record)
[docs]class AddRecordLinksForm(KadiForm): """A form for use in creating new record links. :param user: (optional) A user that will be used for checking various access permissions when prefilling the form. Defaults to the current user. """ record_links = RecordLinksField(_l("New record links")) submit = SubmitField(_l("Link records")) def __init__(self, *args, user=None, **kwargs): user = user if user is not None else current_user super().__init__(*args, **kwargs) if self.is_submitted(): self.record_links.set_initial_data(user=user)
[docs]class EditRecordLinkForm(KadiForm): """A form for use in editing existing record links. :param record_link: The record link to edit, used for prefilling the form. :param record: The record in whose context the given record link is edited. :param user: (optional) A user that will be used for checking various access permissions when prefilling the form. Defaults to the current user. """ record = DynamicSelectField(_l("Record"), validators=[DataRequired()], coerce=int) name = DynamicSelectField( _l("Name"), filters=[normalize], validators=[ DataRequired(), Length(max=RecordLink.Meta.check_constraints["name"]["length"]["max"]), ], description=_l("The name of the link."), ) term = StringField( _l("Term IRI"), filters=[strip, none], validators=[ Length(max=RecordLink.Meta.check_constraints["term"]["length"]["max"]), validate_iri, ], description=_l( "An IRI specifying an existing term that the link should represent." ), ) submit = SubmitField(_l("Save changes")) def _prefill_record(self, record, user): if record is None: return if not isinstance(record, Record): record = Record.query.get_active(record) if record is not None and has_permission(user, "read", "record", record.id): self.record.initial = (record.id, f"@{record.identifier}") def _prefill_name(self, name): if name is not None: self.name.initial = (name, name) def __init__(self, record_link, record, *args, user=None, **kwargs): user = user if user is not None else current_user # Prefill all simple fields directly. if record_link is not None: kwargs["data"] = {"term": record_link.term} super().__init__(*args, **kwargs) # Prefill all other fields separately, also taking into account whether the form # was submitted. if self.is_submitted(): self._prefill_record(self.record.data, user) self._prefill_name(self.name.data) elif record_link is not None: linked_record = get_linked_record(record_link, record) self._prefill_record(linked_record, user) self._prefill_name(record_link.name)
[docs]class LinkCollectionsForm(KadiForm): """A form for use in linking records with collections.""" collections = DynamicMultiSelectField( _l("Collections"), validators=[DataRequired()], coerce=int ) submit = SubmitField(_l("Link collections"))
[docs]class AddRolesForm(KadiForm): """A form for use in adding user or group roles to a record.""" roles = RolesField( _l("New permissions"), roles=[(r, r.capitalize()) for r in Record.Meta.permissions["roles"]], ) submit = SubmitField(_l("Add permissions"))
[docs]class UploadDataForm(KadiForm): """A form for use in uploading data for direct uploads. Currently only used within the API with data sent via custom headers. """
[docs] class Meta: """Container to store meta class attributes.""" csrf = False
checksum = StringField( filters=[strip, none], validators=[ Length(max=Upload.Meta.check_constraints["checksum"]["length"]["max"]) ], )
# Custom validation messages for form fields used in an API context to better match the # usual schema validation. FORM_VALIDATION_MISSING_DATA = "Missing data for required field." FORM_VALIDATION_RANGE_GTE = "Must be greater than or equal to %(min)s."
[docs]class UploadChunkForm(KadiForm): """A form for use in uploading data for chunked uploads. Currently only used within the API with data sent via custom headers. :param chunk_count: The total amount of chunks that the upload this chunk is part of has. Will be used to validate the chunk's index and size. """
[docs] class Meta: """Container to store meta class attributes.""" csrf = False
index = IntegerField( validators=[ InputRequired(message=FORM_VALIDATION_MISSING_DATA), NumberRange( min=Chunk.Meta.check_constraints["index"]["range"]["min"], message=FORM_VALIDATION_RANGE_GTE, ), ] ) size = IntegerField( validators=[ InputRequired(message=FORM_VALIDATION_MISSING_DATA), NumberRange( min=Chunk.Meta.check_constraints["size"]["range"]["min"], message=FORM_VALIDATION_RANGE_GTE, ), ], ) checksum = StringField( filters=[strip, none], validators=[ Length(max=Chunk.Meta.check_constraints["checksum"]["length"]["max"]) ], ) def __init__(self, chunk_count, *args, **kwargs): self.chunk_count = chunk_count super().__init__(*args, **kwargs) def validate_index(self, field): # pylint: disable=missing-function-docstring if field.data is not None and field.data >= self.chunk_count: raise StopValidation(f"Must be less than {self.chunk_count}.") def validate_size(self, field): # pylint: disable=missing-function-docstring chunk_size = field.data if chunk_size is not None: if chunk_size > const.UPLOAD_CHUNK_SIZE: raise StopValidation( f"Chunk size ({filesize(const.UPLOAD_CHUNK_SIZE)}) exceeded." ) chunk_index = self.index.data if ( chunk_index is not None and chunk_size < const.UPLOAD_CHUNK_SIZE and chunk_index < self.chunk_count - 1 ): raise StopValidation( "Only the last chunk may be smaller than the chunk size" f" ({filesize(const.UPLOAD_CHUNK_SIZE)})." )
[docs]class EditFileForm(KadiForm): """A form for use in editing file metadata. :param file: A file used for prefilling the form and checking for duplicate file names. """ name = StringField( _l("Filename"), filters=[normalize], validators=[ DataRequired(), Length(max=File.Meta.check_constraints["name"]["length"]["max"]), ], ) mimetype = StringField( _l("MIME type"), filters=[lower, normalize], validators=[ DataRequired(), Length(max=File.Meta.check_constraints["mimetype"]["length"]["max"]), validate_mimetype, ], ) description = LFTextAreaField( _l("Description"), filters=[empty_str, strip], validators=[ Length(max=File.Meta.check_constraints["description"]["length"]["max"]) ], ) submit = SubmitField(_l("Save changes")) def __init__(self, file, *args, **kwargs): self.file = file super().__init__(*args, obj=file, **kwargs) def validate_name(self, field): # pylint: disable=missing-function-docstring file = File.query.filter( File.record_id == self.file.record_id, File.state == FileState.ACTIVE, File.name == field.data, ).first() if file is not None and self.file != file: raise StopValidation(_("Name is already in use."))
[docs]class LegacyUploadChunkForm(UploadChunkForm): """A form for use in uploading chunks. Currently only kept for backwards compatibility. """ blob = FileField(validators=[DataRequired(message=FORM_VALIDATION_MISSING_DATA)])
[docs]class LegacyUploadDataForm(KadiForm): """A form for use in directly uploading files. Currently only kept for backwards compatibility. """
[docs] class Meta: """Container to store meta class attributes.""" csrf = False
name = StringField( filters=[normalize], validators=[ DataRequired(message=FORM_VALIDATION_MISSING_DATA), Length( max=Upload.Meta.check_constraints["name"]["length"]["max"], message="Longer than maximum length %(max)s.", ), ], ) size = IntegerField( validators=[ InputRequired(message=FORM_VALIDATION_MISSING_DATA), NumberRange( min=Upload.Meta.check_constraints["size"]["range"]["min"], message=FORM_VALIDATION_RANGE_GTE, ), ], ) blob = FileField(validators=[DataRequired(message=FORM_VALIDATION_MISSING_DATA)]) description = StringField( validators=[ Length(max=Upload.Meta.check_constraints["description"]["length"]["max"]) ], ) mimetype = StringField( filters=[lower, normalize], validators=[ Length(max=Upload.Meta.check_constraints["mimetype"]["length"]["max"]), validate_mimetype, ], ) checksum = StringField( filters=[strip], validators=[ Length(max=Upload.Meta.check_constraints["checksum"]["length"]["max"]) ], ) storage_type = StringField() replace_file = BooleanField()