diff --git a/etna/analysis/outliers/median_outliers.py b/etna/analysis/outliers/median_outliers.py index 023848e0c..51ba0f6c7 100644 --- a/etna/analysis/outliers/median_outliers.py +++ b/etna/analysis/outliers/median_outliers.py @@ -2,13 +2,14 @@ import typing import numpy as np +import pandas as pd from etna.datasets.tsdataset import TSDataset def get_anomalies_median( ts: TSDataset, window_size: int = 10, alpha: float = 3 -) -> typing.Dict[str, typing.List[np.datetime64]]: +) -> typing.Dict[str, typing.List[pd.Timestamp]]: """ Get point outliers in time series using median model (estimation model-based method). Outliers are all points deviating from the median by more than alpha * std, where std is the sample variance in the window. diff --git a/etna/datasets/tsdataset.py b/etna/datasets/tsdataset.py index 3dbeae04c..eb55e42c9 100644 --- a/etna/datasets/tsdataset.py +++ b/etna/datasets/tsdataset.py @@ -1,5 +1,6 @@ import math import warnings +from typing import TYPE_CHECKING from typing import Iterable from typing import Optional from typing import Sequence @@ -10,7 +11,8 @@ import pandas as pd from matplotlib import pyplot as plt -from etna.transforms.base import Transform +if TYPE_CHECKING: + from etna.transforms.base import Transform TTimestamp = Union[str, pd.Timestamp] @@ -64,14 +66,14 @@ def __init__(self, df: pd.DataFrame, freq: str, df_exog: Optional[pd.DataFrame] self.transforms = None - def transform(self, transforms: Iterable[Transform]): + def transform(self, transforms: Iterable["Transform"]): """Apply given transform to the data.""" self._check_endings() self.transforms = transforms for transform in self.transforms: self.df = transform.transform(self.df) - def fit_transform(self, transforms: Iterable[Transform]): + def fit_transform(self, transforms: Iterable["Transform"]): """Fit and apply given transforms to the data.""" self._check_endings() self.transforms = transforms diff --git a/etna/transforms/__init__.py b/etna/transforms/__init__.py index 3c7ef5770..963dc2ae9 100644 --- a/etna/transforms/__init__.py +++ b/etna/transforms/__init__.py @@ -6,6 +6,8 @@ from etna.transforms.imputation import TimeSeriesImputerTransform from etna.transforms.lags import LagTransform from etna.transforms.log import LogTransform +from etna.transforms.outliers import DensityOutliersTransform +from etna.transforms.outliers import MedianOutliersTransform from etna.transforms.power import BoxCoxTransform from etna.transforms.power import YeoJohnsonTransform from etna.transforms.scalers import MaxAbsScalerTransform diff --git a/etna/transforms/outliers.py b/etna/transforms/outliers.py new file mode 100644 index 000000000..8f8a1dcf3 --- /dev/null +++ b/etna/transforms/outliers.py @@ -0,0 +1,172 @@ +from abc import ABC +from abc import abstractmethod +from typing import Callable +from typing import Dict +from typing import List + +import numpy as np +import pandas as pd + +from etna.analysis import get_anomalies_density +from etna.analysis import get_anomalies_median +from etna.datasets import TSDataset +from etna.transforms.base import Transform + + +class OutliersTransform(Transform, ABC): + """Finds outliers in specific columns of DataFrame and replaces it with NaNs.""" + + def __init__(self, in_column: str): + """ + Create instance of OutliersTransform. + + Parameters + ---------- + in_column: + name of processed column + """ + self.in_column = in_column + self.outliers_timestamps = None + + def fit(self, df: pd.DataFrame) -> "OutliersTransform": + """ + Find outliers using detection method. + + Parameters + ---------- + df: + dataframe with series to find outliers + + Returns + ------- + result: _OneSegmentTimeSeriesImputerTransform + instance with saved outliers + """ + ts = TSDataset(df, freq=pd.infer_freq(df.index)) + self.outliers_timestamps = self.detect_outliers(ts) + return self + + def transform(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Replace found outliers with NaNs. + + Parameters + ---------- + df: + transform in_column series of given dataframe + + Returns + ------- + result: pd.DataFrame + dataframe with in_column series with filled with NaNs + """ + result_df = df.copy() + for segment in df.columns.get_level_values("segment").unique(): + result_df.loc[self.outliers_timestamps[segment], pd.IndexSlice[segment, self.in_column]] = np.NaN + return result_df + + @abstractmethod + def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]: + """Call function for detection outliers with self parameters. + + Parameters + ---------- + ts: + dataset to process + + Returns + ------- + dict of outliers: + dict of outliers in format {segment: [outliers_timestamps]} + """ + pass + + +class MedianOutliersTransform(OutliersTransform): + """Transform that uses get_anomalies_median to find anomalies in data.""" + + def __init__(self, in_column: str, window_size: int = 10, alpha: float = 3): + """Create instance of MedianOutliersTransform. + + Parameters + ---------- + in_column: + name of processed column + window_size: + number of points in the window + alpha: + coefficient for determining the threshold + """ + self.in_column = in_column + self.window_size = window_size + self.alpha = alpha + super().__init__(in_column=self.in_column) + + def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]: + """Call `get_anomalies_median` function with self parameters. + + Parameters + ---------- + ts: + dataset to process + + Returns + ------- + dict of outliers: + dict of outliers in format {segment: [outliers_timestamps]} + """ + return get_anomalies_median(ts, self.window_size, self.alpha) + + +class DensityOutliersTransform(OutliersTransform): + """Transform that uses get_anomalies_density to find anomalies in data.""" + + def __init__( + self, + in_column: str, + window_size: int = 15, + distance_threshold: float = 100, + n_neighbors: int = 3, + distance_func: Callable[[float, float], float] = lambda x, y: abs(x - y), + ): + """Create instance of DensityOutliersTransform. + + Parameters + ---------- + in_column: + name of processed column + window_size: + size of windows to build + distance_threshold: + distance threshold to determine points are close to each other + n_neighbors: + min number of close neighbors of point not to be outlier + distance_func: + distance function + """ + self.in_column = in_column + self.window_size = window_size + self.distance_threshold = distance_threshold + self.n_neighbors = n_neighbors + self.distance_func = distance_func + super().__init__(in_column=self.in_column) + + def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]: + """Call `get_anomalies_density` function with self parameters. + + Parameters + ---------- + ts: + dataset to process + + Returns + ------- + dict of outliers: + dict of outliers in format {segment: [outliers_timestamps]} + """ + return get_anomalies_density( + ts, self.window_size, self.distance_threshold, self.n_neighbors, self.distance_func + ) + + +__all__ = ["MedianOutliersTransform", "DensityOutliersTransform"] diff --git a/tests/test_analysis/__init__.py b/tests/test_analysis/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/test_transforms/test_outliers_transform.py b/tests/test_transforms/test_outliers_transform.py new file mode 100644 index 000000000..85d3bdf9b --- /dev/null +++ b/tests/test_transforms/test_outliers_transform.py @@ -0,0 +1,43 @@ +import numpy as np +import pandas as pd +import pytest + +from etna.analysis import get_anomalies_density +from etna.analysis import get_anomalies_median +from etna.datasets.tsdataset import TSDataset +from etna.transforms import DensityOutliersTransform +from etna.transforms import MedianOutliersTransform + + +@pytest.mark.parametrize( + "transform", [MedianOutliersTransform(in_column="target"), DensityOutliersTransform(in_column="target")] +) +def test_interface(transform, example_tsds: TSDataset): + """Checks that MedianOutliersTransform and DensityOutliersTransform doesn't change structure of dataframe.""" + start_columnns = example_tsds.columns + example_tsds.fit_transform(transforms=[transform]) + assert np.all(start_columnns == example_tsds.columns) + + +@pytest.mark.parametrize( + "transform, method", + [ + (MedianOutliersTransform(in_column="target"), get_anomalies_median), + (DensityOutliersTransform(in_column="target"), get_anomalies_density), + ], +) +def test_outliers_detection(transform, method, outliers_tsds, recwarn): + """Checks that MedianOutliersTransform detect anomalies according to `get_anomalies_median`.""" + detectiom_method_results = method(outliers_tsds) + + # save for each segment index without existing nans + non_nan_index = {} + for segment in outliers_tsds.segments: + non_nan_index[segment] = outliers_tsds[:, segment, "target"].dropna().index + + # convert to df to ignore different lengths of series + transformed_df = transform.fit_transform(outliers_tsds.to_pandas()) + for segment in outliers_tsds.segments: + nan_timestamps = detectiom_method_results[segment] + transformed_column = transformed_df.loc[non_nan_index[segment], pd.IndexSlice[segment, "target"]] + assert np.all(transformed_column[transformed_column.isna()].index == nan_timestamps)