# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functools import wraps
from flask import abort
from flask import current_app
from flask import has_request_context
from flask import request
from werkzeug.http import HTTP_STATUS_CODES
import kadi.lib.constants as const
from kadi.lib.cache import memoize_request
from kadi.lib.oauth.models import OAuth2ServerToken
from kadi.lib.utils import compact_json
from kadi.lib.web import get_apispec_meta
from kadi.lib.web import get_error_description
from .models import PersonalToken
[docs]
def json_response(status_code, body=None):
"""Return a JSON response to a client.
:param status_code: The HTTP status code of the response.
:param body: (optional) The response body, which must be JSON serializable. Defaults
to an empty dictionary.
:return: The JSON response.
"""
body = body if body is not None else {}
return current_app.response_class(
response=compact_json(body), status=status_code, mimetype=const.MIMETYPE_JSON
)
[docs]
def json_error_response(status_code, message=None, description=None, **kwargs):
r"""Return a JSON error response to a client.
Uses :func:`json_response` with the given headers and a body in the following form,
assuming no additional error information was provided:
.. code-block:: json
{
"code": 404,
"message": "<message>",
"description": "<description>"
}
:param status_code: See :func:`json_response`.
:param message: (optional) The error message. Defaults to a message corresponding to
the given status code.
:param description: (optional) The error description. Defaults to the result of
:func:`kadi.lib.web.get_error_description` using the given status code.
:param \**kwargs: Additional error information that will be included in the response
body. All values need to be JSON serializable.
:return: The JSON response.
"""
message = (
message
if message is not None
else HTTP_STATUS_CODES.get(status_code, "Unknown Error")
)
description = (
description if description is not None else get_error_description(status_code)
)
return json_response(
status_code,
{
"code": status_code,
"message": message,
"description": description,
**kwargs,
},
)
[docs]
@memoize_request
def get_access_token():
"""Get an access token from the current request.
Currently, this will either be a personal token or an OAuth2 server token. The token
value has to be included as a *Bearer* token within an *Authorization* header.
Supports memoization via :func:`kadi.lib.cache.memoize_request`.
:return: An access token object or ``None`` if no valid token can be found or no
request context currently exists.
"""
if (
has_request_context()
and request.authorization is not None
and request.authorization.type == "bearer"
):
token = request.authorization.token
if token.startswith(const.ACCESS_TOKEN_PREFIX_PAT):
return PersonalToken.get_by_token(token)
if token.startswith(const.ACCESS_TOKEN_PREFIX_OAUTH):
return OAuth2ServerToken.get_by_access_token(token)
# Fall back to personal tokens, since these used to not include a prefix.
return PersonalToken.get_by_token(token)
return None
[docs]
def check_access_token_scopes(*scopes):
r"""Check if the current access token contains certain scope values.
The current access token will be retrieved using :func:`.utils.get_access_token`.
:param \*scopes: One or multiple scope values to check in the form of
``"<object>.<action>"``.
:return: ``True`` if the access token either contains all given scope values or if
the current request contains no valid access token at all, ``False`` otherwise.
"""
access_token = get_access_token()
if access_token is None:
return True
current_scopes = access_token.scope.split()
valid_scopes = [scope in current_scopes for scope in scopes]
return all(valid_scopes)
[docs]
def scopes_required(*scopes):
r"""Decorator to add required access token scope values to an API endpoint.
Uses :func:`check_access_token_scopes`, so the scopes are only checked if the
current request actually contains a valid access token. Therefore, this decorator
usually only makes sense for public API endpoints that can be accessed by using an
access token.
**Example:**
.. code-block:: python3
@blueprint.route("/records")
@login_required
@scopes_required("record.read")
def get_records():
pass
The information about the scope values is also used when generating the API
specification.
:param \*scopes: See :func:`check_access_token_scopes`.
"""
def decorator(func):
apispec_meta = get_apispec_meta(func)
apispec_meta[const.APISPEC_SCOPES_KEY] = scopes
@wraps(func)
def wrapper(*args, **kwargs):
if not check_access_token_scopes(*scopes):
abort(
json_error_response(
401,
description="Access token has insufficient scope.",
scopes=scopes,
)
)
return func(*args, **kwargs)
return wrapper
return decorator
[docs]
def internal(func):
"""Decorator to mark an API endpoint as internal.
Internal endpoints can only be accessed via the session, not via access tokens. This
is not to be confused with :func:`kadi.lib.api.utils.is_internal_api_request`.
The information about an endpoint being internal is also used when generating the
API specification.
"""
apispec_meta = get_apispec_meta(func)
apispec_meta[const.APISPEC_INTERNAL_KEY] = True
@wraps(func)
def wrapper(*args, **kwargs):
access_token = get_access_token()
if access_token is not None:
abort(404)
return func(*args, **kwargs)
return wrapper
[docs]
def experimental(func):
"""Decorator to mark an API endpoint as experimental.
Experimental endpoints can only be called if the ``EXPERIMENTAL_FEATURES`` flag in
the application's configuration is set.
The information about an endpoint being experimental is also used when generating
the API specification.
"""
apispec_meta = get_apispec_meta(func)
apispec_meta[const.APISPEC_EXPERIMENTAL_KEY] = True
@wraps(func)
def wrapper(*args, **kwargs):
if not current_app.config["EXPERIMENTAL_FEATURES"]:
abort(404)
return func(*args, **kwargs)
return wrapper