Skip to content

Make SeasonalMovingAverageModel and DeadlineMovingAverageModel to work with context #917

Merged
merged 9 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
-
-
-
-
- Make `SeasonalMovingAverageModel` and `DeadlineMovingAverageModel` to work with context ([#917](https://github.com/tinkoff-ai/etna/pull/917))
-
-
-
Expand Down
7 changes: 4 additions & 3 deletions etna/analysis/outliers/prediction_interval_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ def get_anomalies_prediction_interval(
deepcopy(ts_inner), prediction_interval=True, quantiles=[lower_p, upper_p]
)
for segment in ts_inner.segments:
segment_slice = prediction_interval[:, segment, :][segment]
anomalies_mask = (segment_slice["target"] > segment_slice[f"target_{upper_p:.4g}"]) | (
segment_slice["target"] < segment_slice[f"target_{lower_p:.4g}"]
predicted_segment_slice = prediction_interval[:, segment, :][segment]
alex-hse-repository marked this conversation as resolved.
Show resolved Hide resolved
actual_segment_slice = ts_inner[:, segment, :][segment]
anomalies_mask = (actual_segment_slice["target"] > predicted_segment_slice[f"target_{upper_p:.4g}"]) | (
actual_segment_slice["target"] < predicted_segment_slice[f"target_{lower_p:.4g}"]
)
outliers_per_segment[segment] = list(time_points[anomalies_mask])
return outliers_per_segment
20 changes: 18 additions & 2 deletions etna/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ def _forecast_segment(model: Any, segment: str, ts: TSDataset, *args, **kwargs)
if isinstance(segment_predict, np.ndarray):
segment_predict = pd.DataFrame({"target": segment_predict})
segment_predict["segment"] = segment
segment_predict["timestamp"] = dates

prediction_size = kwargs.get("prediction_size")
if prediction_size is not None:
segment_predict["timestamp"] = dates[-prediction_size:].reset_index(drop=True)
else:
segment_predict["timestamp"] = dates
return segment_predict

@log_decorator
Expand All @@ -489,16 +494,26 @@ def _forecast(self, ts: TSDataset, **kwargs) -> TSDataset:
result_df = result_df.set_index(["timestamp", "segment"])
df = ts.to_pandas(flatten=True)
df = df.set_index(["timestamp", "segment"])
# clear values to be filled, otherwise during in-sample prediction new values won't be set
columns_to_clear = result_df.columns.intersection(df.columns)
df.loc[result_df.index, columns_to_clear] = np.NaN
df = df.combine_first(result_df).reset_index()

df = TSDataset.to_dataset(df)
ts.df = df
ts.inverse_transform()

prediction_size = kwargs.get("prediction_size")
if prediction_size is not None:
ts.df = ts.df.iloc[-prediction_size:]
return ts


class MultiSegmentModelMixin(ModelForecastMixin):
"""Mixin for holding methods for multi-segment prediction."""
"""Mixin for holding methods for multi-segment prediction.

It currently isn't working with prediction intervals and context.
"""

def __init__(self, base_model: Any):
"""
Expand Down Expand Up @@ -547,6 +562,7 @@ def _forecast(self, ts: TSDataset, **kwargs) -> TSDataset:
"""
horizon = len(ts.df)
x = ts.to_pandas(flatten=True).drop(["segment"], axis=1)
# TODO: make it work with prediction intervals and context
y = self._base_model.predict(x, **kwargs).reshape(-1, horizon).T
ts.loc[:, pd.IndexSlice[:, "target"]] = y
ts.inverse_transform()
Expand Down
115 changes: 84 additions & 31 deletions etna/models/deadline_ma.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import numpy as np
import pandas as pd

from etna.models.base import NonPredictionIntervalContextIgnorantAbstractModel
from etna.models.base import NonPredictionIntervalContextIgnorantModelMixin
from etna.models.base import NonPredictionIntervalContextRequiredAbstractModel
from etna.models.base import NonPredictionIntervalContextRequiredModelMixin
from etna.models.base import PerSegmentModelMixin


Expand All @@ -31,7 +31,7 @@ def __init__(self, window: int = 3, seasonality: str = "month"):
"""
Initialize deadline moving average model.

Length of remembered tail of series is equal to the number of ``window`` months or years, depending on the ``seasonality``.
Length of the context is equal to the number of ``window`` months or years, depending on the ``seasonality``.

Parameters
----------
Expand Down Expand Up @@ -78,65 +78,116 @@ def fit(self, df: pd.DataFrame, regressors: List[str]) -> "_DeadlineMovingAverag
message=f"{type(self).__name__} does not work with any exogenous series or features. "
f"It uses only target series for predict/\n "
)
targets = df["target"]
timestamps = df["timestamp"]

if self.seasonality == SeasonalityMode.month:
first_index = timestamps.iloc[-1] - pd.DateOffset(months=self.window)
self._freq = freq

return self

@staticmethod
def _get_context_beginning(
df: pd.DataFrame, prediction_size: int, seasonality: SeasonalityMode, window: int
) -> pd.Timestamp:
"""
Get timestamp where context begins.

Parameters
----------
df:
Time series in a long format.
prediction_size:
Number of last timestamps to leave after making prediction.
Previous timestamps will be used as a context for models that require it.
seasonality:
Seasonality.
window:
Number of values taken for forecast of each point.

elif self.seasonality == SeasonalityMode.year:
first_index = timestamps.iloc[-1] - pd.DateOffset(years=self.window)
Returns
-------
:
Timestamp with beginning of the context.

Raises
------
ValueError:
if context isn't big enough
"""
df_history = df.iloc[:-prediction_size]
history_timestamps = df_history["timestamp"]
future_timestamps = df["timestamp"].iloc[-prediction_size:]

if first_index < timestamps.iloc[0]:
# if we have len(history_timestamps) == 0, then len(df) <= prediction_size
if len(history_timestamps) == 0:
raise ValueError(
"Given series is too short for chosen shift value. Try lower shift value, or give" "longer series."
"Given context isn't big enough, try to decrease context_size, prediction_size of increase length of given dataframe!"
)

self.series = targets.loc[timestamps >= first_index]
self.timestamps = timestamps.loc[timestamps >= first_index]
self.shift = len(self.series)
self._freq = freq
if seasonality is SeasonalityMode.month:
first_index = future_timestamps.iloc[0] - pd.DateOffset(months=window)

return self
elif seasonality is SeasonalityMode.year:
first_index = future_timestamps.iloc[0] - pd.DateOffset(years=window)

if first_index < history_timestamps.iloc[0]:
raise ValueError(
"Given context isn't big enough, try to decrease context_size, prediction_size of increase length of given dataframe!"
)

def predict(self, df: pd.DataFrame) -> np.ndarray:
return first_index

def predict(self, df: pd.DataFrame, prediction_size: int) -> np.ndarray:
"""
Compute predictions from a DeadlineMovingAverageModel.

Parameters
----------
df: pd.DataFrame
Used only for getting the horizon of forecast and timestamps.
prediction_size:
Number of last timestamps to leave after making prediction.
Previous timestamps will be used as a context for models that require it.

Returns
-------
:
Array with predictions.

Raises
------
ValueError:
if context isn't big enough
"""
timestamps = df["timestamp"]
index = pd.date_range(start=self.timestamps.iloc[0], end=timestamps.iloc[-1])
res = np.append(self.series.values, np.zeros(len(df)))
context_beginning = self._get_context_beginning(
df=df, prediction_size=prediction_size, seasonality=self.seasonality, window=self.window
)

df_history = df.iloc[:-prediction_size]
history_targets = df_history["target"]
history_timestamps = df_history["timestamp"]
history_targets = history_targets.loc[history_timestamps >= context_beginning]
history_timestamps = history_timestamps.loc[history_timestamps >= context_beginning]
future_timestamps = df["timestamp"].iloc[-prediction_size:]

index = pd.date_range(start=context_beginning, end=future_timestamps.iloc[-1])
res = np.append(history_targets.values, np.zeros(prediction_size))
res = pd.DataFrame(res)
res.index = index
for i in range(len(self.series), len(res)):
for i in range(len(history_targets), len(res)):
for w in range(1, self.window + 1):
if self.seasonality == SeasonalityMode.month:
prev_date = res.index[i] - pd.DateOffset(months=w)

elif self.seasonality == SeasonalityMode.year:
prev_date = res.index[i] - pd.DateOffset(years=w)
if prev_date <= self.timestamps.iloc[-1]:
res.loc[index[i]] += self.series.loc[self.timestamps == prev_date].values

if prev_date <= history_timestamps.iloc[-1]:
res.loc[index[i]] += history_targets.loc[history_timestamps == prev_date].values
else:
res.loc[index[i]] += res.loc[prev_date].values

res.loc[index[i]] = res.loc[index[i]] / self.window

res = res.values.reshape(
len(res),
)

return res[-len(df) :]
res = res.values.ravel()[-prediction_size:]
return res

@property
def context_size(self) -> int:
Expand All @@ -159,15 +210,17 @@ def context_size(self) -> int:

class DeadlineMovingAverageModel(
PerSegmentModelMixin,
NonPredictionIntervalContextIgnorantModelMixin,
NonPredictionIntervalContextIgnorantAbstractModel,
NonPredictionIntervalContextRequiredModelMixin,
NonPredictionIntervalContextRequiredAbstractModel,
):
"""Moving average model that uses exact previous dates to predict."""

def __init__(self, window: int = 3, seasonality: str = "month"):
"""
Initialize deadline moving average model.

Length of the context is equal to the number of ``window`` months or years, depending on the ``seasonality``.

Parameters
----------
window: int
Expand Down
48 changes: 26 additions & 22 deletions etna/models/seasonal_ma.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import numpy as np
import pandas as pd

from etna.models.base import NonPredictionIntervalContextIgnorantAbstractModel
from etna.models.base import NonPredictionIntervalContextIgnorantModelMixin
from etna.models.base import NonPredictionIntervalContextRequiredAbstractModel
from etna.models.base import NonPredictionIntervalContextRequiredModelMixin
from etna.models.base import PerSegmentModelMixin


Expand All @@ -24,7 +24,7 @@ def __init__(self, window: int = 5, seasonality: int = 7):
"""
Initialize seasonal moving average model.

Length of remembered tail of series is ``window * seasonality``.
Length of the context is ``window * seasonality``.

Parameters
----------
Expand All @@ -33,7 +33,6 @@ def __init__(self, window: int = 5, seasonality: int = 7):
seasonality: int
Lag between values taken for forecast.
"""
self.series = None
self.name = "target"
self.window = window
self.seasonality = seasonality
Expand All @@ -45,7 +44,7 @@ def fit(self, df: pd.DataFrame, regressors: List[str]) -> "_SeasonalMovingAverag

Parameters
----------
df: pd.DataFrame
df:
Data to fit on
regressors:
List of the columns with regressors(ignored in this model)
Expand All @@ -60,44 +59,49 @@ def fit(self, df: pd.DataFrame, regressors: List[str]) -> "_SeasonalMovingAverag
message=f"{type(self).__name__} does not work with any exogenous series or features. "
f"It uses only target series for predict/\n "
)
targets = df["target"]
if len(targets) < self.shift:
raise ValueError(
"Given series is too short for chosen shift value. Try lower shift value, or give" "longer series."
)
self.series = targets[-self.shift :].values

# ???
if targets.name is not None:
self.name = targets.name
return self

def predict(self, df: pd.DataFrame) -> np.ndarray:
def predict(self, df: pd.DataFrame, prediction_size: int) -> np.ndarray:
"""
Compute predictions from a SeasonalMovingAverage model.

Parameters
----------
df: pd.DataFrame
df:
Used only for getting the horizon of forecast
prediction_size:
Number of last timestamps to leave after making prediction.
Previous timestamps will be used as a context for models that require it.

Returns
-------
:
Array with predictions.

Raises
------
ValueError:
if context isn't big enough
"""
horizon = len(df)
res = np.append(self.series, np.zeros(horizon))
expected_length = prediction_size + self.shift
if len(df) < expected_length:
raise ValueError(
"Given context isn't big enough, try to decrease context_size, prediction_size of increase length of given dataframe!"
)

history = df["target"][-expected_length:-prediction_size]
res = np.append(history, np.zeros(prediction_size))
for i in range(self.shift, len(res)):
res[i] = res[i - self.shift : i : self.seasonality].mean()
y_pred = res[-horizon:]
y_pred = res[-prediction_size:]
return y_pred


class SeasonalMovingAverageModel(
PerSegmentModelMixin,
NonPredictionIntervalContextIgnorantModelMixin,
NonPredictionIntervalContextIgnorantAbstractModel,
NonPredictionIntervalContextRequiredModelMixin,
NonPredictionIntervalContextRequiredAbstractModel,
):
"""
Seasonal moving average.
Expand All @@ -112,7 +116,7 @@ def __init__(self, window: int = 5, seasonality: int = 7):
"""
Initialize seasonal moving average model.

Length of remembered tail of series is ``window * seasonality``.
Length of the context is ``window * seasonality``.

Parameters
----------
Expand Down
Loading