Skip to content

ETNA-759: MedianOutliersTransform and DensityOutliersTransform #30

Merged
merged 8 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion etna/analysis/outliers/median_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import typing

import numpy as np
import pandas as pd

from etna.datasets.tsdataset import TSDataset


def get_anomalies_median(
ts: TSDataset, window_size: int = 10, alpha: float = 3
) -> typing.Dict[str, typing.List[np.datetime64]]:
) -> typing.Dict[str, typing.List[pd.Timestamp]]:
"""
Get point outliers in time series using median model (estimation model-based method).
Outliers are all points deviating from the median by more than alpha * std, where std is the sample variance in the window.
Expand Down
8 changes: 5 additions & 3 deletions etna/datasets/tsdataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
import warnings
from typing import TYPE_CHECKING
from typing import Iterable
from typing import Optional
from typing import Sequence
Expand All @@ -10,7 +11,8 @@
import pandas as pd
from matplotlib import pyplot as plt

from etna.transforms.base import Transform
if TYPE_CHECKING:
from etna.transforms.base import Transform

TTimestamp = Union[str, pd.Timestamp]

Expand Down Expand Up @@ -64,14 +66,14 @@ def __init__(self, df: pd.DataFrame, freq: str, df_exog: Optional[pd.DataFrame]

self.transforms = None

def transform(self, transforms: Iterable[Transform]):
def transform(self, transforms: Iterable["Transform"]):
"""Apply given transform to the data."""
self._check_endings()
self.transforms = transforms
for transform in self.transforms:
self.df = transform.transform(self.df)

def fit_transform(self, transforms: Iterable[Transform]):
def fit_transform(self, transforms: Iterable["Transform"]):
"""Fit and apply given transforms to the data."""
self._check_endings()
self.transforms = transforms
Expand Down
2 changes: 2 additions & 0 deletions etna/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from etna.transforms.imputation import TimeSeriesImputerTransform
from etna.transforms.lags import LagTransform
from etna.transforms.log import LogTransform
from etna.transforms.outliers import DensityOutliersTransform
from etna.transforms.outliers import MedianOutliersTransform
from etna.transforms.power import BoxCoxTransform
from etna.transforms.power import YeoJohnsonTransform
from etna.transforms.scalers import MaxAbsScalerTransform
Expand Down
174 changes: 174 additions & 0 deletions etna/transforms/outliers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from abc import ABC
from abc import abstractmethod
from typing import Callable
from typing import Dict
from typing import List

import numpy as np
import pandas as pd

from etna.datasets import TSDataset
from etna.transforms.base import Transform


class OutliersTransform(Transform, ABC):
"""Finds outliers in specific columns of DataFrame and replaces it with NaNs."""

def __init__(self, in_column: str):
"""
Create instance of OutliersTransform.

Parameters
----------
in_column:
name of processed column
"""
self.in_column = in_column
self.outliers_timestamps = None

def fit(self, df: pd.DataFrame) -> "OutliersTransform":
"""
Find outliers using detection method.

Parameters
----------
df:
dataframe with series to find outliers

Returns
-------
result: _OneSegmentTimeSeriesImputerTransform
instance with saved outliers
"""
ts = TSDataset(df, freq=pd.infer_freq(df.index))
self.outliers_timestamps = self.detect_outliers(ts)
return self

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Replace found outliers with NaNs.

Parameters
----------
df:
transform in_column series of given dataframe

Returns
-------
result: pd.DataFrame
dataframe with in_column series with filled with NaNs
"""
result_df = df.copy()
for segment in df.columns.get_level_values("segment"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unique().

result_df.loc[self.outliers_timestamps[segment], pd.IndexSlice[segment, self.in_column]] = np.NaN
return result_df

@abstractmethod
def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]:
"""Call function for detection outliers with self parameters.

Parameters
----------
ts:
dataset to process

Returns
-------
dict of outliers:
dict of outliers in format {segment: [outliers_timestamps]}
"""
pass


