Source code for kadi.modules.records.uploads

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mimetypes import guess_type

from sqlalchemy.exc import IntegrityError

import kadi.lib.constants as const
from .files import get_custom_mimetype
from .models import Chunk
from .models import ChunkState
from .models import File
from .models import FileState
from .models import UploadState
from kadi.ext.db import db
from kadi.lib.db import acquire_lock
from kadi.lib.db import update_object
from kadi.lib.revisions.core import create_revision
from kadi.lib.storage.local import create_chunk_storage
from kadi.plugins.utils import signal_resource_change


[docs]def delete_upload(upload): """Delete an existing upload. This will mark the upload for deletion, i.e. only the upload's state will be changed. :param upload: The upload to delete. """ upload.state = UploadState.INACTIVE
[docs]def remove_upload(upload): """Remove an upload from storage and from the database. Note that this function issues one or more database commits. :param upload: The upload to remove. """ chunk_storage = create_chunk_storage() upload_storage = upload.storage delete_upload(upload) db.session.commit() # Remove any chunks related to the upload as well. for chunk in upload.chunks: filepath = chunk_storage.create_filepath(f"{upload.id}-{chunk.index}") chunk_storage.delete(filepath) db.session.delete(chunk) filepath = upload_storage.create_filepath(str(upload.id)) upload_storage.delete(filepath) db.session.delete(upload) db.session.commit()
[docs]def save_chunk(*, upload, file_object, index, size, checksum=None): """Save a chunk of an upload. Each chunk uses the UUID of the given upload (see :attr:`.Upload.id`) combined with its index as base name in the form of ``"<uuid>-<index>"``. The complete path of the file will then be generated by prepending ``STORAGE_PATH`` as configured in the application's configuration. Note that this function issues one or more database commits. :param upload: The upload the chunk belongs to. :param file_object: A file-like object representing the actual uploaded file. :param index: The index of the chunk. :param size: The size of the chunk in bytes. :param checksum: (optional) The MD5 checksum of the chunk. If given it will be used to verify the checksum after saving the chunk. :raises KadiFilesizeExceededError: If the chunk exceeds the maximum size of its storage. :raises KadiFilesizeMismatchError: If the actual size of the chunk does not match the provided size. :raises KadiChecksumMismatchError: If the actual checksum of the chunk does not match the provided checksum. """ chunk = Chunk.update_or_create(upload=upload, index=index, size=size) db.session.commit() chunk_storage = create_chunk_storage() try: filepath = chunk_storage.create_filepath(f"{upload.id}-{index}") chunk_storage.save(filepath, file_object) chunk_storage.validate_size(filepath, size) if checksum: chunk_storage.validate_checksum(filepath, checksum) chunk.state = ChunkState.ACTIVE except: chunk.state = ChunkState.INACTIVE raise finally: # Always update the upload's timestamp manually, since the timestamp is used for # checking the upload's expiration. upload.update_timestamp() db.session.commit()
[docs]def merge_chunks(upload, task=None): """Merge the chunks of an upload. Uses :func:`complete_file_upload` to complete the file upload process. :param upload: The upload whose chunks should be merged. :param task: (optional) A :class:`.Task` object that can be provided if this function is executed in a task. In that case, the progress of the given task will be updated. :return: See :func:`complete_file_upload`. """ chunk_storage = create_chunk_storage() upload_storage = upload.storage try: upload_path = upload_storage.create_filepath(str(upload.id)) upload_storage.ensure_filepath_exists(upload_path) upload_file = upload_storage.open(upload_path, mode="wb") # Merge the uploaded chunks. for chunk in upload.active_chunks.order_by(Chunk.index.asc()): chunk_path = chunk_storage.create_filepath(f"{upload.id}-{chunk.index}") chunk_file = chunk_storage.open(chunk_path) upload_file.write(chunk_file.read()) chunk_storage.close(chunk_file) if task is not None: task.update_progress((chunk.index + 1) / upload.chunk_count * 100) db.session.commit() upload_storage.close(upload_file) except: db.session.rollback() upload.state = UploadState.INACTIVE db.session.commit() raise return complete_file_upload(upload)
[docs]def complete_file_upload(upload): """Performs necessary steps to complete a file upload. Validates the upload in regards to its stored data and creates or updates the corresponding file. Note that this function issues one or more database commits or rollbacks. :param upload: The upload to complete. :return: The newly created or updated file or ``None`` if it could not be created due to a file name conflict or updated due to a replaced file already being deleted. :raises KadiFilesizeExceededError: If the upload exceeds the maximum size of its storage. :raises KadiFilesizeMismatchError: If the actual size of the upload does not match the provided size. :raises KadiChecksumMismatchError: If the actual checksum of the upload does not match the provided checksum. """ storage = upload.storage try: upload_path = storage.create_filepath(str(upload.id)) storage.validate_size(upload_path, upload.size) checksum = upload.checksum calculated_checksum = upload.calculated_checksum if calculated_checksum is None: calculated_checksum = storage.get_checksum(upload_path) if checksum is not None: storage.validate_checksum(upload_path, checksum, actual=calculated_checksum) new_file_created = False # Check whether the upload replaces an existing file. if upload.file is None: try: file = File.create( creator=upload.creator, record=upload.record, storage=upload.storage, name=upload.name, description=upload.description, size=upload.size, checksum=calculated_checksum, ) # Commit here already, so the file can be referenced and deleted later # if something went wrong. db.session.commit() except IntegrityError: db.session.rollback() return None new_file_created = True else: # Lock the file to make sure replacing the metadata and actual file data # happens in a single transaction. file = acquire_lock(upload.file) # Check if the file still exists and is active. if file is None or file.state != FileState.ACTIVE: # Release the file lock. db.session.commit() return None update_object( file, description=upload.description, size=upload.size, checksum=calculated_checksum, ) # Move the completed upload to the correct location. filepath = storage.create_filepath(str(file.id)) storage.move(upload_path, filepath) # Determine the magic MIME type, and possibly a custom MIME type, based on the # file's content. base_mimetype = storage.get_mimetype(filepath) custom_mimetype = get_custom_mimetype(file, base_mimetype=base_mimetype) magic_mimetype = base_mimetype if custom_mimetype is None else custom_mimetype # Determine the regular MIME type. If no MIME type was given explicitly for the # upload, or it is equal to the default MIME type, the custom MIME type is # taken, if applicable. Otherwise, try to guess the regular MIME type from the # filename and fall back to the magic MIME type. mimetype = upload.mimetype if mimetype == const.MIMETYPE_BINARY: if custom_mimetype is not None: mimetype = custom_mimetype else: mimetype = guess_type(file.name)[0] or magic_mimetype update_object( file, mimetype=mimetype, magic_mimetype=magic_mimetype, state=FileState.ACTIVE, ) upload.state = UploadState.INACTIVE if db.session.is_modified(file): file.record.update_timestamp() # Note that the creator of the upload will be used for the revision. For # existing files, the original creator of the file will stay the same. revision_created = create_revision(file, user=upload.creator) # Releases the file lock as well. db.session.commit() if revision_created: signal_resource_change(file, user=upload.creator, created=new_file_created) return file except: db.session.rollback() # If something went wrong when replacing a file, check whether the old file is # still intact and delete it if not. if upload.file is not None: try: filepath = storage.create_filepath(str(upload.file.id)) storage.validate_checksum(filepath, upload.file.checksum) except: from .files import delete_file delete_file(upload.file, user=upload.creator) upload.state = UploadState.INACTIVE db.session.commit() raise