# Source code for tsaug.trend

```"""
"""
from typing import Tuple, Optional, Callable

import numpy as np
from scipy.interpolate import CubicSpline
from .dimensionalize import dimensionalize
from .augmentor import _Augmentor

[docs]@dimensionalize
def trend(
X: np.ndarray,
Y: Optional[np.ndarray] = None,
anchors: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, Optional[np.ndarray]]:

Given m anchors, the trend of a series is generated by cublic spline
interpolation over all anchors that are uniformly distributed.

Parameters
----------
X : numpy.ndarray
Time series to be augmented. Matrix with shape (n,), (N, n) or (N, n,
c), where n is the length of each series, N is the number of series,
and c is the number of channels.

Y : numpy.ndarray, optional
Binary labels of time series, where 0 represents a normal point and 1
represents an anomalous points. Matrix with shape (n,), (N, n) or (N,
n, cl), where n is the length of each series, N is the number of
series, and cl is the number of classes (i.e. types of anomaly).
Default: None.

anchors : numpy.ndarray
Anchor points from which trend is interpolated. An array with shape
(m,), (N, m), or (N, m, c), where m is the number of anchor points of
each series, N is the number of series, and c is the number of
channels. Default: None, i.e. no trend added.

Returns
-------
tuple (numpy.ndarray, numpy.ndarray)
Augmented time series and augmented labels (if argument `Y` exists).

"""
N = (
0
)  # type: int  # NOTE: this is a horrible hack to type hint for Python 3.5
n = 0  # type: int
c = 0  # type: int
N, n, c = X.shape

if anchors is None:
anchors = np.zeros((N, 2, c))

if anchors.ndim == 1:
anchors = np.tile(anchors, (N, 1, c))
elif anchors.ndim == 2:
anchors = np.tile(anchors, (1, 1, c))

if (
(anchors.ndim != 3)
or (anchors.shape != N)
or (anchors.shape != c)
):
raise ValueError("Wrong shape of anchors.")

m = anchors.shape  # type: int

interpFuncs = CubicSpline(
np.linspace(0, n, m), anchors, axis=1
)  # type: Callable

X_aug = interpFuncs(np.arange(n)) + X  # type: np.ndarray

if Y is None:
Y_aug = None  # type: Optional[np.ndarray]
else:
Y_aug = Y.copy()

return X_aug, Y_aug

[docs]@dimensionalize
def random_trend(
X: np.ndarray,
Y: Optional[np.ndarray] = None,
num_anchors: int = 5,
min_anchor: float = 0.0,
max_anchor: float = 100.0,
random_seed: Optional[int] = None,
) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""Add random trend to time series.

Given m anchors, the trend of a series is generated by cublic spline
interpolation over all anchors that are uniformly distributed. Anchors are
randomly sampled between given bounds.

Parameters
----------
X : numpy.ndarray
Time series to be augmented. Matrix with shape (n,), (N, n) or (N, n,
c), where n is the length of each series, N is the number of series,
and c is the number of channels.

Y : numpy.ndarray, optional
Binary labels of time series, where 0 represents a normal point and 1
represents an anomalous points. Matrix with shape (n,), (N, n) or (N,
n, cl), where n is the length of each series, N is the number of
series, and cl is the number of classes (i.e. types of anomaly).
Default: None.

num_anchors : int, optional
Number of anchors to sample for each series. Default: 5.

min_anchor : float, optional
Minimal value of anchors. Default: 0.0.

max_anchor : float, optional
Maximal value of anchors. Default: 100.0.

random_seed : int, optional
Random seed used to initialize the pseudo-random number generator.
Default: None.

Returns
-------
tuple (numpy.ndarray, numpy.ndarray)
Augmented time series and augmented labels (if argument `Y` exists).

"""

N = (
0
)  # type: int  # NOTE: this is a horrible hack to type hint for Python 3.5
n = 0  # type: int
c = 0  # type: int
N, n, c = X.shape
rand = np.random.RandomState(random_seed)
anchors = np.cumsum(
rand.normal(size=(N, num_anchors, c)), axis=1
)  # type: np.ndarray
anchors = (anchors - anchors.min(axis=1).reshape((N, 1, c))) / (
anchors.max(axis=1) - anchors.min(axis=1)
).reshape((N, 1, c))
anchors = anchors * (max_anchor - min_anchor) + min_anchor
return trend(X, Y, anchors)

[docs]class Trend(_Augmentor):
"""
Augmentor that adds trend to time series.

Given m anchors, the trend of a series is generated by cublic spline
interpolation over all anchors that are uniformly distributed.

Parameters
----------
anchors : numpy.ndarray
Anchor points from which trend is interpolated. An array with shape
(m,), (N, m), or (N, m, c), where m is the number of anchor points of
each series, N is the number of series, and c is the number of
channels. Default: None, i.e. no trend added.

"""

def __init__(self, anchors: Optional[np.ndarray] = None) -> None:
super().__init__(
augmentor_func=trend, is_random=False, anchors=anchors
)

@property
def anchors(self) -> Optional[np.ndarray]:
return self._params["anchors"]

@anchors.setter
def anchors(self, anchors: Optional[np.ndarray]) -> None:
self._params["anchors"] = anchors

[docs]class RandomTrend(_Augmentor):
"""
Augmentor to add random trend to time series.

Given m anchors, the trend of a series is generated by cublic spline
interpolation over all anchors that are uniformly distributed. Anchors are
randomly sampled between given bounds.

Parameters
----------
num_anchors : int, optional
Number of anchors to sample for each series. Default: 5.

min_anchor : float, optional
Minimal value of anchors. Default: 0.0.

max_anchor : float, optional
Maximal value of anchors. Default: 100.0.

random_seed : int, optional
Random seed used to initialize the pseudo-random number generator.
Default: None.

"""

def __init__(
self,
num_anchors: int = 5,
min_anchor: float = 0.0,
max_anchor: float = 100.0,
random_seed: Optional[int] = None,
) -> None:
super().__init__(
augmentor_func=random_trend,
is_random=True,
num_anchors=num_anchors,
min_anchor=min_anchor,
max_anchor=max_anchor,
random_seed=random_seed,
)

@property
def num_anchors(self) -> int:
return self._params["num_anchors"]

@num_anchors.setter
def num_anchors(self, num_anchors: int) -> None:
self._params["num_anchors"] = num_anchors

@property
def min_anchor(self) -> float:
return self._params["min_anchor"]

@min_anchor.setter
def min_anchor(self, min_anchor: float) -> None:
self._params["min_anchor"] = min_anchor

@property
def max_anchor(self) -> float:
return self._params["max_anchor"]

@max_anchor.setter
def max_anchor(self, max_anchor: float) -> None:
self._params["max_anchor"] = max_anchor

@property
def random_seed(self) -> Optional[int]:
return self._params["random_seed"]

@random_seed.setter
def random_seed(self, random_seed: Optional[int]) -> None:
self._params["random_seed"] = random_seed
```