Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Shared plasma #6503

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft

WIP: Shared plasma #6503

wants to merge 22 commits into from

Conversation

martindurant
Copy link
Member

A pyarrow-plasma based shared memory model for multiple workers on a single node. Objects that produce buffers with pickle V5 will have those buffers moved to plasma if larger than a configurable size, and deserialised by viewing those buffers (no copy). This is particularly good when scattering a large object to many/all workers.

Any thoughts, before making this more complete?

Closes #xxxx

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Jun 3, 2022

Unit Test Results

       15 files  ±  0         15 suites  ±0   6h 15m 21s ⏱️ - 16m 9s
  2 838 tests +  2    2 753 ✔️  - 1    83 💤 +2  2 +1 
21 026 runs  +10  20 074 ✔️ +3  950 💤 +6  2 +1 

For more details on these failures, see this check.

Results for commit 0bdc072. ± Comparison against base commit 6d85a85.

@martindurant
Copy link
Member Author

@jrbourbeau , I don't remember who you suggested would be interested in discussing this idea.

@jrbourbeau
Copy link
Member

I think @crusaderky @jakirkham @gjoseph92, among others, might find this interesting

@jakirkham
Copy link
Member

Yeah this is interesting.

Another option I was looking at previously was using mmap with Zict spilling ( dask/zict#51 ), which would allow for a lot of the same memory sharing between workers (as mmap'd files use memory in kernel space that is shared across processes by default).

cc @quasiben

@gjoseph92
Copy link
Collaborator

Thanks for trying this out @martindurant! I've wanted to do this for a long time. A couple general questions:

  1. To me, the main reason to use plasma is for the optimizations transferring buffers over the network. This would also replace a lot of our complicated, bug-prone data fetching logic with an efficient standalone subcomponent. Are you trying to do inter-worker communication here?
  2. Plasma is deprecated: How to switch plasma's memory allocator to jemalloc? apache/arrow#13195.
  3. Given all that, if you're not going to use plasma for inter-worker communication, what's the benefit to using plasma as opposed to just writing buffers to shared memory directly? I imagine the main one is ObjectID reference counting and garbage collection?

@jakirkham
Copy link
Member

Had held off asking the question of whether we should generally consider using sharded key value stores, but given Gabe's comment above am thinking it is now appropriate. Have we considered using a sharded-key value store for spilling/inter-worker communication?

Serialization would amount to handing a key and some buffers over to the store after any operation and retrieving them from any worker as needed. This would allow the data to be requestable by any worker (including the one that stored it; though we could shortcut that case). Also this could be more robust as it would be easier to recover state when restarting or after losing a worker. For CSP that have some builtin key-value store support, we could leverage that directly.

Anyways something to think about :)

@martindurant
Copy link
Member Author

Are you trying to do inter-worker communication here?

This is solely aimed at inter-worker and client-worker comms on a single node.

Deprecated

!? It doesn't say so at the head of https://arrow.apache.org/docs/python/plasma.html !! This was supposed to be a standout feature of the arrow ecosystem.

what's the benefit to using plasma

Yes, the idea was not to have to build another reference-counter, but stick to trusted technology.

Zict spilling

I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.

@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.

@gjoseph92
Copy link
Collaborator

This was supposed to be a standout feature of the arrow ecosystem

I understood it as something that the Ray folks built on top of pyarrow just for Ray, and then decided to upstream. The way Ray has used arrow and plasma has changed considerably over time, so I'm not too surprised if nobody else is maintaining it now (generic object serialization had exactly this lifecycle). I wouldn't be surprised if eventually Plasma moved into the ray project (and maybe even became an implementation detail without a public API). I agree the deprecation seems a bit unclear and undocumented, just saying it doesn't surprise me.

I never thought of plasma as a standout feature of the ecosystem, but rather the ability to implement something like plasma yourself was the point.

@jakirkham
Copy link
Member

jakirkham commented Jun 6, 2022

Zict spilling

I did consider where in the stack this was most appropriate. My thinking is, that many results on a worker in the data/memory-manager are only ever accessed by subsequent tasks on the same worker, so we should only move/copy to shm when we know it will be useful, and we only know this when serialisation is required.

@jakirkham , would be happy to consider the idea, I don't have an immediate picture of how it looks.

Yeah that's fair. Limiting to when communication is needed or memory is overwhelmed is probably good enough. One of the (admittedly obvious) benefits of shared memory is if multiple workers on the same node need the same object in memory, we only need to pay that cost once. So moving the data to a mmap'd file before sending would keep memory usage more consistent

Have a rough sketch of this from a couple months back. Though it's likely missing things. Added to this draft PR ( #6516 ). Some things we may want to build on top of that (though need not be in that PR) include sending serialized data ( #5900 ) (maybe just adding custom serialization for mmap'd files would be enough?). Using sendfile (or similar when unavailable) to send data between workers without needing to load it in memory immediately ( #5996 ).

Also should add am not attached to this or any other approach. Just think there is some benefit to leveraging shared memory on nodes. Think you and I have discussed this in several issues over the years. So happy with any improvement in this space 🙂

@martindurant
Copy link
Member Author

@jakirkham , your mmap thing certainly has the advantage of being a really small change! I would say that there is no conflict with the kind of idea here, and I wonder whether yours has any downside. Still, it's a different use case.

Here is a tangentially interesting article on how fast raw throughput in IPC pipes can be on linux: https://mazzo.li/posts/fast-pipes.html , which is yet another part of the puzzle.

@jakirkham
Copy link
Member

cc @crusaderky

@martindurant
Copy link
Member Author

Perhaps some of the people in this thread can have a quick brainstorm after tomorrow's dev team meeting or later in the week. I do believe that this is an important optimisation for part of our user base, and apparently relatively simple to implement in a number of different ways. So we just need to agree on the best approach(es).

@jakirkham
Copy link
Member

Feel free to send me an invite :)

@github-actions
Copy link
Contributor

github-actions bot commented Jul 11, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 30m 57s ⏱️ + 6m 11s
  3 231 tests +  5    3 142 ✔️ +3    84 💤 +  1    5 +  1 
23 887 runs  +35  22 943 ✔️ +4  926 💤 +17  18 +14 

For more details on these failures, see this check.

Results for commit 2cfb807. ± Comparison against base commit cff33d5.

♻️ This comment has been updated with latest results.

@martindurant
Copy link
Member Author

Created https://github.com/orgs/dask/projects/3/views/1 with very draft project plan for shm. All thoughts appreciated!

@martindurant
Copy link
Member Author

Now does worker-worker too (which is the important case!).

Notes:

  • required passing through the serialisations to the workers in LocalCluster. This should probably be reverted and only defined in configuration
  • current state of this PR includes a lot of prints to follow the data flow; of course these are not to be included in any final version

@hendrikmakait
Copy link
Member

hendrikmakait commented Jul 20, 2022

Thanks for diving into this, @martindurant, it looks really interesting!

To follow up on @gjoseph92's comment regarding Plasma being deprecated: According to this thread on the [email protected] mailing list, it appears as though no one is actively working on Plasma within Arrow. Just from browsing the modification dates on the apache/arrow repo, it also looks like the Plasma code has not been touched in months, if not years.

@jakirkham
Copy link
Member

Another thing to consider might be LMDB. Think there is support for LMDB in Zict.

@martindurant
Copy link
Member Author

Specifically this, though: https://lmdb.readthedocs.io/en/release/#buffers ; we want memoryviews, not bytes objects.

However:

In both PyPy and CPython, returned buffers must be discarded after their producing transaction has completed or been modified in any way. To preserve buffer’s contents, copy it using bytes():

but this seems to be in the context of possible writes.

@jakirkham
Copy link
Member

jakirkham commented Jul 21, 2022

Right we do that in Zarr's LMDBStore, but I don't think Zict is doing this currently. It's easy enough to add though

We could use toreadonly() to make sure all memoryviews returned are immutable

@martindurant
Copy link
Member Author

Agreed, LMDB looks like just the thing if we can get around the invalid memory thing (looks like probably yes); but the point of course was to benchmark to see which is best!

@sighingnow
Copy link

Hi folks, I would like to introduce our project vineyard ( https://github.com/v6d-io/v6d, a CNCF sandbox project) to the Dask community.

Vineyard is a shared memory engine for distributed computing. Like plasma, vineyard provides a shared memory abstraction for connected IPC clients as well (see https://v6d.io/notes/getting-started.html), but unlike plasma, vineyard doesn't require objects need to be serialized before putting into vineyard, rather, it introduces a builder/resolver framework to allow clients stores the metadata and blobs separately, to allow efficient zero-copy sharing between computing engines in different languages (see more details about the idea in https://v6d.io/notes/objects.html#).

We haven't compared the vineyard's performance with plasma, but it is in our plan. If possible, I would like to introduce vineyard to the shared-memory experiments in Dask and to see if it could be helpful.

The integration of plasma looks quite concise in this PR compared to my previous experiment with vineyard. I would add vineyard to my own fork first and try with @martindurant's ongoing shm benchmarks.

Actually, we have worked on some experiments about integrating with dask and vineyard for a while, but stopped the investigation after reading some previous discussion about introducing shared memory to dask (e.g., #4497) and assumed shared memory won't be considered in dask. Thanks @martindurant to dive into such a feature and it looks really exciting!

@martindurant
Copy link
Member Author

Thanks @sighingnow for getting in touch. I was aware of vineyard via discussions in Ray about using it instead of or alongside plasma, but this PR was already underway by then. You will also see that I have added an implementation based on LMDB in the most recent commits, so comparing and contrasting against other frameworks is very much in my plan, and it's great to have someone knowledgable in this mix! This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking - I'd be happy for you to make PRs to this branch rather than make a separate branch, if you prefer. I mist admit I don't understand how you can share objects without some serialisation, but I'm prepared to be educated.

@sighingnow
Copy link

This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking

Will submit pr against the martindurant/distributed:shared_plasma branch.

I mist admit I don't understand how you can share objects without some serialisation

I would like to describe it as "lightweight serialization" instead to make it more clear, as "without sterilization" in my previous comments is a bit misleading. It works like pickle5 in the sense of decoupling metadata and payloads (in pickle5 serialize an object, e.g., numpy.ndarray you will receive a buffer with some meta information and an out-of-band buffer for the binary payload).

In vineyard, we organize objects into metadata and payloads buffers as well, but the metadata is organized as a JSON tree so it can be exchanged between different languages easily.

Besides, we have implemented a client-side allocator that could allocate memory directly in the shared memory. The client-side allocator can be configured as the array data allocator in numpy (see also https://numpy.org/neps/nep-0049.html) then when we decide to put a numpy.ndarray into vineyard we could save the copy, as it has already been placed on the shared memory arena. Note that unlike create_buffer/release_buffer, the malloc/free in the client-side allocator won't trigger a request/response roundtrip over the IPC socket to the server.

The allocator API hasn't been polished for python but has been used in other C++ engines. It could bring added value to Dask as well, I think. As if the cost of "copying-to-shared-memory" could be saved, the worker-to-worker communication on the same node could be really cheap.

@jakirkham
Copy link
Member

It might be worth looking at using Dask's builtin serialization directly as it also generates metadata and out-of-band buffers (instead of only relying on pickle protocol 5). We use MessagePack with the metadata currently, but this may also work with JSON (untested) possibly with some tweaks.

@martindurant
Copy link
Member Author

Ah, yes indeed, it sounds like vineyard has something similar to dask's serialization system - which would be attractive if we didn't already have one.

The ability to directly allocate into the shared memory is a pretty big deal, though, and certainly not done by the plasma or lmdb implementations. It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

@sighingnow
Copy link

It might be worth looking at using [Dask's builtin serialization (https://distributed.dask.org/en/stable/serialization.html#serialization) directly

I have noticed that. The data serilaization protocol should be orthogonal with using vineyard as the underlying shared memory store, I think. I would take a try.

It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

Allocating all buffers in shared memory requires incorporation from the serializer and deserializer, i.e., the serializer needs to tell the offset to the base address of shared memory arena, and the deserializer needs to be able to translate the offset to the address in the memory address space of deserializer process.

Actually the "offset" is done in plasma and vineyard as a the "object id".

From our experience about integrating vineyard with other computing engines (a online graph store engine), we use json tree to includes all blob ids of a graph data structure and pass the json to another computing process and the graph data structure is restore back from the json (and included blobs) there.

@sighingnow
Copy link

It makes me wonder what are the downsides to allocating all buffers in shared memory, aside from a difficulty getting the real memory footprint of a process.

And it would introduce the constriant that blob (allocated memory block) needs to freed by the allocator in the its creator process, as the allocator has its internal state. It works in the "single writer - multiple reader (readonly)" settings and if freeing objects created by other worker in dask is not allowed, it should be applicable as well.

@martindurant
Copy link
Member Author

See very preliminary results at https://github.com/martindurant/shm-distributed#readme (and the code that made those in the same repo). Someone can tell me what I'm doing wrong in the choice of workflows or how I've implemented them. This is running against the current state of this branch.

@jakirkham
Copy link
Member

Thanks Martin 🙏

Raised a couple ideas as issues:

@sighingnow
Copy link

This PR will not be accepted as it stands, but we can use this as a space to push code for testing/benchmarking

I have posted a pull request about the vineyard backend in martindurant#2

Comment on lines +933 to +935
register_serialization_family("plasma", shared.ser_plasma, shared.deser_plasma)
register_serialization_family("lmdb", shared.ser_lmdb, shared.deser_lmdb)
register_serialization_family("vineyard", shared.ser_vineyard, shared.deser_vineyard)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am curious if just setting data on the Worker, which in turn could bypass usual spilling mechanisms, and disabling compression would alleviate the need to hook these in this way while still getting much of the same benefit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we don't want to bother sending buffers to shm if they are not destined to go to another worker. (That is, unless we know how to allocate memory in shm first rather than normal process memory - sounds tricky). That's why I thought serialisation time would be the right time to decide.

Note that spilling might be available in the shm implementation, just haven't tried that yet.

Obviously we would need to do work on workers' memory monitoring and actions taken on that when shm is available.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha couldn't this be useful though for redundancy? IOW if a worker dies, we could still recover the data stored in shared memory

That said, this is a good point about paying attention to what data is sent. Wonder if we should be thinking about shared memory as a comm

Admittedly hooking into serialization like this is pretty easy to do. Just thinking about how we might long term integrate some of these lessons to make shared memory easier to use for those that choose to use it

Copy link
Member Author

@martindurant martindurant Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOW if a worker dies, we could still recover the data stored in shared memory

Either the scheduler would need to know that data was pushed to shm, or we'd need clever hashes of the buffers of a result; also, we need to make sure that the buffers are not purged because the worker that made them disappeared, before a new worker picks them up.

thinking about shared memory as a comm

The thought had occurred, and that's sort of what vineyard's metadata service is. In fact, locally we could probably get better comms on linux sockets rather than TCP. But it was the simplicity of hijacking pickle at the point it is called that made me head in this direction.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that spilling might be available in the shm implementation, just haven't tried that yet.

FYI: Both plasma and vineyard supports implicit spilling (triggered when not enough memory for new allocation).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am aware of this, and we should endeavour to test it. I'm sure vineyard's is well documented, but https://arrow.apache.org/docs/python/plasma.html does not tell you how to activate plasma's.

@chrish42
Copy link
Contributor

To follow up on @gjoseph92's comment regarding Plasma being deprecated: According to this thread on the [email protected] mailing list, it appears as though no one is actively working on Plasma within Arrow. Just from browsing the modification dates on the apache/arrow repo, it also looks like the Plasma code has not been touched in months, if not years.

My understanding (after contributing a small patch to Arrow's Plasma a while back) is that it's mostly deprecated because it doesn't have maintainers, because they Ray folks are not maintaining it anymore, and using their own version instead. But if maintainers were to step up, it would probably get "un-deprecated".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants