Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locks and chunked storage #5083

Closed
d-v-b opened this issue Jul 16, 2021 · 29 comments
Closed

Locks and chunked storage #5083

d-v-b opened this issue Jul 16, 2021 · 29 comments

Comments

@d-v-b
Copy link
Member

d-v-b commented Jul 16, 2021

dask.array.store takes an optional argument, lock, which (to my understanding) avoids write contention by forcing workers to request access to the write target before writing. But for chunked storage like zarr arrays, write contention happens at the level of individual chunks, not the entire array. So perhaps a lock for chunked writes should have the granularity of the chunk structure of the storage target, thereby allowing the scheduler to control access to individual chunks. Does this make sense?

The context for this question is my attempt to optimize storing multiple enormous dask arrays in zarr containers with a minimum amount of rechunking, which makes the locking mechanism attractive, as long as the lock happens at the right place.

cc @jakirkham in case you have ideas about this.

@mrocklin
Copy link
Member

I don't know much about the store function (you're right that @jakirkham is maybe the right person there). But on the lock side you should be able to give the dask.distributed.Lock a unique identifier like the chunk ID and things should work ok. Dask locks are fairly cheap to create, even if you're making many of them.

@gjoseph92
Copy link
Collaborator

Is the chunk structure of your dask array the same as the chunk structure of your zarr array? If so, shouldn't each chunk only be written exactly once? Why would you need to prevent multiple tasks writing to the same chunk if there is only ever one task per chunk?

Having the chunk structures aligned generally seems like a good idea to me. But if they're not, what about https://zarr.readthedocs.io/en/stable/tutorial.html#parallel-computing-and-synchronization? I bet it would be only a few lines to write something equivalent to a zarr.sync.ProcessSynchronizer that used distributed.Locks internally.

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

Is the chunk structure of your dask array the same as the chunk structure of your zarr array?

Ideally, yes; typically, no. My dask arrays are really big (TB), so small chunks would be very inefficient for dask. But for storage (e.g., zarr), small chunks are ideal because we want to visualize the data with a chunk-aware visualization tool (neuroglancer). So I aim for a one-to-many relationship between dask chunks and zarr chunks, e.g. 512 x 512 x 512 on the dask side and 64 x 64 x 64 on the zarr side for uint8 data. This is all fine until I do something in dask that makes my chunks shrink (downsampling for making a multiresolution pyramid). If I downsample my 512-sized chunk by half 4 times, I end up with a 32-sized chunk, and that cannot be safely written to my 64-sized zarr chunks. Rechunking is a natural solution, but this moves data between workers and I would prefer to avoid this if possible. So I'm looking at locking as another approach.

Regarding the syncronizers in the zarr-python library, as I understand things the ProcessSynchronizer requires file system locking, which my storage backends (NFS and s3) don't support. So I need to do locking via the client instead. I actually think the distributed.MultiLock will work for me, but I need to test it.

@gjoseph92
Copy link
Collaborator

Rechunking is a natural solution, but this moves data between workers and I would prefer to avoid this if possible

You might want to try this again with the latest distributed version now that #4967 has been released. Some of these "dask is moving data around a lot" situations should happen a lot less.

I understand things the ProcessSynchronizer requires file system locking

Sorry, I was suggesting writing your own class with the same interface as ProcessSynchronizer—I'm assuming zarr will play nicely with that?

import distributed


class DistributedSynchronizer:
    """Provides synchronization using locks from Distributed.

    Parameters
    ----------
    prefix: string
        Prefix the names of all dask.Distributed locks with this.
    """
    def __init__(self, prefix: str = ""):
        self.prefix = ""

    def __getitem__(self, item):
        return distributed.Lock(self.prefix + str(item))
>>> synchronizer = DistributedSynchronizer()
>>> z = zarr.open_array('data/example', mode='w', shape=(10000, 10000),
...                     chunks=(1000, 1000), dtype='i4',
...                     synchronizer=synchronizer)

@mrocklin
Copy link
Member

Rechunking is a natural solution, but this moves data between workers and I would prefer to avoid this if possible

You might want to try this again with the latest distributed version now that #4967 has been released. Some of these "dask is moving data around a lot" situations should happen a lot less.

Yeah, big +1 on this. I think that the recent memory scheduling things may have had a large positive impact on rechunking.

@mrocklin
Copy link
Member

I actually think the distributed.MultiLock will work for me, but I need to test it.

I would have expected a vanilla dask.distributed.Lock per chunk would have done the job.

with dask.distributed.Lock(chunk_id):
    # write chunk 

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

