diff --git a/CHANGELOG.md b/CHANGELOG.md index 774aac4b8..6d9af63c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add 'in_column' parameter to get_anomalies methods([#199](https://github.com/tinkoff-ai/etna-ts/pull/199)) - Clustering notebook ([#152](https://github.com/tinkoff-ai/etna-ts/pull/152)) - StackingEnsemble ([#195](https://github.com/tinkoff-ai/etna-ts/pull/195)) +- Add AutoRegressivePipeline ([#209](https://github.com/tinkoff-ai/etna-ts/pull/209)) ### Changed - Delete offset from WindowStatisticsTransform ([#111](https://github.com/tinkoff-ai/etna-ts/pull/111)) diff --git a/docs/source/api.rst b/docs/source/api.rst index e299a091e..00044db33 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -13,6 +13,7 @@ API metrics transforms ensembles + pipeline analysis model_selection loggers diff --git a/docs/source/index.rst b/docs/source/index.rst index b9aee1a89..78137d4ce 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -12,6 +12,7 @@ Welcome to ETNA's documentation! metrics transforms ensembles + pipeline analysis model_selection loggers diff --git a/docs/source/pipeline.rst b/docs/source/pipeline.rst new file mode 100644 index 000000000..07cd80dd3 --- /dev/null +++ b/docs/source/pipeline.rst @@ -0,0 +1,20 @@ +Pipelines +========== + +.. _pipeline: + +.. currentmodule:: etna + +Details and available algorithms +-------------------------------- + +See the API documentation for further details of using pipelines: + +.. currentmodule:: etna + +.. moduleautosummary:: + :toctree: api/ + :template: custom-module-template.rst + :recursive: + + etna.pipeline \ No newline at end of file diff --git a/docs/source/transforms.rst b/docs/source/transforms.rst index 0beeae36b..70c4ee9bb 100644 --- a/docs/source/transforms.rst +++ b/docs/source/transforms.rst @@ -8,7 +8,7 @@ Transforms Details and available algorithms -------------------------------- -See the API documentation for further details on available clustering algorithms: +See the API documentation for further details on available feature extractions and transformations: .. currentmodule:: etna diff --git a/etna/pipeline/__init__.py b/etna/pipeline/__init__.py index 1f2cdc476..f84946e5e 100644 --- a/etna/pipeline/__init__.py +++ b/etna/pipeline/__init__.py @@ -1 +1,2 @@ +from etna.pipeline.autoregressive_pipeline import AutoRegressivePipeline from etna.pipeline.pipeline import Pipeline diff --git a/etna/pipeline/autoregressive_pipeline.py b/etna/pipeline/autoregressive_pipeline.py new file mode 100644 index 000000000..dd97a622e --- /dev/null +++ b/etna/pipeline/autoregressive_pipeline.py @@ -0,0 +1,137 @@ +import warnings +from copy import deepcopy +from typing import Iterable + +import pandas as pd + +from etna.datasets import TSDataset +from etna.models.base import Model +from etna.pipeline.pipeline import Pipeline +from etna.transforms import Transform + + +class AutoRegressivePipeline(Pipeline): + """Pipeline that make regressive models autoregressive. + + Examples + -------- + >>> from etna.datasets import generate_periodic_df + >>> from etna.datasets import TSDataset + >>> from etna.models import LinearPerSegmentModel + >>> from etna.transforms import LagTransform + >>> classic_df = generate_periodic_df( + ... periods=100, + ... start_time="2020-01-01", + ... n_segments=4, + ... period=7, + ... sigma=3 + ... ) + >>> df = TSDataset.to_dataset(df=classic_df) + >>> ts = TSDataset(df, freq="D") + >>> horizon = 7 + >>> transforms = [ + ... LagTransform(in_column="target", lags=list(range(1, horizon+1))) + ... ] + >>> model = LinearPerSegmentModel() + >>> pipeline = AutoRegressivePipeline(model, horizon, transforms, step=1) + >>> _ = pipeline.fit(ts=ts) + >>> forecast = pipeline.forecast() + >>> pd.options.display.float_format = '{:,.2f}'.format + >>> forecast[:, :, "target"] + segment segment_0 segment_1 segment_2 segment_3 + feature target target target target + timestamp + 2020-04-10 9.00 9.00 4.00 6.00 + 2020-04-11 5.00 2.00 7.00 9.00 + 2020-04-12 0.00 4.00 7.00 9.00 + 2020-04-13 0.00 5.00 9.00 7.00 + 2020-04-14 1.00 2.00 1.00 6.00 + 2020-04-15 5.00 7.00 4.00 7.00 + 2020-04-16 8.00 6.00 2.00 0.00 + """ + + def __init__(self, model: Model, horizon: int, transforms: Iterable[Transform] = (), step: int = 1): + """ + Create instance of AutoRegressivePipeline with given parameters. + + Parameters + ---------- + model: + Instance of the etna Model + horizon: + Number of timestamps in the future for forecasting + transforms: + Sequence of the transforms + step: + Size of prediction for one step of forecasting + """ + self.model = model + self.horizon = horizon + self.transforms = transforms + self.step = step + self.transforms = transforms + self.model = model + + def fit(self, ts: TSDataset) -> Pipeline: + """Fit the Pipeline. + Fit and apply given transforms to the data, then fit the model on the transformed data. + + Parameters + ---------- + ts: + Dataset with timeseries data + + Returns + ------- + Pipeline: + Fitted Pipeline instance + """ + self.ts = deepcopy(ts) + ts.fit_transform(self.transforms) + self.model.fit(ts) + return self + + def _create_predictions_template(self) -> pd.DataFrame: + """Create dataframe to fill with forecasts.""" + prediction_df = self.ts.to_pandas() + future_dates = pd.date_range( + start=prediction_df.index.max(), periods=self.horizon + 1, freq=self.ts.freq, closed="right" + ) + prediction_df = prediction_df.reindex(prediction_df.index.append(future_dates)) + prediction_df.index.name = "timestamp" + return prediction_df + + def forecast(self) -> TSDataset: + """Make predictions. + + Returns + ------- + TSDataset: + TSDataset with forecast + """ + prediction_df = self._create_predictions_template() + + for idx_start in range(0, self.horizon, self.step): + current_step = min(self.step, self.horizon - idx_start) + current_idx_border = self.ts.index.shape[0] + idx_start + current_ts = TSDataset(prediction_df.iloc[:current_idx_border], freq=self.ts.freq) + # manually set transforms in current_ts, otherwise make_future won't know about them + current_ts.transforms = self.transforms + with warnings.catch_warnings(): + warnings.filterwarnings( + message="TSDataset freq can't be inferred", + action="ignore", + ) + warnings.filterwarnings( + message="You probably set wrong freq.", + action="ignore", + ) + current_ts_forecast = current_ts.make_future(current_step) + current_ts_future = self.model.forecast(current_ts_forecast) + prediction_df = prediction_df.combine_first(current_ts_future.to_pandas()[prediction_df.columns]) + + prediction_ts = TSDataset(prediction_df.tail(self.horizon), freq=self.ts.freq) + # add all other features to forecast by making transform + inverse_transform + prediction_ts.transform(self.transforms) + prediction_ts.inverse_transform() + return prediction_ts diff --git a/tests/test_pipeline/test_autoregressive_pipeline.py b/tests/test_pipeline/test_autoregressive_pipeline.py new file mode 100644 index 000000000..6f0f533c1 --- /dev/null +++ b/tests/test_pipeline/test_autoregressive_pipeline.py @@ -0,0 +1,91 @@ +from copy import deepcopy + +import numpy as np +import pandas as pd +import pytest + +from etna.datasets import TSDataset +from etna.models import LinearPerSegmentModel +from etna.pipeline import AutoRegressivePipeline +from etna.transforms import DateFlagsTransform +from etna.transforms import LagTransform +from etna.transforms import LinearTrendTransform + + +def test_fit(example_tsds): + """Test that AutoRegressivePipeline pipeline makes fit without failing.""" + model = LinearPerSegmentModel() + transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform()] + pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=5, step=1) + pipeline.fit(example_tsds) + + +def test_forecast_columns(example_tsds): + """Test that AutoRegressivePipeline generates all the columns.""" + original_ts = deepcopy(example_tsds) + horizon = 5 + + # make predictions in AutoRegressivePipeline + model = LinearPerSegmentModel() + transforms = [LagTransform(in_column="target", lags=[1]), DateFlagsTransform(is_weekend=True)] + pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) + pipeline.fit(example_tsds) + forecast_pipeline = pipeline.forecast() + + # generate all columns + original_ts.fit_transform(transforms) + + assert set(forecast_pipeline.columns) == set(original_ts.columns) + + +def test_forecast_one_step(example_tsds): + """Test that AutoRegressivePipeline gets predictions one by one if step is equal to 1.""" + original_ts = deepcopy(example_tsds) + horizon = 5 + + # make predictions in AutoRegressivePipeline + model = LinearPerSegmentModel() + transforms = [LagTransform(in_column="target", lags=[1])] + pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) + pipeline.fit(example_tsds) + forecast_pipeline = pipeline.forecast() + + # make predictions manually + df = original_ts.to_pandas() + original_ts.fit_transform(transforms) + model = LinearPerSegmentModel() + model.fit(original_ts) + for i in range(horizon): + cur_ts = TSDataset(df, freq=original_ts.freq) + # these transform don't fit and we can fit_transform them at each step + cur_ts.transform(transforms) + cur_forecast_ts = cur_ts.make_future(1) + cur_future_ts = model.forecast(cur_forecast_ts) + to_add_df = cur_future_ts.to_pandas() + df = pd.concat([df, to_add_df[df.columns]]) + + forecast_manual = TSDataset(df.tail(horizon), freq=original_ts.freq) + assert np.all(forecast_pipeline[:, :, "target"] == forecast_manual[:, :, "target"]) + + +@pytest.mark.parametrize("horizon, step", ((1, 1), (5, 1), (5, 2), (5, 3), (5, 4), (5, 5), (20, 1), (20, 2), (20, 3))) +def test_forecast_multi_step(example_tsds, horizon, step): + """Test that AutoRegressivePipeline gets correct number of predictions if step is more than 1.""" + model = LinearPerSegmentModel() + transforms = [LagTransform(in_column="target", lags=[step])] + pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=step) + pipeline.fit(example_tsds) + forecast_pipeline = pipeline.forecast() + + assert forecast_pipeline.df.shape[0] == horizon + + +def test_forecast_with_fit_transforms(example_tsds): + """Test that AutoRegressivePipeline can work with transforms that need fitting.""" + horizon = 5 + + model = LinearPerSegmentModel() + transforms = [LagTransform(in_column="target", lags=[1]), LinearTrendTransform(in_column="target")] + pipeline = AutoRegressivePipeline(model=model, transforms=transforms, horizon=horizon, step=1) + pipeline.fit(example_tsds) + pipeline.forecast()