Source code for skits.preprocessing

import numpy as np
from numpy.lib.stride_tricks import as_strided
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted

from skits.feature_extraction import AutoregressiveTransformer


def expand_dim_if_needed(arr):
    if arr.ndim == 1:
        return np.expand_dims(arr, axis=1)
    return arr


[docs]class ReversibleImputer(BaseEstimator, TransformerMixin): needs_refit = True def __init__(self, y_only=False): super().__init__() self.y_only = y_only
[docs] def fit(self, X, y=None): mask = np.isnan(X) self._missing_idxs = np.where(mask) self._mean = X[~mask].mean() return self
[docs] def transform(self, X, y=None, refit=False): if refit: self.fit(X, y=y) check_is_fitted(self, "_mean") X[self._missing_idxs] = self._mean return X
[docs] def inverse_transform(self, X): X[self._missing_idxs] = np.nan return X
[docs]class DifferenceTransformer(BaseEstimator, TransformerMixin): needs_refit = True def __init__(self, period=1): super().__init__() self.period = period
[docs] def fit(self, X, y=None): missing = np.where(np.isnan(X))[0] if len(missing) > 0: self._missing_idx_start = np.max(missing) + 1 else: self._missing_idx_start = 0 self._missing_idx_end = self._missing_idx_start + self.period self._missing_vals = X[self._missing_idx_start : self._missing_idx_end, :] return self
[docs] def transform(self, X, y=None, refit=False): if refit: self.fit(X, y=y) check_is_fitted(self, "_missing_idx_start") X_shift = np.roll(X, self.period, axis=0) X_trans = X - X_shift X_trans[: self._missing_idx_end, :] = np.nan return X_trans
[docs] def inverse_transform(self, X): # TODO: Figure out why I have to make a copy here X = X.copy() # Fill in the missing values X[self._missing_idx_start : self._missing_idx_end, :] = self._missing_vals X_inv = X[self._missing_idx_start :, :] # Take stock period_rem = np.remainder(len(X_inv), self.period) stride_shape = [ self.period, int(np.floor(len(X_inv) / self.period) + period_rem), ] stride = X.strides[0] # Clusterfuck inv = ( as_strided( X_inv, shape=stride_shape, strides=[stride, stride * self.period] ) .cumsum(axis=1) .reshape(stride_shape[0] * stride_shape[1], order="F")[: len(X_inv)] )[:, np.newaxis] return np.vstack((X[: self._missing_idx_start, :], inv))
[docs]class LogTransformer(BaseEstimator, TransformerMixin): needs_refit = False def __init__(self): super().__init__()
[docs] def fit(self, X, y=None): return self
[docs] def transform(self, X, y=None, refit=False): with np.errstate(divide="raise", invalid="raise"): try: Xt = np.log(1 + X) except FloatingPointError: raise ValueError("X cannot have negative values") return Xt
[docs] def inverse_transform(self, X): return np.exp(X) - 1
[docs]class HorizonTransformer(BaseEstimator, TransformerMixin): needs_refit = True y_only = True def __init__(self, horizon=2): super().__init__() if horizon < 2: raise ValueError("horizon must be greater than 1") self.horizon = horizon self.autoregressive_transformer = AutoregressiveTransformer( num_lags=self.horizon, pred_stride=1 )
[docs] def fit(self, X, y=None): self.autoregressive_transformer.fit(expand_dim_if_needed(X)) return self
[docs] def transform(self, X, y=None, refit=False): X = expand_dim_if_needed(X) if refit: self.autoregressive_transformer.fit(X) Xt = self.autoregressive_transformer.transform(X) # The autoregressive transformer won't build lags _with_ the last element of X. # So, we have to manually tack it on here. last_non_nan_piece = np.hstack((Xt[-1, 1:], X[-1])) Xt = np.vstack( (Xt[self.horizon :, :], last_non_nan_piece, Xt[1 : self.horizon, :]) ) return Xt
[docs] def inverse_transform(self, X, y=None): Xt = np.vstack((X[-self.horizon :, :], X[: -self.horizon, :])) return self.autoregressive_transformer.inverse_transform(Xt)
[docs] def fit_transform(self, X, y=None): return self.fit(X).transform(X)