Source code for tsaug._augmenter.time_warp

from typing import List, Optional, Tuple, Union

import numpy as np
from scipy.interpolate import PchipInterpolator, interp1d

from .base import _Augmenter, _default_seed


[docs]class TimeWarp(_Augmenter): """ Random time warping. The augmenter random changed the speed of timeline. The time warping is controlled by the number of speed changes and the maximal ratio of max/min speed. Parameters ---------- n_speed_change : int, optional The number of speed changes in each series. Default: 3. max_speed_ratio : float, tuple, or list, optional The maximal ratio of max/min speed in the warpped time line. The time line of a series is more likely to be significantly warpeped if this value is greater. - If float, all series are warpped with the same ratio. - If list, each series is warpped with a ratio that is randomly sampled from the list. - If 2-tuple, each series is warpped with a ratio that is randomly sampled from the interval. Default: 3.0. repeats : int, optional The number of times a series is augmented. If greater than one, a series will be augmented so many times independently. This parameter can also be set by operator `*`. Default: 1. prob : float, optional The probability of a series is augmented. It must be in (0.0, 1.0]. This parameter can also be set by operator `@`. Default: 1.0. seed : int, optional The random seed. Default: None. """ def __init__( self, n_speed_change: int = 3, max_speed_ratio: Union[float, Tuple[float, float], List[float]] = 3.0, repeats: int = 1, prob: float = 1.0, seed: Optional[int] = _default_seed, ): self.n_speed_change = n_speed_change self.max_speed_ratio = max_speed_ratio super().__init__(repeats=repeats, prob=prob, seed=seed) @classmethod def _get_param_name(cls) -> Tuple[str, ...]: return ("n_speed_change",) @property def n_speed_change(self) -> int: return self._n_speed_change @n_speed_change.setter def n_speed_change(self, n: int) -> None: if not isinstance(n, int): raise TypeError( "Parameter `n_speed_change` must be a positive integer." ) if n <= 0: raise ValueError( "Parameter `n_speed_change` must be a positive integer." ) self._n_speed_change = n @property def max_speed_ratio( self, ) -> Union[float, Tuple[float, float], List[float]]: return self._max_speed_ratio @max_speed_ratio.setter def max_speed_ratio( self, n: Union[float, Tuple[float, float], List[float]] ) -> None: MAX_SPEED_RATIO_ERROR_MSG = ( "Parameter `max_speed_ratio` must be a number greater than 1.0, " "a 2-tuple of such numbers representing an interval, " "or a list of such numbers." ) if not isinstance(n, (float, int)): if isinstance(n, list): if len(n) == 0: raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) if not all([isinstance(nn, (float, int)) for nn in n]): raise TypeError(MAX_SPEED_RATIO_ERROR_MSG) if not all([nn > 1.0 for nn in n]): raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) elif isinstance(n, tuple): if len(n) != 2: raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) if (not isinstance(n[0], (float, int))) or ( not isinstance(n[1], (float, int)) ): raise TypeError(MAX_SPEED_RATIO_ERROR_MSG) if n[0] > n[1]: raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) if (n[0] <= 1.0) or (n[1] <= 1.0): raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) else: raise TypeError(MAX_SPEED_RATIO_ERROR_MSG) elif n <= 1.0: raise ValueError(MAX_SPEED_RATIO_ERROR_MSG) self._max_speed_ratio = n def _augment_core( self, X: np.ndarray, Y: Optional[np.ndarray] ) -> Tuple[np.ndarray, Optional[np.ndarray]]: rand = np.random.RandomState(self.seed) N, T, C = X.shape if Y is not None: L = Y.shape[2] # type: int anchors = np.arange( 0, 1 + 1 / (self.n_speed_change + 1) / 2, 1 / (self.n_speed_change + 1), ) * (T - 1) if isinstance(self.max_speed_ratio, (float, int)): max_speed_ratio = np.ones(N) * self.max_speed_ratio elif isinstance(self.max_speed_ratio, tuple): max_speed_ratio = rand.uniform( low=self.max_speed_ratio[0], high=self.max_speed_ratio[1], size=N, ) else: max_speed_ratio = rand.choice(self.max_speed_ratio, size=N) anchor_values = rand.uniform( low=0.0, high=1.0, size=(N, self.n_speed_change + 1) ) anchor_values = anchor_values - ( anchor_values.max(axis=1, keepdims=True) - max_speed_ratio.reshape(N, 1) * anchor_values.min(axis=1, keepdims=True) ) / (1 - max_speed_ratio.reshape(N, 1)) anchor_values = ( anchor_values.cumsum(axis=1) / anchor_values.sum(axis=1, keepdims=True) * (T - 1) ) anchor_values = np.concatenate( [np.zeros((N, 1)), anchor_values], axis=1 ) warp = PchipInterpolator(x=anchors, y=anchor_values, axis=1)( np.arange(T) ) # type: np.ndarray X_aug = np.vstack( [ interp1d( np.arange(T), Xi, axis=0, fill_value="extrapolate", assume_sorted=True, )(warpi).reshape(1, T, C) for Xi, warpi in zip(X, warp) ] ) # type: np.ndarray if Y is None: Y_aug = None # type: Optional[np.ndarray] else: Y_aug = np.vstack( [ interp1d( np.arange(T), Yi, axis=0, fill_value="extrapolate", assume_sorted=True, )(warpi).reshape(1, T, L) for Yi, warpi in zip(Y, warp) ] ) Y_aug = Y_aug.round().astype(int) return X_aug, Y_aug