# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import render_template
from flask_babel import gettext as _
import kadi.lib.constants as const
from .models import NotificationNames
from kadi.ext.db import db
from kadi.lib.tasks.models import Task
from kadi.lib.tasks.models import TaskState
[docs]def create_notification_data(notification):
"""Create notification data suitable for presenting it to a client.
:param notification: A :class:`.Notification` object to use for creating the
notification data.
:return: A tuple containing the title and the HTML body of the notification.
"""
title = body = notification.name
# Task status notifications.
if notification.name == NotificationNames.TASK_STATUS:
task = Task.query.filter(Task.id == notification.data["task_id"]).first()
# Default task status notifications.
title = _("Task status")
if task is None:
body = _("Task no longer exists.")
return title, body
if task.state == TaskState.PENDING:
body = _("Waiting for available resources...")
elif task.state == TaskState.RUNNING:
body = _("Task running...")
elif task.state == TaskState.SUCCESS:
body = _("Task succeeded.")
elif task.state == TaskState.FAILURE:
body = _("Task failed.")
elif task.state == TaskState.REVOKED:
body = _("Task revoked.")
# Publish resource task.
if task.name == const.TASK_PUBLISH_RESOURCE:
title = _("Publish resource")
if task.state == TaskState.RUNNING:
body = render_template(
"notifications/publish_resource.html", progress=task.progress
)
elif task.state in [TaskState.SUCCESS, TaskState.FAILURE]:
template = (
task.result.get("template") if task.result is not None else None
)
body = template if template is not None else _("Unexpected error.")
# Check if the additional task notification metadata contains a custom title,
# which overwrites previous values.
task_meta = notification.data.get("task_meta")
if isinstance(task_meta, dict) and "task_title" in task_meta:
title = task_meta["task_title"]
title = f"{title} ({task.pretty_state})"
return title, body
[docs]def dismiss_notification(notification):
"""Dismiss a notification.
If the notification is of type ``"task_status"``, the referenced task will be
revoked as well.
:param notification: The :class:`.Notification` to dismiss.
"""
if notification.name == NotificationNames.TASK_STATUS:
task = Task.query.filter(Task.id == notification.data["task_id"]).first()
if task is not None:
task.revoke()
db.session.delete(notification)