Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LDAP user search/custom mapping + TLS support #545

Merged
merged 9 commits into from
Jul 21, 2023
17 changes: 11 additions & 6 deletions coldfront/config/plugins/ldap_user_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
except ImportError:
raise ImproperlyConfigured('Please run: pip install ldap3')

#------------------------------------------------------------------------------
# This enables searching for users via LDAP
#------------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# This enables searching for users via LDAP
# ----------------------------------------------------------------------------

LDAP_USER_SEARCH_SERVER_URI = ENV.str('LDAP_USER_SEARCH_SERVER_URI')
LDAP_USER_SEARCH_BASE = ENV.str('LDAP_USER_SEARCH_BASE')
LDAP_USER_SEARCH_BIND_DN = ENV.str('LDAP_USER_SEARCH_BIND_DN')
LDAP_USER_SEARCH_BIND_PASSWORD = ENV.str('LDAP_USER_SEARCH_BIND_PASSWORD')
LDAP_USER_SEARCH_BIND_DN = ENV.str('LDAP_USER_SEARCH_BIND_DN', default=None)
LDAP_USER_SEARCH_BIND_PASSWORD = ENV.str('LDAP_USER_SEARCH_BIND_PASSWORD', default=None)
LDAP_USER_SEARCH_CONNECT_TIMEOUT = ENV.float('LDAP_USER_SEARCH_CONNECT_TIMEOUT', default=2.5)
LDAP_USER_SEARCH_USE_SSL = ENV.bool('LDAP_USER_SEARCH_USE_SSL', default=True)
ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch',]
LDAP_USER_SEARCH_USE_TLS = ENV.bool('LDAP_USER_SEARCH_USE_TLS', default=False)
LDAP_USER_SEARCH_PRIV_KEY_FILE = ENV.str("LDAP_USER_SEARCH_PRIV_KEY_FILE", default=None)
LDAP_USER_SEARCH_CERT_FILE = ENV.str("LDAP_USER_SEARCH_CERT_FILE", default=None)
LDAP_USER_SEARCH_CACERT_FILE = ENV.str("LDAP_USER_SEARCH_CACERT_FILE", default=None)

ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch']
101 changes: 94 additions & 7 deletions coldfront/plugins/ldap_user_search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,106 @@ search.py code in the FreeIPA plugin.
## Design

ColdFront provides an API to define additional user search classes for
extending the default search functionality. This app implements a
LDAPUserSearch class in utils.py which performs the LDAP search. This class is
then registered with ColdFront by setting "ADDITIONAL\_USER\_SEARCH\_CLASSES"
in local\_settings.py.
extending the default search functionality. This app implements an
`LDAPUserSearch` class in `utils.py` which performs the LDAP search. This class is
then registered with ColdFront by setting `ADDITIONAL_USER_SEARCH_CLASSES`
in `config/plugins/ldap_user_search.py`. This class also allows customization
through Django settings of the attributes requested and how they're mapped to
ColdFront users.

## Requirements

- pip install python-ldap ldap3
- `pip install python-ldap ldap3`

## Usage

To enable this plugin add the following in your `local_settings.py` file:
To enable this plugin set the following applicable environment variables:

| Option | Default | Description |
| --- | --- | --- |
| `LDAP_USER_SEARCH_SERVER_URI` | N/A | URI for the LDAP server, required |
| `LDAP_USER_SEARCH_BASE` | N/A | Search base, required |
| `LDAP_USER_SEARCH_BIND_DN` | None | Bind DN |
| `LDAP_USER_SEARCH_BIND_PASSWORD` | None | Bind Password |
| `LDAP_USER_SEARCH_CONNECT_TIMEOUT` | 2.5 | Time in seconds before the connection times out |
| `LDAP_USER_SEARCH_USE_SSL` | True | Whether or not to use SSL |
| `LDAP_USER_SEARCH_USE_TLS` | False | Whether or not to use TLS |
| `LDAP_USER_SEARCH_SASL_MECHANISM` | None | One of `"EXTERNAL"`, `"DIGEST-MD5"`, `"GSSAPI"`, or `None` |
| `LDAP_USER_SEARCH_SASL_CREDENTIALS` | None | SASL authorization identity string. If you don't have one and `None` doesn't work, try `""`. |
| `LDAP_USER_SEARCH_PRIV_KEY_FILE` | None | Path to the private key file |
| `LDAP_USER_SEARCH_CERT_FILE` | None | Path to the certificate file |
| `LDAP_USER_SEARCH_CACERT_FILE` | None | Path to the CA certificate file |

