from collections import OrderedDict
from itertools import chain
from itertools import combinations
from itertools import product
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sympy import simplify
from toolz import compose
[docs]class Base(BaseEstimator, TransformerMixin):
def __init__(self):
self.name_cache = None
@property
def name(self):
if self.name_cache is None:
self.name_cache = str(simplify(self._name))
return self.name_cache
[docs]class SimpleFeature(Base):
"""Base to create polynomial features.
"""
def __init__(self, exponent, index=0):
super().__init__()
if exponent == 0:
raise ValueError
self.exponent = exponent
self.index = index
@property
def _name(self):
if self.exponent == 1:
return "x_{}".format(self.index)
else:
return "x_{}**{}".format(self.index, self.exponent)
[docs]class OperatorFeature(Base):
def __init__(self, feat_cls, operator, operator_name=None):
super().__init__()
self.feat_cls = feat_cls
self.operator = operator
self.operator_name = operator_name or str(operator)
@property
def _name(self):
return "{}({})".format(self.operator_name, self.feat_cls.name)
[docs]class ProductFeature(Base):
def __init__(self, feat_cls1, feat_cls2):
super().__init__()
self.feat_cls1 = feat_cls1
self.feat_cls2 = feat_cls2
@property
def _name(self):
return "{}*{}".format(self.feat_cls1.name, self.feat_cls2.name)
def _allfinite(tpl):
_, x = tpl
return np.all(np.isfinite(x))
def _take_finite(x):
return list(filter(_allfinite, x))
def _hash(array):
return hash(str(array))
[docs]def hashed_hash_():
cache = {}
def inner(x):
key = _hash(x)
if key not in cache:
cache[key] = _hash(x)
return cache[key]
return inner
hashed_hash = hashed_hash_()
def _remove_id(tpl):
expr = OrderedDict()
redundant = []
for b, x in tpl:
name = b.name
vhash = hashed_hash(x)
if name not in expr and vhash not in redundant:
expr[name] = b, x
redundant.append(vhash)
return list(expr.values())
get_valid = compose(_remove_id, _take_finite)
[docs]class SymbolicFeatures(Base):
def __init__(self, exponents=[1], operators={}, consider_products=True):
self.exponents = exponents
self.operators = operators
self.consider_products = consider_products
self._precompute_hash = None
self._names = None
[docs] def fit(self, x, y=None):
x = np.asfortranarray(x)
n_samples, n_features = x.shape
# 1) Get all simple features
simple = (SimpleFeature(e, index=i) for e, i in product(self.exponents, range(n_features)))
simple = get_valid((s, s.transform(x)) for s in simple)
# 2) Get all operator features
operator = (
OperatorFeature(s, op, operator_name=op_name)
for (s, _), (op_name, op) in product(simple, self.operators.items())
)
operator = get_valid((o, o.transform(x)) for o in operator)
# 3) Get all product features
all_ = simple + operator
if self.consider_products:
combs = chain(product(operator, simple), combinations(simple, 2))
prod = [ProductFeature(feat1, feat2) for (feat1, _), (feat2, _) in combs]
all_ += get_valid((p, p.transform(x)) for p in prod)
all_ = get_valid(all_)
feat_cls, features = zip(*[(c, np.array(f)) for c, f in all_])
self._precomputed_features = np.array(list(features)).T # speed up fit_transform
self._precompute_hash = _hash(x)
self.feat_cls = list(feat_cls)
self.n_input_features_ = n_features
self.n_output_features_ = sum(1 for _ in self.feat_cls)
return self
[docs] def get_feature_names(self, input_features=None):
"""Get all the feature names. Only Available after fitting."""
if self._names is None:
self._names = [f.name for f in self.feat_cls]
if input_features:
for i, input_feature in enumerate(input_features):
self._names = [n.replace("x_{}".format(i), input_feature) for n in self._names]
return self._names
def __getstate__(self):
state = self.__dict__.copy()
try:
del state["_precomputed_features"]
except KeyError:
pass
state["_precompute_hash"] = None
return state