Skip to content

Commit

Permalink
Merge pull request #545 from cecilialau6776/ldap_user_search/custom_m…
Browse files Browse the repository at this point in the history
…apping

LDAP user search/custom mapping + TLS support
  • Loading branch information
aebruno authored Jul 21, 2023
2 parents bc495b0 + 0f44ec6 commit dbd3c39
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 31 deletions.
17 changes: 11 additions & 6 deletions coldfront/config/plugins/ldap_user_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
except ImportError:
raise ImproperlyConfigured('Please run: pip install ldap3')

#------------------------------------------------------------------------------
# This enables searching for users via LDAP
#------------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# This enables searching for users via LDAP
# ----------------------------------------------------------------------------

LDAP_USER_SEARCH_SERVER_URI = ENV.str('LDAP_USER_SEARCH_SERVER_URI')
LDAP_USER_SEARCH_BASE = ENV.str('LDAP_USER_SEARCH_BASE')
LDAP_USER_SEARCH_BIND_DN = ENV.str('LDAP_USER_SEARCH_BIND_DN')
LDAP_USER_SEARCH_BIND_PASSWORD = ENV.str('LDAP_USER_SEARCH_BIND_PASSWORD')
LDAP_USER_SEARCH_BIND_DN = ENV.str('LDAP_USER_SEARCH_BIND_DN', default=None)
LDAP_USER_SEARCH_BIND_PASSWORD = ENV.str('LDAP_USER_SEARCH_BIND_PASSWORD', default=None)
LDAP_USER_SEARCH_CONNECT_TIMEOUT = ENV.float('LDAP_USER_SEARCH_CONNECT_TIMEOUT', default=2.5)
LDAP_USER_SEARCH_USE_SSL = ENV.bool('LDAP_USER_SEARCH_USE_SSL', default=True)
ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch',]
LDAP_USER_SEARCH_USE_TLS = ENV.bool('LDAP_USER_SEARCH_USE_TLS', default=False)
LDAP_USER_SEARCH_PRIV_KEY_FILE = ENV.str("LDAP_USER_SEARCH_PRIV_KEY_FILE", default=None)
LDAP_USER_SEARCH_CERT_FILE = ENV.str("LDAP_USER_SEARCH_CERT_FILE", default=None)
LDAP_USER_SEARCH_CACERT_FILE = ENV.str("LDAP_USER_SEARCH_CACERT_FILE", default=None)

ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch']
101 changes: 94 additions & 7 deletions coldfront/plugins/ldap_user_search/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,106 @@ search.py code in the FreeIPA plugin.
## Design

ColdFront provides an API to define additional user search classes for
extending the default search functionality. This app implements a
LDAPUserSearch class in utils.py which performs the LDAP search. This class is
then registered with ColdFront by setting "ADDITIONAL\_USER\_SEARCH\_CLASSES"
in local\_settings.py.
extending the default search functionality. This app implements an
`LDAPUserSearch` class in `utils.py` which performs the LDAP search. This class is
then registered with ColdFront by setting `ADDITIONAL_USER_SEARCH_CLASSES`
in `config/plugins/ldap_user_search.py`. This class also allows customization
through Django settings of the attributes requested and how they're mapped to
ColdFront users.

## Requirements

- pip install python-ldap ldap3
- `pip install python-ldap ldap3`

## Usage

To enable this plugin add the following in your `local_settings.py` file:
To enable this plugin set the following applicable environment variables:

