Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate intra-node IPC with shared memory #6267

Closed
Hoeze opened this issue May 30, 2020 · 15 comments
Closed

Accelerate intra-node IPC with shared memory #6267

Hoeze opened this issue May 30, 2020 · 15 comments
Labels
discussion Discussing a topic with no specific actions yet

Comments

@Hoeze
Copy link

Hoeze commented May 30, 2020

When implementing e.g. a data loading pipeline for machine learning with Dask, I can choose either:

  • threaded scheduler: Only fast, when GIL is released
  • forking scheduler: Only fast, when the data calcuation is very CPU intense compared to the result size.

I often face the issue that the threaded scheduler effectively uses only 150% CPU, no matter how many cores it gets, because of python code that does not parallelize.
The forking scheduler sometimes works better but only if the data loading is very CPU-intense.

Recently, I tried Ray and it could speed up some of my prediction models by 5-fold due to some reason.
I'm not 100% up to date with the latest development in Dask, but AFAIK Dask serializes all data when sending it between workers. That's why I assume the huge speed difference is due to the shared-memory object store Plasma that allows zero-copy transfers of Arrow arrays from the worker to Tensorflow.

=> I'd like to share two ideas how Plasma or Ray could be helpful for Dask:

  1. Have a shared object cache between all threads/forks in dask/cachey
  2. Shared memory communication:
    Allow producer to calculate data and consumer to read it without (de)serialization or copying

Related issues:

@mrocklin
Copy link
Member

What is the workload?

Have you tried the dask.distributed scheduler? You can set up a system with sensible defaults by running the following:

from dask.distributed import Client

client = Client()

#  then run your normal Dask code

https://docs.dask.org/en/latest/scheduling.html#dask-distributed-local

@mrocklin
Copy link
Member

In general a system like Plasma will be useful when you want to do a lot of random access changes to a large data structure and you have to use many processes for some reason.

In my experience, the number of cases where this is true is very low. Unless you're doing something like a deep learning parameter server on one machine and can't use threads for some reason there is almost always a simpler solution.

When implementing e.g. a data loading pipeline for machine learning with Dask, I can choose either:

A data loading pipeline shouldn't really require any communication, and certainly not high speed random access modifications to a large data structure. It sounds like you just want a bunch of processes (because you have code that holds the GIL) and want to minimize data movement between those processes. The dask.distributed scheduler should have you covered there, you might want to add the threads_per_worker=1 (or 2) if you have a high core machine.

@jakirkham
Copy link
Member

In addition to what Matt said, we have tended to keep Dask's dependencies pretty lightweight when possible. My guess is if we were to implement shared memory it would either involve multiprocessing.shared_memory (added in Python 3.8 with a backport package) or using UNIX domain sockets ( dask/distributed#3630 ) (as noted above).

That said, if serialization is really a bottleneck for you, would suggest you take a closer look at what is being serialized. If it's not something that Dask serializes efficiently (like NumPy arrays), then it might just be you need to implement Dask serialization. If you have some simple Python classes consisting of things Dask already knows how to serialize efficiently, you might be able to just register those classes with Dask. It will then recurse through them and serialize them efficiently.

Additionally if you are Python with pickle protocol 5 support and a recent version of Dask, you can get efficient serialization with plain pickle thanks to out-of-band pickling ( dask/distributed#3784 ). Though you would have to check and make sure you are meeting those requirements. This may also require some work on your end to ensure your objects use things that can be handled out-of-band by either wrapping them in PickleBuffers (like in the docs) or using NumPy arrays, which have builtin support.

@dhirschfeld
Copy link

plasma might be ideally suited for e.g. shuffling operations, #6164

@mrocklin
Copy link
Member

mrocklin commented Jun 12, 2020 via email

@jakirkham
Copy link
Member

plasma might be ideally suited for e.g. shuffling operations, #6164

Though if you have thoughts on how plasma would help in that issue, please feel free to suggest over there. I'm sure people would be interested to hear 😉

@dhirschfeld
Copy link

dhirschfeld commented Jun 12, 2020

In the context of distributed you could have a plasma store per node and instead of having workers communicating data directly, have them send the data to the plasma store on the receiving node and only send the guid / unique reference directly to the worker. All workers on that node would then have access to that data (by passing around the guid) without having to copy or deserialize the data.

I think that could have pretty big performance benefits for a number of workloads. IIUC that's basically what ray does.

To illustrate the benefits of Plasma, we demonstrate an 11x speedup (on a machine with 20 physical cores) for sorting a large pandas DataFrame (one billion entries). The baseline is the built-in pandas sort function, which sorts the DataFrame in 477 seconds. To leverage multiple cores, we implement the following standard distributed sorting scheme...

Anyway, it might be a very big piece of work, so not something I could invest time in. I thought I'd mention it as an option though if people are considering big changes to improve performance.

@mrocklin
Copy link
Member

mrocklin commented Jun 12, 2020 via email

@mrocklin
Copy link
Member

mrocklin commented Jun 12, 2020 via email

@jakirkham
Copy link
Member

cc @rjzamora @madsbk (in case this is of interest)

@alexis-intellegens
Copy link

Has there been any further discussion on the multiprocessing shared memory implementation? I also run dask on single machines with high core counts and have read-only datastructures that I want shared.

@Hoeze
Copy link
Author

Hoeze commented Apr 2, 2021

@alexis-intellegens the ray depelopers created a Dask scheduler for this called dask-on-ray.
I'd recommend you to try this one, it magically dropped my memory usage by an order of magnitue.
Note that you may need to use sth like this:

# don't do this:
dask.compute(dask_fn(large_object))
# instead do this:
large_object_ref = ray.put(large_object)
dask.compute(dask_fn(large_object_ref))

ray will automatically de-reference the object for you.

@alexis-intellegens
Copy link

Very interesting! I'll give it a go. Thanks @Hoeze

@alexis-intellegens
Copy link

Out of curiosity, what were to happen if I made a shared memory object (via Python 3.8 multiprocessing) and tried to access it in dask workers? I'll try it later today.

@GenevieveBuckley GenevieveBuckley added the discussion Discussing a topic with no specific actions yet label Oct 13, 2021
@jcrist
Copy link
Member

jcrist commented Oct 14, 2021

Out of curiosity, what were to happen if I made a shared memory object (via Python 3.8 multiprocessing) and tried to access it in dask workers? I'll try it later today.

That should work, they'd pickle as references to the shared memory buffer and be remapped in the receiving process (provided all your workers are running on the same machine, otherwise you'd get an error). In general I think we're unlikely to add direct shared memory support in dask itself, but users are free to make use of it in custom workloads using e.g. dask.delayed. So if you have an object you want to share between workers, you can explicitly build this into your dask computations yourself (using either multiprocessing shared_memory or something more complicated like plasma).

As stated above, shared memory would make the most sense if you have objects that can be mapped to shared memory without copying (meaning they contain large buffers, like a numpy array) but also still hold the GIL. In practice this is rare - if you're using large buffers you also probably are doing something numeric (like numpy) in which case you release the GIL and threads work fine.

Closing.

@jcrist jcrist closed this as completed Oct 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discussion Discussing a topic with no specific actions yet
Projects
None yet
Development

No branches or pull requests

7 participants