the vanilla lock works in the special case where each worker needs to lock just 1 chunk, but in the more general case when a worker attempts to write a region that spans multiple chunks, that worker must iteratively lock the chunk files spanned by the region, which introduces a potential deadlock due to the iteration, and I suspect the MultiLock addresses this (but I could be wrong).

@mrocklin
Copy link
Member

Ah, got it. That makes sense.

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

And I should add that I really like the idea of using the existing synchronizer api in zarr. I will look into implementing that.

@mrocklin
Copy link
Member

I do agree with @gjoseph92 though that rechunking seems maybe better. I would be curious to learn more about what is pushing you away from that path.

@gjoseph92
Copy link
Collaborator

worker must iteratively lock the chunk files spanned by the region, which introduces a potential deadlock due to the iteration, and I suspect the MultiLock addresses this (but I could be wrong).

I think the code I posted handles these issues, since I believe that's what the Zarr synchronization API is all about. It looks like you just have to tell Zarr how to get the lock instance to use for a particular chunk of the dataset, and it manages the rest.

@jakirkham
Copy link
Member

Ideally would probably try to rechunk to match a Dask Array to the chunks that the Zarr Array would have. This effectively causes Dask to figure out how to efficiently compute and write things out. Maybe it's worth discussing why that does not work for your use case?

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

I hope this illustrates why I'm trying to avoid rechunking.

I'm working with task graphs that look like this. A big (500 MB) piece of data enters at the bottom, then gets downscaled and saved recursively. The input chunks need to be big because I have a lot of them. I'm working on datasets that exceed 10 TB, so chunks need to be large to avoid painful task graph creation times.

image

Each downscaling step reduces the chunk size, which is bad when the dask chunk size dips below my zarr chunk size. Now my workers can't do chunk-aligned writes. If I solve this problem with rechunking, the above task graph transforms to this:

image

As you can see, rechunking introduces data dependencies between the end leaves of the task graph and breaks the nice parallelism we had before. I suspect this is the cause of some memory struggles my workers are experiencing (and these issues persisted and / or got worse on the latest version of dask).

I suspect that locking zarr chunks lets me keep the first graph. Although locking zarr chunks would break parallelism, it should do so in a way that only increases runtime and not memory use. Hence my interest.

@mrocklin
Copy link
Member

mrocklin commented Jul 20, 2021 via email

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

I found that my memory issues got worse, not better, after 2021.07.0

@jakirkham
Copy link
Member

Have you tried main as well?

@gjoseph92
Copy link
Collaborator

If it's not too much trouble, I'd love to see a before/after performance report for 2021.07.0. Did it get worse both with and without the final rechunk?

It's true that it's a bit silly to move the data between workers just for the purpose of concatenating and storing it. Having workers synchronize writing the data they already have is simpler in many ways. I think we just all want this rechunking to perform better, so we're eager to understand why it's not working 😄

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

It's not too much trouble, working on it now (however, for some reason the performance report doesn't render correctly for me on 2021.07.0)

@jakirkham
Copy link
Member

however, for some reason the performance report doesn't render correctly for me on 2021.07.0

xref: #5097

@d-v-b
Copy link
Member Author

d-v-b commented Jul 20, 2021

performance report for 2021.06.0
performance report for 2021.07.0 (doesn't render properly for me but you can see the summary)

In lieu of the report, I can tell you that the computation run on 2021.07.0 has a tendency to spill a massive amount of data to disk, while this behavior doesn't occur on 2021.06.0.

@gjoseph92
Copy link
Collaborator

Dang. The performance report for 06 looks pretty good to me, so I'd really like to see what happened with 07. Are these both with or without a final rechunk?

Does that spilling problem happen just at the end, or throughout? A few screenshots of the dashboard might even help until we figure out the performance report issue.

@d-v-b
Copy link
Member Author

d-v-b commented Jul 21, 2021

Spilling typically happens towards the middle of the computation. Also, I should note that sometimes 2021.07.0 gets lucky and completes the computation. So I strongly suspect that there's something stochastic going on with task ordering, and the changes in the latest version of distributed bias towards a sub-optimal task order.

Each layer of rechunking clusters the input tasks into clusters, and the clusters get wider (i.e., depend on more input tasks) as the rechunking requires more data convergence. So the ideal task ordering would be to schedule tasks such that these "rechunking clusters" are completed sequentially. Somehow the latest version of distributed is not finding this ideal task ordering, whereas older versions did OK.

I will try to make a reproducer that doesn't depend on my own code / data. And I will try main as per the suggestion from @jakirkham

@mrocklin
Copy link
Member

mrocklin commented Jul 21, 2021 via email

@d-v-b
Copy link
Member Author

d-v-b commented Jul 21, 2021

reproducer: https://gist.github.com/d-v-b/f4cf44e42f4e9d9cfa2d109d2ad26500

performance reports:

2021.06.0

2021.07.0

main

@gjoseph92
Copy link
Collaborator

Brief update: @d-v-b I've run your reproducer locally and I'm seeing the same thing. I was also able to get non-broken performance reports: 2021.06.2, 2021.07.0.

Downscaling to shape = (2048, 2048, 4096), both versions run in the exact same time on my machine, but have very different scheduling patterns:

2021.06.2.mp4
2021.07.0.mp4

In the new version, you see two instances where a full vertical bar of tasks is red (in memory). This didn't happen before. This is probably what's blowing out the memory on the cluster, since the whole dataset is briefly fully in memory.

Colored by worker assignment

I hacked up the task visualization to color tasks by worker (instead of status): 641f117. Note that tasks are already vertically ordered by priority.

2021.06.2-workercolor-larger.mp4
2021.07.0-workercolor-larger.mp4

We can see that the worker assignment pattern is very different at the start (exactly as we'd expect).

I'm not sure yet why this is happening. Because there's so much data shuffling involved, this is probably not a workload that benefits from running sibling tasks on the same worker. The question is why the task ordering feels different, when the changes in 2021.07.0 don't affect scheduling order at all, just worker selection.

The old worker selection assigned tasks such that it was almost guaranteed that two root tasks with a common downstream task would be on different workers, leading to lots of transfers. My vague feeling is that maybe those transfers were acting as a sort of backpressure, which slowed down the coarsen tasks. Perhaps now, the coarsen tasks can run faster, so they outpace the data_sinks? I have a feeling this theory is wrong though. I plan on looking into this more.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Jul 23, 2021
While looking into dask#5083 I happened to notice that the dashboard felt very sluggish. I profiled with py-spy and discovered that the scheduler was spending 20% of runtime calculaing `sum(map(len, group._dependencies)) < 5`! A quick print statement showed some task groups depended on 25,728 other groups (each of size 1). We can easily skip those.

I originally had this conditional in dask#4967 but we removed it for simplicity: dask#4967 (comment); turns out it was relevant after all!
@gjoseph92
Copy link
Collaborator

@d-v-b as I mentioned, this seems to be completely fixed by #5113. 222 seconds with that PR vs 219 seconds on 2021.06.2 on my machine. The scheduling pattern even looks different; with 2021.07.0 it would blast through all the zeros and data_source tasks right away before seeming to start much else. Now, watching the task stream, it looks the same as it did with 2021.06.2.

I'm a little surprised by this, since #5113 should be purely an optimization; it doesn't actually change the logic. Why did the scheduler being under load from those unnecessary __len__ computations lead to workers running out of memory? If work had just slowed down and workers had been more idle, I would have understood that. But it almost feels as though workers were going off and running tasks they shouldn't have while the scheduler was too busy to keep them in line—which shouldn't be possible.

Maybe the scheduler was too busy to tell workers to release keys, because either incoming or outgoing comms (or both) were blocked up by all this CPU time on the event loop? From the dashboard, I saw that all workers on 2021.07.0 had lots of tasks assigned (~800 per worker). So even with an absentee scheduler, they never ran out of things to do and kept piling up results, but rarely got the message to release these completed keys from memory?

mrocklin pushed a commit that referenced this issue Jul 24, 2021
While looking into #5083 I happened to notice that the dashboard felt very sluggish. I profiled with py-spy and discovered that the scheduler was spending 20% of runtime calculaing `sum(map(len, group._dependencies)) < 5`! A quick print statement showed some task groups depended on 25,728 other groups (each of size 1). We can easily skip those.

I originally had this conditional in #4967 but we removed it for simplicity: #4967 (comment); turns out it was relevant after all!
@d-v-b
Copy link
Member Author

d-v-b commented Jul 24, 2021

Great detective work @gjoseph92, thanks for tracking this down. I still need to test the timing / cluster load with the zarr synchronizer API + distributed lock idea, so maybe we keep this open until I have a chance to do that.

@d-v-b
Copy link
Member Author

d-v-b commented Aug 3, 2021

OK this can be closed. I tested chunk locking the zarr synchronizer API vs rechunking in one of my typical workloads. I observed that the computation time was about the same for both, but there was much less transfer time with chunk locking, as expected.

I do wonder if the docstring for dask.array.store should include a passage about the right kind of locking for chunked storage. If I understand things correctly, passing an instance of distributed.Lock to dask.array.store will lock the entire storage target and not just the chunks. This makes sense for hdf5 but not zarr. Or maybe there is some other natural documentation target for this stuff.

@jakirkham
Copy link
Member

Thanks Davis! 😄

Maybe we should open a new Dask issue with the documentation suggestions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants