Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream spilled data directly from disk to other workers without loading into memory (sendfile-style) #5996

Open
gjoseph92 opened this issue Mar 24, 2022 · 5 comments
Labels

Comments

@gjoseph92
Copy link
Collaborator

This has been discussed in a few other places, but I think it's important enough (and a separate enough task) to warrant its own issue for tracking purposes. Related to:

Once #5900 is done, when one worker requests data from another, and that data is currently spilled to disk, we should not require slurping all the data into memory just to send it. Ideally, we'd stream the serialized bytes from disk directly over the network socket, without copying into userspace at all, via something like socket.sendfile/sendfile(2). If this is not deemed possible, copying into a small userspace buffer and doing incremental sends would still be an improvement (but not ideal).

(As a bonus, it could be nice if workers could also receive data directly to disk. This might let us keep fetching dependencies even when workers are under memory pressure. This might make computations complete faster when workers are under memory pressure, but probably won't make the difference between them succeeding/failing like the sending side does, so it's less important for now.)

The biggest challenge is that the comms interface doesn't currently support streams, only individual messages. The get_data response can also contain multiple keys, which further complicates the interface.

From a technical perspective, doing sendfile over a socket should be easy. The challenge is just around making that feasible to do with our comms interface.

I believe this is one of the highest-impact changes we can make to improve memory performance and reduce out-of-memory failures. There are a few flaws currently with spill-to-disk, but to me this is the biggest: transfer-heavy graphs basically work against spilling to disk, because workers are currently un-spilling some/most of the data they just spilled in order to send it to their peers.

cc @fjetter @crusaderky

@hayesgb hayesgb mentioned this issue Mar 24, 2022
3 tasks
@crusaderky
Copy link
Collaborator

(As a bonus, it could be nice if workers could also receive data directly to disk. This might let us keep fetching dependencies even when workers are under memory pressure. This might make computations complete faster when workers are under memory pressure, but probably won't make the difference between them succeeding/failing like the sending side does, so it's less important for now.)

This would be in violation of LRU time locality.
If workers are currently fetching data without actually expecting to use it any time soon, I think the correct approach would be to delay the copy, not to store the output to disk.

There are two notable exceptions where time locality is not a thing - e.g. data is simply parked on the worker:

  1. rebalance. However, rebalance will never move data towards a worker that is above 60% memory usage (distributed.worker.rebalance.recipient-max). This will not change after it is reimplemented on top of AMM.
  2. retire_workers. The realistic use case here is:
    a. the whole cluster is very heavy with memory
    b. one of the workers gets stuck in paused state due to extreme amounts of unmanaged memory (>80%)
    c. the paused worker is thus auto-retired (not yet implemented: Restart paused workers after a certain timeout #5999)
    d. spilled data from the paused worker needs to be copied to other workers, which are however themselves very heavy.

I think it would make sense to implement spill-upon-receive as you suggest, but it would only serve this very last, and rather niche, use case. As for implementation, we could write a way to tell the SpillBuffer "add this key/value pair to the LRU heap, but instead of giving it max time weight (e.g. spill it after everything else in data.fast), give it a time weight that is lower than that of stores triggered by compute (e.g. spill it after everything you got from the AMM, but before anything you got from compute)". This could translate into immediate eviction, much like it already happens for values whose individual memory weight is larger than target * memory_limit.

@jakirkham
Copy link
Member

jakirkham commented Apr 9, 2022

Would be tempted to just use socket.makefile to convert the socket object into something file-like and then use shutil.copyfile to pass the file and socket file-like object to. As noted in this Python doc on efficient copy operations, this already uses the appropriate and most efficient path on each OS (sendfile on Linux, fcopyfile on macOS, and on Windows does the fallback described above). The benefit being we get the performance benefits described above without the added maintenance associated with rolling our own implementation.

@crusaderky
Copy link
Collaborator

Note that dask currently dynamically downgrades the pickle protocol depending on the interlocutor.
Supporting this would require writing a special code path when something was spilled with e.g. protocol=6 and needs to be sent to a client that only supports protocol=5. Since using mismatched major python versions is an extremely bad idea to begin with, I'd suggest dropping the feature entirely as a prerequisite for this change.

@fjetter
Copy link
Member

fjetter commented Nov 7, 2022

FYI I think with #7217 we'd get much richer information about how much this change would gain us before implementing this

@crusaderky
Copy link
Collaborator

New insights here #7351 (comment) suggest that this proposal may offer poor cost/benefit ratio.

Note that the data was generated on a cluster of 5 workers and we should rerun the test on a much larget scale to confirm it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants