Skip to content

Create PerSegmentBaseModel, PerSegmentPredictionIntervalModel, PerSegmentModel #537

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 177 additions & 65 deletions etna/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union

Expand Down Expand Up @@ -85,71 +86,6 @@ def _forecast_segment(model, segment: Union[str, List[str]], ts: TSDataset) -> p
return segment_predict


class PerSegmentModel(Model):
"""Class for holding specific models for per-segment prediction."""

def __init__(self, base_model):
super(PerSegmentModel, self).__init__()
self._base_model = base_model
self._segments = None

@log_decorator
def fit(self, ts: TSDataset) -> "PerSegmentModel":
"""Fit model."""
self._segments = ts.segments
self._build_models()

for segment in self._segments:
model = self._models[segment]
segment_features = ts[:, segment, :]
segment_features = segment_features.dropna()
segment_features = segment_features.droplevel("segment", axis=1)
segment_features = segment_features.reset_index()
model.fit(df=segment_features)
return self

@log_decorator
def forecast(self, ts: TSDataset) -> TSDataset:
"""Make predictions.

Parameters
----------
ts:
Dataframe with features
Returns
-------
DataFrame
Models result
"""
if self._segments is None:
raise ValueError("The model is not fitted yet, use fit() to train it")

result_list = list()
for segment in self._segments:
model = self._models[segment]

segment_predict = self._forecast_segment(model, segment, ts)
result_list.append(segment_predict)

# need real case to test
result_df = pd.concat(result_list, ignore_index=True)
result_df = result_df.set_index(["timestamp", "segment"])
df = ts.to_pandas(flatten=True)
df = df.set_index(["timestamp", "segment"])
df = df.combine_first(result_df).reset_index()

df = TSDataset.to_dataset(df)
ts.df = df
ts.inverse_transform()
return ts

def _build_models(self):
"""Create a dict with models for each segment (if required)."""
self._models = {}
for segment in self._segments:
self._models[segment] = deepcopy(self._base_model)


class FitAbstractModel(ABC):
"""Interface for model with fit method."""

