import warnings
from itertools import repeat
import numpy as np
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.exceptions import FitFailedWarning
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import ridge_regression
from sklearn.linear_model.base import _rescale_data
from sklearn.linear_model.base import LinearModel
from sklearn.utils.validation import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_X_y
[docs]def print_model(coef, input_features, errors=None, intercept=None, error_intercept=None, precision=3, pm="±"):
"""
Args:
coef:
input_features:
errors:
intercept:
sigma_intercept:
precision:
pm:
Returns:
"""
def term(c, sigma, name):
rounded_coef = np.round(c, precision)
if rounded_coef == 0 and sigma is None:
return ""
elif sigma is None:
return f"{c:.{precision}f} {name}"
elif rounded_coef == 0 and np.round(sigma, precision) == 0:
return ""
else:
return f"({c:.{precision}f} {pm} {sigma:.{precision}f}) {name}"
errors = errors if errors is not None else repeat(None)
# components = map(term, coef, errors, input_features)
components = [term(c, e, i) for c, e, i in zip(coef, errors, input_features)]
eq = " + ".join(filter(bool, components))
if not eq or intercept or error_intercept is not None:
intercept = intercept or 0
intercept_str = term(intercept, error_intercept, "").strip()
if eq and intercept_str:
eq += " + "
eq += intercept_str
elif not eq:
eq = f"{intercept:.{precision}f}"
return eq
[docs]def equation(pipeline, input_features=None, precision=3, input_fmt=None):
input_features = pipeline.steps[0][1].get_feature_names(input_features)
if input_fmt:
input_features = [input_fmt(i) for i in input_features]
coef = pipeline.steps[-1][1].coef_
intercept = pipeline.steps[-1][1].intercept_
return print_model(coef, input_features, intercept=intercept, precision=precision)
[docs]class RationalFunctionMixin:
def _transform(self, x, y):
return np.hstack((x, y.reshape(-1, 1) * x))
[docs] def fit(self, x, y, **kwargs):
# x, y = check_X_y(x, y, multi_output=False)
super().fit(self._transform(x, y), y, **kwargs)
self._arrange_coef()
return self
def _arrange_coef(self):
nom = len(self.coef_) // 2
self.coef_nominator_ = self.coef_[:nom]
self.coef_denominator_ = -self.coef_[nom:]
[docs] def predict(self, x):
check_is_fitted(self, "coef_")
x = check_array(x)
return (self.intercept_ + x @ self.coef_nominator_) / (1 + x @ self.coef_denominator_)
[docs] def print_model(self, input_features=None, precision=3):
input_features = input_features or ["x_{}".format(i) for i in range(len(self.coef_nominator_))]
nominator = print_model(
self.coef_nominator_, input_features, intercept=self.intercept_, precision=precision
)
if np.any(self.coef_denominator_):
denominator = print_model(self.coef_denominator_, input_features, intercept=1)
def add_parentheses(expr):
return "( " + expr + " )" if len(expr.split("+")) > 1 else expr
model = " / ".join(map(add_parentheses, [nominator, denominator]))
else:
model = nominator
return model
[docs]class PrintMixin:
[docs] def print_model(self, input_features=None, precision=3):
input_features = input_features or ["x_{}".format(i) for i in range(len(self.coef_))]
return print_model(self.coef_, input_features, intercept=self.intercept_, precision=precision)
[docs]class STRidge(LinearModel, RegressorMixin):
def __init__(
self,
threshold=0.01,
alpha=0.1,
max_iter=100,
normalize=True,
fit_intercept=True,
threshold_intercept=False,
copy_X=True,
unbias=True,
ridge_kw=None,
):
self.threshold = threshold
self.max_iter = max_iter
self.fit_intercept = fit_intercept
self.threshold_intercept = threshold_intercept
self.normalize = normalize
self.copy_X = copy_X
self.alpha = alpha
self.unbias = unbias
self.ridge_kw = ridge_kw
self.history_ = []
def _sparse_coefficients(self, dim, ind, coef, threshold):
c = np.zeros(dim)
c[ind] = coef
big_ind = np.abs(c) >= threshold
c[~big_ind] = 0
self.history_.append(c)
return c, big_ind
def _regress(self, x, y, alpha):
kw = self.ridge_kw or {}
coef = ridge_regression(x, y, alpha, **kw)
self.iters += 1
return coef
def _no_change(self):
this_coef = self.history_[-1]
if len(self.history_) > 1:
last_coef = self.history_[-2]
else:
last_coef = np.zeros_like(this_coef)
return all(bool(i) == bool(j) for i, j in zip(this_coef, last_coef))
def _reduce(self, x, y):
"""Iterates the thresholding. Assumes an initial guess is saved in self.coef_ and self.ind_"""
ind = self.ind_
n_samples, n_features = x.shape
n_features_selected = sum(ind)
for _ in range(self.iters, self.max_iter):
if np.count_nonzero(ind) == 0:
warnings.warn(
"Sparsity parameter is too big ({}) and eliminated all coeficients".format(self.threshold)
)
coef = np.zeros_like(ind, dtype=float)
break
coef = self._regress(x[:, ind], y, self.alpha)
coef, ind = self._sparse_coefficients(n_features, ind, coef, self.threshold)
if sum(ind) == n_features_selected or self._no_change():
# could not (further) select important features
break
else:
warnings.warn(
"STRidge._reduce did not converge after {} iterations.".format(self.max_iter),
ConvergenceWarning,
)
try:
coef
except NameError:
coef = self.coef_
warnings.warn("STRidge._reduce has no iterations left to determine coef", ConvergenceWarning)
self.coef_ = coef
self.ind_ = ind
def _unbias(self, x, y):
if np.any(self.ind_):
coef = self._regress(x[:, self.ind_], y, 0)
self.coef_, self.ind_ = self._sparse_coefficients(x.shape[1], self.ind_, coef, self.threshold)
[docs] def fit(self, x_, y, sample_weight=None):
x_, y = check_X_y(x_, y, accept_sparse=[], y_numeric=True, multi_output=False)
x, y, X_offset, y_offset, X_scale = self._preprocess_data(
x_,
y,
fit_intercept=self.fit_intercept,
normalize=self.normalize,
copy=self.copy_X,
sample_weight=sample_weight,
)
if sample_weight is not None:
x, y = _rescale_data(x, y, sample_weight)
self.iters = 0
self.ind_ = np.ones(x.shape[1], dtype=bool) # initial guess
if self.threshold > 0:
self._reduce(x, y)
else:
self.coef_ = self._regress(x[:, self.ind_], y, self.alpha)
if self.unbias and self.alpha >= 0:
self._unbias(x, y)
self._set_intercept(X_offset, y_offset, X_scale)
if self.threshold_intercept and abs(self.intercept_) < self.threshold:
self.intercept_ = 0
return self
@property
def complexity(self):
return np.count_nonzero(self.coef_) + np.count_nonzero([abs(self.intercept_) >= self.threshold])
[docs]class BoATS(LinearModel, RegressorMixin):
def __init__(self, alpha=0.01, sigma=0.01, n=10, copy_X=True, fit_intercept=True, normalize=True):
self.alpha = alpha
self.sigma = sigma
self.n = n
self.copy_X = copy_X
self.fit_intercept = fit_intercept
self.normalize = normalize
[docs] def fit(self, x_, y, sample_weight=None):
n_samples, n_features = x_.shape
X, y = check_X_y(x_, y, accept_sparse=[], y_numeric=True, multi_output=False)
x, y, X_offset, y_offset, X_scale = self._preprocess_data(
x_,
y,
fit_intercept=self.fit_intercept,
normalize=self.normalize,
copy=self.copy_X,
sample_weight=None,
)
if sample_weight is not None:
# Sample weight can be implemented via a simple rescaling.
x, y = _rescale_data(x, y, sample_weight)
coefs, intercept = fit_with_noise(x, y, self.sigma, self.alpha, self.n)
self.intercept_ = intercept
self.coef_ = coefs
self._set_intercept(X_offset, y_offset, X_scale)
return self
[docs]def fit_with_noise(x, y, sigma, alpha, n, lmc=LinearRegression):
size = y.shape
n_samples, n_features = x.shape
beta_0 = np.mean(
[lmc().fit(x, y + np.random.normal(size=size, scale=sigma)).coef_ for _ in range(n)], axis=0
)
beta_init = lmc().fit(x, y).coef_
beta_sel = beta_init.copy()
mask = np.abs(beta_init) < alpha * np.abs(beta_0)
if np.all(mask):
raise FitFailedWarning("alpha too high: {}".format(alpha))
beta_sel[mask] = 0
model = lmc().fit(x[:, ~mask], y)
beta_sel[~mask] = model.coef_
return beta_sel, model.intercept_