Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rolling window with as_strided #1837

Merged
merged 82 commits into from
Mar 1, 2018
Merged

Conversation

fujiisoup
Copy link
Member

@fujiisoup fujiisoup commented Jan 18, 2018

I started to work for refactoring rollings.
As suggested in #1831 comment, I implemented rolling_window methods based on as_strided.

I got more than 1,000 times speed up! yey!

In [1]: import numpy as np
   ...: import xarray as xr
   ...: 
   ...: da = xr.DataArray(np.random.randn(10000, 3), dims=['x', 'y'])

with the master

%timeit da.rolling(x=5).reduce(np.mean)
1 loop, best of 3: 9.68 s per loop

with the current implementation

%timeit da.rolling(x=5).reduce(np.mean)
100 loops, best of 3: 5.29 ms per loop

and with the bottleneck

%timeit da.rolling(x=5).mean()
100 loops, best of 3: 2.62 ms per loop

My current concerns are

  • Can we expose the new rolling_window method of DataArray and Dataset to the public?
    I think this method itself is useful for many usecases, such as short-term-FFT and convolution.
    This also gives more flexible rolling operation, such as windowed moving average, strided rolling, and ND-rolling.

  • Is there any dask's equivalence to numpy's as_strided?
    Currently, I just use a slice->concatenate path, but I don't think it is very efficient.
    (Is it already efficient, as dask utilizes out-of-core computation?)

Any thoughts are welcome.

@fujiisoup fujiisoup changed the title Rolling window with as_strided [WIP] Rolling window with as_strided Jan 18, 2018
Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice! I have some minor suggestions for organization but this looks like a nice win for performance

@@ -2132,6 +2132,50 @@ def rank(self, dim, pct=False, keep_attrs=False):
ds = self._to_temp_dataset().rank(dim, pct=pct, keep_attrs=keep_attrs)
return self._from_temp_dataset(ds)

def rolling_window(self, dim, window, window_dim, center=True):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a Rolling method instead? Maybe construct or build would be a good name for the method?

The API could look something like: da.rolling(b=3).construct('window_dim')

That helps keep the main DataArray/Dataset namespace a little more organized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about to_dataarray and to_dataset?
Personally, construct sounds something necessary to compute rolling method.

if isinstance(self.data, dask_array_type):
array = self.data

for d, pad in pad_widths.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment noting dask/dask#1926 here

@@ -936,6 +936,43 @@ def shift(self, **shifts):
result = result._shift_one_dim(dim, count)
return result

def _pad(self, **pad_widths):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this _pad_with_fill_value?

@@ -815,6 +820,17 @@ def __setitem__(self, key, value):
array, key = self._indexing_array_and_key(key)
array[key] = value

def rolling_window(self, axis, window):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in duck_array_ops instead? I'd like to keep the indexing adapters simple.

