Skip to content

Merge hierarchical pipeline #1083

Merged
merged 22 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ebadcb2
`HierarchicalStructure` class (#1044)
brsnw250 Dec 21, 2022
62af639
Add `to_hierarchical_dataset` method to `TSDataset` (#1049)
alex-hse-repository Dec 22, 2022
37ab295
Enhance `TSDataset` to work with hierarchical series (#1048)
alex-hse-repository Dec 22, 2022
0d05503
Create `get_level_dataset` method (#1053)
brsnw250 Dec 26, 2022
363b013
Create `BaseReconciler` (#1054)
alex-hse-repository Dec 28, 2022
6985a21
Create `TopDownReconciliator` (#1055)
brsnw250 Dec 30, 2022
5ca0eeb
Create `BottomUpReconciliator` (#1058)
brsnw250 Jan 3, 2023
5f4c17e
Create `generate_hierarchical_df` (#1060)
brsnw250 Jan 11, 2023
edd801d
Create `HierarchicalPipeline` (#1059)
brsnw250 Jan 11, 2023
455cdbb
Hierarchical pipeline example notebook (#1070)
brsnw250 Jan 24, 2023
2a183f2
Add prediction intervals support to HierarchicalPipeline (#1079)
brsnw250 Jan 26, 2023
70154ff
Merge branch 'master' into hierarchical_pipeline
alex-hse-repository Jan 26, 2023
da06d0a
Update changelog
alex-hse-repository Jan 27, 2023
d7fc425
Add n_periods to generate_hierarchical_df
alex-hse-repository Jan 27, 2023
6d36a11
Add reises block
alex-hse-repository Jan 27, 2023
51b2715
Fix notebook
alex-hse-repository Jan 27, 2023
1c92f77
Fix notebook
alex-hse-repository Jan 27, 2023
c040785
Fix notebook
alex-hse-repository Jan 27, 2023
6c03334
Fix notebook
alex-hse-repository Jan 27, 2023
387a4ed
Add comments about random dataset generation
alex-hse-repository Jan 27, 2023
db7db80
Fix formating
alex-hse-repository Jan 27, 2023
0844bd6
Fix docs
alex-hse-repository Jan 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `RMSE` metric & `rmse` functional metric ([#1051](https://github.com/tinkoff-ai/etna/pull/1051))
- `MaxDeviation` metric & `max_deviation` functional metric ([#1061](https://github.com/tinkoff-ai/etna/pull/1061))
- Add saving/loading for transforms, models, pipelines, ensembles; tutorial for saving/loading ([#1068](https://github.com/tinkoff-ai/etna/pull/1068))
-
-
-
-
-
-
- Add hierarchical time series support([#1083](https://github.com/tinkoff-ai/etna/pull/1083))
-
-
-
-
-
### Changed
-
-
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ We have also prepared a set of tutorials for an easy introduction:
| [Exogenous data](https://github.com/tinkoff-ai/etna/tree/master/examples/exogenous_data.ipynb) | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/tinkoff-ai/etna/master?filepath=examples/exogenous_data.ipynb) |
| [Forecasting strategies](https://github.com/tinkoff-ai/etna/blob/master/examples/forecasting_strategies.ipynb) | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/tinkoff-ai/etna/master?filepath=examples/forecasting_strategies.ipynb) |
| [Classification](https://github.com/tinkoff-ai/etna/blob/master/examples/classification.ipynb) | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/tinkoff-ai/etna/master?filepath=examples/classification.ipynb) |
| [Hierarchical time series](https://github.com/tinkoff-ai/etna/blob/master/examples/hierarchical_pipeline.ipynb) | [![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/tinkoff-ai/etna/master?filepath=examples/hierarchical_pipeline.ipynb) |

## Documentation

Expand Down
1 change: 1 addition & 0 deletions docs/source/tutorials.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ Tutorials
tutorials/ensembles
tutorials/NN_examples
tutorials/classification
tutorials/hierarchical_pipeline
2 changes: 2 additions & 0 deletions etna/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from etna.datasets.datasets_generation import generate_ar_df
from etna.datasets.datasets_generation import generate_const_df
from etna.datasets.datasets_generation import generate_from_patterns_df
from etna.datasets.datasets_generation import generate_hierarchical_df
from etna.datasets.datasets_generation import generate_periodic_df
from etna.datasets.hierarchical_structure import HierarchicalStructure
from etna.datasets.tsdataset import TSDataset
from etna.datasets.utils import duplicate_data
from etna.datasets.utils import set_columns_wide
85 changes: 85 additions & 0 deletions etna/datasets/datasets_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,88 @@ def generate_from_patterns_df(
df["timestamp"] = pd.date_range(start=start_time, freq=freq, periods=periods)
df = df.melt(id_vars=["timestamp"], value_name="target", var_name="segment")
return df


def generate_hierarchical_df(
periods: int,
n_segments: List[int],
freq: str = "D",
start_time: str = "2000-01-01",
ar_coef: Optional[list] = None,
sigma: float = 1,
random_seed: int = 1,
) -> pd.DataFrame:
"""Create DataFrame with hierarchical structure and AR process data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From method name and docstring I can't understand how many levels are generated during this method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add n_levels parameter to make it more explicit


Parameters
----------
periods:
number of timestamps
n_segments:
number of segments on each level.
freq:
pandas frequency string for :py:func:`pandas.date_range` that is used to generate timestamp
start_time:
start timestamp
ar_coef:
AR coefficients
sigma:
scale of AR noise
random_seed:
random seed

Returns
Mr-Geekman marked this conversation as resolved.
Show resolved Hide resolved
-------
:
DataFrame at the bottom level of the hierarchy
"""
if len(n_segments) == 0:
raise ValueError("`n_segments` should contain at least one positive integer!")

if (np.less_equal(n_segments, 0)).any():
raise ValueError("All `n_segments` elements should be positive!")

if (np.diff(n_segments) < 0).any():
raise ValueError("`n_segments` should represent non-decreasing sequence!")

rnd = RandomState(seed=random_seed)

bottom_df = generate_ar_df(
periods=periods,
start_time=start_time,
ar_coef=ar_coef,
sigma=sigma,
n_segments=n_segments[-1],
freq=freq,
random_seed=random_seed,
)

bottom_segments = np.unique(bottom_df["segment"])

num_levels = len(n_segments)
child_to_parent = dict()
for level_id in range(1, num_levels):
prev_level_n_segments = n_segments[level_id - 1]
cur_level_n_segments = n_segments[level_id]

# ensure all parents have at least one child
seen_ids = set()
child_ids = rnd.choice(cur_level_n_segments, prev_level_n_segments, replace=False)
for parent_id, child_id in enumerate(child_ids):
seen_ids.add(child_id)
child_to_parent[f"l{level_id}s{child_id}"] = f"l{level_id - 1}s{parent_id}"

for child_id in range(cur_level_n_segments):
if child_id not in seen_ids:
parent_id = rnd.choice(prev_level_n_segments, 1).item()
child_to_parent[f"l{level_id}s{child_id}"] = f"l{level_id - 1}s{parent_id}"

bottom_segments_map = {segment: f"l{num_levels - 1}s{idx}" for idx, segment in enumerate(bottom_segments)}
bottom_df[f"level_{num_levels - 1}"] = bottom_df["segment"].map(lambda x: bottom_segments_map[x])

for level_id in range(num_levels - 2, -1, -1):
bottom_df[f"level_{level_id}"] = bottom_df[f"level_{level_id + 1}"].map(lambda x: child_to_parent[x])

bottom_df.drop(columns=["segment"], inplace=True)

return bottom_df
192 changes: 192 additions & 0 deletions etna/datasets/hierarchical_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
from collections import defaultdict
from itertools import chain
from queue import Queue
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from scipy.sparse import csr_matrix
from scipy.sparse import lil_matrix

from etna.core import BaseMixin


class HierarchicalStructure(BaseMixin):
"""Represents hierarchical structure of TSDataset."""

def __init__(self, level_structure: Dict[str, List[str]], level_names: Optional[List[str]] = None):
"""Init HierarchicalStructure.

Parameters
----------
level_structure:
Adjacency list describing the structure of the hierarchy tree (i.e. {"total":["X", "Y"], "X":["a", "b"], "Y":["c", "d"]}).
level_names:
Names of levels in the hierarchy in the order from top to bottom (i.e. ["total", "category", "product"]).
If None is passed, level names are generated automatically with structure "level_<level_index>".
"""
self.level_structure = level_structure
self._hierarchy_root = self._find_tree_root(self.level_structure)
self._num_nodes = self._find_num_nodes(self.level_structure)

hierarchy_levels = self._find_hierarchy_levels()
tree_depth = len(hierarchy_levels)

self.level_names = self._get_level_names(level_names, tree_depth)
self._level_series: Dict[str, Tuple[str, ...]] = {
self.level_names[i]: tuple(hierarchy_levels[i]) for i in range(tree_depth)
}
self._level_to_index: Dict[str, int] = {self.level_names[i]: i for i in range(tree_depth)}

self._segment_num_reachable_leafs: Dict[str, int] = self._get_num_reachable_leafs(hierarchy_levels)

self._segment_to_level: Dict[str, str] = {
segment: level for level in self._level_series for segment in self._level_series[level]
}

@staticmethod
def _get_level_names(level_names: Optional[List[str]], tree_depth: int) -> List[str]:
"""Assign level names if not provided."""
if level_names is None:
level_names = [f"level_{i}" for i in range(tree_depth)]

if len(level_names) != tree_depth:
raise ValueError("Length of `level_names` must be equal to hierarchy tree depth!")

return level_names

@staticmethod
def _find_tree_root(hierarchy_structure: Dict[str, List[str]]) -> str:
"""Find hierarchy top level (root of tree)."""
children = set(chain(*hierarchy_structure.values()))
parents = set(hierarchy_structure.keys())

tree_roots = parents.difference(children)
if len(tree_roots) != 1:
raise ValueError("Invalid tree definition: unable to find root!")

return tree_roots.pop()

@staticmethod
def _find_num_nodes(hierarchy_structure: Dict[str, List[str]]) -> int:
"""Count number of nodes in tree."""
children = set(chain(*hierarchy_structure.values()))
parents = set(hierarchy_structure.keys())

num_nodes = len(children | parents)

num_edges = sum(map(len, hierarchy_structure.values()))
if num_edges != num_nodes - 1:
raise ValueError("Invalid tree definition: invalid number of nodes and edges!")

return num_nodes

def _find_hierarchy_levels(self) -> Dict[int, List[str]]:
"""Traverse hierarchy tree to group segments into levels."""
leaves_levels = set()
levels = defaultdict(list)
seen_nodes = {self._hierarchy_root}
queue: Queue = Queue()
queue.put((self._hierarchy_root, 0))
while not queue.empty():
node, level = queue.get()
levels[level].append(node)
child_nodes = self.level_structure.get(node, [])

if len(child_nodes) == 0:
leaves_levels.add(level)

for adj_node in child_nodes:
queue.put((adj_node, level + 1))
seen_nodes.add(adj_node)

if len(seen_nodes) != self._num_nodes:
raise ValueError("Invalid tree definition: disconnected graph!")

if len(leaves_levels) != 1:
raise ValueError("All hierarchy tree leaves must be on the same level!")

return levels

def _get_num_reachable_leafs(self, hierarchy_levels: Dict[int, List[str]]) -> Dict[str, int]:
"""Compute subtree size for each node."""
num_reachable_leafs: Dict[str, int] = dict()
for level in sorted(hierarchy_levels.keys(), reverse=True):
for node in hierarchy_levels[level]:
if node in self.level_structure:
num_reachable_leafs[node] = sum(
num_reachable_leafs[child_node] for child_node in self.level_structure[node]
)

else:
num_reachable_leafs[node] = 1

return num_reachable_leafs

def get_summing_matrix(self, target_level: str, source_level: str) -> csr_matrix:
"""Get summing matrix for transition from source level to target level.

Generation algorithm is based on summing matrix structure. Number of 1 in such matrices equals to
number of nodes on the source level. Each row of summing matrices has ones only for source level nodes that
belongs to subtree rooted from corresponding target level node. BFS order of nodes on levels view simplifies
algorithm to calculation necessary offsets for each row.

Parameters
----------
target_level:
Name of target level.
source_level:
Name of source level.

Returns
-------
:
Summing matrix from source level to target level

"""
try:
target_idx = self._level_to_index[target_level]
source_idx = self._level_to_index[source_level]
except KeyError as e:
raise ValueError(f"Invalid level name: {e.args[0]}")

if target_idx > source_idx:
raise ValueError("Target level must be higher or equal in hierarchy than source level!")

target_level_segment = self.get_level_segments(target_level)
source_level_segment = self.get_level_segments(source_level)
summing_matrix = lil_matrix((len(target_level_segment), len(source_level_segment)), dtype="int32")

current_source_segment_id = 0
for current_target_segment_id, segment in enumerate(target_level_segment):
num_reachable_leafs_left = self._segment_num_reachable_leafs[segment]

while num_reachable_leafs_left > 0:
source_segment = source_level_segment[current_source_segment_id]
num_reachable_leafs_left -= self._segment_num_reachable_leafs[source_segment]
summing_matrix[current_target_segment_id, current_source_segment_id] = 1
current_source_segment_id += 1

return summing_matrix.tocsr()

def get_level_segments(self, level_name: str) -> List[str]:
"""Get all segments from particular level."""
try:
return list(self._level_series[level_name])
except KeyError:
raise ValueError(f"Invalid level name: {level_name}")

def get_segment_level(self, segment: str) -> str:
"""Get level name for provided segment."""
try:
return self._segment_to_level[segment]
except KeyError:
raise ValueError(f"Segment {segment} is out of the hierarchy")

def get_level_depth(self, level_name: str) -> int:
"""Get level depth in a hierarchy tree."""
try:
return self._level_to_index[level_name]
except KeyError:
raise ValueError(f"Invalid level name: {level_name}")
Loading