Source code for kadi.lib.search.core

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from elasticsearch.exceptions import ConnectionError as ESConnectionError
from elasticsearch.exceptions import NotFoundError as ESNotFoundError
from elasticsearch.exceptions import RequestError as ESRequestError
from elasticsearch_dsl import Search
from elasticsearch_dsl import analyzer
from elasticsearch_dsl import tokenizer
from flask import current_app

import kadi.lib.constants as const
from kadi.ext.db import db
from kadi.ext.elasticsearch import es
from kadi.lib.format import timestamp
from kadi.lib.utils import as_list
from kadi.lib.utils import get_class_by_name


[docs]class MappingMixin: """Mixin for custom search mappings."""
[docs] class Analyzers: """Container to store common analyzers.""" trigram = analyzer( "trigram_analyzer", tokenizer=tokenizer("trigram", "ngram", min_gram=3, max_gram=3), filter=["lowercase"], )
[docs] @classmethod def get_attributes(cls): """Get a list of all attributes from a mapping class.""" properties = cls._doc_type.mapping.properties return list(properties._params["properties"].keys())
[docs] @classmethod def create_document(cls, obj): """Create a new document to be indexed in ElasticSearch :param obj: The object to be indexed. :return: The created document. """ document = cls() document.meta.id = obj.id for attr in cls.get_attributes(): if hasattr(obj, attr): setattr(document, attr, getattr(obj, attr)) return document
[docs]class SearchableMixin: """Mixin for SQLALchemy models to add support for searching. The columns to index have to be specified in a mapping class, which has to be configured with its fully qualified name using ``Meta.search_mapping``. **Example:** .. code-block:: python3 class Foo: class Meta: search_mapping = "kadi.modules.record.mapping.RecordMapping" After calling :meth:`register_search_listeners`, the search index will automatically get updated whenever an object is created or deleted or if any of the indexed columns (or the ``state`` column, if present) are updated using :func:`add_to_index` and :func:`remove_from_index`. """
[docs] @classmethod def get_mapping_class(cls): """Convenience method to get the mapping class of a model.""" return get_class_by_name(cls.Meta.search_mapping)
[docs] @classmethod def search(cls, query=None, sort="_score", filter_ids=None, start=0, end=10): """Query the search index corresponding to this model. Uses :func:`search_index`, but returns the actual results instead of the raw search response. :param query: (optional) See :func:`search_index`. :param sort: (optional) See :func:`search_index`. :param filter_ids: (optional) See :func:`search_index`. :param start: (optional) See :func:`search_index`. :param end: (optional) See :func:`search_index`. :return: A tuple containing a list of the search results and the total amount of hits. :raises elasticsearch.exceptions.ConnectionError: If no connection could be established to Elasticsearch. """ response = search_index( cls.__tablename__, query=query, sort=sort, filter_ids=filter_ids, start=start, end=end, ) if response is None or not response.hits: return [], 0 ids = [int(hit.meta.id) for hit in response.hits] whens = [] for index, id in enumerate(ids): whens.append((id, index)) results = ( cls.query.filter(cls.id.in_(ids)) .order_by(db.case(*whens, value=cls.id)) .all() ) return results, response.hits.total.value
@classmethod def _before_flush_search(cls, session, flush_context, instances): if not hasattr(session, "_changes"): session._changes = {"add": set(), "remove": set()} for obj in session.new: if isinstance(obj, cls): session._changes["add"].add(obj) for obj in session.deleted: if isinstance(obj, cls): session._changes["remove"].add(obj) for obj in session.dirty: if isinstance(obj, cls) and session.is_modified(obj): if ( getattr(obj, "state", const.MODEL_STATE_ACTIVE) == const.MODEL_STATE_ACTIVE ): session._changes["add"].add(obj) session._changes["remove"].discard(obj) else: session._changes["remove"].add(obj) session._changes["add"].discard(obj) @classmethod def _after_commit_search(cls, session): if hasattr(session, "_changes"): for obj in session._changes["add"]: add_to_index(obj) for obj in session._changes["remove"]: remove_from_index(obj) del session._changes @classmethod def _after_rollback_search(cls, session): if hasattr(session, "_changes"): del session._changes
[docs] @classmethod def register_search_listeners(cls): """Register listeners to automatically update the search index. Uses SQLAlchemy's ``before_flush``, ``after_commit`` and ``after_rollback`` events and propagates to all inheriting models. """ db.event.listen( db.session, "before_flush", cls._before_flush_search, propagate=True ) db.event.listen( db.session, "after_commit", cls._after_commit_search, propagate=True ) db.event.listen( db.session, "after_rollback", cls._after_rollback_search, propagate=True )
[docs]def create_index(model, force=False): """Create a new search index if it does not exist yet. The name of the index will be in the form of ``"<tablename>_<timestamp>"``, where ``<tablename>`` depends on the given model. An alias ``<tablename>`` pointing to the actual index will also be created automatically if it does not exist yet. :param model: The model to create the index of. See also :class:`.SearchableMixin`. :param force: (optional) Flag indicating whether a new index should be created even if one already exists. Note that if an alias already exists it will not be updated automatically to point to the new index. :return: The name of the newly created or existing index or ``None`` if no new index could be created. """ alias = model.__tablename__ try: alias_exists = es.indices.exists(index=alias) if alias_exists and not force: return list(es.indices.get_alias(index=alias).keys())[0] index = f"{alias}_{timestamp(include_micro=True)}" mapping_class = model.get_mapping_class() mapping_class.init(using=es, index=index) if not alias_exists: es.indices.put_alias(index=index, name=alias) return index except Exception as e: current_app.logger.exception(e) return None
[docs]def add_to_index(obj, index=None): """Add an object to its corresponding search index. :param obj: The object to index. See also :class:`.SearchableMixin`. :param index: (optional) The name of an index that should be used instead of the alias corresponding to the given object. See also :func:`create_index`. :return: ``True`` if the object was indexed successfully, ``False`` otherwise. """ index = index if index is not None else obj.__tablename__ try: # Check if the index actually exists, as it will be created dynamically # otherwise (even if dynamic mapping is turned off for newly added fields). if not es.indices.exists(index=index): return False document = obj.get_mapping_class().create_document(obj) document.save(using=es, index=index) except ESConnectionError as e: if not current_app.config["ELASTICSEARCH_ENABLE_FALLBACK"]: current_app.logger.exception(e) return False except Exception as e: current_app.logger.exception(e) return False return True
[docs]def remove_from_index(obj, index=None): """Remove an object from its corresponding search index. :param obj: The object to remove from the index. See also :class:`.SearchableMixin`. :param index: (optional) The name of an index that should be used instead of the alias corresponding to the given object. See also :func:`create_index`. :return: ``True`` if the object was removed successfully, ``False`` otherwise. """ index = index if index is not None else obj.__tablename__ mapping_class = obj.get_mapping_class() try: document = mapping_class.get(using=es, index=index, id=obj.id) document.delete(using=es) except ESNotFoundError: pass except ESConnectionError as e: if not current_app.config["ELASTICSEARCH_ENABLE_FALLBACK"]: current_app.logger.exception(e) return False except Exception as e: current_app.logger.exception(e) return False return True
[docs]def search_index(index, query=None, sort="_score", filter_ids=None, start=0, end=10): """Query a specific search index. :param index: The name of the search index. :param query: (optional) The search query in form of an Elasticsearch DSL query object. :param sort: (optional) The name of a field or a list of multiple fields to sort on. :param filter_ids: (optional) A list of IDs to restrict the search results to. :param start: (optional) The start index of the results to return. :param end: (optional) The end index of the results to return. :return: A response object as returned by the Elasticsearch DSL or ``None`` if the request returned an error. Note that only the metadata will be returned as part of the search results, without any field data. :raises elasticsearch.exceptions.ConnectionError: If no connection could be established to Elasticsearch. """ search = Search(using=es, index=index) if query: search = search.query(query) if filter_ids is not None: search = search.filter("ids", values=filter_ids) search = ( search.sort(*as_list(sort)) .extra(track_total_hits=True) .source(False)[start:end] ) try: response = search.execute() except ESRequestError: # Ignore malformed requests. return None except ESConnectionError as e: if not current_app.config["ELASTICSEARCH_ENABLE_FALLBACK"]: current_app.logger.exception(e) raise except Exception as e: current_app.logger.exception(e) return None return response