Source code for kadi.lib.ldap

# Copyright 2020 Karlsruhe Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ssl

import ldap3 as ldap
from flask import current_app
from ldap3.utils.dn import parse_dn


[docs]def make_upn(username, dn): """Create a user principal name (UPN) for use in an Active Directory. :param username: The name of a user to use as a prefix for the UPN. :param dn: A DN to parse in order to retrieve the suffix of the UPN. :return: The UPN in the form of ``"<username>@<domain>"`` or ``None`` if the given DN could not be parsed or does not contain any domain components. """ try: rdns = parse_dn(dn) except: return None dcs = [] for rdn in rdns: if rdn[0].lower() == "dc": dcs.append(rdn[1]) if len(dcs) == 0: return None return f"{username}@{'.'.join(dcs)}"
[docs]def bind(connection): """Try to authenticate with a server given an LDAP connection. By default, LDAP connections are anonymous. The BIND operation establishes an authentication state between a client and a server. :param connection: The connection object, see also :func:`make_connection`. :return: ``True`` if the BIND operation was successful, ``False`` otherwise. """ try: return connection.bind() except Exception as e: current_app.logger.exception(e) unbind(connection) return False
[docs]def unbind(connection): """Disconnect a given LDAP connection. :param connection: The connection object, see also :func:`make_connection`. """ try: connection.unbind() except: pass
[docs]def make_server(host, port=389, use_ssl=False, validate_cert="REQUIRED", ciphers=None): """Create a new LDAP ``Server`` object. :param host: The host name or IP address of the LDAP server. :param port: (optional) The port the LDAP server is listening on. :param use_ssl: (optional) Flag indicating whether the entire connection should be secured via SSL/TLS. :param validate_cert: (optional) Whether the certificate of the server should be validated. One of ``"NONE"``, ``"OPTIONAL"`` or ``"REQUIRED"``. :param ciphers: (optional) One or more SSL/TLS ciphers to use as a single string according to the OpenSSL cipher list format. May also be set to ``"DEFAULT"``, in which case the default ciphers of the installed OpenSSL version are used. :return: The new ``Server`` object or ``None`` if it could not be created. """ try: tls = ldap.Tls( validate=getattr(ssl, f"CERT_{validate_cert}", ssl.CERT_REQUIRED), ciphers=ciphers, ) return ldap.Server(host, port=port, use_ssl=use_ssl, tls=tls) except Exception as e: current_app.logger.exception(e) return None
[docs]def make_connection(server, user=None, password=None, use_starttls=False): """Create a new LDAP ``Connection`` object. :param server: The server object to use for the connection. See :func:`make_server`. :param user: (optional) The user for simple BIND. :param password: (optional) The password of the user for simple BIND. :param use_starttls: (optional) Flag indicating whether the connection should be secured via STARTTLS. :return: The new ``Connection`` object or ``None`` if it could not be created. """ try: connection = ldap.Connection( server, user=user, password=password, fast_decoder=False, read_only=True ) if use_starttls: connection.start_tls() except Exception as e: current_app.logger.exception(e) return None return connection
[docs]def modify_password( connection, user, new_password, old_password=None, active_directory=False ): """Modify a user's LDAP password using an extended password modification operation. :param connection: The LDAP connection to use. See :func:`make_connection`. :param user: The user whose password should be changed. :param new_password: The new password of the user. :param old_password: (optional) The old password of the user, if the LDAP server requires it. :param active_directory: (optional) Flag indicating whether the LDAP server is an Active Directory, which does not support the standard extended password modification operation. :return: A boolean value indicating whether the change was successful. """ if not connection.bound and not bind(connection): return False if active_directory: return connection.extend.microsoft.modify_password( user, new_password, old_password=old_password ) return connection.extend.standard.modify_password( user=user, new_password=new_password, old_password=old_password )