import warnings
import numpy as np
from sklearn.base import RegressorMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model.base import LinearModel
from sklearn.utils.validation import check_X_y
from sparsereg.model.base import PrintMixin
eps = np.finfo(np.float64).eps
[docs]def scale_sigma(est, X_offset, X_scale):
if est.fit_intercept:
std_intercept = np.sqrt(np.abs(X_offset @ np.diag(est.sigma_).T))
else:
std_intercept = 0
sigma = np.diag(est.sigma_) / (X_scale + eps)
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
std_coef = np.sqrt(sigma)
return std_intercept, std_coef
[docs]class JMAP(LinearModel, RegressorMixin, PrintMixin):
def __init__(
self,
ae0=1e-6,
be0=1e-6,
af0=1e-6,
bf0=1e-6,
max_iter=300,
tol=1e-3,
normalize=False,
fit_intercept=True,
copy_X=True,
):
self.max_iter = max_iter
self.normalize = normalize
self.fit_intercept = fit_intercept
self.copy_X = copy_X
self.tol = tol
self.ae0 = ae0
self.be0 = be0
self.af0 = af0
self.bf0 = bf0
warnings.warn(
f"Consider using sklearn.linear_model.BayesianRidge instead of {self.__class__.__name__}."
)
[docs] def fit(self, x, y):
x, y = check_X_y(x, y, accept_sparse=[], y_numeric=True, multi_output=False) # boilerplate
x, y, X_offset, y_offset, X_scale = self._preprocess_data(
x, y, fit_intercept=self.fit_intercept, normalize=self.normalize, copy=self.copy_X
)
fh, vf, ve, sigma = jmap(
y, x, self.ae0, self.be0, self.af0, self.bf0, max_iter=self.max_iter, tol=self.tol
)
self.X_offset_ = X_offset
self.X_scale_ = X_scale
self.sigma_ = sigma
self.ve_ = ve
self.vf_ = vf
self.coef_ = fh
self.alpha_ = 1.0 / np.mean(ve)
self.lambda_ = 1.0 / np.mean(vf)
self.std_intercept_, self.std_coef_ = scale_sigma(self, X_offset, X_scale)
self._set_intercept(X_offset, y_offset, X_scale)
return self
[docs] def predict(self, X, return_std=False):
y_mean = self._decision_function(X)
if return_std is False:
return y_mean
else:
if self.normalize:
X = (X - self.X_offset_) / self.X_scale_
sigmas_squared_data = ((X @ self.sigma_) * X).sum(axis=1)
y_std = np.sqrt(sigmas_squared_data + (1.0 / self.alpha_))
return y_mean, y_std
def _converged(fhs, tol=0.1):
if len(fhs) < 2:
return False
rtol = np.sum(np.abs((fhs[-1] - fhs[-2]) / fhs[-1]))
return rtol <= tol
[docs]def jmap(g, H, ae0, be0, af0, bf0, max_iter=1000, tol=1e-4, rcond=None, observer=None):
"""Maximum a posteriori estimator for g = H @ f + e
p(g | f) = normal(H f, ve I)
p(ve) = inverse_gauss(ae0, be0)
p(f | vf) = normal(0, vf I)
p(vf) = inverse_gauss(af0, bf0)
JMAP: maximizes p(f,ve,vf|g) = p(g | f) p(f | vf) p(ve) p(vf) / p(g)
with respect to f, ve and vf
Original Author: Ali Mohammad-Djafari, April 2015
Args:
g:
H:
ae0:
be0:
af0:
bf0:
max_iter:
rcond:
Returns:
"""
n_features, n_samples = H.shape
HtH = H.T @ H
Htg = H.T @ g
ve0 = be0 / (ae0 - 1)
vf0 = bf0 / (af0 - 1)
lambda_ = ve0 / vf0
fh, *_ = np.linalg.lstsq(HtH + lambda_ * np.eye(n_samples, n_samples), Htg, rcond=rcond)
fhs = [fh]
for _ in range(max_iter):
dg = g - H @ fh
ae = ae0 + 1.5
be = be0 + 0.5 * dg ** 2
ve = be / ae + eps
iVe = np.diag(1 / ve)
af = af0 + 1.5
bf = bf0 + 0.5 * fh ** 2
vf = bf / af + eps
iVf = np.diag(1.0 / vf)
HR = H.T @ iVe @ H + iVf
fh, *_ = np.linalg.lstsq(HR, H.T @ iVe @ g, rcond=rcond)
fhs.append(fh)
if observer is not None:
observer(fh, vf, ve)
if _converged(fhs, tol=tol):
break
else:
warnings.warn(f"jmap did not converge after {max_iter} iterations.", ConvergenceWarning)
sigma = np.linalg.inv(HR)
return fh, vf, ve, sigma