Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory issues despite lazy operations #1193

Closed
schlunma opened this issue Jun 24, 2021 · 12 comments
Closed

Memory issues despite lazy operations #1193

schlunma opened this issue Jun 24, 2021 · 12 comments
Labels
iris Related to the Iris package question Further information is requested

Comments

@schlunma
Copy link
Contributor

Right now, I'm trying to analyze 25 years of monthly ERA5 data (variable ta). Since this dataset is 4D (time, height, lat, lon), the file sizes are pretty large. After concantenation, the cube's shape is (300, 37, 721, 1440), which corresponds to about 86 GiB assuming float64.

When using the following recipe to extract the zonal means

# ESMValTool
---
documentation:
 description: Test ERA5.

 authors:
   - schlund_manuel

 references:
   - acknow_project


preprocessors:

 mean:
   zonal_statistics:
     operator: mean


diagnostics:

 ta_obs:
   variables:
     ta:
       preprocessor: mean
       mip: Amon
       additional_datasets:
         - {dataset: ERA5, project: native6, type: reanaly, version: v1, tier: 3, start_year: 1990, end_year: 2014}
   scripts:
     null

on a node with 64 GiB memory, the process hangs indefinitely after reaching

2021-06-24 11:49:23,750 UTC [24261] INFO    Starting task ta_obs/ta in process [24261]
2021-06-24 11:49:23,843 UTC [24232] INFO    Progress: 1 tasks running, 0 tasks waiting for ancestors, 0/1 done

I'm fairly sure this is due to memory issues. As the following memory samples (for every second) show, the memory slowly ramps up to maximum memory and then suddenly drops. I guess somehow the OS shuts down the process after filling up the entire RAM.

            total       used       free     shared    buffers     cached
Mem:           62G       2.7G        59G        80K       6.5M       653M
Mem:           62G       9.2G        53G        80K       6.5M       1.7G
Mem:           62G       9.4G        53G        80K       6.5M       1.8G
Mem:           62G        16G        46G        80K       6.5M       3.0G
Mem:           62G        15G        47G        80K       6.5M       3.0G
Mem:           62G        22G        39G        80K       6.5M       4.2G
Mem:           62G        22G        40G        80K       6.5M       4.2G
Mem:           62G        29G        32G        80K       6.5M       5.4G
Mem:           62G        28G        34G        80K       6.5M       5.5G
Mem:           62G        36G        26G        80K       6.5M       6.6G
Mem:           62G        35G        27G        80K       6.5M       6.8G
Mem:           62G        42G        20G        80K       6.5M       7.9G
Mem:           62G        41G        20G        80K       6.5M       8.1G
Mem:           62G        48G        14G        80K       6.5M       9.2G
Mem:           62G        48G        14G        80K       6.5M       9.2G
Mem:           62G        53G       9.4G        80K       6.5M        10G
Mem:           62G        56G       6.7G        80K       6.5M        10G
Mem:           62G        59G       3.1G        80K       6.5M        11G
Mem:           62G        61G       811M        80K       1.6M       9.2G
Mem:           62G        58G       4.4G        80K       1.6M       9.4G
Mem:           62G        62G       193M        80K       1.6M       9.5G
Mem:           62G        62G       690M        80K       1.6M       8.4G
Mem:           62G        62G       165M        80K       1.5M       6.5G
Mem:           62G        61G       980M        80K       1.5M       6.2G
Mem:           62G        62G       335M        80K       1.5M       5.4G
Mem:           62G        62G       175M        80K       1.5M       3.4G
Mem:           62G        62G       171M        80K       1.5M       2.5G
Mem:           62G        62G       177M        80K       1.5M       2.3G
Mem:           62G        62G       166M        80K       1.5M       1.0G
Mem:           62G        62G       165M        80K       1.5M       701M
Mem:           62G        62G       165M        80K       1.5M       124M
Mem:           62G        62G       165M        80K       1.1M        23M
Mem:           62G        62G       165M        80K       996K        19M
Mem:           62G        62G       165M        80K       956K        18M
Mem:           62G        62G       166M        80K       856K        24M
Mem:           62G        62G       165M        80K       724K       2.2M
Mem:           62G        40G        22G        80K       920K        11M
Mem:           62G        19G        43G        80K       1.5M        13M
Mem:           62G       2.2G        60G        80K       1.5M        14M
Mem:           62G       1.8G        60G        80K       1.5M        14M
Mem:           62G       1.8G        60G        80K       1.5M        14M

The zonal_statistics preprocessor that is used here should be 100% lazy:

cube = cube.collapsed('longitude', operation)

So I'm really not sure what's going on here. I would have expected that a node with 64 GiB of memory should be able to handle 86 GiB of lazy data. I think this is somehow related to dask, as adding a cube.lazy_data().mean(axis=3).compute() to the line above triggers the exact same behavior. At this moment, the (lazy) array looks like that (for me that looks reasonable):

dask.array<concatenate, shape=(300, 37, 721, 1440), dtype=float32, chunksize=(1, 12, 721, 1440), chunktype=numpy.MaskedArray>

For comparison, setting up a random array with the same sizes in dask and doing the same operation works perfectly well:

>>> import dask.array as da
>>> x = da.ma.masked_greater(da.random.normal(size=(300, 37, 721, 1440), chunks=(1, 12, 721, 1440)), 0.0)                                                                                                                                                                     
>>> print(x)
dask.array<masked_greater, shape=(300, 37, 721, 1440), dtype=float64, chunksize=(1, 12, 721, 1440), chunktype=numpy.MaskedArray>
>>> x.mean(axis=3).compute() # works perfectly well with max. ~15 GiB memory

@ESMValGroup/esmvaltool-coreteam Has anyone made similar experiences? I know that this is not exactly an issue of ESMValTool, but maybe there is something we can configure in dask that helps here? Apart from using a larger node, there is no way to evaluate this dataset at the moment.

@schlunma schlunma added question Further information is requested iris Related to the Iris package labels Jun 24, 2021
@zklaus
Copy link

zklaus commented Jun 24, 2021

For possible reference: dask/distributed#2602

@schlunma
Copy link
Contributor Author

schlunma commented Jun 24, 2021

Thanks @zklaus, that sounds exactly like the problem here.

I found a (relatively) easy workaround for this: By moving the preprocessor step concatenate down in the preprocessing pipeline right before save and realizing the data in it removes this issue. Basically, by doing this all other preprocessing operations are executed serially for each input file. Right now, due to the concatenation of all files in the very beginning of the chain all of this is done in parallel which leads to the "backpressure" described in the issue. EDIT: "backpressure" is actually what's needed here.

Is this something that might be interesting for us? This could be implemented as an optional memory_optimized mode (or similar name) that can be triggered in the config-user.yml file or as command line option.

@valeriupredoi
Copy link
Contributor

nice find @schlunma - unfortunately we need to perform the preprocessor: concatenation at the beginning, data is fragmented most of the times. Unless I am misinterpreting your workaround?

@schlunma
Copy link
Contributor Author

schlunma commented Jun 24, 2021

Ahh! Yes --- now that I think about it --- most of the time related preprocessors would fail. In my case it worked just fine (I only used zonal_statistics), but e.g. calculating a mean over all time steps would fail brutally 😄

I will do some more tests tomorrow with a dataset that is not spread among different files and see how that performs - my guess is that it works well, similar to the test I did with the raw dask array.

@Peter9192
Copy link
Contributor

Sounds quite similar to the behaviour I've observed in #968 (comment), and that conversation in the dask repo is interesting!

Not sure if what you mention here:

Right now, due to the concatenation of all files in the very beginning of the chain all of this is done in parallel which leads to the "backpressure" described in the issue.

is actually the backpressure that they're talking about. The way I understand it, "backpressure" would be an analogy to a channel flow (or any kind of flow, really) where a bottleneck at one point causes a reduction of the inflow from upstream, and this feature is actually desireable. What you describe sound more like "over-pressure" (see the reference they pointed to here), i.e. too much data flowing in from upstream causing an overload on the bottleneck, undesireable..

@schlunma
Copy link
Contributor Author

Yes @Peter9192, you are 100% correct!

@bouweandela
Copy link
Member

Looks like the linked dask issue finally got fixed, so it would be interesting to see if our problem also disappears with the next release of dask.

@zklaus
Copy link

zklaus commented Jul 5, 2021

Yes, though note that that specific issue is in distributed. If I recall correctly, we might be using different dask schedulers, so that the implications might be weaker.

@bouweandela
Copy link
Member

@fnattino Could you also try if this is solved now? Possibly with #1714?

@fnattino
Copy link

Hey @bouweandela, I have run the recipe above with 4 cores and 8GB memory using #1714 and it completes without errors!

@zklaus
Copy link

zklaus commented Nov 10, 2022

That's great to hear. Note that there have been recent, massive improvements in dask/distributed on these things. C.f., e.g., dask/distributed#7128 and related issues and PRs in the dask and distributed changelog.

When we get serious about it, I would suggest to pin at least dask >=2022.10.0 (which will also pin distributed correspondingly).

@schlunma
Copy link
Contributor Author

The recipe runs now with the current version of the tool. I could also get a significant speed up by using a distributed scheduler with #2049. Closing this now, feel free to re-open if necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
iris Related to the Iris package question Further information is requested
Projects
None yet
Development

No branches or pull requests

6 participants