| Option | Default | Description |
| --- | --- | --- |
| `LDAP_USER_SEARCH_SERVER_URI` | N/A | URI for the LDAP server, required |
| `LDAP_USER_SEARCH_BASE` | N/A | Search base, required |
| `LDAP_USER_SEARCH_BIND_DN` | None | Bind DN |
| `LDAP_USER_SEARCH_BIND_PASSWORD` | None | Bind Password |
| `LDAP_USER_SEARCH_CONNECT_TIMEOUT` | 2.5 | Time in seconds before the connection times out |
| `LDAP_USER_SEARCH_USE_SSL` | True | Whether or not to use SSL |
| `LDAP_USER_SEARCH_USE_TLS` | False | Whether or not to use TLS |
| `LDAP_USER_SEARCH_SASL_MECHANISM` | None | One of `"EXTERNAL"`, `"DIGEST-MD5"`, `"GSSAPI"`, or `None` |
| `LDAP_USER_SEARCH_SASL_CREDENTIALS` | None | SASL authorization identity string. If you don't have one and `None` doesn't work, try `""`. |
| `LDAP_USER_SEARCH_PRIV_KEY_FILE` | None | Path to the private key file |
| `LDAP_USER_SEARCH_CERT_FILE` | None | Path to the certificate file |
| `LDAP_USER_SEARCH_CACERT_FILE` | None | Path to the CA certificate file |

The following can be set in your local settings:
| `LDAP_USER_SEARCH_ATTRIBUTE_MAP` | `{"username": "uid", "last_name": "sn", "first_name": "givenName", "email": "mail"}` | A mapping from ColdFront user attributes to LDAP attributes. |
| `LDAP_USER_SEARCH_MAPPING_CALLBACK` | See below. | Function that maps LDAP search results to ColdFront user attributes. See more below. |

`LDAP_USER_SEARCH_MAPPING_CALLBACK` default:
```py
def parse_ldap_entry(attribute_map, entry_dict):
user_dict = {}
for user_attr, ldap_attr in attribute_map.items():
user_dict[user_attr] = entry_dict.get(ldap_attr)[0] if entry_dict.get(ldap_attr) else ''
return user_dict
```

For custom attributes, set the Django variable `LDAP_USER_SEARCH_ATTRIBUTE_MAP` in ColdFront's [local settings](https://coldfront.readthedocs.io/en/latest/config/#configuration-files). This dictionary maps from ColdFront User attributes to LDAP attributes:
```py
# default
LDAP_USER_SEARCH_ATTRIBUTE_MAP = {
"username": "uid",
"last_name": "sn",
"first_name": "givenName",
"email": "mail",
}
```
ADDITIONAL_USER_SEARCH_CLASSES = ['coldfront.plugins.ldap_user_search.utils.LDAPUserSearch',]

You can also set the attribute to search by through the variable `LDAP_USER_SEARCH_USERNAME_ONLY_ATTR`. This might be useful if you wish to instead search LDAP with an email instead of username.
```py
# this will make the call to search_a_user("[email protected]", "email") search
# for "[email protected]" with the LDAP attribute "mail" if you're using the above map.
LDAP_USER_SEARCH_USERNAME_ONLY_ATTR = "email"
```

To set a custom mapping, define an `LDAP_USER_SEARCH_MAPPING_CALLBACK` function with parameters `attr_map` and `entry_dict` that returns a dictionary mapping ColdFront User attributes to their values. `attr_map` is just `LDAP_USER_SEARCH_ATTRIBUTE_MAP`, and `entry_dict` is further explained below.

For example, if your LDAP schema provides a full name and no first and last name attributes, you can define `LDAP_USER_SEARCH_ATTRIBUTE_MAP` and `LDAP_USER_SEARCH_MAPPING_CALLBACK` as follows:

```py
LDAP_USER_SEARCH_ATTRIBUTE_MAP = {
"username": "uid",
"email": "mail",
"full_name": "cn",
}

def LDAP_USER_SEARCH_MAPPING_CALLBACK(attr_map, entry_dict):
user_dict = {
"username": entry_dict.get(attr_map["username"])[0],
"email": entry_dict.get(attr_map["email"])[0],
"first_name": entry_dict.get(attr_map["full_name"])[0].split(" ")[0],
"last_name": entry_dict.get(attr_map["full_name"])[0].split(" ")[-1],
}
return user_dict
```

`entry_dict` is provided as a dictionary mapping from the LDAP attribute to a list of values.
```py
entry_dict = {
'mail': ['[email protected]'],
'cn': ['Jane E Doe'],
'uid': ['janedoe1234']
}
```

