Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataArrays inside apply_ufunc with dask=parallelized #5010

Closed
aulemahal opened this issue Mar 8, 2021 · 3 comments · Fixed by #5011
Closed

DataArrays inside apply_ufunc with dask=parallelized #5010

aulemahal opened this issue Mar 8, 2021 · 3 comments · Fixed by #5011

Comments

@aulemahal
Copy link
Contributor

aulemahal commented Mar 8, 2021

Is your feature request related to a problem? Please describe.
Currently, when using apply_ufunc with dask=parallelized the wrapped function receives numpy arrays upon computation.
Some xarray operations generate enormous amount of chunks (best example : da.groupby('time.dayofyear'), so any complex script using dask ends up with huge task graphs. Dask's scheduler becomes overloaded, sometimes even hangs, sometimes uses way more RAM than its workers.

Describe the solution you'd like
I'd want to profit from both the tools of xarray and the power of dask parallelization. I'd like to be able to do something like this:

def func(da):
     """Example of an operation not (easily) possible with numpy."""
     return da.groupby('time').mean()

xr.apply_ufunc(
    da,
    func,
    input_core_dims=[['time']],
    pass_xr=True,
    dask='parallelized'
)

I'd like the wrapped func to receive DataArrays resembling the inputs (named dims, coords and all), but only with the subset of that dask chunk. Doing this, the whole function gets parallelized : dask only sees 1 task and I can code using xarray. Depending on the implementation, it might be less efficient than dask=allowed for small dataset, but I think this could be beneficial for long and complex computations on large datasets.

Describe alternatives you've considered
The alternative is to reduce the size of the datasets (looping on other dimensions), but that defeats the purpose of dask.

Another alternative I am currently testing, is to add a layer between apply_ufunc and the func. That layer reconstruct a DataArray and deconstructs it before returning the result, so xarray/dask only passing by. If this works and is elegant enough, I can maybe suggest an implementation within xarray.

@keewis
Copy link
Collaborator

keewis commented Mar 8, 2021

would map_blocks work for you?

@aulemahal
Copy link
Contributor Author

Dang. How did I miss that?

@keewis
Copy link
Collaborator

keewis commented Mar 8, 2021

we should probably link to map_blocks in the "See Also" section of apply_ufunc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants