Skip to content

Implement predict in ensembles #972

Merged
merged 4 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Make `SeasonalMovingAverageModel` and `DeadlineMovingAverageModel` to work with context ([#917](https://github.com/tinkoff-ai/etna/pull/917))
-
- Add `predict` method to models ([#935](https://github.com/tinkoff-ai/etna/pull/935))
-
- Implement `predict` in ensembles ([#972](https://github.com/tinkoff-ai/etna/pull/972))
- Implement `predict` in `Pipeline`, `AutoRegressivePipeline` ([#970](https://github.com/tinkoff-ai/etna/pull/970))
### Changed
-
Expand Down
13 changes: 13 additions & 0 deletions etna/ensembles/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import List
from typing import Optional

import pandas as pd

from etna.datasets import TSDataset
from etna.loggers import tslogger
Expand Down Expand Up @@ -37,3 +40,13 @@ def _forecast_pipeline(pipeline: BasePipeline) -> TSDataset:
forecast = pipeline.forecast()
tslogger.log(msg=f"Forecast is done with {pipeline}.")
return forecast

@staticmethod
def _predict_pipeline(
pipeline: BasePipeline, start_timestamp: Optional[pd.Timestamp], end_timestamp: Optional[pd.Timestamp]
) -> TSDataset:
"""Make predict with given pipeline."""
tslogger.log(msg=f"Start prediction with {pipeline}.")
prediction = pipeline.predict(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
tslogger.log(msg=f"Prediction is done with {pipeline}.")
return prediction
21 changes: 21 additions & 0 deletions etna/ensembles/direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import cast

import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed

Expand Down Expand Up @@ -133,3 +136,21 @@ def _forecast(self) -> TSDataset:
)
forecast = self._merge(forecasts=forecasts)
return forecast

def _predict(
self,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

self.ts = cast(TSDataset, self.ts)
horizons = [pipeline.horizon for pipeline in self.pipelines]
pipeline = self.pipelines[np.argmin(horizons)]
prediction = self._predict_pipeline(
pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
return prediction
58 changes: 43 additions & 15 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from typing import cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -211,30 +213,56 @@ def _make_features(
else:
return x, None

def _process_forecasts(self, forecasts: List[TSDataset]) -> TSDataset:
x, _ = self._make_features(forecasts=forecasts, train=False)
self.ts = cast(TSDataset, self.ts)
y = self.final_model.predict(x)
num_segments = len(forecasts[0].segments)
y = y.reshape(num_segments, -1).T
num_timestamps = y.shape[0]

# Format the forecast into TSDataset
segment_col = [segment for segment in self.ts.segments for _ in range(num_timestamps)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)

df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN

result = TSDataset(df=df, freq=self.ts.freq, df_exog=df_exog)
result.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return result

def _forecast(self) -> TSDataset:
"""Make predictions.

Compute the combination of pipelines' forecasts using ``final_model``
"""
if self.ts is None:
raise ValueError("Something went wrong, ts is None!")

# Get forecast
forecasts = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._forecast_pipeline)(pipeline=pipeline) for pipeline in self.pipelines
)
x, _ = self._make_features(forecasts=forecasts, train=False)
y = self.final_model.predict(x).reshape(-1, self.horizon).T

# Format the forecast into TSDataset
segment_col = [segment for segment in self.ts.segments for _ in range(self.horizon)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)
forecast = self._process_forecasts(forecasts=forecasts)
return forecast

df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN
def _predict(
self,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

forecast = TSDataset(df=df, freq=self.ts.freq, df_exog=df_exog)
forecast.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return forecast
self.ts = cast(TSDataset, self.ts)
predictions = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._predict_pipeline)(
pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
for pipeline in self.pipelines
)
prediction = self._process_forecasts(forecasts=predictions)
return prediction
22 changes: 22 additions & 0 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast

import pandas as pd
from joblib import Parallel
Expand Down Expand Up @@ -209,3 +211,23 @@ def _forecast(self) -> TSDataset:
)
forecast = self._vote(forecasts=forecasts)
return forecast

def _predict(
self,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

self.ts = cast(TSDataset, self.ts)
predictions = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._predict_pipeline)(
pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
for pipeline in self.pipelines
)
predictions = self._vote(forecasts=predictions)
return predictions
63 changes: 1 addition & 62 deletions etna/pipeline/autoregressive_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import warnings
from typing import Optional
from typing import Sequence
from typing import cast

Expand All @@ -15,7 +14,7 @@
from etna.transforms import Transform


class AutoRegressivePipeline(BasePipeline, ModelPipelinePredictMixin):
class AutoRegressivePipeline(ModelPipelinePredictMixin, BasePipeline):
"""Pipeline that make regressive models autoregressive.

Examples
Expand Down Expand Up @@ -161,63 +160,3 @@ def _forecast(self) -> TSDataset:
prediction_ts.df = prediction_ts.df.tail(self.horizon)
prediction_ts.raw_df = prediction_ts.raw_df.tail(self.horizon)
return prediction_ts

def predict(
self,
start_timestamp: Optional[pd.Timestamp] = None,
end_timestamp: Optional[pd.Timestamp] = None,
prediction_interval: bool = False,
quantiles: Sequence[float] = (0.025, 0.975),
) -> TSDataset:
"""Make in-sample predictions in a given range.

Currently, in situation when segments start with different timestamps
we only guarantee to work with ``start_timestamp`` >= beginning of all segments.

Parameters
----------
start_timestamp:
First timestamp of prediction range to return, should be >= than first timestamp in ``self.ts``;
expected that beginning of each segment <= ``start_timestamp``;
if isn't set the first timestamp where each segment began is taken.
end_timestamp:
Last timestamp of prediction range to return; if isn't set the last timestamp of ``self.ts`` is taken.
Expected that value is less or equal to the last timestamp in ``self.ts``.
prediction_interval:
If True returns prediction interval for forecast.
quantiles:
Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval.

Returns
-------
:
Dataset with predictions in ``[start_timestamp, end_timestamp]`` range.

Raises
------
ValueError:
Pipeline wasn't fitted.
ValueError:
Value of ``end_timestamp`` is less than ``start_timestamp``.
ValueError:
Value of ``start_timestamp`` goes before point where each segment started.
ValueError:
Value of ``end_timestamp`` goes after the last timestamp.
"""
if self.ts is None:
raise ValueError(
f"{self.__class__.__name__} is not fitted! Fit the {self.__class__.__name__} "
f"before calling predict method."
)

start_timestamp, end_timestamp = self._make_predict_timestamps(
ts=self.ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
self._validate_quantiles(quantiles=quantiles)
result = self._predict(
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
prediction_interval=prediction_interval,
quantiles=quantiles,
)
return result
27 changes: 26 additions & 1 deletion etna/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,15 @@ def _make_predict_timestamps(

return start_timestamp, end_timestamp

def _predict(
self,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
raise NotImplementedError("Predict method isn't implemented!")

def predict(
self,
start_timestamp: Optional[pd.Timestamp] = None,
Expand Down Expand Up @@ -383,7 +392,23 @@ def predict(
ValueError:
Value of ``end_timestamp`` goes after the last timestamp.
"""
raise NotImplementedError("Predict method isn't implemented!")
if self.ts is None:
raise ValueError(
f"{self.__class__.__name__} is not fitted! Fit the {self.__class__.__name__} "
f"before calling predict method."
)

start_timestamp, end_timestamp = self._make_predict_timestamps(
ts=self.ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
self._validate_quantiles(quantiles=quantiles)
result = self._predict(
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
prediction_interval=prediction_interval,
quantiles=quantiles,
)
return result

def _init_backtest(self):
self._folds: Optional[Dict[int, Any]] = None
Expand Down
16 changes: 9 additions & 7 deletions etna/pipeline/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,27 @@ class ModelPipelinePredictMixin:
model: ModelType
transforms: Sequence[Transform]

def _recreate_ts(self, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp) -> TSDataset:
"""Recreate ``self.ts`` on given timestamp range in state before any transformations."""
def _create_ts(self, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp) -> TSDataset:
"""Create ``TSDataset`` to make predictions on."""
self.ts = cast(TSDataset, self.ts)
df = self.ts.raw_df.copy()
# we make it through deepcopy to handle df_exog=None
df_exog = deepcopy(self.ts.df_exog)
freq = self.ts.freq
known_future = self.ts.known_future

df_to_transform = df[:end_timestamp]
cur_ts = TSDataset(df=df_to_transform, df_exog=df_exog, freq=freq, known_future=known_future)
cur_ts.transform(transforms=self.transforms)

# correct start_timestamp taking into account context size
timestamp_indices = pd.Series(np.arange(len(df.index)), index=df.index)
start_idx = timestamp_indices[start_timestamp]
start_idx = max(0, start_idx - self.model.context_size)
start_timestamp = timestamp_indices.index[start_idx]

df = df[start_timestamp:end_timestamp]
ts = TSDataset(df=df, df_exog=df_exog, freq=freq, known_future=known_future)
return ts
cur_ts.df = cur_ts.df[start_timestamp:end_timestamp]
return cur_ts

def _determine_prediction_size(self, start_timestamp: pd.Timestamp, end_timestamp: pd.Timestamp) -> int:
self.ts = cast(TSDataset, self.ts)
Expand All @@ -57,9 +60,8 @@ def _predict(
quantiles: Sequence[float],
) -> TSDataset:
self.ts = cast(TSDataset, self.ts)
ts = self._recreate_ts(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
ts = self._create_ts(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
prediction_size = self._determine_prediction_size(start_timestamp=start_timestamp, end_timestamp=end_timestamp)
ts.transform(transforms=self.transforms)

if prediction_interval and isinstance(self.model, get_args(NonPredictionIntervalModelType)):
raise NotImplementedError(f"Model {self.model.__class__.__name__} doesn't support prediction intervals!")
Expand Down
Loading