Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialise data on demand #5900

Open
crusaderky opened this issue Mar 4, 2022 · 5 comments
Open

Deserialise data on demand #5900

crusaderky opened this issue Mar 4, 2022 · 5 comments
Labels
discussion Discussing a topic with no specific actions yet feature Something is missing memory

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Mar 4, 2022

Issue

As of today, data is eagerly deserialised as soon as it is un-spilled and as soon as it is received from another worker.
This has the benefit that we know that all data in Worker.data.fast is already deserialised when we need it and that Worker.data can be treated as an opaque MutableMapping.
However, this design simplicity comes at a very steep cost:

  • When data is retrieved from the spill disk just for the purpose of sending it from worker A to worker B, then there's an unnecessary unpickle->pickle loop on worker A
  • After a rebalance(), data is simply parked on a worker and there is a good chance it won't be needed on the same worker anytime soon. It's just as likely to be used by a task on the worker than it is to end up spilled or moved somewhere else again. This last point is mitigated by the fact that rebalance() uses a least recently inserted policy and a dead zone threshold around the cluster mean memory usage, meaning it won't pinball the same keys around.
  • Soon, the Active Memory Manager will rebalance every few seconds.

Proposed design

Whenever a Worker acquires data in any way other than task completion, meaning

  • over the network, through get_data
  • over the network, because of the AMM
  • over the network, because of the current implementation of rebalance(), replicate(), or scatter(broadcast=True) (soon to be reimplemented on top of the AMM)
  • from the disk, through unspilling

it should not unpickle the data immediately. Instead, it should keep it pickled. Worker.data.fast will contain a mix of pickled and unpickled data.
Whenever a task compute() starts, it will receive a mix of pickled and unpickled inputs. It will unpickle the pickled inputs and put them back into data.fast, replacing the pickled version.

Or if you prefer: the current two mappings data.slow and data.fast should be replaced by data.slow, data.fast_pickled and data.fast_unpickled; compute() will always read and write from data.fast_unpickled, which will internally read and delete from the other two; whereas network receive and send will always read and write from data.fast_pickled which will internally read and delete on the other mappings. This also carries the benefit that all pickle/unpickle work, be it due to network activity or spilling/unspilling, can now be encapsulated in a single module.

Challenges

  • Care should be taken for when two tasks start at the same time on a multithreaded worker and require the same input; we should avoid unpickling twice.
  • We should figure out a way to isolate and display in the GUI serialisation/deserialisation time.
  • Care should be taken for the edge case where sizeof(data) > memory_limit * target, which triggers immediate spilling to disk. If the data was acquired for the purpose of computing a task, this would likely mean going to an unnecessary spill->unspill cycle.
@gjoseph92
Copy link
Collaborator

Overall this seems like a good objective and valuable thing to do.

Other things that are nice about keeping data serialized longer:

  • Its memory use is nearly free to measure perfectly (compared to sizeof traversing a complex object), and can't be double-counted (if multiple objects reference the same thing)
  • Data could easily be serialized and compressed in memory. In-memory compression could be a step to take prior to spilling to disk (since it should probably be written to disk compressed anyway).

One concern about the proposed design:

network receive and send will always read and write from data.fast_pickled

This sounds like the on-disk bytes will be read fully into memory before sending them. I would really like to see a design where we can stream data from disk directly to the network without copying the entire thing into memory (via os.sendfile ideally for zero userspace copies, but even just having a userspace buffer would be a vast improvement over slurping the entire file, which is potentially hundreds of MBs). Without doing this, I think we'll still have the issues I mentioned in #4424 (comment): one worker asking another for data can put the sender under memory pressure, since it has to read all the data from disk into RAM before transferring it. Whether that data is pickled or not generally doesn't make that much of a difference for the amount of RAM.

This is a big problem, because if the sender is already approaching its limit, it currently has two bad options:

  1. Load all the spilled data into memory (as pickled bytes or a Python object, doesn't make much difference). Likely disrupt things (by pausing, spilling other keys to compensate, etc.), maybe even run out of RAM and die.
  2. Refuse to send the requested data until memory goes down. At best this delays computation a lot, at worst it deadlocks.

A combination of these two is what we currently do when paused (limit to one outgoing transfer at a time, and un-spilling that key can still maybe kill the worker). But if sending spilled data didn't use any extra RAM, we wouldn't have to choose between bad options. We could just send the data, basically for free.

This is why I don't like the idea of data.slow being a mapping interface (as in Mapping[str, bytes]). We shouldn't abstract away the reading from disk, because:

  1. Sometimes you need the contents of the file (input for a computation), sometimes you need the file itself (input for sendfile over the network)
  2. __getitem__ and __setitem__ need to be async anyway Asynchronous Disk Access in Workers #4424. There is no async __setitem__, so we already can't technically follow the mapping interface.

Maybe Mapping[str, typing.BinaryIO] could be more appropriate? But still awkward. Figuring out the right interface for this seems like a secondary task to figuring out the right behavior #4424 (comment).

I recognize that comms may also require an additional streaming/file-like interface (and/or a per-comm sendfile implementation) to support this. But I think it's a critical thing to support for spilling to actually work.

A less-common corollary to think about is whether we should be able to receive data straight to disk, without holding the full pickled bytes in data.fast_pickled. Not sure this will come up as much, but it would allow a worker that's under memory pressure to still fetch dependencies/receive rebalanced keys without increasing its RSS, since the pickled bytes would be streamed from the socket straight to disk.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Mar 8, 2022

As discussed in #4424, I don't think an async setitem is necessary. Its only benefit would be to be able to block a task from transitioning from state=running to state=memory when the disk writes cannot keep up with the task completion rate, which is something that pause() already deals with using a, IMHO, more holistic approach than a simple fixed-size queue of writes, since it also considers unmanaged memory.

@crusaderky crusaderky added the discussion Discussing a topic with no specific actions yet label Mar 11, 2022
@jakirkham
Copy link
Member

Sorry Idk to what extent this is off topic given the discussion above, but happy to move to a new issue if we'd like to pursue further. Though will just say this small bit.

If we do go down the road of sending files between workers (say as a way of sending spilled data between workers), as of Python 3.8 zero-copy operations are used to transfer files (when available). IOW the file data would not move through userspace, but would be handled directly by the kernel. Thus saving the Worker the memory cost of the transfer.

@jakirkham
Copy link
Member

cc @madsbk (who did similar work in dask-cuda with JIT unspilling)

cc @shwina (as we were discussing improved memory usage recently)

@crusaderky
Copy link
Collaborator Author

New insights here #7351 (comment) suggest that this proposal may offer poor cost/benefit ratio.

Note that the data was generated on a cluster of 5 workers and we should rerun the test on a much larget scale to confirm it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet feature Something is missing memory
Projects
None yet
Development

No branches or pull requests

3 participants