diff --git a/asv_bench/benchmarks/__init__.py b/asv_bench/benchmarks/__init__.py index f9bbc751284..997fdfd0db0 100644 --- a/asv_bench/benchmarks/__init__.py +++ b/asv_bench/benchmarks/__init__.py @@ -8,6 +8,14 @@ _counter = itertools.count() +def parameterized(names, params): + def decorator(func): + func.param_names = names + func.params = params + return func + return decorator + + def requires_dask(): try: import dask # noqa diff --git a/asv_bench/benchmarks/rolling.py b/asv_bench/benchmarks/rolling.py new file mode 100644 index 00000000000..79d06019c00 --- /dev/null +++ b/asv_bench/benchmarks/rolling.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np +import pandas as pd +import xarray as xr + +from . import parameterized, randn, requires_dask + +nx = 3000 +ny = 2000 +nt = 1000 +window = 20 + + +class Rolling(object): + def setup(self, *args, **kwargs): + self.ds = xr.Dataset( + {'var1': (('x', 'y'), randn((nx, ny), frac_nan=0.1)), + 'var2': (('x', 't'), randn((nx, nt))), + 'var3': (('t', ), randn(nt))}, + coords={'x': np.arange(nx), + 'y': np.linspace(0, 1, ny), + 't': pd.date_range('1970-01-01', periods=nt, freq='D'), + 'x_coords': ('x', np.linspace(1.1, 2.1, nx))}) + + @parameterized(['func', 'center'], + (['mean', 'count'], [True, False])) + def time_rolling(self, func, center): + getattr(self.ds.rolling(x=window, center=center), func)() + + @parameterized(['window_', 'min_periods'], + ([20, 40], [5, None])) + def time_rolling_np(self, window_, min_periods): + self.ds.rolling(x=window_, center=False, + min_periods=min_periods).reduce(getattr(np, 'nanmean')) + + @parameterized(['center', 'stride'], + ([True, False], [1, 200])) + def time_rolling_construct(self, center, stride): + self.ds.rolling(x=window, center=center).construct( + 'window_dim', stride=stride).mean(dim='window_dim') + + +class RollingDask(Rolling): + def setup(self, *args, **kwargs): + requires_dask() + super(RollingDask, self).setup(**kwargs) + self.ds = self.ds.chunk({'x': 100, 'y': 50, 't': 50}) diff --git a/doc/api.rst b/doc/api.rst index 10386fe3a9b..4a26298b268 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -467,6 +467,32 @@ DataArray methods DataArray.load DataArray.chunk +Rolling objects +=============== + +.. autosummary:: + :toctree: generated/ + + core.rolling.DataArrayRolling + core.rolling.DataArrayRolling.construct + core.rolling.DataArrayRolling.reduce + core.rolling.DatasetRolling + core.rolling.DatasetRolling.construct + core.rolling.DatasetRolling.reduce + +GroupByObjects +============== + +.. autosummary:: + :toctree: generated/ + + core.groupby.DataArrayGroupBy + core.groupby.DataArrayGroupBy.apply + core.groupby.DataArrayGroupBy.reduce + core.groupby.DatasetGroupBy + core.groupby.DatasetGroupBy.apply + core.groupby.DatasetGroupBy.reduce + Plotting ======== diff --git a/doc/computation.rst b/doc/computation.rst index 420b97923d7..78c645ff8c3 100644 --- a/doc/computation.rst +++ b/doc/computation.rst @@ -158,13 +158,11 @@ Aggregation and summary methods can be applied directly to the ``Rolling`` objec r.mean() r.reduce(np.std) -Note that rolling window aggregations are much faster (both asymptotically and -because they avoid a loop in Python) when bottleneck_ is installed. Otherwise, -we fall back to a slower, pure Python implementation. +Note that rolling window aggregations are faster when bottleneck_ is installed. .. _bottleneck: https://github.com/kwgoodman/bottleneck/ -Finally, we can manually iterate through ``Rolling`` objects: +We can also manually iterate through ``Rolling`` objects: .. ipython:: python @@ -172,6 +170,30 @@ Finally, we can manually iterate through ``Rolling`` objects: for label, arr_window in r: # arr_window is a view of x +Finally, the rolling object has ``construct`` method, which gives a +view of the original ``DataArray`` with the windowed dimension attached to +the last position. +You can use this for more advanced rolling operations, such as strided rolling, +windowed rolling, convolution, short-time FFT, etc. + +.. ipython:: python + + # rolling with 2-point stride + rolling_da = r.construct('window_dim', stride=2) + rolling_da + rolling_da.mean('window_dim', skipna=False) + +Because the ``DataArray`` given by ``r.construct('window_dim')`` is a view +of the original array, it is memory efficient. + +.. note:: + numpy's Nan-aggregation functions such as ``nansum`` copy the original array. + In xarray, we internally use these functions in our aggregation methods + (such as ``.sum()``) if ``skipna`` argument is not specified or set to True. + This means ``rolling_da.mean('window_dim')`` is memory inefficient. + To avoid this, use ``skipna=False`` as the above example. + + .. _compute.broadcasting: Broadcasting by dimension name diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 8b200103303..ab667ceba3f 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -38,6 +38,15 @@ Documentation Enhancements ~~~~~~~~~~~~ +- Improve :py:func:`~xarray.DataArray.rolling` logic. + :py:func:`~xarray.DataArrayRolling` object now supports + :py:func:`~xarray.DataArrayRolling.construct` method that returns a view + of the DataArray / Dataset object with the rolling-window dimension added + to the last axis. This enables more flexible operation, such as strided + rolling, windowed rolling, ND-rolling, short-time FFT and convolution. + (:issue:`1831`, :issue:`1142`, :issue:`819`) + By `Keisuke Fujii `_. + Bug fixes ~~~~~~~~~ @@ -64,7 +73,6 @@ Documentation Enhancements ~~~~~~~~~~~~ - **New functions and methods**: - Added :py:meth:`DataArray.to_iris` and @@ -140,6 +148,10 @@ Enhancements Bug fixes ~~~~~~~~~ +- Rolling aggregation with ``center=True`` option now gives the same result + with pandas including the last element (:issue:`1046`). + By `Keisuke Fujii `_. + - Support indexing with a 0d-np.ndarray (:issue:`1921`). By `Keisuke Fujii `_. - Added warning in api.py of a netCDF4 bug that occurs when diff --git a/xarray/core/common.py b/xarray/core/common.py index d521e7ae5c2..85ac0bf9364 100644 --- a/xarray/core/common.py +++ b/xarray/core/common.py @@ -424,6 +424,11 @@ def groupby(self, group, squeeze=True): grouped : GroupBy A `GroupBy` object patterned after `pandas.GroupBy` that can be iterated over in the form of `(unique_value, grouped_array)` pairs. + + See Also + -------- + core.groupby.DataArrayGroupBy + core.groupby.DatasetGroupBy """ return self._groupby_cls(self, group, squeeze=squeeze) @@ -483,9 +488,6 @@ def rolling(self, min_periods=None, center=False, **windows): """ Rolling window object. - Rolling window aggregations are much faster when bottleneck is - installed. - Parameters ---------- min_periods : int, default None @@ -503,7 +505,8 @@ def rolling(self, min_periods=None, center=False, **windows): Returns ------- - rolling : type of input argument + Rolling object (core.rolling.DataArrayRolling for DataArray, + core.rolling.DatasetRolling for Dataset.) Examples -------- @@ -531,6 +534,11 @@ def rolling(self, min_periods=None, center=False, **windows): array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]) Coordinates: * time (time) datetime64[ns] 2000-02-15 2000-03-15 2000-04-15 ... + + See Also + -------- + core.rolling.DataArrayRolling + core.rolling.DatasetRolling """ return self._rolling_cls(self, min_periods=min_periods, diff --git a/xarray/core/dask_array_ops.py b/xarray/core/dask_array_ops.py index 3aefd114517..5524efb4803 100644 --- a/xarray/core/dask_array_ops.py +++ b/xarray/core/dask_array_ops.py @@ -1,6 +1,9 @@ -"""Define core operations for xarray objects. -""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + import numpy as np +from . import nputils try: import dask.array as da @@ -24,3 +27,70 @@ def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1): # trim array result = da.ghost.trim_internal(out, depth) return result + + +def rolling_window(a, axis, window, center, fill_value): + """ Dask's equivalence to np.utils.rolling_window """ + orig_shape = a.shape + # inputs for ghost + if axis < 0: + axis = a.ndim + axis + depth = {d: 0 for d in range(a.ndim)} + depth[axis] = int(window / 2) + # For evenly sized window, we need to crop the first point of each block. + offset = 1 if window % 2 == 0 else 0 + + if depth[axis] > min(a.chunks[axis]): + raise ValueError( + "For window size %d, every chunk should be larger than %d, " + "but the smallest chunk size is %d. Rechunk your array\n" + "with a larger chunk size or a chunk size that\n" + "more evenly divides the shape of your array." % + (window, depth[axis], min(a.chunks[axis]))) + + # Although dask.ghost pads values to boundaries of the array, + # the size of the generated array is smaller than what we want + # if center == False. + if center: + start = int(window / 2) # 10 -> 5, 9 -> 4 + end = window - 1 - start + else: + start, end = window - 1, 0 + pad_size = max(start, end) + offset - depth[axis] + drop_size = 0 + # pad_size becomes more than 0 when the ghosted array is smaller than + # needed. In this case, we need to enlarge the original array by padding + # before ghosting. + if pad_size > 0: + if pad_size < depth[axis]: + # Ghosting requires each chunk larger than depth. If pad_size is + # smaller than the depth, we enlarge this and truncate it later. + drop_size = depth[axis] - pad_size + pad_size = depth[axis] + shape = list(a.shape) + shape[axis] = pad_size + chunks = list(a.chunks) + chunks[axis] = (pad_size, ) + fill_array = da.full(shape, fill_value, dtype=a.dtype, chunks=chunks) + a = da.concatenate([fill_array, a], axis=axis) + + boundary = {d: fill_value for d in range(a.ndim)} + + # create ghosted arrays + ag = da.ghost.ghost(a, depth=depth, boundary=boundary) + + # apply rolling func + def func(x, window, axis=-1): + x = np.asarray(x) + rolling = nputils._rolling_window(x, window, axis) + return rolling[(slice(None), ) * axis + (slice(offset, None), )] + + chunks = list(a.chunks) + chunks.append(window) + out = ag.map_blocks(func, dtype=a.dtype, new_axis=a.ndim, chunks=chunks, + window=window, axis=axis) + + # crop boundary. + index = (slice(None),) * axis + (slice(drop_size, + drop_size + orig_shape[axis]), ) + return out[index] diff --git a/xarray/core/duck_array_ops.py b/xarray/core/duck_array_ops.py index 1a1bcf36c56..3a5c4a124d1 100644 --- a/xarray/core/duck_array_ops.py +++ b/xarray/core/duck_array_ops.py @@ -13,7 +13,7 @@ import numpy as np import pandas as pd -from . import dtypes, npcompat +from . import dask_array_ops, dtypes, npcompat, nputils from .nputils import nanfirst, nanlast from .pycompat import dask_array_type @@ -278,6 +278,10 @@ def f(values, axis=None, skipna=None, **kwargs): dtype = kwargs.get('dtype', None) values = asarray(values) + # dask requires dtype argument for object dtype + if (values.dtype == 'object' and name in ['sum', ]): + kwargs['dtype'] = values.dtype if dtype is None else dtype + if coerce_strings and values.dtype.kind in 'SU': values = values.astype(object) @@ -369,3 +373,16 @@ def last(values, axis, skipna=None): _fail_on_dask_array_input_skipna(values) return nanlast(values, axis) return take(values, -1, axis=axis) + + +def rolling_window(array, axis, window, center, fill_value): + """ + Make an ndarray with a rolling window of axis-th dimension. + The rolling dimension will be placed at the last dimension. + """ + if isinstance(array, dask_array_type): + return dask_array_ops.rolling_window( + array, axis, window, center, fill_value) + else: # np.ndarray + return nputils.rolling_window( + array, axis, window, center, fill_value) diff --git a/xarray/core/missing.py b/xarray/core/missing.py index e58d74f4c0d..0da6750f5bc 100644 --- a/xarray/core/missing.py +++ b/xarray/core/missing.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd +from . import rolling from .computation import apply_ufunc from .npcompat import flip from .pycompat import iteritems @@ -326,4 +327,8 @@ def _get_valid_fill_mask(arr, dim, limit): '''helper function to determine values that can be filled when limit is not None''' kw = {dim: limit + 1} - return arr.isnull().rolling(min_periods=1, **kw).sum() <= limit + # we explicitly use construct method to avoid copy. + new_dim = rolling._get_new_dimname(arr.dims, '_window') + return (arr.isnull().rolling(min_periods=1, **kw) + .construct(new_dim, fill_value=False) + .sum(new_dim, skipna=False)) <= limit diff --git a/xarray/core/npcompat.py b/xarray/core/npcompat.py index df1e955518c..8f1f3821f96 100644 --- a/xarray/core/npcompat.py +++ b/xarray/core/npcompat.py @@ -1,6 +1,17 @@ from __future__ import absolute_import, division, print_function import numpy as np +from distutils.version import LooseVersion + + +if LooseVersion(np.__version__) >= LooseVersion('1.12'): + as_strided = np.lib.stride_tricks.as_strided +else: + def as_strided(x, shape=None, strides=None, subok=False, writeable=True): + array = np.lib.stride_tricks.as_strided(x, shape, strides, subok) + array.setflags(write=writeable) + return array + try: from numpy import nancumsum, nancumprod, flip diff --git a/xarray/core/nputils.py b/xarray/core/nputils.py index c781ca65a69..4ca1f9390eb 100644 --- a/xarray/core/nputils.py +++ b/xarray/core/nputils.py @@ -5,6 +5,8 @@ import numpy as np import pandas as pd +from . import npcompat + def _validate_axis(data, axis): ndim = data.ndim @@ -133,3 +135,65 @@ def __setitem__(self, key, value): mixed_positions, vindex_positions = _advanced_indexer_subspaces(key) self._array[key] = np.moveaxis(value, vindex_positions, mixed_positions) + + +def rolling_window(a, axis, window, center, fill_value): + """ rolling window with padding. """ + pads = [(0, 0) for s in a.shape] + if center: + start = int(window / 2) # 10 -> 5, 9 -> 4 + end = window - 1 - start + pads[axis] = (start, end) + else: + pads[axis] = (window - 1, 0) + a = np.pad(a, pads, mode='constant', constant_values=fill_value) + return _rolling_window(a, window, axis) + + +def _rolling_window(a, window, axis=-1): + """ + Make an ndarray with a rolling window along axis. + + Parameters + ---------- + a : array_like + Array to add rolling window to + axis: int + axis position along which rolling window will be applied. + window : int + Size of rolling window + + Returns + ------- + Array that is a view of the original array with a added dimension + of size w. + + Examples + -------- + >>> x=np.arange(10).reshape((2,5)) + >>> np.rolling_window(x, 3, axis=-1) + array([[[0, 1, 2], [1, 2, 3], [2, 3, 4]], + [[5, 6, 7], [6, 7, 8], [7, 8, 9]]]) + + Calculate rolling mean of last dimension: + >>> np.mean(np.rolling_window(x, 3, axis=-1), -1) + array([[ 1., 2., 3.], + [ 6., 7., 8.]]) + + This function is taken from https://github.com/numpy/numpy/pull/31 + but slightly modified to accept axis option. + """ + axis = _validate_axis(a, axis) + a = np.swapaxes(a, axis, -1) + + if window < 1: + raise ValueError( + "`window` must be at least 1. Given : {}".format(window)) + if window > a.shape[-1]: + raise ValueError("`window` is too long. Given : {}".format(window)) + + shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) + strides = a.strides + (a.strides[-1],) + rolling = npcompat.as_strided(a, shape=shape, strides=strides, + writeable=False) + return np.swapaxes(rolling, -2, axis) diff --git a/xarray/core/ops.py b/xarray/core/ops.py index 32b31010b5f..d9e8ceb65d5 100644 --- a/xarray/core/ops.py +++ b/xarray/core/ops.py @@ -223,20 +223,8 @@ def func(self, *args, **kwargs): def rolling_count(rolling): - not_null = rolling.obj.notnull() - instance_attr_dict = {'center': rolling.center, - 'min_periods': rolling.min_periods, - rolling.dim: rolling.window} - rolling_count = not_null.rolling(**instance_attr_dict).sum() - - if rolling.min_periods is None: - return rolling_count - - # otherwise we need to filter out points where there aren't enough periods - # but not_null is False, and so the NaNs don't flow through - # array with points where there are enough values given min_periods - enough_periods = rolling_count >= rolling.min_periods - + rolling_count = rolling._counts() + enough_periods = rolling_count >= rolling._min_periods return rolling_count.where(enough_periods) diff --git a/xarray/core/rolling.py b/xarray/core/rolling.py index 4bb020cebeb..845dcae5473 100644 --- a/xarray/core/rolling.py +++ b/xarray/core/rolling.py @@ -5,8 +5,7 @@ import numpy as np -from .combine import concat -from .common import full_like +from . import dtypes from .dask_array_ops import dask_rolling_wrapper from .ops import ( bn, has_bottleneck, inject_bottleneck_rolling_methods, @@ -14,6 +13,24 @@ from .pycompat import OrderedDict, dask_array_type, zip +def _get_new_dimname(dims, new_dim): + """ Get an new dimension name based on new_dim, that is not used in dims. + If the same name exists, we add an underscore(s) in the head. + + Example1: + dims: ['a', 'b', 'c'] + new_dim: ['_rolling'] + -> ['_rolling'] + Example2: + dims: ['a', 'b', 'c', '_rolling'] + new_dim: ['_rolling'] + -> ['__rolling'] + """ + while new_dim in dims: + new_dim = '_' + new_dim + return new_dim + + class Rolling(object): """A object that implements the moving window pattern. @@ -98,59 +115,53 @@ def __len__(self): class DataArrayRolling(Rolling): - """ - This class adds the following class methods; - + _reduce_method(cls, func) - + _bottleneck_reduce(cls, func) - - These class methods will be used to inject numpy or bottleneck function - by doing - - >>> func = cls._reduce_method(f) - >>> func.__name__ = name - >>> setattr(cls, name, func) - - in ops.inject_bottleneck_rolling_methods. - - After the injection, the Rolling object will have `name` (such as `mean` or - `median`) methods, - e.g. it enables the following call, - >>> data.rolling().mean() + def __init__(self, obj, min_periods=None, center=False, **windows): + """ + Moving window object for DataArray. + You should use DataArray.rolling() method to construct this object + instead of the class constructor. - If bottleneck is installed, some bottleneck methods will be used instdad of - the numpy method. + Parameters + ---------- + obj : DataArray + Object to window. + min_periods : int, default None + Minimum number of observations in window required to have a value + (otherwise result is NA). The default, None, is equivalent to + setting min_periods equal to the size of the window. + center : boolean, default False + Set the labels at the center of the window. + **windows : dim=window + dim : str + Name of the dimension to create the rolling iterator + along (e.g., `time`). + window : int + Size of the moving window. - see also - + rolling.DataArrayRolling - + ops.inject_bottleneck_rolling_methods - """ + Returns + ------- + rolling : type of input argument - def __init__(self, obj, min_periods=None, center=False, **windows): + See Also + -------- + DataArray.rolling + DataArray.groupby + Dataset.rolling + Dataset.groupby + """ super(DataArrayRolling, self).__init__(obj, min_periods=min_periods, center=center, **windows) - self._windows = None - self._valid_windows = None self.window_indices = None self.window_labels = None self._setup_windows() - @property - def windows(self): - if self._windows is None: - self._windows = OrderedDict(zip(self.window_labels, - self.window_indices)) - return self._windows - def __iter__(self): - for (label, indices, valid) in zip(self.window_labels, - self.window_indices, - self._valid_windows): - + for (label, indices) in zip(self.window_labels, self.window_indices): window = self.obj.isel(**{self.dim: indices}) - if not valid: - window = full_like(window, fill_value=True, dtype=bool) + counts = window.count(dim=self.dim) + window = window.where(counts >= self._min_periods) yield (label, window) @@ -158,32 +169,65 @@ def _setup_windows(self): """ Find the indices and labels for each window """ - from .dataarray import DataArray - self.window_labels = self.obj[self.dim] - window = int(self.window) - dim_size = self.obj[self.dim].size stops = np.arange(dim_size) + 1 starts = np.maximum(stops - window, 0) - if self._min_periods > 1: - valid_windows = (stops - starts) >= self._min_periods - else: - # No invalid windows - valid_windows = np.ones(dim_size, dtype=bool) - self._valid_windows = DataArray(valid_windows, dims=(self.dim, ), - coords=self.obj[self.dim].coords) - self.window_indices = [slice(start, stop) for start, stop in zip(starts, stops)] - def _center_result(self, result): - """center result""" - shift = (-self.window // 2) + 1 - return result.shift(**{self.dim: shift}) + def construct(self, window_dim, stride=1, fill_value=dtypes.NA): + """ + Convert this rolling object to xr.DataArray, + where the window dimension is stacked as a new dimension + + Parameters + ---------- + window_dim: str + New name of the window dimension. + stride: integer, optional + Size of stride for the rolling window. + fill_value: optional. Default dtypes.NA + Filling value to match the dimension size. + + Returns + ------- + DataArray that is a view of the original array. + + Note + ---- + The return array is not writeable. + + Examples + -------- + >>> da = DataArray(np.arange(8).reshape(2, 4), dims=('a', 'b')) + >>> + >>> rolling = da.rolling(a=3) + >>> rolling.to_datarray('window_dim') + + array([[[np.nan, np.nan, 0], [np.nan, 0, 1], [0, 1, 2], [1, 2, 3]], + [[np.nan, np.nan, 4], [np.nan, 4, 5], [4, 5, 6], [5, 6, 7]]]) + Dimensions without coordinates: a, b, window_dim + >>> + >>> rolling = da.rolling(a=3, center=True) + >>> rolling.to_datarray('window_dim') + + array([[[np.nan, 0, 1], [0, 1, 2], [1, 2, 3], [2, 3, np.nan]], + [[np.nan, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, np.nan]]]) + Dimensions without coordinates: a, b, window_dim + """ + + from .dataarray import DataArray + + window = self.obj.variable.rolling_window(self.dim, self.window, + window_dim, self.center, + fill_value=fill_value) + result = DataArray(window, dims=self.obj.dims + (window_dim,), + coords=self.obj.coords) + return result.isel(**{self.dim: slice(None, None, stride)}) def reduce(self, func, **kwargs): """Reduce the items in this group by applying `func` along some @@ -203,27 +247,27 @@ def reduce(self, func, **kwargs): reduced : DataArray Array with summarized data. """ - - windows = [window.reduce(func, dim=self.dim, **kwargs) - for _, window in self] - - # Find valid windows based on count - if self.dim in self.obj.coords: - concat_dim = self.window_labels - else: - concat_dim = self.dim - counts = concat([window.count(dim=self.dim) for _, window in self], - dim=concat_dim) - result = concat(windows, dim=concat_dim) - # restore dim order - result = result.transpose(*self.obj.dims) - - result = result.where(counts >= self._min_periods) - - if self.center: - result = self._center_result(result) - - return result + rolling_dim = _get_new_dimname(self.obj.dims, '_rolling_dim') + windows = self.construct(rolling_dim) + result = windows.reduce(func, dim=rolling_dim, **kwargs) + + # Find valid windows based on count. + counts = self._counts() + return result.where(counts >= self._min_periods) + + def _counts(self): + """ Number of non-nan entries in each rolling window. """ + + rolling_dim = _get_new_dimname(self.obj.dims, '_rolling_dim') + # We use False as the fill_value instead of np.nan, since boolean + # array is faster to be reduced than object array. + # The use of skipna==False is also faster since it does not need to + # copy the strided array. + counts = (self.obj.notnull() + .rolling(center=self.center, **{self.dim: self.window}) + .construct(rolling_dim, fill_value=False) + .sum(dim=rolling_dim, skipna=False)) + return counts @classmethod def _reduce_method(cls, func): @@ -255,45 +299,41 @@ def wrapped_func(self, **kwargs): axis = self.obj.get_axis_num(self.dim) - if isinstance(self.obj.data, dask_array_type): + padded = self.obj.variable + if self.center: + shift = (-self.window // 2) + 1 + + if (LooseVersion(np.__version__) < LooseVersion('1.13') and + self.obj.dtype.kind == 'b'): + # with numpy < 1.13 bottleneck cannot handle np.nan-Boolean + # mixed array correctly. We cast boolean array to float. + padded = padded.astype(float) + padded = padded.pad_with_fill_value(**{self.dim: (0, -shift)}) + valid = (slice(None), ) * axis + (slice(-shift, None), ) + + if isinstance(padded.data, dask_array_type): values = dask_rolling_wrapper(func, self.obj.data, window=self.window, min_count=min_count, axis=axis) else: - values = func(self.obj.data, window=self.window, + values = func(padded.data, window=self.window, min_count=min_count, axis=axis) - result = DataArray(values, self.obj.coords) - if self.center: - result = self._center_result(result) + values = values[valid] + result = DataArray(values, self.obj.coords) return result return wrapped_func class DatasetRolling(Rolling): - """An object that implements the moving window pattern for Dataset. - - This class has an OrderedDict named self.rollings, that is a collection of - DataArrayRollings for all the DataArrays in the Dataset, except for those - not depending on rolling dimension. - - reduce() method returns a new Dataset generated from a set of - self.rollings[key].reduce(). - - See Also - -------- - Dataset.groupby - DataArray.groupby - Dataset.rolling - DataArray.rolling - """ - def __init__(self, obj, min_periods=None, center=False, **windows): """ Moving window object for Dataset. + You should use Dataset.rolling() method to construct this object + instead of the class constructor. Parameters ---------- @@ -315,6 +355,13 @@ def __init__(self, obj, min_periods=None, center=False, **windows): Returns ------- rolling : type of input argument + + See Also + -------- + Dataset.rolling + DataArray.rolling + Dataset.groupby + DataArray.groupby """ super(DatasetRolling, self).__init__(obj, min_periods, center, **windows) @@ -355,6 +402,16 @@ def reduce(self, func, **kwargs): reduced[key] = self.obj[key] return Dataset(reduced, coords=self.obj.coords) + def _counts(self): + from .dataset import Dataset + reduced = OrderedDict() + for key, da in self.obj.data_vars.items(): + if self.dim in da.dims: + reduced[key] = self.rollings[key]._counts() + else: + reduced[key] = self.obj[key] + return Dataset(reduced, coords=self.obj.coords) + @classmethod def _reduce_method(cls, func): """ @@ -374,6 +431,37 @@ def wrapped_func(self, **kwargs): return Dataset(reduced, coords=self.obj.coords) return wrapped_func + def construct(self, window_dim, stride=1, fill_value=dtypes.NA): + """ + Convert this rolling object to xr.Dataset, + where the window dimension is stacked as a new dimension + + Parameters + ---------- + window_dim: str + New name of the window dimension. + stride: integer, optional + size of stride for the rolling window. + fill_value: optional. Default dtypes.NA + Filling value to match the dimension size. + + Returns + ------- + Dataset with variables converted from rolling object. + """ + + from .dataset import Dataset + + dataset = OrderedDict() + for key, da in self.obj.data_vars.items(): + if self.dim in da.dims: + dataset[key] = self.rollings[key].construct( + window_dim, fill_value=fill_value) + else: + dataset[key] = da + return Dataset(dataset, coords=self.obj.coords).isel( + **{self.dim: slice(None, None, stride)}) + inject_bottleneck_rolling_methods(DataArrayRolling) inject_datasetrolling_methods(DatasetRolling) diff --git a/xarray/core/variable.py b/xarray/core/variable.py index efec2806f48..bb4285fba0a 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -934,6 +934,53 @@ def shift(self, **shifts): result = result._shift_one_dim(dim, count) return result + def pad_with_fill_value(self, fill_value=dtypes.NA, **pad_widths): + """ + Return a new Variable with paddings. + + Parameters + ---------- + **pad_width: keyword arguments of the form {dim: (before, after)} + Number of values padded to the edges of each dimension. + """ + if fill_value is dtypes.NA: # np.nan is passed + dtype, fill_value = dtypes.maybe_promote(self.dtype) + else: + dtype = self.dtype + + if isinstance(self.data, dask_array_type): + array = self.data + + # Dask does not yet support pad. We manually implement it. + # https://github.com/dask/dask/issues/1926 + for d, pad in pad_widths.items(): + axis = self.get_axis_num(d) + before_shape = list(array.shape) + before_shape[axis] = pad[0] + before_chunks = list(array.chunks) + before_chunks[axis] = (pad[0], ) + after_shape = list(array.shape) + after_shape[axis] = pad[1] + after_chunks = list(array.chunks) + after_chunks[axis] = (pad[1], ) + + arrays = [] + if pad[0] > 0: + arrays.append(da.full(before_shape, fill_value, + dtype=dtype, chunks=before_chunks)) + arrays.append(array) + if pad[1] > 0: + arrays.append(da.full(after_shape, fill_value, + dtype=dtype, chunks=after_chunks)) + if len(arrays) > 1: + array = da.concatenate(arrays, axis=axis) + else: + pads = [(0, 0) if d not in pad_widths else pad_widths[d] + for d in self.dims] + array = np.pad(self.data.astype(dtype, copy=False), pads, + mode='constant', constant_values=fill_value) + return type(self)(self.dims, array) + def _roll_one_dim(self, dim, count): axis = self.get_axis_num(dim) @@ -1452,6 +1499,57 @@ def rank(self, dim, pct=False): ranked /= count return Variable(self.dims, ranked) + def rolling_window(self, dim, window, window_dim, center=False, + fill_value=dtypes.NA): + """ + Make a rolling_window along dim and add a new_dim to the last place. + + Parameters + ---------- + dim: str + Dimension over which to compute rolling_window + window: int + Window size of the rolling + window_dim: str + New name of the window dimension. + center: boolean. default False. + If True, pad fill_value for both ends. Otherwise, pad in the head + of the axis. + fill_value: + value to be filled. + + Returns + ------- + Variable that is a view of the original array with a added dimension of + size w. + The return dim: self.dims + (window_dim, ) + The return shape: self.shape + (window, ) + + Examples + -------- + >>> v=Variable(('a', 'b'), np.arange(8).reshape((2,4))) + >>> v.rolling_window(x, 'b', 3, 'window_dim') + + array([[[nan, nan, 0], [nan, 0, 1], [0, 1, 2], [1, 2, 3]], + [[nan, nan, 4], [nan, 4, 5], [4, 5, 6], [5, 6, 7]]]) + + >>> v.rolling_window(x, 'b', 3, 'window_dim', center=True) + + array([[[nan, 0, 1], [0, 1, 2], [1, 2, 3], [2, 3, nan]], + [[nan, 4, 5], [4, 5, 6], [5, 6, 7], [6, 7, nan]]]) + """ + if fill_value is dtypes.NA: # np.nan is passed + dtype, fill_value = dtypes.maybe_promote(self.dtype) + array = self.astype(dtype, copy=False).data + else: + dtype = self.dtype + array = self.data + + new_dims = self.dims + (window_dim, ) + return Variable(new_dims, duck_array_ops.rolling_window( + array, axis=self.get_axis_num(dim), window=window, + center=center, fill_value=fill_value)) + @property def real(self): return type(self)(self.dims, self.data.real, self._attrs) diff --git a/xarray/tests/test_dataarray.py b/xarray/tests/test_dataarray.py index 095ad5b793b..18fc27c96ab 100644 --- a/xarray/tests/test_dataarray.py +++ b/xarray/tests/test_dataarray.py @@ -2729,7 +2729,7 @@ def test_series_categorical_index(self): if not hasattr(pd, 'CategoricalIndex'): raise unittest.SkipTest('requires pandas with CategoricalIndex') - s = pd.Series(range(5), index=pd.CategoricalIndex(list('aabbc'))) + s = pd.Series(np.arange(5), index=pd.CategoricalIndex(list('aabbc'))) arr = DataArray(s) assert "'a'" in repr(arr) # should not error @@ -3325,9 +3325,11 @@ def da_dask(seed=123): return da +@pytest.mark.parametrize('da', (1, 2), indirect=True) def test_rolling_iter(da): rolling_obj = da.rolling(time=7) + rolling_obj_mean = rolling_obj.mean() assert len(rolling_obj.window_labels) == len(da['time']) assert_identical(rolling_obj.window_labels, da['time']) @@ -3335,6 +3337,16 @@ def test_rolling_iter(da): for i, (label, window_da) in enumerate(rolling_obj): assert label == da['time'].isel(time=i) + actual = rolling_obj_mean.isel(time=i) + expected = window_da.mean('time') + + # TODO add assert_allclose_with_nan, which compares nan position + # as well as the closeness of the values. + assert_array_equal(actual.isnull(), expected.isnull()) + if (~actual.isnull()).sum() > 0: + np.allclose(actual.values[actual.values.nonzero()], + expected.values[expected.values.nonzero()]) + def test_rolling_doc(da): rolling_obj = da.rolling(time=7) @@ -3403,8 +3415,8 @@ def test_rolling_wrapped_bottleneck_dask(da_dask, name, center, min_periods): @pytest.mark.parametrize('center', (True, False)) @pytest.mark.parametrize('min_periods', (None, 1, 2, 3)) @pytest.mark.parametrize('window', (1, 2, 3, 4)) -def test_rolling_pandas_compat(da, center, window, min_periods): - s = pd.Series(range(10)) +def test_rolling_pandas_compat(center, window, min_periods): + s = pd.Series(np.arange(10)) da = DataArray.from_series(s) if min_periods is not None and window < min_periods: @@ -3414,12 +3426,39 @@ def test_rolling_pandas_compat(da, center, window, min_periods): min_periods=min_periods).mean() da_rolling = da.rolling(index=window, center=center, min_periods=min_periods).mean() - # pandas does some fancy stuff in the last position, - # we're not going to do that yet! - np.testing.assert_allclose(s_rolling.values[:-1], - da_rolling.values[:-1]) - np.testing.assert_allclose(s_rolling.index, - da_rolling['index']) + da_rolling_np = da.rolling(index=window, center=center, + min_periods=min_periods).reduce(np.nanmean) + + np.testing.assert_allclose(s_rolling.values, da_rolling.values) + np.testing.assert_allclose(s_rolling.index, da_rolling['index']) + np.testing.assert_allclose(s_rolling.values, da_rolling_np.values) + np.testing.assert_allclose(s_rolling.index, da_rolling_np['index']) + + +@pytest.mark.parametrize('center', (True, False)) +@pytest.mark.parametrize('window', (1, 2, 3, 4)) +def test_rolling_construct(center, window): + s = pd.Series(np.arange(10)) + da = DataArray.from_series(s) + + s_rolling = s.rolling(window, center=center, min_periods=1).mean() + da_rolling = da.rolling(index=window, center=center, min_periods=1) + + da_rolling_mean = da_rolling.construct('window').mean('window') + np.testing.assert_allclose(s_rolling.values, da_rolling_mean.values) + np.testing.assert_allclose(s_rolling.index, da_rolling_mean['index']) + + # with stride + da_rolling_mean = da_rolling.construct('window', + stride=2).mean('window') + np.testing.assert_allclose(s_rolling.values[::2], da_rolling_mean.values) + np.testing.assert_allclose(s_rolling.index[::2], da_rolling_mean['index']) + + # with fill_value + da_rolling_mean = da_rolling.construct( + 'window', stride=2, fill_value=0.0).mean('window') + assert da_rolling_mean.isnull().sum() == 0 + assert (da_rolling_mean == 0.0).sum() >= 0 @pytest.mark.parametrize('da', (1, 2), indirect=True) @@ -3432,6 +3471,10 @@ def test_rolling_reduce(da, center, min_periods, window, name): if min_periods is not None and window < min_periods: min_periods = window + if da.isnull().sum() > 1 and window == 1: + # this causes all nan slices + window = 2 + rolling_obj = da.rolling(time=window, center=center, min_periods=min_periods) @@ -3442,26 +3485,52 @@ def test_rolling_reduce(da, center, min_periods, window, name): assert actual.dims == expected.dims +@pytest.mark.skipif(LooseVersion(np.__version__) < LooseVersion('1.13'), + reason='Old numpy does not support nansum / nanmax for ' + 'object typed arrays.') +@pytest.mark.parametrize('center', (True, False)) +@pytest.mark.parametrize('min_periods', (None, 1, 2, 3)) +@pytest.mark.parametrize('window', (1, 2, 3, 4)) +@pytest.mark.parametrize('name', ('sum', 'max')) +def test_rolling_reduce_nonnumeric(center, min_periods, window, name): + da = DataArray([0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], + dims='time').isnull() + + if min_periods is not None and window < min_periods: + min_periods = window + + rolling_obj = da.rolling(time=window, center=center, + min_periods=min_periods) + + # add nan prefix to numpy methods to get similar behavior as bottleneck + actual = rolling_obj.reduce(getattr(np, 'nan%s' % name)) + expected = getattr(rolling_obj, name)() + assert_allclose(actual, expected) + assert actual.dims == expected.dims + + def test_rolling_count_correct(): da = DataArray( [0, np.nan, 1, 2, np.nan, 3, 4, 5, np.nan, 6, 7], dims='time') - result = da.rolling(time=11, min_periods=1).count() - expected = DataArray( - [1, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8], dims='time') - assert_equal(result, expected) - - result = da.rolling(time=11, min_periods=None).count() - expected = DataArray( + kwargs = [{'time': 11, 'min_periods': 1}, + {'time': 11, 'min_periods': None}, + {'time': 7, 'min_periods': 2}] + expecteds = [DataArray( + [1, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8], dims='time'), + DataArray( [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, - np.nan, np.nan, np.nan, np.nan, 8], dims='time') - assert_equal(result, expected) + np.nan, np.nan, np.nan, np.nan, np.nan], dims='time'), + DataArray( + [np.nan, np.nan, 2, 3, 3, 4, 5, 5, 5, 5, 5], dims='time')] + + for kwarg, expected in zip(kwargs, expecteds): + result = da.rolling(**kwarg).count() + assert_equal(result, expected) - result = da.rolling(time=7, min_periods=2).count() - expected = DataArray( - [np.nan, np.nan, 2, 3, 3, 4, 5, 5, 5, 5, 5], dims='time') - assert_equal(result, expected) + result = da.to_dataset(name='var1').rolling(**kwarg).count()['var1'] + assert_equal(result, expected) def test_raise_no_warning_for_nan_in_binary_ops(): diff --git a/xarray/tests/test_dataset.py b/xarray/tests/test_dataset.py index 353128acd39..1b557479ec1 100644 --- a/xarray/tests/test_dataset.py +++ b/xarray/tests/test_dataset.py @@ -4134,12 +4134,36 @@ def test_rolling_pandas_compat(center, window, min_periods): min_periods=min_periods).mean() ds_rolling = ds.rolling(index=window, center=center, min_periods=min_periods).mean() - # pandas does some fancy stuff in the last position, - # we're not going to do that yet! - np.testing.assert_allclose(df_rolling['x'].values[:-1], - ds_rolling['x'].values[:-1]) - np.testing.assert_allclose(df_rolling.index, - ds_rolling['index']) + + np.testing.assert_allclose(df_rolling['x'].values, ds_rolling['x'].values) + np.testing.assert_allclose(df_rolling.index, ds_rolling['index']) + + +@pytest.mark.parametrize('center', (True, False)) +@pytest.mark.parametrize('window', (1, 2, 3, 4)) +def test_rolling_construct(center, window): + df = pd.DataFrame({'x': np.random.randn(20), 'y': np.random.randn(20), + 'time': np.linspace(0, 1, 20)}) + + ds = Dataset.from_dataframe(df) + df_rolling = df.rolling(window, center=center, min_periods=1).mean() + ds_rolling = ds.rolling(index=window, center=center) + + ds_rolling_mean = ds_rolling.construct('window').mean('window') + np.testing.assert_allclose(df_rolling['x'].values, + ds_rolling_mean['x'].values) + np.testing.assert_allclose(df_rolling.index, ds_rolling_mean['index']) + + # with stride + ds_rolling_mean = ds_rolling.construct('window', stride=2).mean('window') + np.testing.assert_allclose(df_rolling['x'][::2].values, + ds_rolling_mean['x'].values) + np.testing.assert_allclose(df_rolling.index[::2], ds_rolling_mean['index']) + # with fill_value + ds_rolling_mean = ds_rolling.construct( + 'window', stride=2, fill_value=0.0).mean('window') + assert ds_rolling_mean.isnull().sum() == 0 + assert (ds_rolling_mean['x'] == 0.0).sum() >= 0 @pytest.mark.slow diff --git a/xarray/tests/test_duck_array_ops.py b/xarray/tests/test_duck_array_ops.py index fde2a1cc726..99250796d8c 100644 --- a/xarray/tests/test_duck_array_ops.py +++ b/xarray/tests/test_duck_array_ops.py @@ -8,7 +8,8 @@ from xarray import DataArray, concat from xarray.core.duck_array_ops import ( - array_notnull_equiv, concatenate, count, first, last, mean, stack, where) + array_notnull_equiv, concatenate, count, first, last, mean, rolling_window, + stack, where) from xarray.core.pycompat import dask_array_type from xarray.testing import assert_allclose, assert_equal @@ -303,3 +304,28 @@ def test_isnull_with_dask(): da = construct_dataarray(2, np.float32, contains_nan=True, dask=True) assert isinstance(da.isnull().data, dask_array_type) assert_equal(da.isnull().load(), da.load().isnull()) + + +@pytest.mark.skipif(not has_dask, reason='This is for dask.') +@pytest.mark.parametrize('axis', [0, -1]) +@pytest.mark.parametrize('window', [3, 8, 11]) +@pytest.mark.parametrize('center', [True, False]) +def test_dask_rolling(axis, window, center): + import dask.array as da + + x = np.array(np.random.randn(100, 40), dtype=float) + dx = da.from_array(x, chunks=[(6, 30, 30, 20, 14), 8]) + + expected = rolling_window(x, axis=axis, window=window, center=center, + fill_value=np.nan) + actual = rolling_window(dx, axis=axis, window=window, center=center, + fill_value=np.nan) + assert isinstance(actual, da.Array) + assert_array_equal(actual, expected) + assert actual.shape == expected.shape + + # we need to take care of window size if chunk size is small + # window/2 should be smaller than the smallest chunk size. + with pytest.raises(ValueError): + rolling_window(dx, axis=axis, window=100, center=center, + fill_value=np.nan) diff --git a/xarray/tests/test_nputils.py b/xarray/tests/test_nputils.py index 3c9c92ae2ba..d3ef02a039c 100644 --- a/xarray/tests/test_nputils.py +++ b/xarray/tests/test_nputils.py @@ -1,7 +1,8 @@ import numpy as np from numpy.testing import assert_array_equal -from xarray.core.nputils import NumpyVIndexAdapter, _is_contiguous +from xarray.core.nputils import (NumpyVIndexAdapter, _is_contiguous, + rolling_window) def test_is_contiguous(): @@ -28,3 +29,27 @@ def test_vindex(): vindex[[0, 1], [0, 1], :] = vindex[[0, 1], [0, 1], :] vindex[[0, 1], :, [0, 1]] = vindex[[0, 1], :, [0, 1]] vindex[:, [0, 1], [0, 1]] = vindex[:, [0, 1], [0, 1]] + + +def test_rolling(): + x = np.array([1, 2, 3, 4], dtype=float) + + actual = rolling_window(x, axis=-1, window=3, center=True, + fill_value=np.nan) + expected = np.array([[np.nan, 1, 2], + [1, 2, 3], + [2, 3, 4], + [3, 4, np.nan]], dtype=float) + assert_array_equal(actual, expected) + + actual = rolling_window(x, axis=-1, window=3, center=False, fill_value=0.0) + expected = np.array([[0, 0, 1], + [0, 1, 2], + [1, 2, 3], + [2, 3, 4]], dtype=float) + assert_array_equal(actual, expected) + + x = np.stack([x, x * 1.1]) + actual = rolling_window(x, axis=-1, window=3, center=False, fill_value=0.0) + expected = np.stack([expected, expected * 1.1], axis=0) + assert_array_equal(actual, expected) diff --git a/xarray/tests/test_variable.py b/xarray/tests/test_variable.py index 5f60fc95e15..1373358c476 100644 --- a/xarray/tests/test_variable.py +++ b/xarray/tests/test_variable.py @@ -737,6 +737,52 @@ def test_getitem_error(self): with raises_regex(IndexError, 'Dimensions of indexers mis'): v[:, ind] + def test_pad(self): + data = np.arange(4 * 3 * 2).reshape(4, 3, 2) + v = self.cls(['x', 'y', 'z'], data) + + xr_args = [{'x': (2, 1)}, {'y': (0, 3)}, {'x': (3, 1), 'z': (2, 0)}] + np_args = [((2, 1), (0, 0), (0, 0)), ((0, 0), (0, 3), (0, 0)), + ((3, 1), (0, 0), (2, 0))] + for xr_arg, np_arg in zip(xr_args, np_args): + actual = v.pad_with_fill_value(**xr_arg) + expected = np.pad(np.array(v.data.astype(float)), np_arg, + mode='constant', constant_values=np.nan) + assert_array_equal(actual, expected) + assert isinstance(actual._data, type(v._data)) + + # for the boolean array, we pad False + data = np.full_like(data, False, dtype=bool).reshape(4, 3, 2) + v = self.cls(['x', 'y', 'z'], data) + for xr_arg, np_arg in zip(xr_args, np_args): + actual = v.pad_with_fill_value(fill_value=False, **xr_arg) + expected = np.pad(np.array(v.data), np_arg, + mode='constant', constant_values=False) + assert_array_equal(actual, expected) + + def test_rolling_window(self): + # Just a working test. See test_nputils for the algorithm validation + v = self.cls(['x', 'y', 'z'], + np.arange(40 * 30 * 2).reshape(40, 30, 2)) + for (d, w) in [('x', 3), ('y', 5)]: + v_rolling = v.rolling_window(d, w, d + '_window') + assert v_rolling.dims == ('x', 'y', 'z', d + '_window') + assert v_rolling.shape == v.shape + (w, ) + + v_rolling = v.rolling_window(d, w, d + '_window', center=True) + assert v_rolling.dims == ('x', 'y', 'z', d + '_window') + assert v_rolling.shape == v.shape + (w, ) + + # dask and numpy result should be the same + v_loaded = v.load().rolling_window(d, w, d + '_window', + center=True) + assert_array_equal(v_rolling, v_loaded) + + # numpy backend should not be over-written + if isinstance(v._data, np.ndarray): + with pytest.raises(ValueError): + v_loaded[0] = 1.0 + class TestVariable(TestCase, VariableSubclassTestCases): cls = staticmethod(Variable) @@ -1462,7 +1508,7 @@ def test_reduce_funcs(self): v = Variable('t', pd.date_range('2000-01-01', periods=3)) with pytest.raises(NotImplementedError): - v.max(skipna=True) + v.argmax(skipna=True) assert_identical( v.max(), Variable([], pd.Timestamp('2000-01-03'))) @@ -1741,6 +1787,14 @@ def test_getitem_fancy(self): def test_getitem_uint(self): super(TestIndexVariable, self).test_getitem_fancy() + @pytest.mark.xfail + def test_pad(self): + super(TestIndexVariable, self).test_rolling_window() + + @pytest.mark.xfail + def test_rolling_window(self): + super(TestIndexVariable, self).test_rolling_window() + class TestAsCompatibleData(TestCase): def test_unchanged_types(self):