Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Learning how to manage data on dask workers #89

Open
consideRatio opened this issue Oct 14, 2021 · 4 comments
Open

Learning how to manage data on dask workers #89

consideRatio opened this issue Oct 14, 2021 · 4 comments

Comments

@consideRatio
Copy link
Member

@espg have run into memory management issues, and this is my attempt to summarize the problem we need to resolve for a proper solution of the issue.

This is my attempted problem description

We have troubles with memory management on our dask workers. Our dask workers are scheduled to run tasks, and sometimes multiple tasks are run in parallel on each worker. Each task completed will have some associated result that we want to store in memory, and then at a later time collect it. This collection is typically done when all of a larger set of tasks have completed.

The issue is that our worker's don't retain the tasks result reliably, the results stored in memory are considered unmanaged - and they end up being garbage collected.

When we collect results from workers, we want that memory to be released.

I think these are key questions

  • How do we make our memory be considered managed?
    • Note that the assumption is that if it is considered managed memory, we won't have it released, and then we won't run into various issues.
    • Also note that we are using vaex dataframes and would prefer to do that, but if we are forced to use pandas dataframes then that is how it is. At this point though we ask ourselves if we can make a vaex dataframe to be considered managed memory to be retained and then released at collection.
  • How do we release the managed memory after we gather it?
@espg
Copy link
Contributor

espg commented Oct 15, 2021

@consideRatio some (but not all) of this maybe related to cluster structure =/ Some interesting data points:

  • Running on the 'massive' 256GB instances, I'm less likely to run into the streaming error. Perhaps the networking is more stable between the workers and the notebook instance?
  • I think that these are also more likely to run to completion later in the evening. These are 'spot' instances, so we don't have hard guarantees on availability, quality, etc? I'm wondering if we're running into issues related to them being on demand?
  • Since the behavior is different running on the x1 instances, is there anything in the configuration that might be able to be changed? The 'massive' units are just a tad too small, otherwise I would just run on them. Is there a 32xlarge version of what we've been using that is the same type? Curious if that would work, even without having the ssd attached...

@fjetter
Copy link

fjetter commented Oct 27, 2021

Can you provide an example of how you create your data? Typically the only thing you'll need to do for a result to be retained by the worker and for it to be considered managed memory is to return the data in the task you are running. The output type is pretty much irrelevant. To use the data, just pass along the key to another future and dask takes care that it is passed forward, e.g.

def a_lot_of_data(*args, **kwargs):
    return ["0" * 2**30] + ["1" * 2**30]
fut = client.submit(a_lot_of_data)

def data_on_worker(dask_worker):
    return list(dask_worker.data.keys())

client.run(data_on_worker)

def use_the_data(data):
    return sum(map(len, data))

res_f = client.submit(use_the_data, fut)
res_f.result()

del fut, res_f # this will release the data again (not shown on the GIF)

Screen Recording 2021-10-27 at 14 51 13

@espg
Copy link
Contributor

espg commented Oct 27, 2021

@fjetter Thanks for checking in on this, we appreciate it :-) I think I've started to get a better handle on how dask manages memory, at least some of the time. Initially, we were downloading files to the worker, using h5py to read the data to arrays, and then dumping the data to vaex dataframes before gathering the data (i.e., some of the code in dask/distributed#5430). The vaex dataframes were showing up as unmanaged memory and had poor performance when calling gather because of object introspection, so now we serialize the vaex dataframes to arrow tables before calling gather, and arrow tables show up as managed memory like we would expect.

We could load the data into dask-pandas dataframes instead; the main reason that we've been using vaex inside of dask distributed is because the export to parquet function in vaex lets us write parquet files in the apache hive hierarchy:

# list is a list of lists, with the inner list being futures that return an arrow table
for thing in tqdm(lst):
    futures = thing.tolist()
    # some returns are NoneType; filter these out
    quiver = list(filter(None, swarm.gather(futures, errors='skip', direct=True)))
    # reclaim memory on the worker-- this works fine
    for future in futures:
        future.cancel()
    for arrow in quiver:
        big_list.append(vaex.from_arrow_table(arrow))

data = vaex.concat(big_list)
# midx is mortons index; it is variant of healpix flat spacial index
# a value might be '4112412423423141333'
data['shards'] = data03.midx // 10**12
data03['chunk'] = data03.midx // 10**16
# this is cutting the tree at two different levels
# shard is 4112412
# chunk is 411

def write_data(arrowtable):
    vaexDF = vaex.from_arrow_table(arrowtable)
    del vaexDF['chunk']
    # this function is the reason that we use vaex
    vaexDF.export_partitioned('s3://geostacks/icesat_2/{subdir}/atl06.parquet', by=['shards', 'cycle'])

# Here's how data is written    
data.select(data.chunk == 411)
# partial subset based on chunk
tmp = data.to_arrow_table(selection=True, parallel=True)
# give to worker to write out-- this would be wrapped in a loop
example = client.submit(tmp, direct=True)
task = client.submit(write_data, example)

# output on first run for cycle 6
# s3://geostacks/icesat_2/shards=4111111/cycle=6/atl06.parquet
# s3://geostacks/icesat_2/shards=4111112/cycle=6/atl06.parquet
# ...
# s3://geostacks/icesat_2/shards=4114444/cycle=6/atl06.parquet

# output on next run for cycle 7
# s3://geostacks/icesat_2/shards=4111111/cycle=7/atl06.parquet
# s3://geostacks/icesat_2/shards=4111112/cycle=7/atl06.parquet
# ...
# s3://geostacks/icesat_2/shards=4114444/cycle=7/atl06.parquet

In general I'm open to suggestion on how to do this better. We are casting back and forth from arrow tables to vaex dataframes a fair bit, but the operation is quick. If there's a way to load all the data into a dask backed pandas array on the workers, and then export to the hive arrow format across the cluster using custom defined partitions, that would be great-- I just haven't seen anything about hierarchal export to hive in the dask documentation.

One issue that we have been running into on memory management is that the scheduler will try and 'fill up' a worker, and this often crashes our workers because the memory requirements don't fit within the scheduler logic. To give a concrete example--

  • Workers have 128GB ram
  • 'Chunk' arrow arrays vary in size from 1GB to 40GB

If we loop thru our list and scatter objects, we have a really good chance of crashing the workers. Dask seems to make a copy of any object that you scatter to it; so a 40GB object placed on a worker will spike memory usage to ~85GB. Once the scatter operation is complete, memory usage drops back down to 40GB. If we run the above in a loop, and there are two 40GB objects to scatter, the following happens:

  • Dask puts first object on worker 1. Memory usage spikes to ~80GB, then drops to 40GB.
  • Dask gets second scatter request for object of 40GB. It checks the following logic
    • Round robin assignment by thread. Worker 1 has 32 cores (threads) so it wants to assign to this worker until all threads are occupied
    • Occupancy check: is memory usage above 80%? No, so dask wants to assign to this worker
    • Occupancy check again: can worker 1 fit both objects? 40GB + 40GB is 80GB, which is below 128 and below 80%
    • Dask decides to send object to worker 1
  • Worker 1 has 40GB object; second object is sent; second object memory usage spikes to 85GB. Combined memory is now ~125GB; dask kills worker.

To address this, we've been trying to assign workers manually as described in described in dask/distributed#5451 -- but that only is able to assign tasks to worker based on the number of resident objects, not the amount of space used. From the code example that you posted above, it looks like you don't define the variable dask_worker; so I assume that this is a variable that is defined and updated on the worker automatically. Does the dask_worker object have the memory usage of that worker contained in it somewhere that we could query programmatically?

@fjetter
Copy link

fjetter commented Oct 28, 2021

The vaex dataframes were showing up as unmanaged memory and had poor performance when calling gather because of object introspection, so now we serialize the vaex dataframes to arrow tables before calling gather, and arrow tables show up as managed memory like we would expect.

I believe the problem here might be how we measure memory. We're relying on all objects to either implement a __sizeof__ method or to implement a sizeof dispatch (see here. Neither might be the case for vaex, I'm not too familiar with the code.

Only what dask measures will be reflected as managed memory. However, for the unmanaged memory we'll actually inspect the RSS memory needs of the process your running in. measured/managed minus RSS is unmanaged (roughly speaking; see http://distributed.dask.org/en/latest/worker.html#using-the-dashboard-to-monitor-memory-usage for details).

We could load the data into dask-pandas dataframes instead; the main reason that we've been using vaex inside of dask distributed is because the export to parquet function in vaex lets us write parquet files in the apache hive hierarchy:

Are you familiar with https://docs.dask.org/en/latest/generated/dask.dataframe.to_parquet.html ? partition_on=['shards', 'cycle'] should do the same. See also https://docs.dask.org/en/latest/generated/dask.dataframe.from_delayed.html for delayed->DataFrame conversion

From the code example that you posted above, it looks like you don't define the variable dask_worker;

Client.run is a special function for debugging purposes. It injects the worker as an argument and you can debug your setup if necessary. I would consider this advanced, though. The function runs out of band on the worker event loop. I don't recommend to do this for any actual computation since that will cause instabilities. Ordinary payload computations should be run by using Client.submit and they'll be run in a Thread. If you need the worker object in there, for whatever reason, use get_worker instead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants