Source code for tsaug._augmenter.quantize

from typing import List, Optional, Tuple, Union

import numpy as np

from .base import _Augmenter, _default_seed


[docs]class Quantize(_Augmenter): """ Quantize time series to a level set. Values in a time series are rounded to the nearest level in the level set. Parameters ---------- n_levels : int, tuple, or list, optional The number levels in a level set. - If int, all series (all channels if `per_channel` is True) are quantized to a level set of this size. - If list, a series (a channel if `per_channel` is True) is quantized to a level set whose size is sampled from this list randomly. - If 2-tuple, a series (a channel if `per_channel` is True) is quantized to a level set whose size is sampled from this interval randomly. Default: 10. how : str, optional The method that a level set is defined. - If 'uniform', a level set is defined by uniformly discretizing the range of this channel in this series. - If 'quantile', a level set is defined by the quantiles of values in this channel in this series. - If 'kmeans', a level set is defined by k-means clustering of values in this channel in this series. Note that this method could be slow. Default: 'uniform'. per_channel : bool, optional Whether to sample a level set size for each channel in a time series or to use the same size for all channels in a time series. Only used if the level set size is not deterministic. Default: False. repeats : int, optional The number of times a series is augmented. If greater than one, a series will be augmented so many times independently. This parameter can also be set by operator `*`. Default: 1. prob : float, optional The probability of a series is augmented. It must be in (0.0, 1.0]. This parameter can also be set by operator `@`. Default: 1.0. seed : int, optional The random seed. Default: None. """ def __init__( self, n_levels: Union[int, Tuple[int, int], List[int]] = 10, how: str = "uniform", per_channel: bool = False, repeats: int = 1, prob: float = 1.0, seed: Optional[int] = _default_seed, ): self.n_levels = n_levels self.how = how self.per_channel = per_channel super().__init__(repeats=repeats, prob=prob, seed=seed) @classmethod def _get_param_name(cls) -> Tuple[str, ...]: return ("n_levels", "how", "per_channel") @property def n_levels(self) -> Union[int, Tuple[int, int], List[int]]: return self._n_levels @n_levels.setter def n_levels(self, n: Union[int, Tuple[int, int], List[int]]) -> None: N_LEVELS_ERROR_MSG = ( "Parameter `n_levels` must be a positive integer, " "a 2-tuple of positive integers representing an interval, " "or a list of positive integers." ) if not isinstance(n, int): if isinstance(n, list): if len(n) == 0: raise ValueError(N_LEVELS_ERROR_MSG) if not all([isinstance(nn, int) for nn in n]): raise TypeError(N_LEVELS_ERROR_MSG) if not all([nn > 0 for nn in n]): raise ValueError(N_LEVELS_ERROR_MSG) elif isinstance(n, tuple): if len(n) != 2: raise ValueError(N_LEVELS_ERROR_MSG) if (not isinstance(n[0], int)) or (not isinstance(n[1], int)): raise TypeError(N_LEVELS_ERROR_MSG) if n[0] >= n[1]: raise ValueError(N_LEVELS_ERROR_MSG) if (n[0] <= 0) or (n[1] <= 0): raise ValueError(N_LEVELS_ERROR_MSG) else: raise TypeError(N_LEVELS_ERROR_MSG) elif n <= 0: raise ValueError(N_LEVELS_ERROR_MSG) self._n_levels = n @property def how(self) -> str: return self._how @how.setter def how(self, h: str) -> None: HOW_ERROR_MSG = "Parameter `how` must be one of 'uniform', 'quantile', and 'kmeans'." if not isinstance(h, str): raise TypeError(HOW_ERROR_MSG) if h not in ["uniform", "quantile", "kmeans"]: raise ValueError(HOW_ERROR_MSG) self._how = h @property def per_channel(self) -> bool: return self._per_channel @per_channel.setter def per_channel(self, p: bool) -> None: if not isinstance(p, bool): raise TypeError("Paremeter `per_channel` must be boolean.") self._per_channel = p def _augment_core( self, X: np.ndarray, Y: Optional[np.ndarray] ) -> Tuple[np.ndarray, Optional[np.ndarray]]: rand = np.random.RandomState(self.seed) N, T, C = X.shape if isinstance(self.n_levels, int): n_levels = (np.ones((N, 1, C)) * self.n_levels).astype(int) elif isinstance(self.n_levels, list): if self.per_channel: n_levels = rand.choice(self.n_levels, size=(N, 1, C)).astype( int ) else: n_levels = rand.choice(self.n_levels, size=(N, 1, 1)).astype( int ) n_levels = np.repeat(n_levels, C, axis=2) else: if self.per_channel: n_levels = rand.choice( range(self.n_levels[0], self.n_levels[1]), size=(N, 1, C) ).astype(int) else: n_levels = rand.choice( range(self.n_levels[0], self.n_levels[1]), size=(N, 1, 1) ).astype(int) n_levels = np.repeat(n_levels, C, axis=2) if self.how == "uniform": series_min = X.min(axis=1, keepdims=True) series_max = X.max(axis=1, keepdims=True) series_range = series_max - series_min series_range[series_range == 0] = 1 X_aug = (X - series_min) / series_range X_aug = X_aug * n_levels X_aug = X_aug.round() X_aug = X_aug.clip(0, n_levels - 1) X_aug = X_aug + 0.5 X_aug = X_aug / n_levels X_aug = X_aug * series_range + series_min elif self.how == "quantile": n_levels = n_levels.flatten() X_aug = X.copy() X_aug = X_aug.swapaxes(1, 2).reshape((N * C, T)) for i in range(len(X_aug)): bins = np.percentile( X_aug[i, :], np.arange(n_levels[i] + 1) / n_levels[i] / 100 ) bins_center = np.percentile( X_aug[i, :], np.arange(0.5, n_levels[i]) / n_levels[i] / 100, ) X_aug[i, :] = bins_center[ np.digitize(X_aug[i, :], bins).clip(0, n_levels[i] - 1), ] X_aug = X_aug.reshape(N, C, T).swapaxes(1, 2) else: try: from sklearn.cluster import KMeans except ImportError: raise ImportError( "To use kmeans quantization, sklearn>=0.22 must be installed." ) n_levels = n_levels.flatten() X_aug = X.copy() X_aug = X.swapaxes(1, 2).reshape((N * C, T)) model = KMeans(n_clusters=2, n_jobs=-1, random_state=self.seed) for i in range(len(X_aug)): model.n_clusters = n_levels[i] ind = model.fit_predict(X_aug[i].reshape(-1, 1)) X_aug[i, :] = model.cluster_centers_[ind, :].flatten() X_aug = X_aug.reshape(N, C, T).swapaxes(1, 2) if Y is not None: Y_aug = Y.copy() else: Y_aug = None return X_aug, Y_aug