Source code for kadi.modules.records.files

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import current_app
from flask_login import current_user
from sqlalchemy.exc import IntegrityError
from zipstream import ZipStream

import kadi.lib.constants as const
from kadi.ext.db import db
from kadi.lib.db import acquire_lock
from kadi.lib.db import escape_like
from kadi.lib.db import update_object
from kadi.lib.permissions.core import get_permitted_objects
from kadi.lib.permissions.core import has_permission
from kadi.lib.plugins.core import run_hook
from kadi.lib.plugins.utils import signal_resource_change
from kadi.lib.revisions.core import create_revision as _create_revision
from kadi.lib.revisions.core import delete_revisions
from kadi.lib.validation import validate_mimetype
from kadi.lib.web import download_bytes

from .models import File
from .models import FileState
from .models import Record
from .models import RecordState


[docs]def update_file(file, user=None, **kwargs): r"""Update an existing file. Note that this function issues a database commit or rollback. :param file: The file to update. :param user: (optional) The user who triggered the update. Defaults to the current user. :param \**kwargs: Keyword arguments that will be passed to :func:`kadi.lib.db.update_object`. :return: ``True`` if the file was updated successfully, ``False`` otherwise. """ user = user if user is not None else current_user file = acquire_lock(file) if file.state != FileState.ACTIVE or file.record.state != RecordState.ACTIVE: # Release the file lock. db.session.commit() return False update_object(file, **kwargs) if db.session.is_modified(file): file.record.update_timestamp() try: db.session.flush() except IntegrityError: db.session.rollback() return False revision_created = _create_revision(file, user=user) # Releases the file lock as well. db.session.commit() if revision_created: signal_resource_change(file, user=user) return True
[docs]def delete_file(file, create_revision=True, user=None): """Delete an existing file. This will mark the file for deletion, i.e. only the file's state will be changed. Note that this function issues one or more database commits. :param file: The file to delete. :param create_revision: (optional) Flag indicating whether a revision should be created for the deletion. :param user: (optional) The user who triggered the deletion. Defaults to the current user. """ from .uploads import delete_upload user = user if user is not None else current_user file = acquire_lock(file) file.state = FileState.INACTIVE if db.session.is_modified(file): file.record.update_timestamp() revision_created = False if create_revision: revision_created = _create_revision(file, user=user) # Mark any uploads related to the file for deletion as well. for upload in file.uploads: delete_upload(upload) # Releases the file lock as well. db.session.commit() if revision_created: signal_resource_change(file, user=user)
[docs]def remove_file(file, delete_from_db=True): """Remove a file from storage and optionally from the database. Note that this function issues one or more database commits. :param file: The file to remove. :param delete_from_db: (optional) A flag indicating whether the file should be deleted from the database as well, instead of just doing a soft deletion. """ from .uploads import remove_upload delete_file(file, create_revision=False) # Remove any uploads related to the file as well. for upload in file.uploads: remove_upload(upload) file.storage.delete(file.identifier) if delete_from_db: delete_revisions(file) db.session.delete(file) else: file.state = FileState.DELETED db.session.commit()
[docs]def stream_files(record): """Stream all files of a record to a client as ZIP archive. :param record: The record the files belong to. :return: The response object. """ zip_stream = ZipStream(sized=True) def _file_wrapper(storage, identifier): with storage.open(identifier) as f: yield from f for file in record.active_files: arcname = file.name.replace("/", "_") zip_stream.add( _file_wrapper(file.storage, file.identifier), arcname=arcname, size=file.size, ) return download_bytes( zip_stream, filename=f"{record.identifier}.zip", content_length=len(zip_stream) )
[docs]def get_custom_mimetype(file, base_mimetype=None): """Get a custom MIME type of a file based on its content. Uses the :func:`kadi.plugins.spec.kadi_get_custom_mimetype` plugin hook for custom MIME types based on the file's content. :param file: The file to get the MIME type of. :param base_mimetype: (optional) The base MIME type of the file on which to base the custom MIME type. :return: The custom MIME type or ``None`` if no valid custom MIME type was found. """ if base_mimetype is None: base_mimetype = file.storage.get_mimetype(file.identifier) try: custom_mimetype = run_hook( "kadi_get_custom_mimetype", file=file, base_mimetype=base_mimetype ) if custom_mimetype is None: return None validate_mimetype(custom_mimetype) except Exception as e: current_app.logger.exception(e) return None return custom_mimetype
[docs]def get_direct_upload_type(file): """Get the direct upload type of a file. This type can be used to determine whether a file can be directly edited via a corresponding editor in the frontend. Such files must have a certain format and must not exceed a size of 10 MB. :param file: The file to get the direct upload type of. :return: The direct upload type or ``None`` if no suitable type can be determined. """ if file.size <= 10 * const.ONE_MB: if file.magic_mimetype in const.IMAGE_MIMETYPES: return "image" if file.magic_mimetype.startswith("text/") or file.magic_mimetype in [ const.MIMETYPE_JSON, const.MIMETYPE_XML, const.MIMETYPE_TOOL, ]: try: # As only UTF-8 encoded files are supported for direct editing, try to # read a decent amount of data using this encoding first. with file.storage.open( file.identifier, mode="r", encoding="utf-8" ) as f: f.read(const.ONE_MB) return "text" except: pass elif file.magic_mimetype == const.MIMETYPE_FLOW: return "workflow" return None
[docs]def get_permitted_files(filter_term="", record_id=None, user=None): """Convenience function to get all active record files that a user can access. In this context having access to a file means having read permission for the record the file belongs to. :param filter_term: (optional) A (case insensitive) term to filter the files by their name or record identifier. :param record_id: (optional) A record ID by which to filter the files. :param user: (optional) The user to check for access permissions. Defaults to the current user. :return: The permitted file objects as query. """ user = user if user is not None else current_user filter_term = escape_like(filter_term) record_ids_filter = [] if record_id is not None: record = Record.query.get_active(record_id) if record is not None and has_permission(user, "read", "record", record.id): record_ids_filter = [record.id] else: record_ids_filter = ( get_permitted_objects(user, "read", "record") .filter(Record.state == RecordState.ACTIVE) .with_entities(Record.id) ) files_query = File.query.join(Record).filter( File.state == FileState.ACTIVE, Record.id.in_(record_ids_filter), db.or_( File.name.ilike(f"%{filter_term}%"), Record.identifier.ilike(f"%{filter_term}%"), ), ) return files_query