Skip to content

Commit

Permalink
Speed up multimodel statistics and fix bug in peak computation (#1301)
Browse files Browse the repository at this point in the history
  • Loading branch information
bouweandela authored Sep 6, 2021
1 parent 0f0fef2 commit 478ba30
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 9 deletions.
24 changes: 21 additions & 3 deletions esmvalcore/preprocessor/_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import iris
import numpy as np
from iris.util import equalise_attributes

from esmvalcore.preprocessor import remove_fx_variables

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -221,14 +222,31 @@ def _combine(cubes):
return merged_cube


def _compute_slices(cubes):
"""Create cube slices resulting in a combined cube of about 1 GB."""
gigabyte = 2**30
total_bytes = cubes[0].data.nbytes * len(cubes)
n_slices = int(np.ceil(total_bytes / gigabyte))

n_timesteps = cubes[0].shape[0]
slice_len = int(np.ceil(n_timesteps / n_slices))

for i in range(n_slices):
start = i * slice_len
end = (i + 1) * slice_len
if end > n_timesteps:
end = n_timesteps
yield slice(start, end)


def _compute_eager(cubes: list, *, operator: iris.analysis.Aggregator,
**kwargs):
"""Compute statistics one slice at a time."""
_ = [cube.data for cube in cubes] # make sure the cubes' data are realized

result_slices = []
for i in range(cubes[0].shape[0]):
single_model_slices = [cube[i] for cube in cubes]
for chunk in _compute_slices(cubes):
single_model_slices = [cube[chunk] for cube in cubes]
combined_slice = _combine(single_model_slices)
with warnings.catch_warnings():
warnings.filterwarnings(
Expand All @@ -250,7 +268,7 @@ def _compute_eager(cubes: list, *, operator: iris.analysis.Aggregator,
result_slices.append(collapsed_slice)

try:
result_cube = iris.cube.CubeList(result_slices).merge_cube()
result_cube = iris.cube.CubeList(result_slices).concatenate_cube()
except Exception as excinfo:
raise ValueError(
"Multi-model statistics failed to concatenate results into a"
Expand Down
30 changes: 24 additions & 6 deletions tests/unit/preprocessor/_multimodel/test_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from iris.cube import Cube

import esmvalcore.preprocessor._multimodel as mm
from esmvalcore.preprocessor._ancillary_vars import add_ancillary_variable
from esmvalcore.preprocessor import multi_model_statistics
from esmvalcore.preprocessor._ancillary_vars import add_ancillary_variable

SPAN_OPTIONS = ('overlap', 'full')

Expand Down Expand Up @@ -131,7 +131,7 @@ def get_cubes_for_validation_test(frequency, lazy=False):
('full', 'median', (5, 5, 3)),
('full', 'p50', (5, 5, 3)),
('full', 'p99.5', (8.96, 8.96, 4.98)),
('full', 'peak', ([9], [9], [5])),
('full', 'peak', (9, 9, 5)),
('overlap', 'mean', (5, 5)),
('overlap', 'std_dev', (5.656854249492381, 4)),
('overlap', 'std', (5.656854249492381, 4)),
Expand All @@ -140,13 +140,31 @@ def get_cubes_for_validation_test(frequency, lazy=False):
('overlap', 'median', (5, 5)),
('overlap', 'p50', (5, 5)),
('overlap', 'p99.5', (8.96, 8.96)),
('overlap', 'peak', ([9], [9])),
('overlap', 'peak', (9, 9)),
# test multiple statistics
('overlap', ('min', 'max'), ((1, 1), (9, 9))),
('full', ('min', 'max'), ((1, 1, 1), (9, 9, 5))),
)


@pytest.mark.parametrize(
'length,slices',
[
(1, [slice(0, 1)]),
(25000, [slice(0, 8334),
slice(8334, 16668),
slice(16668, 25000)]),
],
)
def test_compute_slices(length, slices):
"""Test cube `_compute_slices`."""
cubes = [
Cube(da.empty([length, 50, 100], dtype=np.float32)) for _ in range(5)
]
result = list(mm._compute_slices(cubes))
assert result == slices


@pytest.mark.parametrize('frequency', FREQUENCY_OPTIONS)
@pytest.mark.parametrize('span, statistics, expected', VALIDATION_DATA_SUCCESS)
def test_multimodel_statistics(frequency, span, statistics, expected):
Expand Down Expand Up @@ -494,6 +512,7 @@ def test_unify_time_coordinates():

class PreprocessorFile:
"""Mockup to test output of multimodel."""

def __init__(self, cube=None):
if cube:
self.cubes = [cube]
Expand Down Expand Up @@ -552,9 +571,8 @@ def test_ignore_tas_scalar_height_coord():
cube.add_aux_coord(
iris.coords.AuxCoord([height], var_name="height", units="m"))

result = mm.multi_model_statistics([tas_2m, tas_2m.copy(), tas_1p5m],
statistics=['mean'],
span='full')
result = mm.multi_model_statistics(
[tas_2m, tas_2m.copy(), tas_1p5m], statistics=['mean'], span='full')

# iris automatically averages the value of the scalar coordinate.
assert len(result['mean'].coords("height")) == 1
Expand Down

0 comments on commit 478ba30

Please sign in to comment.