-
Jose Manuel Lopez Lujan authored
- fixes #73
Jose Manuel Lopez Lujan authored- fixes #73
__init__.py 15.21 KiB
import re
from functools import wraps
import ldap
from ldap import filter as ldap_filter
from flask import abort, current_app, g, make_response, redirect, url_for, \
request
__all__ = ['LDAP']
class LDAPException(RuntimeError):
message = None
def __init__(self, message):
self.message = message
def __str__(self):
return self.message
class LDAP(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
@staticmethod
def init_app(app):
"""Initialize the `app` for use with this :class:`~LDAP`. This is
called automatically if `app` is passed to :meth:`~LDAP.__init__`.
:param flask.Flask app: the application to configure for use with
this :class:`~LDAP`
"""
app.config.setdefault('LDAP_HOST', 'localhost')
app.config.setdefault('LDAP_PORT', 389)
app.config.setdefault('LDAP_SCHEMA', 'ldap')
app.config.setdefault('LDAP_USERNAME', None)
app.config.setdefault('LDAP_PASSWORD', None)
app.config.setdefault('LDAP_TIMEOUT', 10)
app.config.setdefault('LDAP_USE_SSL', False)
app.config.setdefault('LDAP_USE_TLS', False)
app.config.setdefault('LDAP_REQUIRE_CERT', False)
app.config.setdefault('LDAP_CERT_PATH', '/path/to/cert')
app.config.setdefault('LDAP_BASE_DN', None)
app.config.setdefault('LDAP_OBJECTS_DN', 'distinguishedName')
app.config.setdefault('LDAP_USER_FIELDS', [])
app.config.setdefault('LDAP_USER_OBJECT_FILTER',
'(&(objectclass=Person)(userPrincipalName=%s))')
app.config.setdefault('LDAP_USER_GROUPS_FIELD', 'memberOf')
app.config.setdefault('LDAP_GROUP_FIELDS', [])
app.config.setdefault('LDAP_GROUP_OBJECT_FILTER',
'(&(objectclass=Group)(userPrincipalName=%s))')
app.config.setdefault('LDAP_GROUP_MEMBERS_FIELD', 'member')
app.config.setdefault('LDAP_LOGIN_VIEW', 'login')
app.config.setdefault('LDAP_REALM_NAME', 'LDAP authentication')
app.config.setdefault('LDAP_OPENLDAP', False)
app.config.setdefault('LDAP_GROUP_MEMBER_FILTER', '*')
app.config.setdefault('LDAP_GROUP_MEMBER_FILTER_FIELD', '*')
app.config.setdefault('LDAP_CUSTOM_OPTIONS', None)
if app.config['LDAP_USE_SSL'] or app.config['LDAP_USE_TLS']:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
if app.config['LDAP_REQUIRE_CERT']:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_DEMAND)
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE,
current_app.config['LDAP_CERT_PATH'])
for option in ['USERNAME', 'PASSWORD', 'BASE_DN']:
if app.config['LDAP_{0}'.format(option)] is None:
raise LDAPException('LDAP_{0} cannot be None!'.format(option))
@staticmethod
def _set_custom_options(conn):
options = current_app.config['LDAP_CUSTOM_OPTIONS']
if options:
for k, v in options.items():
conn.set_option(k, v)
return conn
@property
def initialize(self):
"""Initialize a connection to the LDAP server.
:return: LDAP connection object.
"""
try:
conn = ldap.initialize('{0}://{1}:{2}'.format(
current_app.config['LDAP_SCHEMA'],
current_app.config['LDAP_HOST'],
current_app.config['LDAP_PORT']))
conn.set_option(ldap.OPT_NETWORK_TIMEOUT,
current_app.config['LDAP_TIMEOUT'])
conn = self._set_custom_options(conn)
conn.protocol_version = ldap.VERSION3
if current_app.config['LDAP_USE_TLS']:
conn.start_tls_s()
return conn
except ldap.LDAPError as e:
raise LDAPException(self.error(e.args))
@property
def bind(self):
"""Attempts to bind to the LDAP server using the credentials of the
service account.
:return: Bound LDAP connection object if successful or ``None`` if
unsuccessful.
"""
conn = self.initialize
try:
conn.simple_bind_s(
current_app.config['LDAP_USERNAME'],
current_app.config['LDAP_PASSWORD'])
return conn
except ldap.LDAPError as e:
raise LDAPException(self.error(e.args))
def bind_user(self, username, password):
"""Attempts to bind a user to the LDAP server using the credentials
supplied.
.. note::
Many LDAP servers will grant anonymous access if ``password`` is
the empty string, causing this method to return :obj:`True` no
matter what username is given. If you want to use this method to
validate a username and password, rather than actually connecting
to the LDAP server as a particular user, make sure ``password`` is
not empty.
:param str username: The username to attempt to bind with.
:param str password: The password of the username we're attempting to
bind with.
:return: Returns ``True`` if successful or ``None`` if the credentials
are invalid.
"""
user_dn = self.get_object_details(user=username, dn_only=True)
if user_dn is None:
return
try:
conn = self.initialize
_user_dn = user_dn.decode('utf-8') \
if isinstance(user_dn, bytes) else user_dn
conn.simple_bind_s(_user_dn, password)
return True
except ldap.LDAPError:
return
def get_object_details(self, user=None, group=None, query_filter=None,
dn_only=False):
"""Returns a ``dict`` with the object's (user or group) details.
:param str user: Username of the user object you want details for.
:param str group: Name of the group object you want details for.
:param str query_filter: If included, will be used to query object.
:param bool dn_only: If we should only retrieve the object's
distinguished name or not. Default: ``False``.
"""
query = None
fields = None
if user is not None:
if not dn_only:
fields = current_app.config['LDAP_USER_FIELDS']
query_filter = query_filter or \
current_app.config['LDAP_USER_OBJECT_FILTER']
query = ldap_filter.filter_format(query_filter, (user,))
elif group is not None:
if not dn_only:
fields = current_app.config['LDAP_GROUP_FIELDS']
query_filter = query_filter or \
current_app.config['LDAP_GROUP_OBJECT_FILTER']
query = ldap_filter.filter_format(query_filter, (group,))
conn = self.bind
try:
records = conn.search_s(current_app.config['LDAP_BASE_DN'],
ldap.SCOPE_SUBTREE, query, fields)
conn.unbind_s()
result = {}
if records:
if dn_only:
if current_app.config['LDAP_OPENLDAP']:
if records:
return records[0][0]
else:
if current_app.config['LDAP_OBJECTS_DN'] \
in records[0][1]:
dn = records[0][1][
current_app.config['LDAP_OBJECTS_DN']]
return dn[0]
for k, v in list(records[0][1].items()):
result[k] = v
return result
except ldap.LDAPError as e:
raise LDAPException(self.error(e.args))
def get_user_groups(self, user):
"""Returns a ``list`` with the user's groups or ``None`` if
unsuccessful.
:param str user: User we want groups for.
"""
conn = self.bind
try:
if current_app.config['LDAP_OPENLDAP']:
fields = \
[str(current_app.config['LDAP_GROUP_MEMBER_FILTER_FIELD'])]
records = conn.search_s(
current_app.config['LDAP_BASE_DN'], ldap.SCOPE_SUBTREE,
ldap_filter.filter_format(
current_app.config['LDAP_GROUP_MEMBER_FILTER'],
(self.get_object_details(user, dn_only=True),)),
fields)
else:
records = conn.search_s(
current_app.config['LDAP_BASE_DN'], ldap.SCOPE_SUBTREE,
ldap_filter.filter_format(
current_app.config['LDAP_USER_OBJECT_FILTER'],
(user,)),
[current_app.config['LDAP_USER_GROUPS_FIELD']])
conn.unbind_s()
if records:
if current_app.config['LDAP_OPENLDAP']:
group_member_filter = \
current_app.config['LDAP_GROUP_MEMBER_FILTER_FIELD']
groups = [record[1][group_member_filter][0].decode(
'utf-8') for record in records]
return groups
else:
if current_app.config['LDAP_USER_GROUPS_FIELD'] in \
records[0][1]:
groups = records[0][1][
current_app.config['LDAP_USER_GROUPS_FIELD']]
result = [re.findall(b'(?:cn=|CN=)(.*?),', group)[0]
for group in groups]
result = [r.decode('utf-8') for r in result]
return result
except ldap.LDAPError as e:
raise LDAPException(self.error(e.args))
def get_group_members(self, group):
"""Returns a ``list`` with the group's members or ``None`` if
unsuccessful.
:param str group: Group we want users for.
"""
conn = self.bind
try:
records = conn.search_s(
current_app.config['LDAP_BASE_DN'], ldap.SCOPE_SUBTREE,
ldap_filter.filter_format(
current_app.config['LDAP_GROUP_OBJECT_FILTER'], (group,)),
[current_app.config['LDAP_GROUP_MEMBERS_FIELD']])
conn.unbind_s()
if records:
if current_app.config['LDAP_GROUP_MEMBERS_FIELD'] in \
records[0][1]:
members = records[0][1][
current_app.config['LDAP_GROUP_MEMBERS_FIELD']]
members = [m.decode('utf-8') for m in members]
return members
except ldap.LDAPError as e:
raise LDAPException(self.error(e.args))
@staticmethod
def error(e):
e = e[0]
if 'desc' in e:
return e['desc']
else:
return e
@staticmethod
def login_required(func):
"""When applied to a view function, any unauthenticated requests will
be redirected to the view named in LDAP_LOGIN_VIEW. Authenticated
requests do NOT require membership from a specific group.
The login view is responsible for asking for credentials, checking
them, and setting ``flask.g.user`` to the name of the authenticated
user if the credentials are acceptable.
:param func: The view function to decorate.
"""
@wraps(func)
def wrapped(*args, **kwargs):
if g.user is None:
next_path=request.full_path or request.path
if next_path == '/?':
return redirect(
url_for(current_app.config['LDAP_LOGIN_VIEW']))
return redirect(url_for(current_app.config['LDAP_LOGIN_VIEW'],
next=next_path))
return func(*args, **kwargs)
return wrapped
@staticmethod
def group_required(groups=None):
"""When applied to a view function, any unauthenticated requests will
be redirected to the view named in LDAP_LOGIN_VIEW. Authenticated
requests are only permitted if they belong to one of the listed groups.
The login view is responsible for asking for credentials, checking
them, and setting ``flask.g.user`` to the name of the authenticated
user and ``flask.g.ldap_groups`` to the authenticated user's groups
if the credentials are acceptable.
:param list groups: List of groups that should be able to access the
view function.
"""
def wrapper(func):
@wraps(func)
def wrapped(*args, **kwargs):
if g.user is None:
return redirect(
url_for(current_app.config['LDAP_LOGIN_VIEW'],
next=request.full_path or request.path))
match = [group for group in groups if group in g.ldap_groups]
if not match:
abort(403)
return func(*args, **kwargs)
return wrapped
return wrapper
def basic_auth_required(self, func):
"""When applied to a view function, any unauthenticated requests are
asked to authenticate via HTTP's standard Basic Authentication system.
Requests with credentials are checked with :meth:`.bind_user()`.
The user's browser will typically show them the contents of
LDAP_REALM_NAME as a prompt for which username and password to enter.
If the request's credentials are accepted by the LDAP server, the
username is stored in ``flask.g.ldap_username`` and the password in
``flask.g.ldap_password``.
:param func: The view function to decorate.
"""
def make_auth_required_response():
response = make_response('Unauthorized', 401)
response.www_authenticate.set_basic(
current_app.config['LDAP_REALM_NAME'])
return response
@wraps(func)
def wrapped(*args, **kwargs):
if request.authorization is None:
req_username = None
req_password = None
else:
req_username = request.authorization.username
req_password = request.authorization.password
# Many LDAP servers will grant you anonymous access if you log in
# with an empty password, even if you supply a non-anonymous user
# ID, causing .bind_user() to return True. Therefore, only accept
# non-empty passwords.
if req_username in ['', None] or req_password in ['', None]:
current_app.logger.debug('Got a request without auth data')
return make_auth_required_response()
if not self.bind_user(req_username, req_password):
current_app.logger.debug('User {0!r} gave wrong '
'password'.format(req_username))
return make_auth_required_response()
g.ldap_username = req_username
g.ldap_password = req_password
return func(*args, **kwargs)
return wrapped