The following can be set in your local settings:
| `LDAP_USER_SEARCH_ATTRIBUTE_MAP` | `{"username": "uid", "last_name": "sn", "first_name": "givenName", "email": "mail"}` | A mapping from ColdFront user attributes to LDAP attributes. |
| `LDAP_USER_SEARCH_MAPPING_CALLBACK` | See below. | Function that maps LDAP search results to ColdFront user attributes. See more below. |

`LDAP_USER_SEARCH_MAPPING_CALLBACK` default:
```py
def parse_ldap_entry(attribute_map, entry_dict):
user_dict = {}
for user_attr, ldap_attr in attribute_map.items():
user_dict[user_attr] = entry_dict.get(ldap_attr)[0] if entry_dict.get(ldap_attr) else ''
return user_dict
```

For custom attributes, set the Django variable `LDAP_USER_SEARCH_ATTRIBUTE_MAP` in ColdFront's [local settings](https://coldfront.readthedocs.io/en/latest/config/#configuration-files). This dictionary maps from ColdFront User attributes to LDAP attributes:
```py
# default
LDAP_USER_SEARCH_ATTRIBUTE_MAP = {
"username": "uid",
"last_name": "sn",
"first_name": "givenName",
"email": "mail",
}
```
ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch',]

You can also set the attribute to search by through the variable `LDAP_USER_SEARCH_USERNAME_ONLY_ATTR`. This might be useful if you wish to instead search LDAP with an email instead of username.
```py
# this will make the call to search_a_user("[email protected]", "email") search
# for "[email protected]" with the LDAP attribute "mail" if you're using the above map.
LDAP_USER_SEARCH_USERNAME_ONLY_ATTR = "email"
```

To set a custom mapping, define an `LDAP_USER_SEARCH_MAPPING_CALLBACK` function with parameters `attr_map` and `entry_dict` that returns a dictionary mapping ColdFront User attributes to their values. `attr_map` is just `LDAP_USER_SEARCH_ATTRIBUTE_MAP`, and `entry_dict` is further explained below.

For example, if your LDAP schema provides a full name and no first and last name attributes, you can define `LDAP_USER_SEARCH_ATTRIBUTE_MAP` and `LDAP_USER_SEARCH_MAPPING_CALLBACK` as follows:

```py
LDAP_USER_SEARCH_ATTRIBUTE_MAP = {
"username": "uid",
"email": "mail",
"full_name": "cn",
}

def LDAP_USER_SEARCH_MAPPING_CALLBACK(attr_map, entry_dict):
user_dict = {
"username": entry_dict.get(attr_map["username"])[0],
"email": entry_dict.get(attr_map["email"])[0],
"first_name": entry_dict.get(attr_map["full_name"])[0].split(" ")[0],
"last_name": entry_dict.get(attr_map["full_name"])[0].split(" ")[-1],
}
return user_dict
```

`entry_dict` is provided as a dictionary mapping from the LDAP attribute to a list of values.
```py
entry_dict = {
'mail': ['[email protected]'],
'cn': ['Jane E Doe'],
'uid': ['janedoe1234']
}
```