If this was the input to the above callback, `user_dict` would look like this:
```py
user_dict = {
"username": "janedoe1234",
"email": "[email protected]",
"first_name": "Jane",
"last_name": "Doe",
}
```

## Details
The `search_a_user` function also allows searching for a specific attribute. Providing the `search_by` parameter with a key to the attribute map will have it search for the corresponding attribute.
75 changes: 57 additions & 18 deletions coldfront/plugins/ldap_user_search/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import ldap.filter
from coldfront.core.user.utils import UserSearch
from coldfront.core.utils.common import import_from_settings
from ldap3 import Connection, Server
from ldap3 import Connection, Server, Tls, get_config_parameter, set_config_parameter, SASL

logger = logging.getLogger(__name__)


class LDAPUserSearch(UserSearch):
search_source = 'LDAP'

Expand All @@ -19,42 +20,80 @@ def __init__(self, user_search_string, search_by):
self.LDAP_BIND_PASSWORD = import_from_settings('LDAP_USER_SEARCH_BIND_PASSWORD', None)
self.LDAP_CONNECT_TIMEOUT = import_from_settings('LDAP_USER_SEARCH_CONNECT_TIMEOUT', 2.5)
self.LDAP_USE_SSL = import_from_settings('LDAP_USER_SEARCH_USE_SSL', True)
self.LDAP_USE_TLS = import_from_settings("LDAP_USER_SEARCH_USE_TLS", False)
self.LDAP_SASL_MECHANISM = import_from_settings("LDAP_USER_SEARCH_SASL_MECHANISM", None)
self.LDAP_SASL_CREDENTIALS = import_from_settings("LDAP_USER_SEARCH_SASL_CREDENTIALS", None)
self.LDAP_PRIV_KEY_FILE = import_from_settings('LDAP_USER_SEARCH_PRIV_KEY_FILE', None)
self.LDAP_CERT_FILE = import_from_settings('LDAP_USER_SEARCH_CERT_FILE', None)
self.LDAP_CACERT_FILE = import_from_settings('LDAP_USER_SEARCH_CACERT_FILE', None)
self.USERNAME_ONLY_ATTR = import_from_settings('LDAP_USER_SEARCH_USERNAME_ONLY_ATTR', 'username')
self.ATTRIBUTE_MAP = import_from_settings('LDAP_USER_SEARCH_ATTRIBUTE_MAP', {
"username": "uid",
"last_name": "sn",
"first_name": "givenName",
"email": "mail",
})
self.MAPPING_CALLBACK = import_from_settings('LDAP_USER_SEARCH_MAPPING_CALLBACK', self.parse_ldap_entry)

self.server = Server(self.LDAP_SERVER_URI, use_ssl=self.LDAP_USE_SSL, connect_timeout=self.LDAP_CONNECT_TIMEOUT)
self.conn = Connection(self.server, self.LDAP_BIND_DN, self.LDAP_BIND_PASSWORD, auto_bind=True)

def parse_ldap_entry(self, entry):
entry_dict = json.loads(entry.entry_to_json()).get('attributes')
tls = None
if self.LDAP_USE_TLS:
tls = Tls(
local_private_key_file=self.LDAP_PRIV_KEY_FILE,
local_certificate_file=self.LDAP_CERT_FILE,
ca_certs_file=self.LDAP_CACERT_FILE,
)

user_dict = {
'last_name': entry_dict.get('sn')[0] if entry_dict.get('sn') else '',
'first_name': entry_dict.get('givenName')[0] if entry_dict.get('givenName') else '',
'username': entry_dict.get('uid')[0] if entry_dict.get('uid') else '',
'email': entry_dict.get('mail')[0] if entry_dict.get('mail') else '',
'source': self.search_source,
}
self.server = Server(self.LDAP_SERVER_URI, use_ssl=self.LDAP_USE_SSL, connect_timeout=self.LDAP_CONNECT_TIMEOUT, tls=tls)
conn_params = {"auto_bind": True}
if self.LDAP_SASL_MECHANISM:
conn_params["sasl_mechanism"] = self.LDAP_SASL_MECHANISM
conn_params["sasl_credentials"] = self.LDAP_SASL_CREDENTIALS
conn_params["authentication"] = SASL
self.conn = Connection(self.server, self.LDAP_BIND_DN, self.LDAP_BIND_PASSWORD, **conn_params)

@staticmethod
def parse_ldap_entry(attribute_map, entry_dict):
user_dict = {}
for user_attr, ldap_attr in attribute_map.items():
user_dict[user_attr] = entry_dict.get(ldap_attr)[0] if entry_dict.get(ldap_attr) else ''
return user_dict

def search_a_user(self, user_search_string=None, search_by='all_fields'):
size_limit = 50
ldap_attrs = list(self.ATTRIBUTE_MAP.values())
attrs = get_config_parameter("ATTRIBUTES_EXCLUDED_FROM_CHECK")
attrs.extend(ldap_attrs)
set_config_parameter("ATTRIBUTES_EXCLUDED_FROM_CHECK", attrs)
if user_search_string and search_by == 'all_fields':
filter = ldap.filter.filter_format("(|(givenName=*%s*)(sn=*%s*)(uid=*%s*)(mail=*%s*))", [user_search_string] * 4)
filter = ldap.filter.filter_format(
f"(|({ldap_attrs[0]}=*%s*)({ldap_attrs[1]}=*%s*)({ldap_attrs[2]}=*%s*)({ldap_attrs[3]}=*%s*))",
[user_search_string] * 4)
elif user_search_string and search_by == 'username_only':
filter = ldap.filter.filter_format("(uid=%s)", [user_search_string])
attr = self.USERNAME_ONLY_ATTR
filter = ldap.filter.filter_format(
f"({self.ATTRIBUTE_MAP[attr]}=%s)", [user_search_string]
)
size_limit = 1
elif user_search_string and search_by in self.ATTRIBUTE_MAP.keys():
filter = ldap.filter.filter_format(
f"({self.ATTRIBUTE_MAP[search_by]}=%s)", [user_search_string]
)
size_limit = 1
else:
filter = '(objectclass=person)'

searchParameters = {'search_base': self.LDAP_USER_SEARCH_BASE,
'search_filter': filter,
'attributes': ['uid', 'sn', 'givenName', 'mail'],
'attributes': ldap_attrs,
'size_limit': size_limit}
logger.debug(f"search params: {searchParameters}")
self.conn.search(**searchParameters)
users = []
for idx, entry in enumerate(self.conn.entries, 1):
user_dict = self.parse_ldap_entry(entry)
entry_dict = json.loads(entry.entry_to_json()).get('attributes')
logger.debug(f"Entry dict: {entry_dict}")
user_dict = self.MAPPING_CALLBACK(self.ATTRIBUTE_MAP, entry_dict)
user_dict["source"] = self.search_source
users.append(user_dict)

logger.info("LDAP user search for %s found %s results", user_search_string, len(users))
return users
4 changes: 4 additions & 0 deletions docs/pages/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ exist in your backend LDAP to show up in the ColdFront user search.
| LDAP_USER_SEARCH_BASE | User search base dn |
| LDAP_USER_SEARCH_CONNECT_TIMEOUT | Time in seconds to wait before timing out. Default 2.5 |
| LDAP_USER_SEARCH_USE_SSL | Whether to use ssl when connecting to LDAP server. Default True |
| LDAP_USER_SEARCH_USE_TLS | Whether to use tls when connecting to LDAP server. Default False |
| LDAP_USER_SEARCH_PRIV_KEY_FILE | Path to the private key file. |
| LDAP_USER_SEARCH_CERT_FILE | Path to the certificate file. |
| LDAP_USER_SEARCH_CACERT_FILE | Path to the CA cert file. |

## Advanced Configuration

Expand Down

0 comments on commit dbd3c39

Please sign in to comment.