Source code for kadi.modules.records.uploads

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mimetypes import guess_type

from sqlalchemy.exc import IntegrityError

import kadi.lib.constants as const
from kadi.ext.db import db
from kadi.lib.db import acquire_lock
from kadi.lib.db import update_object
from kadi.lib.plugins.utils import signal_resource_change
from kadi.lib.revisions.core import create_revision
from kadi.lib.security import hash_value

from .files import get_custom_mimetype
from .models import Chunk
from .models import ChunkState
from .models import File
from .models import FileState
from .models import UploadState


[docs]def delete_upload(upload): """Delete an existing upload. This will mark the upload for deletion, i.e. only the upload's state will be changed. :param upload: The upload to delete. """ upload.state = UploadState.INACTIVE
[docs]def remove_upload(upload): """Remove an upload from storage and from the database. Note that this function issues one or more database commits. :param upload: The upload to remove. """ delete_upload(upload) db.session.commit() # Remove any chunks related to the upload as well. for chunk in upload.chunks: upload.storage.delete(chunk.identifier) db.session.delete(chunk) upload.storage.delete(upload.identifier) db.session.delete(upload) db.session.commit()
[docs]def save_chunk_data(upload, stream, *, index, size, checksum=None): """Save the uploaded chunk data of a chunked upload. Note that this function issues one or more database commits. :param upload: The upload. :param stream: The chunk data as a readable binary stream. :param index: The index of the chunk. :param size: The size of the chunk in bytes. :param checksum: (optional) The checksum of the chunk. If provided, it will be used to verify the calculated checksum after saving the chunk. :return: The created or updated :class:`.Chunk` object. :raises KadiFilesizeExceededError: If the chunk data exceeds the size of its upload or the maximum chunk size. :raises KadiFilesizeMismatchError: If the actual size of the chunk data does not match the provided size. :raises KadiChecksumMismatchError: If the actual checksum of the chunk data does not match the provided checksum. """ chunk = Chunk.update_or_create(upload=upload, index=index, size=size) db.session.commit() try: chunk.checksum = upload.storage.save( chunk.identifier, stream, max_size=min(upload.size, const.UPLOAD_CHUNK_SIZE) ) calculated_size = upload.storage.get_size(chunk.identifier) upload.storage.validate_size(calculated_size, chunk.size) if checksum is not None: upload.storage.validate_checksum(chunk.checksum, checksum) chunk.state = ChunkState.ACTIVE except: chunk.state = ChunkState.INACTIVE raise finally: # Always update the upload's timestamp, since it is relevant it's expiration. upload.update_timestamp() db.session.commit() return chunk
[docs]def merge_chunk_data(upload): """Merge the chunk data of a chunked upload. Note that this function issues one or more database commits or rollbacks. :param upload: The upload. :return: The newly created or updated file or ``None`` if a conflict occured while completing the upload. :raises KadiFilesizeMismatchError: If the actual final size of the upload data does not match the size provided when the upload was created. """ chunk_identifiers = [] chunk_checksums = "" for chunk in upload.active_chunks.order_by(Chunk.index): chunk_identifiers.append(chunk.identifier) chunk_checksums += chunk.checksum or "" try: combined_checksum = hash_value(bytes.fromhex(chunk_checksums), alg="md5") upload.checksum = f"{combined_checksum}-{upload.chunk_count}" except ValueError: pass try: upload.storage.merge(upload.identifier, chunk_identifiers) calculated_size = upload.storage.get_size(upload.identifier) upload.storage.validate_size(calculated_size, upload.size) except: delete_upload(upload) raise finally: db.session.commit() return _complete_file_upload(upload)
[docs]def save_upload_data(upload, stream, checksum=None): """Save the uploaded data of a direct upload. Note that this function issues one or more database commits or rollbacks. :param upload: The upload. :param stream: The upload data as a readable binary stream. :param checksum: (optional) The checksum of the upload. If provided, it will be used to verify the calculated checksum after saving the upload. :return: The newly created or updated file or ``None`` if a conflict occured while completing the upload. :raises KadiFilesizeExceededError: If the upload data exceeds the size of its upload or the maximum direct upload size. :raises KadiFilesizeMismatchError: If the actual size of the upload data does not match match the size provided when the upload was created. :raises KadiChecksumMismatchError: If the actual checksum of the upload data does not match the provided checksum. """ try: upload.checksum = upload.storage.save( upload.identifier, stream, max_size=min(upload.size, const.UPLOAD_CHUNKED_BOUNDARY), ) calculated_size = upload.storage.get_size(upload.identifier) upload.storage.validate_size(calculated_size, upload.size) if checksum is not None: upload.storage.validate_checksum(upload.checksum, checksum) except: delete_upload(upload) raise finally: db.session.commit() return _complete_file_upload(upload)
def _complete_file_upload(upload): if upload.file is None: file = File.create( creator=upload.creator, record=upload.record, name=upload.name, size=upload.size, checksum=upload.checksum, storage_type=upload.storage_type, ) db.session.commit() else: # Lock the file to make sure replacing the metadata and actual file data happens # in a single transaction. file = acquire_lock(upload.file) # Check if the file still exists and is active at this point. if file is None or file.state != FileState.ACTIVE: delete_upload(upload) # Releases the file lock as well. db.session.commit() return None update_object(file, size=upload.size, checksum=upload.checksum) if upload.description is not None: file.description = upload.description try: # When replacing a file with a different storage type we have to delete any # previous data before updating the storage type. if file.storage_type != upload.storage_type: file.storage.delete(file.identifier) file.storage_type = upload.storage_type upload.storage.move(upload.identifier, file.identifier) # Determine the magic MIME type, and possibly a custom MIME type, based on the # file's content. base_mimetype = file.storage.get_mimetype(file.identifier) custom_mimetype = get_custom_mimetype(file, base_mimetype=base_mimetype) magic_mimetype = base_mimetype if custom_mimetype is None else custom_mimetype # Determine the regular MIME type. If no MIME type was given explicitly for the # upload, the custom MIME type is taken, if applicable. Otherwise, try to guess # the regular MIME type from the filename and fall back to the magic MIME type. mimetype = upload.mimetype if mimetype is None: if custom_mimetype is not None: mimetype = custom_mimetype else: mimetype = guess_type(file.name)[0] or magic_mimetype update_object( file, mimetype=mimetype, magic_mimetype=magic_mimetype, state=FileState.ACTIVE, ) delete_upload(upload) if db.session.is_modified(file): file.record.update_timestamp() revision_created = create_revision(file, user=upload.creator) # Releases the file lock as well. db.session.commit() except Exception as e: db.session.rollback() # If something went wrong at this point when replacing a file, the existing file # data has most likely already been deleted or overwritten, so we have to delete # the file. if upload.file is not None: from .files import delete_file delete_file(upload.file, user=upload.creator) delete_upload(upload) db.session.commit() if isinstance(e, IntegrityError): return None raise if revision_created: signal_resource_change(file, user=upload.creator, created=upload.file is None) return file