If this was the input to the above callback, `user_dict` would look like this:
```py
user_dict = {
"username": "janedoe1234",
"email": "[email protected]",
"first_name": "Jane",
"last_name": "Doe",
}
```

## Details
The `search_a_user` function also allows searching for a specific attribute. Providing the `search_by` parameter with a key to the attribute map will have it search for the corresponding attribute.
75 changes: 57 additions & 18 deletions coldfront/plugins/ldap_user_search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import ldap.filter
from coldfront.core.user.utils import UserSearch
from coldfront.core.utils.common import import_from_settings
from ldap3 import Connection, Server
from ldap3 import Connection, Server, Tls, get_config_parameter, set_config_parameter, SASL

logger = logging.getLogger(__name__)


class LDAPUserSearch(UserSearch):
search_source = 'LDAP'

Expand All @@ -19,42 +20,80 @@ def __init__(self, user_search_string, search_by):
self.LDAP_BIND_PASSWORD = import_from_settings('LDAP_USER_SEARCH_BIND_PASSWORD', None)
self.LDAP_CONNECT_TIMEOUT = import_from_settings('LDAP_USER_SEARCH_CONNECT_TIMEOUT', 2.5)
self.LDAP_USE_SSL = import_from_settings('LDAP_USER_SEARCH_USE_SSL', True)
self.LDAP_USE_TLS = import_from_settings("LDAP_USER_SEARCH_USE_TLS", False)
self.LDAP_SASL_MECHANISM = import_from_settings("LDAP_USER_SEARCH_SASL_MECHANISM", None)
self.LDAP_SASL_CREDENTIALS = import_from_settings("LDAP_USER_SEARCH_SASL_CREDENTIALS", None)
self.LDAP_PRIV_KEY_FILE = import_from_settings('LDAP_USER_SEARCH_PRIV_KEY_FILE', None)
self.LDAP_CERT_FILE = import_from_settings('LDAP_USER_SEARCH_CERT_FILE', None)
self.LDAP_CACERT_FILE = import_from_settings('LDAP_USER_SEARCH_CACERT_FILE', None)
self.USERNAME_ONLY_ATTR = import_from_settings('LDAP_USER_SEARCH_USERNAME_ONLY_ATTR', 'username')
self.ATTRIBUTE_MAP = import_from_settings('LDAP_USER_SEARCH_ATTRIBUTE_MAP', {
"username": "uid",
"last_name": "sn",
"first_name": "givenName",
"email": "mail",
})
self.MAPPING_CALLBACK = import_from_settings('LDAP_USER_SEARCH_MAPPING_CALLBACK', self.parse_ldap_entry)

self.server = Server(self.LDAP_SERVER_URI, use_ssl=self.LDAP_USE_SSL, connect_timeout=self.LDAP_CONNECT_TIMEOUT)
self.conn = Connection(self.server, self.LDAP_BIND_DN, self.LDAP_BIND_PASSWORD, auto_bind=True)

def parse_ldap_entry(self, entry):
entry_dict = json.loads(entry.entry_to_json()).get('attributes')
tls = None
if self.LDAP_USE_TLS:
tls = Tls(
local_private_key_file=self.LDAP_PRIV_KEY_FILE,
local_certificate_file=self.LDAP_CERT_FILE,
ca_certs_file=self.LDAP_CACERT_FILE,
)

user_dict = {
'last_name': entry_dict.get('sn')[0] if entry_dict.get('sn') else '',
'first_name': entry_dict.get('givenName')[0] if entry_dict.get('givenName') else '',
'username': entry_dict.get('uid')[0] if entry_dict.get('uid') else '',
'email': entry_dict.get('mail')[0] if entry_dict.get('mail') else '',
'source': self.search_source,
}
self.server = Server(self.LDAP_SERVER_URI, use_ssl=self.LDAP_USE_SSL, connect_timeout=self.LDAP_CONNECT_TIMEOUT, tls=tls)
conn_params = {"auto_bind": True}
if self.LDAP_SASL_MECHANISM:
conn_params["sasl_mechanism"] = self.LDAP_SASL_MECHANISM
conn_params["sasl_credentials"] = self.LDAP_SASL_CREDENTIALS
conn_params["authentication"] = SASL
self.conn = Connection(self.server, self.LDAP_BIND_DN, self.LDAP_BIND_PASSWORD, **conn_params)

