Source code for skits.feature_extraction
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
[docs]class AutoregressiveTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, num_lags=5, pred_stride=1):
        super().__init__()
        self.num_lags = num_lags
        self.pred_stride = pred_stride
[docs]    def transform(self, X, y=None):
        X_trans = []
        for i in range(self.num_lags, 0, -1):
            X_trans.append([X[self.num_lags - i : -(i + self.pred_stride - 1), 0]])
        X_trans = np.vstack(X_trans).T
        X_trans = self._fill_missing(X_trans)
        return X_trans
    def _fill_missing(self, X):
        missing_vals = np.zeros(
            (self.num_lags + self.pred_stride - 1, self.num_lags), dtype=X.dtype
        )
        missing_vals[:] = np.nan
        return np.vstack((missing_vals, X))
[docs]    def inverse_transform(self, X):
        return np.concatenate(
            (
                X[self.num_lags + self.pred_stride - 1, :],  # Slice along first window
                X[
                    self.num_lags + self.pred_stride :, -1
                ],  # Then grab all single-lag up to last one
                np.atleast_1d(self._final_points),
            )
        )[:, np.newaxis]
[docs]class SeasonalTransformer(AutoregressiveTransformer):
    def __init__(self, seasonal_period=1, pred_stride=1):
        pred_stride = seasonal_period + pred_stride - 1
        super().__init__(num_lags=1, pred_stride=pred_stride)
[docs]class IntegratedTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, num_lags=1, pred_stride=1):
        super().__init__()
        self.num_lags = num_lags
        self.pred_stride = pred_stride
        self.ar1 = AutoregressiveTransformer(
            num_lags=self.num_lags, pred_stride=self.pred_stride
        )
        self.ar2 = AutoregressiveTransformer(
            num_lags=self.num_lags, pred_stride=1 + self.pred_stride
        )
[docs]    def transform(self, X, y=None):
        Xt1 = self.ar1.transform(X)
        Xt2 = self.ar2.transform(X)
        return Xt1 - Xt2
[docs]class RollingMeanTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, window=5):
        super().__init__()
        self.window = window
[docs]    def transform(self, X, y=None):
        z2 = np.cumsum(
            np.pad(X, ((self.window, 0), (0, 0)), "constant", constant_values=0), axis=0
        )
        z1 = np.cumsum(
            np.pad(X, ((0, self.window), (0, 0)), "constant", constant_values=X[-1]),
            axis=0,
        )
        zc = (z1 - z2)[(self.window - 1) : -1] / self.window
        return np.vstack(
            (
                np.repeat(np.nan, self.window).reshape(-1, 1),
                zc[0 : len(X) - self.window],
            )
        )
[docs]class TrendTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, shift=0):
        super().__init__()
        self.shift = shift
[docs]    def transform(self, X, y=None):
        return np.expand_dims(np.arange(self.shift, self.shift + X.shape[0]), axis=1)
[docs]class FourierTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, period=10, max_order=10, step_size=1):
        super().__init__()
        self.period = period
        self.max_order = max_order
        self.step_size = step_size
    def _get_trig_args(self, X):
        trig_args = (2 * np.pi / self.period) * np.arange(
            1, self.max_order + 1, self.step_size
        )
        time = np.arange(X.shape[0])
        trig_args = trig_args[np.newaxis, :] * time[:, np.newaxis]
        return trig_args
[docs]    def transform(self, X, y=None):
        trig_args = self._get_trig_args(X)
        cos = np.cos(trig_args)
        sin = np.sin(trig_args)
        fourier_terms = np.hstack((cos, sin))
        return fourier_terms