Skip to content

Commit

Permalink
Merge inference track (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Geekman authored Oct 26, 2022
1 parent 326b599 commit 43ae404
Show file tree
Hide file tree
Showing 49 changed files with 4,531 additions and 1,494 deletions.
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
-
- Add `plot_change_points_interactive` ([#988](https://github.com/tinkoff-ai/etna/pull/988))
- Add `experimental` module with `TimeSeriesBinaryClassifier` and `PredictabilityAnalyzer` ([#985](https://github.com/tinkoff-ai/etna/pull/985))
-
-
-
- Inference track results: add `predict` method to pipelines, teach some models to work with context, change hierarchy of base models, update notebook examples ([#979](https://github.com/tinkoff-ai/etna/pull/979))
-
-
### Changed
-
- Change returned model in get_model of BATSModel, TBATSModel ([#987](https://github.com/tinkoff-ai/etna/pull/987))
-
-
- Change returned model in `get_model` of `HoltWintersModel`, `HoltModel`, `SimpleExpSmoothingModel` ([#986](https://github.com/tinkoff-ai/etna/pull/986))
-
-
-
-
-
Expand Down
9 changes: 5 additions & 4 deletions etna/analysis/outliers/prediction_interval_outliers.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ def get_anomalies_prediction_interval(
model_instance = model(**model_params)
model_instance.fit(ts_inner)
lower_p, upper_p = [(1 - interval_width) / 2, (1 + interval_width) / 2]
prediction_interval = model_instance.forecast(
prediction_interval = model_instance.predict(
deepcopy(ts_inner), prediction_interval=True, quantiles=[lower_p, upper_p]
)
for segment in ts_inner.segments:
segment_slice = prediction_interval[:, segment, :][segment]
anomalies_mask = (segment_slice["target"] > segment_slice[f"target_{upper_p:.4g}"]) | (
segment_slice["target"] < segment_slice[f"target_{lower_p:.4g}"]
predicted_segment_slice = prediction_interval[:, segment, :][segment]
actual_segment_slice = ts_inner[:, segment, :][segment]
anomalies_mask = (actual_segment_slice["target"] > predicted_segment_slice[f"target_{upper_p:.4g}"]) | (
actual_segment_slice["target"] < predicted_segment_slice[f"target_{lower_p:.4g}"]
)
outliers_per_segment[segment] = list(time_points[anomalies_mask])
return outliers_per_segment
16 changes: 16 additions & 0 deletions etna/ensembles/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import List
from typing import Optional

import pandas as pd

from etna.datasets import TSDataset
from etna.loggers import tslogger
Expand Down Expand Up @@ -37,3 +40,16 @@ def _forecast_pipeline(pipeline: BasePipeline) -> TSDataset:
forecast = pipeline.forecast()
tslogger.log(msg=f"Forecast is done with {pipeline}.")
return forecast

@staticmethod
def _predict_pipeline(
ts: TSDataset,
pipeline: BasePipeline,
start_timestamp: Optional[pd.Timestamp],
end_timestamp: Optional[pd.Timestamp],
) -> TSDataset:
"""Make predict with given pipeline."""
tslogger.log(msg=f"Start prediction with {pipeline}.")
prediction = pipeline.predict(ts=ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
tslogger.log(msg=f"Prediction is done with {pipeline}.")
return prediction
20 changes: 20 additions & 0 deletions etna/ensembles/direct_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence

import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed

Expand Down Expand Up @@ -133,3 +135,21 @@ def _forecast(self) -> TSDataset:
)
forecast = self._merge(forecasts=forecasts)
return forecast

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

horizons = [pipeline.horizon for pipeline in self.pipelines]
pipeline = self.pipelines[np.argmin(horizons)]
prediction = self._predict_pipeline(
ts=ts, pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
return prediction
58 changes: 43 additions & 15 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
from typing import cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -211,30 +213,56 @@ def _make_features(
else:
return x, None

def _process_forecasts(self, forecasts: List[TSDataset]) -> TSDataset:
x, _ = self._make_features(forecasts=forecasts, train=False)
self.ts = cast(TSDataset, self.ts)
y = self.final_model.predict(x)
num_segments = len(forecasts[0].segments)
y = y.reshape(num_segments, -1).T
num_timestamps = y.shape[0]

# Format the forecast into TSDataset
segment_col = [segment for segment in self.ts.segments for _ in range(num_timestamps)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)

df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN

result = TSDataset(df=df, freq=self.ts.freq, df_exog=df_exog)
result.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return result

def _forecast(self) -> TSDataset:
"""Make predictions.
Compute the combination of pipelines' forecasts using ``final_model``
"""
if self.ts is None:
raise ValueError("Something went wrong, ts is None!")

# Get forecast
forecasts = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._forecast_pipeline)(pipeline=pipeline) for pipeline in self.pipelines
)
x, _ = self._make_features(forecasts=forecasts, train=False)
y = self.final_model.predict(x).reshape(-1, self.horizon).T

# Format the forecast into TSDataset
segment_col = [segment for segment in self.ts.segments for _ in range(self.horizon)]
x.loc[:, "segment"] = segment_col
x.loc[:, "timestamp"] = x.index.values
df_exog = TSDataset.to_dataset(x)
forecast = self._process_forecasts(forecasts=forecasts)
return forecast

df = forecasts[0][:, :, "target"].copy()
df.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = np.NAN
def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

forecast = TSDataset(df=df, freq=self.ts.freq, df_exog=df_exog)
forecast.loc[pd.IndexSlice[:], pd.IndexSlice[:, "target"]] = y
return forecast
predictions = Parallel(n_jobs=self.n_jobs, **self.joblib_params)(
delayed(self._predict_pipeline)(
ts=ts, pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
for pipeline in self.pipelines
)
prediction = self._process_forecasts(forecasts=predictions)
return prediction
23 changes: 23 additions & 0 deletions etna/ensembles/voting_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence
from typing import Union
from typing import cast

import pandas as pd
from joblib import Parallel
Expand Down Expand Up @@ -209,3 +211,24 @@ def _forecast(self) -> TSDataset:
)
forecast = self._vote(forecasts=forecasts)
return forecast

def _predict(
self,
ts: TSDataset,
start_timestamp: pd.Timestamp,
end_timestamp: pd.Timestamp,
prediction_interval: bool,
quantiles: Sequence[float],
) -> TSDataset:
if prediction_interval:
raise NotImplementedError(f"Ensemble {self.__class__.__name__} doesn't support prediction intervals!")

self.ts = cast(TSDataset, self.ts)
predictions = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._predict_pipeline)(
ts=ts, pipeline=pipeline, start_timestamp=start_timestamp, end_timestamp=end_timestamp
)
for pipeline in self.pipelines
)
predictions = self._vote(forecasts=predictions)
return predictions
12 changes: 9 additions & 3 deletions etna/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from etna import SETTINGS
from etna.models.autoarima import AutoARIMAModel
from etna.models.base import BaseAdapter
from etna.models.base import BaseModel
from etna.models.base import Model
from etna.models.base import PerSegmentModel
from etna.models.base import ContextIgnorantModelType
from etna.models.base import ContextRequiredModelType
from etna.models.base import ModelType
from etna.models.base import NonPredictionIntervalContextIgnorantAbstractModel
from etna.models.base import NonPredictionIntervalContextRequiredAbstractModel
from etna.models.base import NonPredictionIntervalModelType
from etna.models.base import PredictionIntervalContextIgnorantAbstractModel
from etna.models.base import PredictionIntervalContextRequiredAbstractModel
from etna.models.base import PredictionIntervalModelType
from etna.models.catboost import CatBoostModelMultiSegment
from etna.models.catboost import CatBoostModelPerSegment
from etna.models.catboost import CatBoostMultiSegmentModel
Expand Down
11 changes: 9 additions & 2 deletions etna/models/autoarima.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from statsmodels.tools.sm_exceptions import ValueWarning
from statsmodels.tsa.statespace.sarimax import SARIMAXResultsWrapper

from etna.models.base import PerSegmentPredictionIntervalModel
from etna.models.base import PredictionIntervalContextIgnorantAbstractModel
from etna.models.mixins import PerSegmentModelMixin
from etna.models.mixins import PredictionIntervalContextIgnorantModelMixin
from etna.models.sarimax import _SARIMAXBaseAdapter

warnings.filterwarnings(
Expand Down Expand Up @@ -49,10 +51,15 @@ def _get_fit_results(self, endog: pd.Series, exog: pd.DataFrame) -> SARIMAXResul
return model.arima_res_


class AutoARIMAModel(PerSegmentPredictionIntervalModel):
class AutoARIMAModel(
PerSegmentModelMixin, PredictionIntervalContextIgnorantModelMixin, PredictionIntervalContextIgnorantAbstractModel
):
"""
Class for holding auto arima model.
Method ``predict`` can use true target values only on train data on future data autoregression
forecasting will be made even if targets are known.
Notes
-----
We use :py:class:`pmdarima.arima.arima.ARIMA`.
Expand Down
Loading

1 comment on commit 43ae404

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.