# Source code for tsaug.resample

```"""
Resample module
"""
from typing import Tuple, Optional, Union

import numpy as np
from .dimensionalize import dimensionalize
from .augmentor import _Augmentor

[docs]@dimensionalize
def resample(
X: np.ndarray, Y: Optional[np.ndarray] = None, n_new: Optional[int] = None
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""Resample from time series with new length.

Time series will be transformed into new length. Values of time series is
determined by linear interpolation.

Parameters
----------
X : numpy.ndarray
Time series to be augmented. Matrix with shape (n,), (N, n) or (N, n,
c), where n is the length of each series, N is the number of series,
and c is the number of channels.

Y : numpy.ndarray, optional
Binary labels of time series, where 0 represents a normal point and 1
represents an anomalous points. Matrix with shape (n,), (N, n) or (N,
n, cl), where n is the length of each series, N is the number of
series, and cl is the number of classes (i.e. types of anomaly).
Default: None.

n_new : int, optional
New length of time series. Default: n.

Returns
-------
tuple (numpy.ndarray, numpy.ndarray)
Augmented time series and augmented labels (if argument `Y` exists).

"""

n = X.shape[1]  # type: int

if (n_new is None) or (n_new == n):
if Y is None:
return X.copy(), None
else:
return X.copy(), Y.copy()

interp = (
np.arange(1, n_new - 1) / (n_new - 1) * (n - 1)
)  # type: np.ndarray
interp_inds_0 = interp.astype(int)  # type: np.ndarray
interp_inds_1 = interp_inds_0 + 1  # type: np.ndarray
interp_weight_1 = interp - interp_inds_0  # type: np.ndarray
interp_weight_0 = 1 - interp_weight_1  # type: np.ndarray

X_aug = np.concatenate(
[
X[:, 0:1, :],
X[:, interp_inds_0, :] * interp_weight_0.reshape(1, -1, 1)
+ X[:, interp_inds_1, :] * interp_weight_1.reshape(1, -1, 1),
X[:, -1:, :],
],
axis=1,
)  # type: np.ndarray

if Y is None:
Y_aug = None  # type: Optional[np.ndarray]
else:
Y_aug = (
np.concatenate(
[
Y[:, 0:1, :],
Y[:, interp_inds_0, :] * interp_weight_0.reshape(1, -1, 1)
+ Y[:, interp_inds_1, :]
* interp_weight_1.reshape(1, -1, 1),
Y[:, -1:, :],
],
axis=1,
)
.round()
.astype(int)
)

return X_aug, Y_aug

[docs]class Resample(_Augmentor):
"""Augmentor that resamples from time series with new length.

Time series will be transformed into new length. Values of time series is
determined by linear interpolation.

Parameters
----------
n_new : int, optional
New length of time series. Default: n.

"""

def __init__(self, n_new: Optional[int] = None) -> None:
super().__init__(augmentor_func=resample, is_random=False, n_new=n_new)

@property
def prob(self) -> float:
return self._prob

@prob.setter
def prob(self, prob: float) -> None:
if (prob != 1.0) & (prob != 0.0):
raise ValueError(
"Resample augmentor may change the length of time series, "
+ "therefore probability must be either 0 or 1"
)
self._prob = prob

@property
def n_new(self) -> Optional[int]:
return self._params["n_new"]

@n_new.setter
def n_new(self, n_new: Optional[int]) -> None:
self._params["n_new"] = n_new

def _get_output_dim(
self,
input_N: Union[
Tuple[int, Optional[int]], Tuple[Optional[int], int]
] = (1, None),
input_n: Union[
Tuple[int, Optional[int]], Tuple[Optional[int], int]
] = (1, None),
input_c: Union[
Tuple[int, Optional[int]], Tuple[Optional[int], int]
] = (1, None),
) -> Tuple[
Union[Tuple[int, Optional[int]], Tuple[Optional[int], int]],
Union[Tuple[int, Optional[int]], Tuple[Optional[int], int]],
Union[Tuple[int, Optional[int]], Tuple[Optional[int], int]],
]:
output_N = (
(input_N[0] * self.M, None)
if (input_N[0] is not None)
else (None, input_N[1] * self.M)
)  # type: Union[Tuple[int, Optional[int]], Tuple[Optional[int], int]]
if self.n_new is None:
output_n = (
input_n
)  # type: Union[Tuple[int, Optional[int]], Tuple[Optional[int], int]]
else:
output_n = (None, self.n_new)
return output_N, output_n, input_c
```