"""

axis = nputils._validate_axis(self.array, axis)
rolling = nputils.rolling_window(np.swapaxes(self.array, axis, -1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add an axis argument directly to nputils.rolling_window?

size = self.array.shape[axis] - window + 1
arrays = [self.array[(slice(None), ) * axis + (slice(w, size + w), )]
for w in range(window)]
return da.stack(arrays, axis=-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the most efficient way to do this would be to use dask's ghost cell support, but this is fine for now:
http://dask.pydata.org/en/latest/array-ghost.html

You would want to map the new efficient rolling window computation over each block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see here as well:

def dask_rolling_wrapper(moving_func, a, window, min_count=None, axis=-1):
'''wrapper to apply bottleneck moving window funcs on dask arrays'''
# inputs for ghost
if axis < 0:
axis = a.ndim + axis
depth = {d: 0 for d in range(a.ndim)}
depth[axis] = window - 1
boundary = {d: np.nan for d in range(a.ndim)}
# create ghosted arrays
ag = da.ghost.ghost(a, depth=depth, boundary=boundary)
# apply rolling func
out = ag.map_blocks(moving_func, window, min_count=min_count,
axis=axis, dtype=a.dtype)
# trim array
result = da.ghost.trim_internal(out, depth)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the help.
But I don't yet come up with the solution for dask...
(Do you mean that we want to do as_strided-like operation for each ghosted-chunk?)

I think this could be in another PR, as it would take for along time for me to think of the correct path.

@fujiisoup
Copy link
Member Author

During the work, I notice a somehow unexpected behavior of rolling.

I expected that with min_periods=1 option, we will get an array without nan,
It is true with center=False and also pd.rolling.

In [2]: da = xr.DataArray(np.arange(10), dims='x')
In [3]: da.rolling(x=3, min_periods=1, center=False).sum()
Out[3]: 
<xarray.DataArray (x: 10)>
array([  0.,   1.,   3.,   6.,   9.,  12.,  15.,  18.,  21.,  24.])
Dimensions without coordinates: x

In [5]: s = pd.Series(np.arange(10))
 s.rolling(3, min_periods=1, center=True).sum()
Out[7]: 
0     1.0
1     3.0
2     6.0
3     9.0
4    12.0
5    15.0
6    18.0
7    21.0
8    24.0
9    17.0
dtype: float64

But with center=True, we have a nan at the end.

In [4]: da.rolling(x=3, min_periods=1, center=True).sum()
Out[4]: 
<xarray.DataArray (x: 10)>
array([  1.,   3.,   6.,   9.,  12.,  15.,  18.,  21.,  24.,  nan])
Dimensions without coordinates: x

It is because we make shift operation after the rolling.

values = func(self.obj.data, window=self.window,
min_count=min_count, axis=axis)
result = DataArray(values, self.obj.coords)
if self.center:
result = self._center_result(result)

If we pad the array before the rolling operation instead of shift, we will not get the last nan and the result would be the same to pandas.
(rolling.to_dataarray('window_dim') does this).

I think this path is more intuitive.
Any thoughts?

@@ -34,6 +34,8 @@ def maybe_promote(dtype):
fill_value = np.datetime64('NaT')
elif np.issubdtype(dtype, np.timedelta64):
fill_value = np.timedelta64('NaT')
elif dtype.kind == 'b':
fill_value = False
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is convenient for me, but it is not very clear whether False is equivalent to nan for boolean arrays.
If anyone has objections, I will consider different approach.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, let's consider other options here. This is used for the default value when reindexing/aligning.

da_rolling['index'])

np.testing.assert_allclose(s_rolling.values, da_rolling.values)
np.testing.assert_allclose(s_rolling.index, da_rolling['index'])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the logic for the center=True case. Now our result is equivalent to pandas's rolling, including the last position.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks!

@@ -936,6 +936,43 @@ def shift(self, **shifts):
result = result._shift_one_dim(dim, count)
return result

def pad_with_fill_value(self, **pad_widths):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to expose this to the public, which is used in my new logic in rolling.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great -- I have only a few smaller suggestions.

@@ -34,6 +34,8 @@ def maybe_promote(dtype):
fill_value = np.datetime64('NaT')
elif np.issubdtype(dtype, np.timedelta64):
fill_value = np.timedelta64('NaT')
elif dtype.kind == 'b':
fill_value = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, let's consider other options here. This is used for the default value when reindexing/aligning.

# Find valid windows based on count.
# We do not use `reduced.count()` because it constructs a larger array
# (notice that `windows` is just a view)
counts = (~self.obj.isnull()).rolling(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For formatting long chains of method calls, I like to add extra parentheses and break every operation at the start of the line, e.g.,

counts = ((~self.obj.isnull())
          .rolling(center=self.center, **{self.dim: self.window})
          .to_dataarray('_rolling_window_dim')
          .sum(dim='_rolling_window_dim'))

I find this makes it easier to read


return result
# restore dim order
return result.transpose(*self.obj.dims)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to restore dimension order any more. The result should already be calculated correctly.

da_rolling['index'])

np.testing.assert_allclose(s_rolling.values, da_rolling.values)
np.testing.assert_allclose(s_rolling.index, da_rolling['index'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks!


.. ipython:: python

@verbatim
for label, arr_window in r:
# arr_window is a view of x

Finally, the rolling object has ``to_dataarray`` method, which gives a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add: (to_dataset for Rolling objects from Dataset)

dimension added to the last position. This enables more flexible operation,
such as strided rolling, windowed rolling, ND-rolling, and convolution.
(:issue:`1831`, :issue:`1142`, :issue:`819`)
By `Keisuke Fujii <https://github.com/fujiisoup>`_.
- Added nodatavals attribute to DataArray when using :py:func:`~xarray.open_rasterio`. (:issue:`1736`).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a bug fix note for the aggregations of the last element with center=True?

# Find valid windows based on count.
# We do not use `reduced.count()` because it constructs a larger array
# (notice that `windows` is just a view)
counts = (~self.obj.isnull()).rolling(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a short-cut here that doesn't bother to compute counts if the array's dtype cannot hold NaN? I think that would solve the issue with changing maybe_promote for booleans.

You could add a utility function to determine this based on whether the result of maybe_promote() has the same dtype as the input.

@@ -133,3 +134,52 @@ def __setitem__(self, key, value):
mixed_positions, vindex_positions = _advanced_indexer_subspaces(key)
self._array[key] = np.moveaxis(value, vindex_positions,
mixed_positions)


def rolling_window(a, axis, window):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a small point, but can you swap the arguments for this function? That would let you set a default axis.

Bottleneck uses default arguments like move_sum(array, window, axis=-1) which I think is a nice convention:
https://kwgoodman.github.io/bottleneck-doc/reference.html#moving-window-functions

**pad_width: keyword arguments of the form {dim: (before, after)}
Number of values padded to the edges of each dimension.
"""
if self.dtype.kind == 'b':
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to get an appropriate fill_value for non-float arrays?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this function should take a fill value argument, which could default to dtypes.NA?

