import random
import re
import warnings
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.base import RegressorMixin
from sklearn.base import TransformerMixin
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import Lasso
from sklearn.linear_model import LassoLarsCV
from sklearn.pipeline import Pipeline
from sparsereg.util.net import net
operators = {
"add": np.add,
"subtract": np.subtract,
"mul": np.multiply,
"div": np.divide,
"exp": np.exp,
"log": np.log,
"sqrt": np.sqrt,
"square": np.square,
"sin": np.sin,
"cos": np.cos,
}
[docs]def size(name):
pattern = r"[\(,]"
return len(re.findall(pattern, name)) + 1
[docs]def mutate(names, importance, toursize, operators, rng=random):
f = rng.choice(list(operators))
arity = getattr(operators[f], "nin", None) or operators[f].__code__.co_argcount
parents = []
size = min(toursize, len(names))
for _ in range(arity):
candidates = rng.sample(names, size)
parent = sorted(candidates, key=lambda i: importance[names.index(i)])[0]
parents.append(parent)
args = ",".join(parents)
name = f + "(" + args + ")"
return operators[f], name, [names.index(p) for p in parents]
[docs]def get_importance(coefs, scores):
return np.array([[score if c else 0 for c in coef] for coef, score in zip(coefs, scores)]).sum(axis=0)
def _check_rng(state):
if isinstance(state, random.Random):
return state
elif isinstance(state, int):
rng = random.Random()
rng.seed(state)
return rng
else:
return random.Random()
def _transform(x, names, operators):
args = ",".join("x_{}".format(i) for i in range(x.shape[1]))
funcs = [eval("lambda {}: {}".format(args, code), {**operators}) for code in names]
data = np.array([f(*x.T) for f in funcs]).T
return data
[docs]class LibTrafo(BaseEstimator, TransformerMixin):
def __init__(self, names, operators):
self.names = names[:]
self.operators = operators
[docs] def fit(self, x, y=None):
return self
def _fit_model(x, y, names, operators, **kw):
steps = [("trafo", LibTrafo(names, operators)), ("lasso", LassoLarsCV(**kw))]
model = Pipeline(steps).fit(x, y)
return model, model.score(x, y)
[docs]class EFS(BaseEstimator, RegressorMixin, TransformerMixin):
def __init__(
self,
q=1,
mu=1,
max_size=5,
t=0.95,
toursize=5,
max_stall_iter=20,
max_iter=2000,
random_state=None,
operators=operators,
max_coarsity=2,
n_jobs=1,
):
"""Evolutionary feature synthesis."""
self.q = q
self.mu = mu
self.max_size = max_size
self.t = t
self.toursize = toursize
self.max_iter = max_iter
self.max_stall_iter = max_stall_iter
self.max_coarsity = max_coarsity
self.operators = operators
self.n_jobs = n_jobs
self.rng = _check_rng(random_state)
[docs] def fit(self, x, y):
n_samples, p = x.shape
linear_names = ["x_{}".format(i) for i in range(p)]
names = linear_names[:]
data = [x[:, i] for i in range(p)]
models = net(Lasso, x, y, max_coarsity=self.max_coarsity).values()
scores = [model.score(x, y) for model in models]
coefs = [model.coef_ for model in models]
importance = get_importance(coefs, scores)
stall_iter = 0
best_names = linear_names[:]
best_model, best_score = _fit_model(x, y, best_names, self.operators, n_jobs=self.n_jobs)
pop_size = p * (self.mu + 1 + self.q)
for _ in range(self.max_iter):
# old_names = sorted(names[:])
stall_iter += 1
new_names = []
new_data = []
for i in range(3 * pop_size):
f, new_name, parents = mutate(names, importance, self.toursize, self.operators, self.rng)
if size(new_name) <= self.max_size and new_name not in new_names and new_name not in names:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
feature = f(*[data[i] for i in parents])
if np.all(np.isfinite(feature)) and all(
abs(np.corrcoef(feature, data[i]))[1, 0] <= self.t for i in parents
):
new_names.append(new_name)
new_data.append(feature)
if len(new_names + names) < pop_size:
break
else:
warnings.warn(
"Failed to produce a new population given the tree-depth {} and correlation threshold {}.".format(
self.max_size, self.t
),
ConvergenceWarning,
)
names.extend(new_names)
data.extend(new_data)
models = net(Lasso, np.array(data).T, y, max_coarsity=self.max_coarsity).values()
scores = [model.score(np.array(data).T, y) for model in models]
coefs = [model.coef_ for model in models]
importance = list(get_importance(coefs, scores))
names_to_discard = [
n
for n in sorted(names, key=lambda x: importance[names.index(x)], reverse=True)
if n not in linear_names
]
for n in names_to_discard[-self.mu * p :]: # noqa
i = names.index(n)
names.pop(i)
data.pop(i)
importance.pop(i)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
model, score = _fit_model(x, y, names, self.operators, n_jobs=self.n_jobs)
if score > best_score:
best_model = model
best_score = score
stall_iter = 0
elif stall_iter >= self.max_stall_iter:
break
self.model = best_model
return self
[docs] def predict(self, x):
return self.model.predict(x)