Expand Down Expand Up @@ -230,3 +166,179 @@ def forecast(
Dataset with predictions
"""
pass


class PerSegmentBaseModel(FitAbstractModel, BaseMixin):
"""Base class for holding specific models for per-segment prediction."""

def __init__(self, base_model: Any):
"""
Init PerSegmentBaseModel.

Parameters
----------
base_model:
Internal model which will be used to forecast segments, expected to have fit/predict interface
"""
self._base_model = base_model
self._segments: Optional[List[str]] = None
self._models: Optional[Dict[str, Any]] = None

@log_decorator
def fit(self, ts: TSDataset) -> "PerSegmentBaseModel":
"""Fit model.

Parameters
----------
ts:
Dataset with features

Returns
-------
self:
Model after fit
"""
self._segments = ts.segments
self._models = {}
for segment in ts.segments:
self._models[segment] = deepcopy(self._base_model)

for segment, model in self._models.items():
segment_features = ts[:, segment, :]
segment_features = segment_features.dropna()
segment_features = segment_features.droplevel("segment", axis=1)
segment_features = segment_features.reset_index()
model.fit(df=segment_features)
return self

def get_model(self) -> Dict[str, Any]:
"""Get internal models that are used inside etna class.

Internal model is a model that is used inside etna to forecast segments, e.g. `catboost.CatBoostRegressor`
or `sklearn.linear_model.Ridge`.

Returns
-------
result:
dictionary where key is segment and value is internal model
"""
if self._models is None:
raise ValueError("Can not get the dict with base models from not fitted model!")
return self._models

@staticmethod
def _forecast_segment(model: Any, segment: str, ts: TSDataset, *args, **kwargs) -> pd.DataFrame:
"""Make predictions for one segment."""
segment_features = ts[:, segment, :]
segment_features = segment_features.droplevel("segment", axis=1)
segment_features = segment_features.reset_index()
dates = segment_features["timestamp"]
dates.reset_index(drop=True, inplace=True)
segment_predict = model.predict(df=segment_features, *args, **kwargs)
segment_predict = pd.DataFrame({"target": segment_predict})
segment_predict["segment"] = segment
segment_predict["timestamp"] = dates
return segment_predict

def _build_models(self):
"""Create a dict with models for each segment (if required)."""
self._models = {}
for segment in self._segments: # type: ignore
self._models[segment] = deepcopy(self._base_model)


class PerSegmentModel(PerSegmentBaseModel, ForecastAbstractModel):
"""Class for holding specific models for per-segment prediction."""

def __init__(self, base_model: Any):
"""
Init PerSegmentBaseModel.

Parameters
----------
base_model:
Internal model which will be used to forecast segments, expected to have fit/predict interface
"""
super().__init__(base_model=base_model)

@log_decorator
def forecast(self, ts: TSDataset) -> TSDataset:
"""Make predictions.

Parameters
----------
ts:
Dataframe with features
Returns
-------
forecast:
Dataset with predictions
"""
result_list = list()
for segment, model in self.get_model().items():
segment_predict = self._forecast_segment(model=model, segment=segment, ts=ts)
result_list.append(segment_predict)

result_df = pd.concat(result_list, ignore_index=True)
result_df = result_df.set_index(["timestamp", "segment"])
df = ts.to_pandas(flatten=True)
df = df.set_index(["timestamp", "segment"])
df = df.combine_first(result_df).reset_index()

df = TSDataset.to_dataset(df)
ts.df = df
ts.inverse_transform()
return ts


class PerSegmentPredictionIntervalModel(PerSegmentBaseModel, PredictIntervalAbstractModel):
"""Class for holding specific models for per-segment prediction which are able to build prediction intervals."""

def __init__(self, base_model: Any):
"""
Init PerSegmentPredictionIntervalModel.

Parameters
----------
base_model:
Internal model which will be used to forecast segments, expected to have fit/predict interface
"""
super().__init__(base_model=base_model)

@abstractmethod
def forecast(
self, ts: TSDataset, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975)
) -> TSDataset:
"""Make predictions.

Parameters
----------
ts:
Dataset with features
prediction_interval:
If True returns prediction interval for forecast
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% are taken to form a 95% prediction interval

Returns
-------
forecast:
Dataset with predictions
"""
result_list = list()
for segment, model in self.get_model().items():
segment_predict = self._forecast_segment(
model=model, segment=segment, ts=ts, prediction_interval=prediction_interval, quantiles=quantiles
)
result_list.append(segment_predict)

result_df = pd.concat(result_list, ignore_index=True)
result_df = result_df.set_index(["timestamp", "segment"])
df = ts.to_pandas(flatten=True)
df = df.set_index(["timestamp", "segment"])
df = df.combine_first(result_df).reset_index()

df = TSDataset.to_dataset(df)
ts.df = df
ts.inverse_transform()
return ts
4 changes: 2 additions & 2 deletions etna/models/prophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def fit(self, ts: TSDataset) -> "ProphetModel":
self._build_models()

for segment in self._segments:
model = self._models[segment]
model = self._models[segment] # type: ignore
segment_features = ts[:, segment, :]
segment_features = segment_features.dropna()
segment_features = segment_features.droplevel("segment", axis=1)
Expand Down Expand Up @@ -381,7 +381,7 @@ def forecast(

result_list = list()
for segment in self._segments:
model = self._models[segment]
model = self._models[segment] # type: ignore

segment_predict = self._forecast_one_segment(model, segment, ts, prediction_interval, quantiles)
result_list.append(segment_predict)
Expand Down
4 changes: 2 additions & 2 deletions etna/models/sarimax.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def fit(self, ts: TSDataset) -> "SARIMAXModel":
self._build_models()

for segment in self._segments:
model = self._models[segment]
model = self._models[segment] # type: ignore
segment_features = ts[:, segment, :]
segment_features = segment_features.dropna()
segment_features = segment_features.droplevel("segment", axis=1)
Expand Down Expand Up @@ -534,7 +534,7 @@ def forecast(

result_list = list()
for segment in self._segments:
model = self._models[segment]
model = self._models[segment] # type: ignore

segment_predict = self._forecast_one_segment(model, segment, ts, prediction_interval, quantiles)
iKintosh marked this conversation as resolved.
Show resolved Hide resolved
result_list.append(segment_predict)
Expand Down
2 changes: 1 addition & 1 deletion etna/models/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def fit(self, ts: TSDataset) -> "SklearnPerSegmentModel":
self._build_models()

for segment in self._segments:
model = self._models[segment]
model = self._models[segment] # type: ignore
segment_features = ts[:, segment, :]
segment_features = segment_features.dropna()
segment_features = segment_features.droplevel("segment", axis=1)
Expand Down
1 change: 1 addition & 0 deletions tests/test_models/test_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def linear_segments_ts_common(random_seed):
return linear_segments_by_parameters(alpha_values, intercept_values)


@pytest.mark.xfail
@pytest.mark.parametrize("model", (LinearPerSegmentModel(), ElasticPerSegmentModel()))
def test_not_fitted(model, linear_segments_ts_unique):
"""Check exception when trying to forecast with unfitted model."""
Expand Down