@@ -3435,7 +3448,7 @@ def test_rolling_count_correct():
result = da.rolling(time=11, min_periods=None).count()
expected = DataArray(
[np.nan, np.nan, np.nan, np.nan, np.nan, np.nan,
np.nan, np.nan, np.nan, np.nan, 8], dims='time')
np.nan, np.nan, np.nan, np.nan, np.nan], dims='time')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the last element should be np.nan rather than 8, because 8 < min_periods=11.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- thanks for all your work on this!

@fujiisoup
Copy link
Member Author

@shoyer , thanks for the detailed review.

I noticed the benchmark test is still failing.
After fixing this, I will merge this.

Thanks :)

center=True)
# window/2 should be smaller than the smallest chunk size.
with pytest.raises(ValueError):
rw = v.rolling_window(dim='x', window=100, window_dim='x_w',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F841 local variable 'rw' is assigned to but never used

import dask.array as da
v = Variable(['x'], da.arange(100, chunks=20))
# should not raise
rw = v.rolling_window(dim='x', window=10, window_dim='x_w',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F841 local variable 'rw' is assigned to but never used

else:
start, end = window - 1, 0

drop_size = depth[axis] - offset - np.maximum(start, end)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I think of size as a positive integer, but below you use -drop_size to make it positive. I think this would be clearer as drop_size = max(start, end) - offset - depth[axis] (use max() vs np.maximum as start and end are Python integers)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I thought it becomes sometimes negative.
Fixed.

"more evenly divides the shape of your array." %
(window, depth[axis], min(a.chunks[axis])))

# We temporary use `reflect` boundary here, but the edge portion is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer correct?

"""
if fill_value is dtypes.NA: # np.nan is passed
dtype, fill_value = dtypes.maybe_promote(self.dtype)
array = self.astype(dtype).data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use self.data.astype(dtype, copy=False) to avoid copying for numpy arrays unless necessary.

fill_value=np.nan)


@pytest.mark.skipif(not has_dask, reason='This is for dask.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

F811 redefinition of unused 'test_dask_rolling' from line 308

# Conflicts:
#	xarray/core/duck_array_ops.py
#	xarray/core/missing.py
#	xarray/core/nputils.py
#	xarray/core/rolling.py
#	xarray/tests/test_duck_array_ops.py
#	xarray/tests/test_nputils.py
@fujiisoup
Copy link
Member Author

@shyer, do you have further suggestions?
I think this is almost ready.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks ready to me!

@fujiisoup fujiisoup merged commit dc3eebf into pydata:master Mar 1, 2018
@fujiisoup fujiisoup deleted the rolling_window branch March 1, 2018 03:39
@jhamman
Copy link
Member

jhamman commented Mar 1, 2018

nice work @fujiisoup!

array = self.data

# Dask does not yet support pad. We manually implement it.
# https://github.com/dask/dask/issues/1926

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working on a pad function for Dask Arrays (akin to NumPy's pad) in PR ( dask/dask#3578 ). Would be curious to know if this will meet your needs. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)
Thanks for letting me know this.
I will update here after your PR is merged.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the recent 0.18.1 release. Feedback welcome :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants