Skip to content

Commit

Permalink
Merge branch 'master' into issue-198
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-hse-repository authored Nov 3, 2021
2 parents 14fa696 + 13d7e75 commit 4c90ac4
Show file tree
Hide file tree
Showing 56 changed files with 1,412 additions and 535 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- TreeFeatureSelectionTransform ([#229](https://github.com/tinkoff-ai/etna-ts/pull/229))
- Feature relevance table calculation ([#227](https://github.com/tinkoff-ai/etna-ts/pull/227), [#249](https://github.com/tinkoff-ai/etna-ts/pull/249))
- Method flatten to TSDataset ([#241](https://github.com/tinkoff-ai/etna-ts/pull/241)
- Out_column parameter to not inplace transforms([#211](https://github.com/tinkoff-ai/etna-ts/pull/211))

### Changed
- Add possibility to set custom in_column for ConfidenceIntervalOutliersTransform ([#240](https://github.com/tinkoff-ai/etna-ts/pull/240))
- Make `in_column` the first argument in every transform ([#247](https://github.com/tinkoff-ai/etna-ts/pull/247))
- Update mypy checking and fix issues with it ([#248](https://github.com/tinkoff-ai/etna-ts/pull/248))
- Add histogram method in outliers notebook ([#252](https://github.com/tinkoff-ai/etna-ts/pull/252))

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lint:
black --check tests/
flake8 --exclude etna/libs etna/
flake8 --exclude etna/libs tests/ --select E,W,C,F401,N
mypy --exclude etna/libs
mypy

format:
isort --skip etna/libs --sl etna/
Expand All @@ -14,7 +14,7 @@ format:
black tests/
flake8 --exclude etna/libs etna/
flake8 --exclude etna/libs tests/ --select E,W,C,F401,N
mypy --exclude etna/libs
mypy

.PHONY: deps/release
deps/release:
Expand Down
12 changes: 10 additions & 2 deletions etna/clustering/distances/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@
import warnings
from abc import ABC
from abc import abstractmethod
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict

import numpy as np
import pandas as pd

from etna.core import BaseMixin

if TYPE_CHECKING:
from etna.datasets import TSDataset


class Distance(ABC, BaseMixin):
"""Base class for distances between series."""
Expand Down Expand Up @@ -85,21 +91,23 @@ def _get_average(self, ts: "TSDataset") -> pd.DataFrame:
"""Get series that minimizes squared distance to given ones according to the Distance."""
pass

def get_average(self, ts: "TSDataset") -> pd.DataFrame:
def get_average(self, ts: "TSDataset", **kwargs: Dict[str, Any]) -> pd.DataFrame:
"""Get series that minimizes squared distance to given ones according to the Distance.
Parameters
----------
ts:
TSDataset with series to be averaged
kwargs:
additional parameters for averaging
Returns
-------
pd.DataFrame:
dataframe with columns "timestamp" and "target" that contains the series
"""
self._validate_dataset(ts)
centroid = self._get_average(ts)
centroid = self._get_average(ts, **kwargs) # type: ignore
return centroid


Expand Down
4 changes: 4 additions & 0 deletions etna/clustering/distances/distance_matrix.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import warnings
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
Expand All @@ -10,6 +11,9 @@
from etna.core import BaseMixin
from etna.loggers import tslogger

if TYPE_CHECKING:
from etna.datasets import TSDataset


class DistanceMatrix(BaseMixin):
"""DistanceMatrix computes distance matrix from TSDataset."""
Expand Down
15 changes: 9 additions & 6 deletions etna/clustering/distances/dtw_distance.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import TYPE_CHECKING
from typing import Callable
from typing import List
from typing import Tuple
Expand All @@ -8,6 +9,9 @@

from etna.clustering.distances.base import Distance

if TYPE_CHECKING:
from etna.datasets import TSDataset


@numba.cfunc(numba.float64(numba.float64, numba.float64))
def simple_dist(x1: float, x2: float) -> float:
Expand All @@ -31,9 +35,7 @@ def simple_dist(x1: float, x2: float) -> float:
class DTWDistance(Distance):
"""DTW distance handler."""

def __init__(
self, points_distance: Callable[[np.ndarray, np.ndarray], float] = simple_dist, trim_series: bool = False
):
def __init__(self, points_distance: Callable[[float, float], float] = simple_dist, trim_series: bool = False):
"""Init DTWDistance.
Parameters
Expand Down Expand Up @@ -115,7 +117,7 @@ def _dba_iteration(self, initial_centroid: np.ndarray, series_list: List[np.ndar
@staticmethod
def _get_longest_series(ts: "TSDataset") -> pd.Series:
"""Get the longest series from the list."""
series_list = []
series_list: List[pd.Series] = []
for segment in ts.segments:
series = ts[:, segment, "target"].dropna()
series_list.append(series)
Expand All @@ -131,7 +133,7 @@ def _get_all_series(ts: "TSDataset") -> List[np.ndarray]:
series_list.append(series)
return series_list

def _get_average(self, ts: "TSDataset", n_iters: int = 10) -> np.ndarray:
def _get_average(self, ts: "TSDataset", n_iters: int = 10) -> pd.DataFrame:
"""Get series that minimizes squared distance to given ones according to the dtw distance.
Parameters
Expand All @@ -140,9 +142,10 @@ def _get_average(self, ts: "TSDataset", n_iters: int = 10) -> np.ndarray:
TSDataset with series to be averaged
n_iters:
number of DBA iterations to adjust centroid with series
Returns
-------
np.ndarray:
pd.Dataframe:
dataframe with columns "timestamp" and "target" that contains the series
"""
series_list = self._get_all_series(ts)
Expand Down
5 changes: 5 additions & 0 deletions etna/clustering/distances/euclidean_distance.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from typing import TYPE_CHECKING

import numba
import numpy as np
import pandas as pd

from etna.clustering.distances.base import Distance

if TYPE_CHECKING:
from etna.datasets import TSDataset


@numba.cfunc(numba.float64(numba.float64[:], numba.float64[:]))
def euclidean_distance(x1: np.ndarray, x2: np.ndarray) -> float:
Expand Down
11 changes: 5 additions & 6 deletions etna/clustering/hierarchical/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ClusteringLinkageMode(Enum):
class HierarchicalClustering(Clustering):
"""Base class for hierarchical clustering."""

def __init__(self):
def __init__(self, distance: Optional[Distance] = None):
"""Init HierarchicalClustering."""
super().__init__()
self.n_clusters: Optional[int] = None
Expand All @@ -35,10 +35,10 @@ def __init__(self):
self.clusters: Optional[List[int]] = None
self.ts: Optional["TSDataset"] = None
self.segment2cluster: Optional[Dict[str, int]] = None
self.distance: Optional[Distance] = None
self.distance: Optional[Distance] = distance
self.centroids_df: Optional[pd.DataFrame] = None

def build_distance_matrix(self, ts: "TSDataset", distance: Distance):
def build_distance_matrix(self, ts: "TSDataset"):
"""Compute distance matrix with given ts and distance.
Parameters
Expand All @@ -49,8 +49,7 @@ def build_distance_matrix(self, ts: "TSDataset", distance: Distance):
instance if distance to compute matrix
"""
self.ts = ts
self.distance = distance
self.distance_matrix = DistanceMatrix(distance=distance)
self.distance_matrix = DistanceMatrix(distance=self.distance)
self.distance_matrix.fit(ts=ts)
self.clusters = None
self.segment2cluster = None
Expand Down Expand Up @@ -104,7 +103,7 @@ def _get_series_in_cluster(self, cluster: int) -> TSDataset:
cluster_ts = TSDataset(df=self.ts[:, segments_in_cluster, "target"], freq=self.ts.freq)
return cluster_ts

def _get_centroid_of_cluster(self, cluster: str, **averaging_kwargs) -> pd.DataFrame:
def _get_centroid_of_cluster(self, cluster: int, **averaging_kwargs) -> pd.DataFrame:
"""Get centroid of cluster."""
cluster_ts = self._get_series_in_cluster(cluster)
centroid = self.distance.get_average(ts=cluster_ts, **averaging_kwargs)
Expand Down
11 changes: 10 additions & 1 deletion etna/clustering/hierarchical/dtw_clustering.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from typing import TYPE_CHECKING

from etna.clustering.distances.dtw_distance import DTWDistance
from etna.clustering.hierarchical.base import HierarchicalClustering

if TYPE_CHECKING:
from etna.datasets import TSDataset


class DTWClustering(HierarchicalClustering):
"""Hierarchical clustering with DTW distance.
Expand Down Expand Up @@ -29,6 +34,10 @@ class DTWClustering(HierarchicalClustering):
'segment_9': 2}
"""

def __init__(self):
"""Create instance of DTWClustering."""
super().__init__(distance=DTWDistance())

def build_distance_matrix(self, ts: "TSDataset"):
"""
Build distance matrix with DTW distance.
Expand All @@ -38,7 +47,7 @@ def build_distance_matrix(self, ts: "TSDataset"):
ts:
TSDataset with series to build distance matrix
"""
super().build_distance_matrix(ts=ts, distance=DTWDistance())
super().build_distance_matrix(ts=ts)


__all__ = ["DTWClustering"]
11 changes: 10 additions & 1 deletion etna/clustering/hierarchical/euclidean_clustering.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from typing import TYPE_CHECKING

from etna.clustering.distances.euclidean_distance import EuclideanDistance
from etna.clustering.hierarchical.base import HierarchicalClustering

if TYPE_CHECKING:
from etna.datasets import TSDataset


class EuclideanClustering(HierarchicalClustering):
"""Hierarchical clustering with euclidean distance.
Expand Down Expand Up @@ -29,6 +34,10 @@ class EuclideanClustering(HierarchicalClustering):
'segment_9': 2}
"""

def __init__(self):
"""Create instance of EuclideanClustering."""
super().__init__(distance=EuclideanDistance())

def build_distance_matrix(self, ts: "TSDataset"):
"""
Build distance matrix with euclidean distance.
Expand All @@ -38,7 +47,7 @@ def build_distance_matrix(self, ts: "TSDataset"):
ts:
TSDataset with series to build distance matrix
"""
super().build_distance_matrix(ts=ts, distance=EuclideanDistance())
super().build_distance_matrix(ts=ts)


__all__ = ["EuclideanClustering"]
8 changes: 4 additions & 4 deletions etna/commands/backtest_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def backtest(
============= =========== =============== ===============
"""
with open(config_path, "r") as f:
pipeline = yaml.safe_load(f)
pipeline_dict = yaml.safe_load(f)

with open(backtest_config_path, "r") as f:
backtest_configs = yaml.safe_load(f)
Expand All @@ -69,10 +69,10 @@ def backtest(

tsdataset = TSDataset(df=df_timeseries, freq=freq, df_exog=df_exog)

pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline)
backtest_configs: Dict[str, Any] = hydra_slayer.get_from_params(**backtest_configs)
pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline_dict)
backtest_configs_hydra_slayer: Dict[str, Any] = hydra_slayer.get_from_params(**backtest_configs)

metrics, forecast, info = pipeline.backtest(ts=tsdataset, **backtest_configs)
metrics, forecast, info = pipeline.backtest(ts=tsdataset, **backtest_configs_hydra_slayer)

(metrics.to_csv(output_path / "metrics.csv", index=False))
(forecast.to_csv(output_path / "forecast.csv", index=False))
Expand Down
4 changes: 2 additions & 2 deletions etna/commands/forecast_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def forecast(
============= =========== =============== ===============
"""
with open(config_path, "r") as f:
pipeline = yaml.safe_load(f)
pipeline_dict = yaml.safe_load(f)

df_timeseries = pd.read_csv(target_path, parse_dates=["timestamp"])

Expand All @@ -64,7 +64,7 @@ def forecast(

tsdataset = TSDataset(df=df_timeseries, freq=freq, df_exog=df_exog)

pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline)
pipeline: Pipeline = hydra_slayer.get_from_params(**pipeline_dict)
pipeline.fit(tsdataset)
forecast = pipeline.forecast()

Expand Down
27 changes: 16 additions & 11 deletions etna/ensembles/stacking_ensemble.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
from copy import deepcopy
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
Expand Down Expand Up @@ -53,12 +54,14 @@ class StackingEnsemble(Pipeline):
2021-09-15 0.36 1.56 0.30
"""

support_confidence_interval = False

def __init__(
self,
pipelines: List[Pipeline],
final_model: RegressorMixin = LinearRegression(),
cv: int = 3,
features_to_use: Union[None, Literal[all], List[str]] = None,
features_to_use: Union[None, Literal["all"], List[str]] = None,
n_jobs: int = 1,
):
"""Init StackingEnsemble.
Expand Down Expand Up @@ -114,15 +117,15 @@ def _filter_features_to_use(self, forecasts: List[TSDataset]) -> Union[None, Set
elif features_to_use == "all":
return available_features - {"target"}
elif isinstance(features_to_use, list):
features_to_use = set(features_to_use)
if len(features_to_use) == 0:
features_to_use_unique = set(features_to_use)
if len(features_to_use_unique) == 0:
return None
elif features_to_use.issubset(available_features):
return features_to_use
elif features_to_use_unique.issubset(available_features):
return features_to_use_unique
else:
unavailable_features = features_to_use - available_features
unavailable_features = features_to_use_unique - available_features
warnings.warn(f"Features {unavailable_features} are not found and will be dropped!")
return features_to_use.intersection(available_features)
return features_to_use_unique.intersection(available_features)
else:
warnings.warn(
"Feature list is passed in the wrong format."
Expand Down Expand Up @@ -177,7 +180,7 @@ def fit(self, ts: TSDataset) -> "StackingEnsemble":

def _make_features(
self, forecasts: List[TSDataset], train: bool = False
) -> Union[Tuple[pd.DataFrame, pd.Series], pd.Series]:
) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
"""Prepare features for the `final_model`."""
# Stack targets from the forecasts
targets = [
Expand Down Expand Up @@ -211,7 +214,7 @@ def _make_features(
)
return x, y
else:
return x
return x, None

@staticmethod
def _forecast_pipeline(pipeline: Pipeline) -> TSDataset:
Expand All @@ -221,19 +224,21 @@ def _forecast_pipeline(pipeline: Pipeline) -> TSDataset:
tslogger.log(msg=f"Forecast is done with {pipeline}.")
return forecast

def forecast(self) -> TSDataset:
def forecast(self, confidence_interval: bool = False) -> TSDataset:
"""Forecast with ensemble: compute the combination of pipelines' forecasts using `final_model`.
Returns
-------
TSDataset:
Dataset with forecasts.
"""
self.check_support_confidence_interval(confidence_interval)

# Get forecast
forecasts = Parallel(n_jobs=self.n_jobs, backend="multiprocessing", verbose=11)(
delayed(self._forecast_pipeline)(pipeline=pipeline) for pipeline in self.pipelines
)
x = self._make_features(forecasts=forecasts, train=False)
x, _ = self._make_features(forecasts=forecasts, train=False)
y = self.final_model.predict(x).reshape(-1, self.horizon).T

# Format the forecast into TSDataset
Expand Down
Loading

0 comments on commit 4c90ac4

Please sign in to comment.