Source code for kadi.modules.records.tasks

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import current_app
from flask_login import current_user

import kadi.lib.constants as const
from kadi.ext.celery import celery
from kadi.ext.db import db
from kadi.lib.exceptions import KadiFilesizeMismatchError
from kadi.lib.tasks.core import launch_task
from kadi.lib.tasks.models import Task
from kadi.lib.tasks.models import TaskState

from .core import purge_record
from .models import Record
from .models import RecordState
from .models import Upload
from .models import UploadState
from .uploads import merge_chunk_data


@celery.task(name=const.TASK_MERGE_CHUNKS, soft_time_limit=const.ONE_HOUR, bind=True)
def _merge_chunks_task(self, upload_id, **kwargs):
    task = Task.query.get(self.request.id)
    upload = Upload.query.get(upload_id)

    # Check if the upload is still ready to be processed at this point.
    if upload is None or upload.state != UploadState.PROCESSING:
        return None

    file = None

    try:
        file = merge_chunk_data(upload)

        if file is not None:
            task.result = {"file": str(file.id)}
        else:
            task.state = TaskState.FAILURE
            task.result = {"error": "Error creating or updating file."}

    # Catches time limit exceeded exceptions as well.
    except Exception as e:
        db.session.rollback()
        task.state = TaskState.FAILURE

        if isinstance(e, KadiFilesizeMismatchError):
            task.result = {"error": str(e)}
        else:
            current_app.logger.exception(e)
            task.result = {"error": "Internal server error."}

    db.session.commit()
    return str(file.id) if file is not None else None


[docs]def start_merge_chunks_task(upload, user=None): """Merge the chunks of a local file upload in a background task. Uses :func:`kadi.modules.records.uploads.merge_chunk_data`. The created task will be kept in the database. Note that this function issues one or more database commits. :param upload: The upload that the chunks belong to. :param user: (optional) The user who started the task. Defaults to the current user. :return: The new task object if the task was started successfully, ``None`` otherwise. """ user = user if user is not None else current_user return launch_task( const.TASK_MERGE_CHUNKS, args=(str(upload.id),), user=user, keep=True )
@celery.task(name=const.TASK_PURGE_RECORD, soft_time_limit=const.ONE_HOUR) def _purge_record_task(record_id, **kwargs): record = Record.query.get(record_id) try: purge_record(record) # Catches time limit exceeded exceptions as well. except Exception as e: current_app.logger.exception(e) db.session.rollback() # In case the state of the record was already changed, we reset it so another # attempt can be made to purge it (including the periodic cleanup task). record.state = RecordState.DELETED db.session.commit() return False return True
[docs]def start_purge_record_task(record): """Purge an existing record in a background task. Uses :func:`kadi.modules.records.core.purge_record`. :param record: The record to purge. """ return launch_task(const.TASK_PURGE_RECORD, args=(record.id,))