Source code for skits.preprocessing
import numpy as np
from numpy.lib.stride_tricks import as_strided
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted
from skits.feature_extraction import AutoregressiveTransformer
def expand_dim_if_needed(arr):
    if arr.ndim == 1:
        return np.expand_dims(arr, axis=1)
    return arr
[docs]class ReversibleImputer(BaseEstimator, TransformerMixin):
    needs_refit = True
    def __init__(self, y_only=False):
        super().__init__()
        self.y_only = y_only
[docs]    def fit(self, X, y=None):
        mask = np.isnan(X)
        self._missing_idxs = np.where(mask)
        self._mean = X[~mask].mean()
        return self
[docs]    def transform(self, X, y=None, refit=False):
        if refit:
            self.fit(X, y=y)
        check_is_fitted(self, "_mean")
        X[self._missing_idxs] = self._mean
        return X
[docs]class DifferenceTransformer(BaseEstimator, TransformerMixin):
    needs_refit = True
    def __init__(self, period=1):
        super().__init__()
        self.period = period
[docs]    def fit(self, X, y=None):
        missing = np.where(np.isnan(X))[0]
        if len(missing) > 0:
            self._missing_idx_start = np.max(missing) + 1
        else:
            self._missing_idx_start = 0
        self._missing_idx_end = self._missing_idx_start + self.period
        self._missing_vals = X[self._missing_idx_start : self._missing_idx_end, :]
        return self
[docs]    def transform(self, X, y=None, refit=False):
        if refit:
            self.fit(X, y=y)
        check_is_fitted(self, "_missing_idx_start")
        X_shift = np.roll(X, self.period, axis=0)
        X_trans = X - X_shift
        X_trans[: self._missing_idx_end, :] = np.nan
        return X_trans
[docs]    def inverse_transform(self, X):
        # TODO: Figure out why I have to make a copy here
        X = X.copy()
        # Fill in the missing values
        X[self._missing_idx_start : self._missing_idx_end, :] = self._missing_vals
        X_inv = X[self._missing_idx_start :, :]
        # Take stock
        period_rem = np.remainder(len(X_inv), self.period)
        stride_shape = [
            self.period,
            int(np.floor(len(X_inv) / self.period) + period_rem),
        ]
        stride = X.strides[0]
        # Clusterfuck
        inv = (
            as_strided(
                X_inv, shape=stride_shape, strides=[stride, stride * self.period]
            )
            .cumsum(axis=1)
            .reshape(stride_shape[0] * stride_shape[1], order="F")[: len(X_inv)]
        )[:, np.newaxis]
        return np.vstack((X[: self._missing_idx_start, :], inv))
[docs]class LogTransformer(BaseEstimator, TransformerMixin):
    needs_refit = False
    def __init__(self):
        super().__init__()
[docs]    def transform(self, X, y=None, refit=False):
        with np.errstate(divide="raise", invalid="raise"):
            try:
                Xt = np.log(1 + X)
            except FloatingPointError:
                raise ValueError("X cannot have negative values")
        return Xt
[docs]class HorizonTransformer(BaseEstimator, TransformerMixin):
    needs_refit = True
    y_only = True
    def __init__(self, horizon=2):
        super().__init__()
        if horizon < 2:
            raise ValueError("horizon must be greater than 1")
        self.horizon = horizon
        self.autoregressive_transformer = AutoregressiveTransformer(
            num_lags=self.horizon, pred_stride=1
        )
[docs]    def fit(self, X, y=None):
        self.autoregressive_transformer.fit(expand_dim_if_needed(X))
        return self
[docs]    def transform(self, X, y=None, refit=False):
        X = expand_dim_if_needed(X)
        if refit:
            self.autoregressive_transformer.fit(X)
        Xt = self.autoregressive_transformer.transform(X)
        # The autoregressive transformer won't build lags _with_ the last element of X.
        # So, we have to manually tack it on here.
        last_non_nan_piece = np.hstack((Xt[-1, 1:], X[-1]))
        Xt = np.vstack(
            (Xt[self.horizon :, :], last_non_nan_piece, Xt[1 : self.horizon, :])
        )
        return Xt
[docs]    def inverse_transform(self, X, y=None):
        Xt = np.vstack((X[-self.horizon :, :], X[: -self.horizon, :]))
        return self.autoregressive_transformer.inverse_transform(Xt)