Skip to content

Commit

Permalink
Add refit parameter into backtest (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mr-Geekman authored Mar 14, 2023
1 parent d9c24de commit 02bf892
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add saving/loading for transforms, models, pipelines, ensembles; tutorial for saving/loading ([#1068](https://github.com/tinkoff-ai/etna/pull/1068))
-
-
-
- Add `refit` parameter into `backtest` ([#1159](https://github.com/tinkoff-ai/etna/pull/1159))
- Add optional parameter `ts` into `forecast` method of pipelines ([#1071](https://github.com/tinkoff-ai/etna/pull/1071))
- Add tests on `transform` method of transforms on subset of segments, on new segments, on future with gap ([#1094](https://github.com/tinkoff-ai/etna/pull/1094))
- Add tests on `inverse_transform` method of transforms on subset of segments, on new segments, on future with gap ([#1127](https://github.com/tinkoff-ai/etna/pull/1127))
Expand Down
189 changes: 169 additions & 20 deletions etna/pipeline/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from abc import abstractmethod
from copy import deepcopy
from enum import Enum
Expand All @@ -15,6 +16,7 @@
from joblib import Parallel
from joblib import delayed
from scipy.stats import norm
from typing_extensions import TypedDict

from etna.core import AbstractSaveable
from etna.core import BaseMixin
Expand Down Expand Up @@ -210,11 +212,14 @@ def backtest(
mode: str = "expand",
aggregate_metrics: bool = False,
n_jobs: int = 1,
refit: Union[bool, int] = True,
joblib_params: Optional[Dict[str, Any]] = None,
forecast_params: Optional[Dict[str, Any]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Run backtest with the pipeline.
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception.
Parameters
----------
ts:
Expand All @@ -229,6 +234,15 @@ def backtest(
If True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
Number of jobs to run in parallel
refit:
Determines how often pipeline should be retrained during iteration over folds.
* If ``True``: pipeline is retrained on each fold.
* If ``False``: pipeline is trained only on the first fold.
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`
forecast_params:
Expand All @@ -241,6 +255,15 @@ def backtest(
"""


class FoldParallelGroup(TypedDict):
"""Group for parallel fold processing."""

train_fold_number: int
train_mask: FoldMask
forecast_fold_numbers: List[int]
forecast_masks: List[FoldMask]


class BasePipeline(AbstractPipeline, BaseMixin):
"""Base class for pipeline."""

Expand Down Expand Up @@ -522,21 +545,39 @@ def _compute_metrics(metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset
metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore
return metrics_values

def _run_fold(
def _fit_backtest_pipeline(
self,
ts: TSDataset,
fold_number: int,
) -> "BasePipeline":
"""Fit pipeline for a given data in backtest."""
tslogger.start_experiment(job_type="training", group=str(fold_number))
pipeline = deepcopy(self)
pipeline.fit(ts=ts)
tslogger.finish_experiment()
return pipeline

def _forecast_backtest_pipeline(
self, pipeline: "BasePipeline", ts: TSDataset, fold_number: int, forecast_params: Dict[str, Any]
) -> TSDataset:
"""Make a forecast with a given pipeline in backtest."""
tslogger.start_experiment(job_type="forecasting", group=str(fold_number))
forecast = pipeline.forecast(ts=ts, **forecast_params)
tslogger.finish_experiment()
return forecast

def _process_fold_forecast(
self,
forecast: TSDataset,
train: TSDataset,
test: TSDataset,
fold_number: int,
mask: FoldMask,
metrics: List[Metric],
forecast_params: Dict[str, Any],
) -> Dict[str, Any]:
"""Run fit-forecast pipeline of model for one fold."""
"""Process forecast made for a fold."""
tslogger.start_experiment(job_type="crossval", group=str(fold_number))

pipeline = deepcopy(self)
pipeline.fit(ts=train)
forecast = pipeline.forecast(**forecast_params)
fold: Dict[str, Any] = {}
for stage_name, stage_df in zip(("train", "test"), (train, test)):
fold[f"{stage_name}_timerange"] = {}
Expand Down Expand Up @@ -620,6 +661,108 @@ def _prepare_fold_masks(self, ts: TSDataset, masks: Union[int, List[FoldMask]],
mask.validate_on_dataset(ts=ts, horizon=self.horizon)
return masks

@staticmethod
def _make_backtest_fold_groups(masks: List[FoldMask], refit: Union[bool, int]) -> List[FoldParallelGroup]:
"""Make groups of folds for backtest."""
if not refit:
refit = len(masks)

grouped_folds = []
num_groups = math.ceil(len(masks) / refit)
for group_id in range(num_groups):
train_fold_number = group_id * refit
forecast_fold_numbers = [train_fold_number + i for i in range(refit) if train_fold_number + i < len(masks)]
cur_group: FoldParallelGroup = {
"train_fold_number": train_fold_number,
"train_mask": masks[train_fold_number],
"forecast_fold_numbers": forecast_fold_numbers,
"forecast_masks": [masks[i] for i in forecast_fold_numbers],
}
grouped_folds.append(cur_group)

return grouped_folds

def _run_all_folds(
self,
masks: List[FoldMask],
ts: TSDataset,
metrics: List[Metric],
n_jobs: int,
refit: Union[bool, int],
joblib_params: Dict[str, Any],
forecast_params: Dict[str, Any],
) -> Dict[int, Any]:
"""Run pipeline on all folds."""
fold_groups = self._make_backtest_fold_groups(masks=masks, refit=refit)

with Parallel(n_jobs=n_jobs, **joblib_params) as parallel:
# fitting
fit_masks = [group["train_mask"] for group in fold_groups]
fit_datasets = (
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon)
)
pipelines = parallel(
delayed(self._fit_backtest_pipeline)(ts=fit_ts, fold_number=fold_groups[group_idx]["train_fold_number"])
for group_idx, fit_ts in enumerate(fit_datasets)
)

# forecasting
forecast_masks = [group["forecast_masks"] for group in fold_groups]
forecast_datasets = (
(
train
for train, _ in self._generate_folds_datasets(
ts=ts, masks=group_forecast_masks, horizon=self.horizon
)
)
for group_forecast_masks in forecast_masks
)
forecasts_flat = parallel(
delayed(self._forecast_backtest_pipeline)(
ts=forecast_ts,
pipeline=pipelines[group_idx],
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx],
forecast_params=forecast_params,
)
for group_idx, group_forecast_datasets in enumerate(forecast_datasets)
for idx, forecast_ts in enumerate(group_forecast_datasets)
)

# processing forecasts
fold_process_train_datasets = (
train for train, _ in self._generate_folds_datasets(ts=ts, masks=fit_masks, horizon=self.horizon)
)
fold_process_test_datasets = (
(
test
for _, test in self._generate_folds_datasets(
ts=ts, masks=group_forecast_masks, horizon=self.horizon
)
)
for group_forecast_masks in forecast_masks
)
fold_results_flat = parallel(
delayed(self._process_fold_forecast)(
forecast=forecasts_flat[group_idx * refit + idx],
train=train,
test=test,
fold_number=fold_groups[group_idx]["forecast_fold_numbers"][idx],
mask=fold_groups[group_idx]["forecast_masks"][idx],
metrics=metrics,
)
for group_idx, (train, group_fold_process_test_datasets) in enumerate(
zip(fold_process_train_datasets, fold_process_test_datasets)
)
for idx, test in enumerate(group_fold_process_test_datasets)
)

results = {
fold_number: fold_results_flat[group_idx * refit + idx]
for group_idx in range(len(fold_groups))
for idx, fold_number in enumerate(fold_groups[group_idx]["forecast_fold_numbers"])
}
return results

def backtest(
self,
ts: TSDataset,
Expand All @@ -628,11 +771,14 @@ def backtest(
mode: str = "expand",
aggregate_metrics: bool = False,
n_jobs: int = 1,
refit: Union[bool, int] = True,
joblib_params: Optional[Dict[str, Any]] = None,
forecast_params: Optional[Dict[str, Any]] = None,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Run backtest with the pipeline.
If ``refit != True`` and some component of the pipeline doesn't support forecasting with gap, this component will raise an exception.
Parameters
----------
ts:
Expand All @@ -647,6 +793,15 @@ def backtest(
If True aggregate metrics above folds, return raw metrics otherwise
n_jobs:
Number of jobs to run in parallel
refit:
Determines how often pipeline should be retrained during iteration over folds.
* If ``True``: pipeline is retrained on each fold.
* If ``False``: pipeline is trained only on the first fold.
* If ``value: int``: pipeline is trained every ``value`` folds starting from the first.
joblib_params:
Additional parameters for :py:class:`joblib.Parallel`
forecast_params:
Expand All @@ -666,21 +821,15 @@ def backtest(
self._init_backtest()
self._validate_backtest_metrics(metrics=metrics)
masks = self._prepare_fold_masks(ts=ts, masks=n_folds, mode=mode)

folds = Parallel(n_jobs=n_jobs, **joblib_params)(
delayed(self._run_fold)(
train=train,
test=test,
fold_number=fold_number,
mask=masks[fold_number],
metrics=metrics,
forecast_params=forecast_params,
)
for fold_number, (train, test) in enumerate(
self._generate_folds_datasets(ts=ts, masks=masks, horizon=self.horizon)
)
self._folds = self._run_all_folds(
masks=masks,
ts=ts,
metrics=metrics,
n_jobs=n_jobs,
refit=refit,
joblib_params=joblib_params,
forecast_params=forecast_params,
)
self._folds = {i: fold for i, fold in enumerate(folds)}

metrics_df = self._get_backtest_metrics(aggregate_metrics=aggregate_metrics)
forecast_df = self._get_backtest_forecasts()
Expand Down
20 changes: 12 additions & 8 deletions tests/test_loggers/test_file_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,12 @@ def test_local_file_logger_with_stacking_ensemble(example_df):
assert len(list(cur_dir.iterdir())) == 1, "we've run one experiment"

current_experiment_dir = list(cur_dir.iterdir())[0]
assert len(list(current_experiment_dir.iterdir())) == 2, "crossval and crossval_results folders"
assert len(list(current_experiment_dir.iterdir())) == 4, "training, forecasting, crossval, crossval_results"

assert (
len(list((current_experiment_dir / "crossval").iterdir())) == n_folds
), "crossval should have `n_folds` runs"
for folder in ["training", "forecasting", "crossval"]:
assert (
len(list((current_experiment_dir / folder).iterdir())) == n_folds
), f"{folder} should have `n_folds` runs"

tslogger.remove(idx)

Expand Down Expand Up @@ -281,11 +282,14 @@ def test_local_file_logger_with_empirical_prediction_interval(example_df):
assert len(list(cur_dir.iterdir())) == 1, "we've run one experiment"

current_experiment_dir = list(cur_dir.iterdir())[0]
assert len(list(current_experiment_dir.iterdir())) == 2, "crossval and crossval_results folders"

assert (
len(list((current_experiment_dir / "crossval").iterdir())) == n_folds
), "crossval should have `n_folds` runs"
len(list(current_experiment_dir.iterdir())) == 4
), "training, forecasting, crossval, crossval_results folders"

for folder in ["training", "forecasting", "crossval"]:
assert (
len(list((current_experiment_dir / folder).iterdir())) == n_folds
), f"{folder} should have `n_folds` runs"

tslogger.remove(idx)

Expand Down
13 changes: 12 additions & 1 deletion tests/test_pipeline/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from etna.datasets import TSDataset
from etna.models import CatBoostPerSegmentModel
from etna.models import NaiveModel
from etna.pipeline import Pipeline
from etna.transforms import LagTransform

Expand All @@ -25,6 +26,16 @@ def catboost_pipeline() -> Pipeline:
return pipeline


@pytest.fixture
def naive_pipeline() -> Pipeline:
"""Generate pipeline with NaiveModel."""
pipeline = Pipeline(
model=NaiveModel(lag=7),
horizon=7,
)
return pipeline


@pytest.fixture
def catboost_pipeline_big() -> Pipeline:
"""Generate pipeline with CatBoostPerSegmentModel."""
Expand Down Expand Up @@ -218,7 +229,7 @@ def masked_ts() -> TSDataset:


@pytest.fixture
def ts_run_fold() -> TSDataset:
def ts_process_fold_forecast() -> TSDataset:
timerange = pd.date_range(start="2020-01-01", periods=11).to_list()
df = pd.DataFrame({"timestamp": timerange + timerange})
df["segment"] = ["segment_0"] * 11 + ["segment_1"] * 11
Expand Down
Loading

0 comments on commit 02bf892

Please sign in to comment.