class MedianOutliersTransform(OutliersTransform):
"""Transform that uses get_anomalies_median to find anomalies in data."""

def __init__(self, in_column: str, window_size: int = 10, alpha: float = 3):
"""Create instance of MedianOutliersTransform.

Parameters
----------
in_column:
name of processed column
window_size:
number of points in the window
alpha:
coefficient for determining the threshold
"""
self.in_column = in_column
self.window_size = window_size
self.alpha = alpha
super().__init__(in_column=self.in_column)

def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]:
"""Call `get_anomalies_median` function with self parameters.

Parameters
----------
ts:
dataset to process

Returns
-------
dict of outliers:
dict of outliers in format {segment: [outliers_timestamps]}
"""
from etna.analysis import get_anomalies_median
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use both: type_checking and importing inside methods?


return get_anomalies_median(ts, self.window_size, self.alpha)


class DensityOutliersTransform(OutliersTransform):
"""Transform that uses get_anomalies_density to find anomalies in data."""

def __init__(
self,
in_column: str,
window_size: int = 15,
distance_threshold: float = 100,
n_neighbors: int = 3,
distance_func: Callable[[float, float], float] = lambda x, y: abs(x - y),
):
"""Create instance of DensityOutliersTransform.

Parameters
----------
in_column:
name of processed column
window_size:
size of windows to build
distance_threshold:
distance threshold to determine points are close to each other
n_neighbors:
min number of close neighbors of point not to be outlier
distance_func:
distance function
"""
self.in_column = in_column
self.window_size = window_size
self.distance_threshold = distance_threshold
self.n_neighbors = n_neighbors
self.distance_func = distance_func
super().__init__(in_column=self.in_column)

def detect_outliers(self, ts: TSDataset) -> Dict[str, List[pd.Timestamp]]:
"""Call `get_anomalies_density` function with self parameters.

Parameters
----------
ts:
dataset to process

Returns
-------
dict of outliers:
dict of outliers in format {segment: [outliers_timestamps]}
"""
from etna.analysis import get_anomalies_density

return get_anomalies_density(
ts, self.window_size, self.distance_threshold, self.n_neighbors, self.distance_func
)


__all__ = ["MedianOutliersTransform", "DensityOutliersTransform"]
Empty file added tests/test_analysis/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions tests/test_transforms/test_outliers_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import numpy as np
import pandas as pd
import pytest

from etna.analysis import get_anomalies_density
from etna.analysis import get_anomalies_median
from etna.datasets.tsdataset import TSDataset
from etna.transforms import DensityOutliersTransform
from etna.transforms import MedianOutliersTransform


@pytest.mark.parametrize(
"transform", [MedianOutliersTransform(in_column="target"), DensityOutliersTransform(in_column="target")]
)
def test_interface(transform, example_tsds: TSDataset):
"""Checks that MedianOutliersTransform and DensityOutliersTransform doesn't change structure of dataframe."""
start_columnns = example_tsds.columns
example_tsds.fit_transform(transforms=[transform])
assert np.all(start_columnns == example_tsds.columns)


@pytest.mark.parametrize(
"transform, method",
[
(MedianOutliersTransform(in_column="target"), get_anomalies_median),
(DensityOutliersTransform(in_column="target"), get_anomalies_density),
],
)
def test_outliers_detection(transform, method, outliers_tsds, recwarn):
"""Checks that MedianOutliersTransform detect anomalies according to `get_anomalies_median`."""
detectiom_method_results = method(outliers_tsds)

# save for each segment index without existing nans
non_nan_index = {}
for segment in outliers_tsds.segments:
non_nan_index[segment] = outliers_tsds[:, segment, "target"].dropna().index

# convert to df to ignore different lengths of series
transformed_df = transform.fit_transform(outliers_tsds.to_pandas())
for segment in outliers_tsds.segments:
nan_timestamps = detectiom_method_results[segment]
transformed_column = transformed_df.loc[non_nan_index[segment], pd.IndexSlice[segment, "target"]]
assert np.all(transformed_column[transformed_column.isna()].index == nan_timestamps)