@staticmethod
def parse_ldap_entry(attribute_map, entry_dict):
user_dict = {}
for user_attr, ldap_attr in attribute_map.items():
user_dict[user_attr] = entry_dict.get(ldap_attr)[0] if entry_dict.get(ldap_attr) else ''
return user_dict

def search_a_user(self, user_search_string=None, search_by='all_fields'):
size_limit = 50
ldap_attrs = list(self.ATTRIBUTE_MAP.values())
attrs = get_config_parameter("ATTRIBUTES_EXCLUDED_FROM_CHECK")
attrs.extend(ldap_attrs)
set_config_parameter("ATTRIBUTES_EXCLUDED_FROM_CHECK", attrs)
if user_search_string and search_by == 'all_fields':
filter = ldap.filter.filter_format("(|(givenName=*%s*)(sn=*%s*)(uid=*%s*)(mail=*%s*))", [user_search_string] * 4)
filter = ldap.filter.filter_format(
f"(|({ldap_attrs[0]}=*%s*)({ldap_attrs[1]}=*%s*)({ldap_attrs[2]}=*%s*)({ldap_attrs[3]}=*%s*))",
[user_search_string] * 4)
elif user_search_string and search_by == 'username_only':
filter = ldap.filter.filter_format("(uid=%s)", [user_search_string])
attr = self.USERNAME_ONLY_ATTR
filter = ldap.filter.filter_format(
f"({self.ATTRIBUTE_MAP[attr]}=%s)", [user_search_string]
)
size_limit = 1
elif user_search_string and search_by in self.ATTRIBUTE_MAP.keys():
filter = ldap.filter.filter_format(
f"({self.ATTRIBUTE_MAP[search_by]}=%s)", [user_search_string]
)
size_limit = 1
else:
filter = '(objectclass=person)'

searchParameters = {'search_base': self.LDAP_USER_SEARCH_BASE,
'search_filter': filter,
'attributes': ['uid', 'sn', 'givenName', 'mail'],
'attributes': ldap_attrs,
'size_limit': size_limit}
logger.debug(f"search params: {searchParameters}")
self.conn.search(**searchParameters)
users = []
for idx, entry in enumerate(self.conn.entries, 1):
user_dict = self.parse_ldap_entry(entry)
entry_dict = json.loads(entry.entry_to_json()).get('attributes')
logger.debug(f"Entry dict: {entry_dict}")
user_dict = self.MAPPING_CALLBACK(self.ATTRIBUTE_MAP, entry_dict)
user_dict["source"] = self.search_source
users.append(user_dict)

logger.info("LDAP user search for %s found %s results", user_search_string, len(users))
return users
4 changes: 4 additions & 0 deletions docs/pages/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ exist in your backend LDAP to show up in the ColdFront user search.
| LDAP_USER_SEARCH_BASE | User search base dn |
| LDAP_USER_SEARCH_CONNECT_TIMEOUT | Time in seconds to wait before timing out. Default 2.5 |
| LDAP_USER_SEARCH_USE_SSL | Whether to use ssl when connecting to LDAP server. Default True |
| LDAP_USER_SEARCH_USE_TLS | Whether to use tls when connecting to LDAP server. Default False |
| LDAP_USER_SEARCH_PRIV_KEY_FILE | Path to the private key file. |
| LDAP_USER_SEARCH_CERT_FILE | Path to the certificate file. |
| LDAP_USER_SEARCH_CACERT_FILE | Path to the CA cert file. |

## Advanced Configuration

Expand Down