Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel regridding support #3

Closed
JiaweiZhuang opened this issue Aug 30, 2017 · 47 comments
Closed

Parallel regridding support #3

JiaweiZhuang opened this issue Aug 30, 2017 · 47 comments

Comments

@JiaweiZhuang
Copy link
Owner

"Parallel regridding" could mean two things: (see #2 for more about this two-step regridding)

  1. Generate regridding weights in parallel.
  2. Apply regridding weights in parallel.

The native parallel support in ESMF/ESMPy is based on MPI and horizontal domain decomposition. It works for both generating and applying weights. See https://github.com/nawendt/esmpy-tutorial/blob/master/esmpy_mpi_example.py as an example.

MPI-based horizontal domain decomposition makes perfect sense for earth system model simulation, but for data analysis I would absolutely want to avoid MPI's complexity. With dask, there's a simple way to go:

  1. Avoid horizontal domain decomposition and only parallelize over additional dimensions like time and level. Writing such code withdask.array will be trivial.
  2. Still generate regridding weights in serial. My impression is people tend to have a fixed pair of source and destination grids and regrid a lot of data fields between them. In this case, weights generation only needs to be done once. So we would only care about applying the weights in parallel.

Is there any case that we have to parallelize over horizontal dimensions?

PS: Need to profile the regridding on very large data sets and figure out the bottleneck (generating vs applying weights) before starting to implement anything.

@JiaweiZhuang JiaweiZhuang changed the title Parallel regridding Parallel regridding support Aug 30, 2017
@bekozi
Copy link

bekozi commented Aug 31, 2017

I generally agree with your summary. I added a few comments. It would also be good to get @rokuingh's thoughts.

Still generate regridding weights in serial. My impression is people tend to have a fixed pair of source and destination grids and regrid a lot of data fields between them. In this case, weights generation only needs to be done once.

Yes, mostly. The only caveat is grid size. Very large grids can be prohibitive for serial operations. Dealing with big data is the responsibility of the user (provided the software works in the first place), but excluding the ability to generate weights in parallel using horizontal decompositions would remove important functionality. Especially since grid resolutions are for the most part increasing.

So we would only care about applying the weights in parallel. Is there any case that we have to parallelize over horizontal dimensions?

Again, large grids. There would also be the case of time-varying grid/mesh spatial structures. That is a very special case though! There is also the parallelization scheme used by ESMF's weight application (see below).

PS: Need to profile the regridding on very large data sets and figure out the bottleneck (generating vs applying weights) before starting to implement anything.

Generating the weights and creating the ESMF route handle are the major bottlenecks. Which takes longer depends on the regridding method and source/destination structures. The route handle contains the sparse matrix application optimizations. Hence, once the route handle is created, the weight application is very fast. Note SMM optimizations must use horizontal decompositions (correct me if I'm wrong @rokuingh). It is entirely possible to avoid ESMF's SMM routines and write your own that interprets a weight file. It is worth noting that ESMPy does not expose Python bindings for route handle stuff yet. @rokuingh is very close to to-weight-file/from-weight-file operations...

For large grids, memory usage is also an issue. We've developed a workflow that uses spatial subsetting (to ensure appropriate source/destination overlap) and ESMF to tile weight and SMM operations. We'd like to bring this into ESMF proper at some point.

Anyway, you definitely have a good handle on things. Let us know if you have any questions.

@rokuingh
Copy link

I generally agree with @bekozi.

ESMPy does use horizontal decompositions, but we have considered bringing the customizealld decomposition capability of ESMF out to the Python layer. However, this still would not allow the case mentioned by @JiaweiZhuang of distributing over non-gridded dimensions like time and levels, unless the grid was created with enough dimensions to incorporate the non-gridded dimensions, but that would probably turn into a big mess..

The ability to write weights to file and read weight from file in ESMPy is almost finished. I requested time for a development sprint on this from my supervisor yesterday to finish it off, will let you know how that goes.

@bekozi has done a fair amount of profiling of the ESMF SMM code, is that stuff publicly available?

@JiaweiZhuang
Copy link
Owner Author

The ability to write weights to file and read weight from file in ESMPy is almost finished. I requested time for a development sprint on this from my supervisor yesterday to finish it off, will let you know how that goes.

Great. That's the currently most important part for me.

@JiaweiZhuang
Copy link
Owner Author

Dealing with big data is the responsibility of the user (provided the software works in the first place), but excluding the ability to generate weights in parallel using horizontal decompositions would remove important functionality. Especially since grid resolutions are for the most part increasing.

You are right, but it also depends on actual use cases. For me, the highest priority for xESMF is usability. I want users to be able to perform regridding with one line of code, and I don't specifically target on very large grids. If there's an easy&elegant way to gain speed-up (say dask) I will be happy to go, but I don't want users to call mpirun -- writing and understanding MPI codes is beyond an average earth scientist's skill...

Also, I want xESMF to provide regridding capability for my cubedsphere package and the models it serves (FV3, GEOS-Chem...). The horizontal resolution should be around 50km and there are always many vertical levels and variables, so parallelizing over extra dimensions with dask seems a reasonable way.

Would like to hear from @rabernat @jhamman for your use cases, as you are particularly interested in regridding.

@jhamman
Copy link

jhamman commented Sep 1, 2017

@JiaweiZhuang - first let me say that the package you've put together looks pretty sweet.

The conundrum you're discussing here is why I stayed away from ESMF (and other existing low-level remapping libraries). I basically could not see a clear path toward integration with the xarray ecosystem (dask). As I'm thinking about it more, I have one idea for how you could make this work using a hybrid approach.

  • use ESMF to calculate the weights
  • do the transform in python/xarray/dask

The transform step is fairly straightforward (new = (weights * data ) / sum(weights)) and is generalizable once you have the weights. I would like to have something that exposes this sort of API as it would lend itself to the addition of new methods to calculate mappings/weights outside of ESMF.


As for your actual question, my most common use case is remapping 3d and 4d data between well defined grids (regular, equal area, etc.) using a either conservative or linear remapping schemes. So, in theory, ESMF includes everything I need.

@rabernat
Copy link

rabernat commented Sep 1, 2017

weights * data is a matrix multiplication, right? And weights is a relatively sparse matrix? If so, perhaps it could be executed using dask's sparse array capabilities (via sparse package).

@JiaweiZhuang
Copy link
Owner Author

JiaweiZhuang commented Sep 1, 2017

@jhamman Thanks!

I would like to have something that exposes this sort of API as it would lend itself to the addition of new methods to calculate mappings/weights outside of ESMF.

I totally agree with this kind of interoperability. Current popular remapping packages are ESMF and Tempest. They both output regridding weights in SCRIP format (SCRIP itself was retired). I tried to write a Python-interface for Tempest (written C++) but then realized that ESMPy allows me to do everything in Python... I think the representations of regridding weights are pretty consistent among packages.

As for your actual question, my most common use case is remapping 3d and 4d data between well defined grids (regular, equal area, etc.) using a either conservative or linear remapping schemes.

Glad to see that! Just to point out that the first-order conservative scheme is also a linear mapping. A scheme is linear as long as the weight matrix is independent to the data field, which means we can pre-calculate the weights and apply them to any data. The only non-linear remapping I know is high-order conservative remapping with monotonic constraints, used in semi-Lagrangian advection schemes.

@JiaweiZhuang
Copy link
Owner Author

@rabernat

weights * data is a matrix multiplication, right? And weights is a relatively sparse matrix? If so, perhaps it could be executed using dask's sparse array capabilities (via sparse package).

Yes it is very sparse(see #2). The full dense matrix should have the size of N_s*N_d, where N_s and N_d are the numbers of grid points in source and destination grids. But the number of non-zero elements should be at the order of max(N_s, N_d).

Good to know that dask already has sparse matrix support. It seems that calling sparse.tensordot on dask arrays means executing scipy.sprarse.csr_matrix.dot in parallel... I would try it out.

@jhamman
Copy link

jhamman commented Nov 1, 2017

cc @mrocklin and @kmpaul

@mrocklin
Copy link

mrocklin commented Nov 1, 2017

My first impression here is that creating a large sparse matrix may not be the right approach here. Instead, we might consider how each block in the input array affects each block in the output array, sum up those effects, and build up a graph of tasks manually. My guess is that this requires a non-trivial amount of blackboard/notepad time to get right.

@mrocklin
Copy link

mrocklin commented Nov 1, 2017

@pelson has also asked about this topic before.

@mrocklin
Copy link

mrocklin commented Nov 1, 2017

If my intuition above is correct then my first instinct would be to consider the action of regridding on a small 10x10 array cut into 5x5 chunks, and work that out by hand.

@JiaweiZhuang
Copy link
Owner Author

@mrocklin Thanks for the suggestion! This block-by-block approach looks more like ESMF/ESMPy's original MPI implementation. It is very hard to replicate that in Python so I'll first try a simpler approach.

@pelson
Copy link

pelson commented Nov 2, 2017

@cpelley has a huge amount of experience with chunking regridding operations (esp. huge input to moderate output)

@mrocklin
Copy link

mrocklin commented Nov 2, 2017

@mrocklin Thanks for the suggestion! This block-by-block approach looks more like ESMF/ESMPy's original MPI implementation. It is very hard to replicate that in Python so I'll first try a simpler approach.

I'm curious, what makes this hard in Python?

@cpelley
Copy link

cpelley commented Nov 2, 2017

@cpelley has a huge amount of experience with chunking regridding operations (esp. huge input to moderate output)

I have developed a generalised decomposition framework for the project ANTS (a toolkit for generating ancillaries for the unified model here at the Met Office).

This works by fetching 'pieces' of the target and corresponding overlapping source 'pieces' and sending them along with an operation (whether regrid or otherwise) to any number of processes. Our method abstracts away the decomposition technology utilised and in doing so, allows users to gain the benefits of ipython parallel for multi-machine parallelism, simple single machine parallelism from multiprocessing or just serial decomposition without any coding change to their code and with a simple one-line API.

More information can be found here:
https://code.metoffice.gov.uk/doc/ancil/ants/latest/decomposition.html

The reasons for choosing horizontal decomposition are that performing the overlap calculations is incredibly fast and broadcasting over the other dimensions is incredibly fast. So, doing so provides the benefit of hardware utilisation along with control of memory usage while maintaining the speed benefits of numpy broadcasting.

I hope to spend some time looking at the possibility of Dask usage for us in future. A few years ago I deemed it wouldn't help us due to the complex runtime relationship between source and target in the decomposition. Dask seems to have moved along quite a bit now so I suspect the situation could easily have changed...

Works well for us and our users and is a Python based framework, made possible by utilising powerful libraries like iris, cartopy, numpy, shapely etc.

@mrocklin
Copy link

mrocklin commented Nov 2, 2017

The link you provide seems to require a Met Office login. Do you have anything that is publicly available?

@cpelley
Copy link

cpelley commented Nov 2, 2017

Ah OK, here you go:
ants_decomposition.pdf

Hope this is useful in some way.

Cheers

@mrocklin
Copy link

mrocklin commented Nov 2, 2017 via email

@cpelley
Copy link

cpelley commented Nov 2, 2017

I'm afraid not :(

@JiaweiZhuang
Copy link
Owner Author

I'm curious, what makes this hard in Python?

Just realized that it shouldn't be that hard😅 I was thinking about domain decomposition and MPI communication. But here since the weight matrix is already generated, the problem is only about parallelizing a sparse matrix multiplication (with broadcasting). See #6 for the serial implementation.

The choice is either chunking over extra/broadcasting dimensions (time and level) or chunking over the horizontal field. A typical sparse matrix structure is shown below. By chunking over the "output field" dimension we should be able to avoid writing to the same output grid point at the same time.
weights

@mrocklin
Copy link

mrocklin commented Jan 1, 2018

Checking in here. Does the latest release manage parallelism across non-grid dimensions like time and level?

@mrocklin
Copy link

mrocklin commented Jan 1, 2018

Assuming the answer above is "yes, this works for simple cases on dask arrays" (which may not be true) then have you tried this out on a distributed system? One might try this by setting up a single-machine dask.distributed cluster with the following lines of Python

from dask.distributed import Client
client = Client()  # starts a few single-threaded worker processes

# normal xarray with dask code

@JiaweiZhuang
Copy link
Owner Author

JiaweiZhuang commented Jan 1, 2018

Checking in here. Does the latest release manage parallelism across non-grid dimensions like time and level?

The current version (v0.1.1) only runs in serial, although extra-dimension-parallelism should be quite easy to add (literally just a parallel sparse .dot()) May add an experimental parallel method in the next release.

@JiaweiZhuang
Copy link
Owner Author

Assuming the answer above is "yes, this works for simple cases on dask arrays" (which may not be true) then have you tried this out on a distributed system? One might try this by setting up a single-machine dask.distributed cluster with the following lines of Python

Thanks for the suggestion. I haven't needed to regrid any data that is large enough and requires a distributed cluster. If there's any specific research needs I am willing to try.

@mrocklin
Copy link

mrocklin commented Jan 1, 2018

The current version (v0.1.1) only runs in serial, although extra-dimension-parallelism should be quite easy to add (literally just a parallel sparse .dot()) May add an experimental parallel method in the next release.

I think that XArray will handle things for you if you use methods like apply_ufunc

Thanks for the suggestion. I haven't needed to regrid any data that is large enough and requires a distributed cluster. If there's any specific research needs I am willing to try.

We're certainly dealing with multi-terabyte datasets for which distributed computation isn't necessary, but is certainly convenient. I'll be giving a couple of talks about distributed XArray for data analysis in the next couple of weeks. It would be nice to point to this work as an example of an active community developing functionality with serial computation in mind that "just works" without having to think much about distributed computation. I believe that XArray's apply_ufunc was designed with this kind of thing in mind.

@rabernat
Copy link

I agree with Matt that we are talking about two different types of optimization here:

  1. A single regridding operation from one grid to another (given a sparse weight matrix). This is a highly "solved" problem, in the sense that it is a very common scientific computing task, with several high-performance implementations (scipy, numba, maybe also GPU?). At this stage, I am assuming that a single regridding operation fits in memory.

  2. Repeating this task many times over, say for each timestep or vertical level of a dataset. It is very unlikely that this can all fit in memory at once. But it is embarrassingly parallel. dask and xarray.apply_ufunc could be very helpful here.

@rabernat
Copy link

So far @JiaweiZhuang has been focused on optimizing 1. Those of us with more xarray and dask experience could help with 2.

@JiaweiZhuang
Copy link
Owner Author

@JiaweiZhuang - have you tried using dask's dot product? If that can be made to work, we could use dask with xarray right out of the box to parallelize this step.

Tried at the very beginning. dask.array.dot only works with dense matrices, so cannot be used here.

@JiaweiZhuang
Copy link
Owner Author

JiaweiZhuang commented Jan 14, 2018

@mrocklin @rabernat @jhamman

I agree with Matt and Ryan's comments on out-of-core computing with dask and apply_ufunc. I've just done some experiments with xarray.apply_ufunc, but could not make it work. I would appreciate it if any of you can take a look at my code.

See this notebook in a standalone repo: apply_ufunc_with_dask.ipynb

I've simplified the problem so it is only about sparse matrix multiplication. The repo already contains the regridding weight file, so you don't need to use xESMF and ESMPy at all.

It is helpful to take a look at sparse_dot_benchmark.ipynb first. It contains detailed explanations on the sparse matrix multiplication algorithm and a successful parallel implementation with Numba. It can be used to benchmark the dask method.

@JiaweiZhuang
Copy link
Owner Author

Repeating this task many times over, say for each timestep or vertical level of a dataset. It is very unlikely that this can all fit in memory at once. But it is embarrassingly parallel. dask and xarray.apply_ufunc could be very helpful here.

And I guess it can also parallelize over multiple variables in a Dataset?

@mrocklin
Copy link

Tried at the very beginning. dask.array.dot only works with dense matrices, so cannot be used here.

Yeah, I agree that you don't necessarily want to invoke dask array operations here. It sounds like you have built a good numpy -> numpy function. I believe that apply_ufunc will help apply that function in parallel across many slices of many dataarrays of a distributed dataset.

@shoyer
Copy link

shoyer commented Jan 16, 2018

@JiaweiZhuang Try using output_core_dims and output_sizes in your last example, e.g., output_core_dims=[['out_grid']] and output_sizes={'out_grid': dr_dask.sizes['grid_dims'] // 2}

@JiaweiZhuang
Copy link
Owner Author

Try using output_core_dims and output_sizes in your last example, e.g., output_core_dims=[['out_grid']] and output_sizes={'out_grid': dr_dask.sizes['grid_dims'] // 2}

Thanks! Adding output_core_dims and output_sizes fixed the bug. It is faster than serial+dask but still slower than serial+numpy. I also tried manually parallelizing over chunks using dask.delayed, but the efficiency is not comparable with Numba's parallelism. See the same notebook. Seems like I should use Numba for in-memory parallelization, and apply_ufunc+dask only for out-of-core.

@mrocklin
Copy link

Yeah, to be clear, Dask-done-well is almost never faster than Numba-done-well. In the context of XArray the main advantage of dask is larger-than-memory computing.

@mrocklin
Copy link

You might want to try profiling, perhaps with %prun instead of %time or %snakeviz after you pip install snakeviz and %load_ext snakeviz to see what is taking time in the sequential case (profiling in the threaded case will be less informative).

@JiaweiZhuang
Copy link
Owner Author

You might want to try profiling, perhaps with %prun instead of %time or %snakeviz after you pip install snakeviz and %load_ext snakeviz to see what is taking time in the sequential case (profiling in the threaded case will be less informative).

concatenate takes a significant amount of time, for either apply_ufunc or the low-level implementation with dask.delayed. With Numba I can preallocate contiguous memory and don't need to concatenate chunks.

@mrocklin
Copy link

You might want to try timing with persist rather than compute. This will skip the concatenation, leaving the results as a dask array (but now with fully computed entries rather than lazy ones). This more accurately reflects how people are likely to use this in a distributed setting or as part of a larger workflow anyway.

@shoyer
Copy link

shoyer commented Jan 16, 2018

I would try using the Numba dot product (without Numba's parallelization) inside dask/xarray.apply_ufunc. You might even try Numba's guvectorize to build a gufunc directly.

Something seems to be going wrong with your benchmarks using dask. Maybe dot products with scipy's coo_matrix don't release the GIL?

@JiaweiZhuang
Copy link
Owner Author

You might want to try timing with persist rather than compute. This will skip the concatenation, leaving the results as a dask array (but now with fully computed entries rather than lazy ones). This more accurately reflects how people are likely to use this in a distributed setting or as part of a larger workflow anyway.

Using persist does remove the concatenation overhead, but the serial performance is still worse than the pure numpy version... But since we care more about out-of-core rather than parallelization in this case, this should be fine.

I would try using the Numba dot product (without Numba's parallelization) inside dask/xarray.apply_ufunc. You might even try Numba's guvectorize to build a gufunc directly.

Something seems to be going wrong with your benchmarks using dask. Maybe dot products with scipy's coo_matrix don't release the GIL?

guvectorize gives slightly slower performance than prange. But maybe I was not using the optimal setup.

Numba on dask array shows somewhat similar performance to Scipy on dask array. There is some speed-up but the parallel efficiency is not great.

All details are in this new notebook: numba_on_dask.ipynb

@NicWayand
Copy link

Just chiming in to support the push for xesmf handling out-of-memory! Getting MemoryError while regridding some GFDL model output on a local computer with 32GB of ram. Would be great to avoid looping over files. Thanks @JiaweiZhuang for a great package, its been very useful so far.

@JiaweiZhuang
Copy link
Owner Author

Just chiming in to support the push for xesmf handling out-of-memory!

Glad that xESMF helps! I will utilize xarray.apply_ufunc in v0.2 to enable out-of-core regridding of large data files.

@zxdawn
Copy link

zxdawn commented Nov 25, 2018

How about multiprocessing? I've tried multiprocessing to regrid, but it's as same as before.

Here's the example.

I'm not familiar with multiprocessing, maybe something wrong with it?

@JiaweiZhuang
Copy link
Owner Author

v0.2 now supports parallel regridding with dask.

Distributed regridding is left to pangeo-data/pangeo#334

@JiaweiZhuang JiaweiZhuang unpinned this issue Aug 6, 2019
raphaeldussin pushed a commit to raphaeldussin/xESMF that referenced this issue Aug 26, 2020
Weights are now returned in memory by default, instead of being written to file. Backward compatibility with the `filename` and `reuse_weights` arguments is preserved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests