# Authors: Fabian Pedregosa
# Alexandre Gramfort
# Nelle Varoquaux
# License: BSD 3 clause
import numpy as np
from scipy import interpolate
from scipy.stats import spearmanr
import warnings
import math
from .base import BaseEstimator, TransformerMixin, RegressorMixin
from .utils import check_array, check_consistent_length
from .utils.validation import _check_sample_weight
from ._isotonic import _inplace_contiguous_isotonic_regression, _make_unique
__all__ = ["check_increasing", "isotonic_regression", "IsotonicRegression"]
def check_increasing(x, y):
"""Determine whether y is monotonically correlated with x.
y is found increasing or decreasing with respect to x based on a Spearman
correlation test.
Parameters
----------
x : array-like of shape (n_samples,)
Training data.
y : array-like of shape (n_samples,)
Training target.
Returns
-------
increasing_bool : boolean
Whether the relationship is increasing or decreasing.
Notes
-----
The Spearman correlation coefficient is estimated from the data, and the
sign of the resulting estimate is used as the result.
In the event that the 95% confidence interval based on Fisher transform
spans zero, a warning is raised.
References
----------
Fisher transformation. Wikipedia.
https://en.wikipedia.org/wiki/Fisher_transformation
"""
# Calculate Spearman rho estimate and set return accordingly.
rho, _ = spearmanr(x, y)
increasing_bool = rho >= 0
# Run Fisher transform to get the rho CI, but handle rho=+/-1
if rho not in [-1.0, 1.0] and len(x) > 3:
F = 0.5 * math.log((1.0 + rho) / (1.0 - rho))
F_se = 1 / math.sqrt(len(x) - 3)
# Use a 95% CI, i.e., +/-1.96 S.E.
# https://en.wikipedia.org/wiki/Fisher_transformation
rho_0 = math.tanh(F - 1.96 * F_se)
rho_1 = math.tanh(F + 1.96 * F_se)
# Warn if the CI spans zero.
if np.sign(rho_0) != np.sign(rho_1):
warnings.warn(
"Confidence interval of the Spearman "
"correlation coefficient spans zero. "
"Determination of ``increasing`` may be "
"suspect."
)
return increasing_bool
def isotonic_regression(
y, *, sample_weight=None, y_min=None, y_max=None, increasing=True
):
"""Solve the isotonic regression model.
Read more in the :ref:`User Guide `.
Parameters
----------
y : array-like of shape (n_samples,)
The data.
sample_weight : array-like of shape (n_samples,), default=None
Weights on each point of the regression.
If None, weight is set to 1 (equal weights).
y_min : float, default=None
Lower bound on the lowest predicted value (the minimum value may
still be higher). If not set, defaults to -inf.
y_max : float, default=None
Upper bound on the highest predicted value (the maximum may still be
lower). If not set, defaults to +inf.
increasing : bool, default=True
Whether to compute ``y_`` is increasing (if set to True) or decreasing
(if set to False).
Returns
-------
y_ : list of floats
Isotonic fit of y.
References
----------
"Active set algorithms for isotonic regression; A unifying framework"
by Michael J. Best and Nilotpal Chakravarti, section 3.
"""
order = np.s_[:] if increasing else np.s_[::-1]
y = check_array(y, ensure_2d=False, input_name="y", dtype=[np.float64, np.float32])
y = np.array(y[order], dtype=y.dtype)
sample_weight = _check_sample_weight(sample_weight, y, dtype=y.dtype, copy=True)
sample_weight = np.ascontiguousarray(sample_weight[order])
_inplace_contiguous_isotonic_regression(y, sample_weight)
if y_min is not None or y_max is not None:
# Older versions of np.clip don't accept None as a bound, so use np.inf
if y_min is None:
y_min = -np.inf
if y_max is None:
y_max = np.inf
np.clip(y, y_min, y_max, y)
return y[order]
class IsotonicRegression(RegressorMixin, TransformerMixin, BaseEstimator):
"""Isotonic regression model.
Read more in the :ref:`User Guide `.
.. versionadded:: 0.13
Parameters
----------
y_min : float, default=None
Lower bound on the lowest predicted value (the minimum value may
still be higher). If not set, defaults to -inf.
y_max : float, default=None
Upper bound on the highest predicted value (the maximum may still be
lower). If not set, defaults to +inf.
increasing : bool or 'auto', default=True
Determines whether the predictions should be constrained to increase
or decrease with `X`. 'auto' will decide based on the Spearman
correlation estimate's sign.
out_of_bounds : {'nan', 'clip', 'raise'}, default='nan'
Handles how `X` values outside of the training domain are handled
during prediction.
- 'nan', predictions will be NaN.
- 'clip', predictions will be set to the value corresponding to
the nearest train interval endpoint.
- 'raise', a `ValueError` is raised.
Attributes
----------
X_min_ : float
Minimum value of input array `X_` for left bound.
X_max_ : float
Maximum value of input array `X_` for right bound.
X_thresholds_ : ndarray of shape (n_thresholds,)
Unique ascending `X` values used to interpolate
the y = f(X) monotonic function.
.. versionadded:: 0.24
y_thresholds_ : ndarray of shape (n_thresholds,)
De-duplicated `y` values suitable to interpolate the y = f(X)
monotonic function.
.. versionadded:: 0.24
f_ : function
The stepwise interpolating function that covers the input domain ``X``.
increasing_ : bool
Inferred value for ``increasing``.
See Also
--------
sklearn.linear_model.LinearRegression : Ordinary least squares Linear
Regression.
sklearn.ensemble.HistGradientBoostingRegressor : Gradient boosting that
is a non-parametric model accepting monotonicity constraints.
isotonic_regression : Function to solve the isotonic regression model.
Notes
-----
Ties are broken using the secondary method from de Leeuw, 1977.
References
----------
Isotonic Median Regression: A Linear Programming Approach
Nilotpal Chakravarti
Mathematics of Operations Research
Vol. 14, No. 2 (May, 1989), pp. 303-308
Isotone Optimization in R : Pool-Adjacent-Violators
Algorithm (PAVA) and Active Set Methods
de Leeuw, Hornik, Mair
Journal of Statistical Software 2009
Correctness of Kruskal's algorithms for monotone regression with ties
de Leeuw, Psychometrica, 1977
Examples
--------
>>> from sklearn.datasets import make_regression
>>> from sklearn.isotonic import IsotonicRegression
>>> X, y = make_regression(n_samples=10, n_features=1, random_state=41)
>>> iso_reg = IsotonicRegression().fit(X, y)
>>> iso_reg.predict([.1, .2])
array([1.8628..., 3.7256...])
"""
def __init__(self, *, y_min=None, y_max=None, increasing=True, out_of_bounds="nan"):
self.y_min = y_min
self.y_max = y_max
self.increasing = increasing
self.out_of_bounds = out_of_bounds
def _check_input_data_shape(self, X):
if not (X.ndim == 1 or (X.ndim == 2 and X.shape[1] == 1)):
msg = (
"Isotonic regression input X should be a 1d array or "
"2d array with 1 feature"
)
raise ValueError(msg)
def _build_f(self, X, y):
"""Build the f_ interp1d function."""
# Handle the out_of_bounds argument by setting bounds_error
if self.out_of_bounds not in ["raise", "nan", "clip"]:
raise ValueError(
"The argument ``out_of_bounds`` must be in "
"'nan', 'clip', 'raise'; got {0}".format(self.out_of_bounds)
)
bounds_error = self.out_of_bounds == "raise"
if len(y) == 1:
# single y, constant prediction
self.f_ = lambda x: y.repeat(x.shape)
else:
self.f_ = interpolate.interp1d(
X, y, kind="linear", bounds_error=bounds_error
)
def _build_y(self, X, y, sample_weight, trim_duplicates=True):
"""Build the y_ IsotonicRegression."""
self._check_input_data_shape(X)
X = X.reshape(-1) # use 1d view
# Determine increasing if auto-determination requested
if self.increasing == "auto":
self.increasing_ = check_increasing(X, y)
else:
self.increasing_ = self.increasing
# If sample_weights is passed, removed zero-weight values and clean
# order
sample_weight = _check_sample_weight(sample_weight, X, dtype=X.dtype)
mask = sample_weight > 0
X, y, sample_weight = X[mask], y[mask], sample_weight[mask]
order = np.lexsort((y, X))
X, y, sample_weight = [array[order] for array in [X, y, sample_weight]]
unique_X, unique_y, unique_sample_weight = _make_unique(X, y, sample_weight)
X = unique_X
y = isotonic_regression(
unique_y,
sample_weight=unique_sample_weight,
y_min=self.y_min,
y_max=self.y_max,
increasing=self.increasing_,
)
# Handle the left and right bounds on X
self.X_min_, self.X_max_ = np.min(X), np.max(X)
if trim_duplicates:
# Remove unnecessary points for faster prediction
keep_data = np.ones((len(y),), dtype=bool)
# Aside from the 1st and last point, remove points whose y values
# are equal to both the point before and the point after it.
keep_data[1:-1] = np.logical_or(
np.not_equal(y[1:-1], y[:-2]), np.not_equal(y[1:-1], y[2:])
)
return X[keep_data], y[keep_data]
else:
# The ability to turn off trim_duplicates is only used to it make
# easier to unit test that removing duplicates in y does not have
# any impact the resulting interpolation function (besides
# prediction speed).
return X, y
def fit(self, X, y, sample_weight=None):
"""Fit the model using X, y as training data.
Parameters
----------
X : array-like of shape (n_samples,) or (n_samples, 1)
Training data.
.. versionchanged:: 0.24
Also accepts 2d array with 1 feature.
y : array-like of shape (n_samples,)
Training target.
sample_weight : array-like of shape (n_samples,), default=None
Weights. If set to None, all weights will be set to 1 (equal
weights).
Returns
-------
self : object
Returns an instance of self.
Notes
-----
X is stored for future use, as :meth:`transform` needs X to interpolate
new input data.
"""
check_params = dict(accept_sparse=False, ensure_2d=False)
X = check_array(
X, input_name="X", dtype=[np.float64, np.float32], **check_params
)
y = check_array(y, input_name="y", dtype=X.dtype, **check_params)
check_consistent_length(X, y, sample_weight)
# Transform y by running the isotonic regression algorithm and
# transform X accordingly.
X, y = self._build_y(X, y, sample_weight)
# It is necessary to store the non-redundant part of the training set
# on the model to make it possible to support model persistence via
# the pickle module as the object built by scipy.interp1d is not
# picklable directly.
self.X_thresholds_, self.y_thresholds_ = X, y
# Build the interpolation function
self._build_f(X, y)
return self
def transform(self, T):
"""Transform new data by linear interpolation.
Parameters
----------
T : array-like of shape (n_samples,) or (n_samples, 1)
Data to transform.
.. versionchanged:: 0.24
Also accepts 2d array with 1 feature.
Returns
-------
y_pred : ndarray of shape (n_samples,)
The transformed data.
"""
if hasattr(self, "X_thresholds_"):
dtype = self.X_thresholds_.dtype
else:
dtype = np.float64
T = check_array(T, dtype=dtype, ensure_2d=False)
self._check_input_data_shape(T)
T = T.reshape(-1) # use 1d view
# Handle the out_of_bounds argument by clipping if needed
if self.out_of_bounds not in ["raise", "nan", "clip"]:
raise ValueError(
"The argument ``out_of_bounds`` must be in "
"'nan', 'clip', 'raise'; got {0}".format(self.out_of_bounds)
)
if self.out_of_bounds == "clip":
T = np.clip(T, self.X_min_, self.X_max_)
res = self.f_(T)
# on scipy 0.17, interp1d up-casts to float64, so we cast back
res = res.astype(T.dtype)
return res
def predict(self, T):
"""Predict new data by linear interpolation.
Parameters
----------
T : array-like of shape (n_samples,) or (n_samples, 1)
Data to transform.
Returns
-------
y_pred : ndarray of shape (n_samples,)
Transformed data.
"""
return self.transform(T)
# We implement get_feature_names_out here instead of using
# `_ClassNamePrefixFeaturesOutMixin`` because `input_features` are ignored.
# `input_features` are ignored because `IsotonicRegression` accepts 1d
# arrays and the semantics of `feature_names_in_` are not clear for 1d arrays.
def get_feature_names_out(self, input_features=None):
"""Get output feature names for transformation.
Parameters
----------
input_features : array-like of str or None, default=None
Ignored.
Returns
-------
feature_names_out : ndarray of str objects
An ndarray with one string i.e. ["isotonicregression0"].
"""
class_name = self.__class__.__name__.lower()
return np.asarray([f"{class_name}0"], dtype=object)
def __getstate__(self):
"""Pickle-protocol - return state of the estimator."""
state = super().__getstate__()
# remove interpolation method
state.pop("f_", None)
return state
def __setstate__(self, state):
"""Pickle-protocol - set state of the estimator.
We need to rebuild the interpolation function.
"""
super().__setstate__(state)
if hasattr(self, "X_thresholds_") and hasattr(self, "y_thresholds_"):
self._build_f(self.X_thresholds_, self.y_thresholds_)
def _more_tags(self):
return {"X_types": ["1darray"]}