Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pickle's protocol 5 #3784

Merged
merged 18 commits into from
May 21, 2020

Conversation

jakirkham
Copy link
Member

This ensures that objects that do support Pickle's protocol 5 go through out-of-band serialization of buffers. Should avoid some extra memory usage and memory copies that would otherwise be encountered when going through Pickle's older protocols without this feature. Requires Python 3.8+ to work (as that is when this protocol was introduced).

Had considered supporting pickle5 as well to ensure users could get this functionality with earlier versions of Python. However this would involve supporting pickle5 in cloudpickle, which hasn't been done yet ( cloudpipe/cloudpickle#179 ). So am side stepping that for now. Could be handled in a follow-up once that is addressed.

Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great to see. I think sticking with the cPython implementation only for now (instead of supporting pickle5) makes sense - as people upgrade python versions the need for pickle5 will go away.

distributed/protocol/pickle.py Outdated Show resolved Hide resolved
@jakirkham jakirkham force-pushed the support_pickle_protocol_5 branch 4 times, most recently from e30341e to d8558c2 Compare May 7, 2020 20:03
@jakirkham
Copy link
Member Author

Please let me know if anything else is needed here 🙂

distributed/protocol/pickle.py Outdated Show resolved Hide resolved
Before calling the user provided buffer callback, collect buffers in an
internal list. That way if the mechanism of pickling needs to be
changed, the internal list can be purged before handing these to the
user. At the end of pickling, make sure the user's buffer callback is
called on each buffer in order.
@jakirkham
Copy link
Member Author

Planning on merging tomorrow if no comments.

@jcrist
Copy link
Member

jcrist commented May 19, 2020

Could you wait a bit? I still want to review this, just a bit swamped right now.

@jakirkham
Copy link
Member Author

Sure. Do you have a (rough) sense of when you might be able to look at it (for my own planning)?

@jcrist
Copy link
Member

jcrist commented May 19, 2020

Probably sometime tomorrow.

@jcrist
Copy link
Member

jcrist commented May 29, 2020

Hmmm, that is concerning. Can you reproduce it on a local cluster?

from dask.distributed import Client
client = Client()
client.submit(lambda: "test function").result()

@jakirkham
Copy link
Member Author

Might need to use a NumPy array and force pickle for communication to create a reproducer. Alternatively could use the MemoryviewHolder class from the tests here. Something like that should generate a non-trivial number of buffers.

@gshimansky
Copy link

@jcrist Yes I am getting back 'test function'.
This is a very subtle bug and for some reason it is reproducible only in test environment. But am able to do this consistently, so I will try to find the reason.

@jakirkham
Copy link
Member Author

It would be good to start debugging with something where out-of-band pickling would be used. NumPy arrays would work for example (though we have to force them to go through pickle). Alternatively something that implements pickle protocol 5 (like MemoryviewHolder) that Dask doesn't otherwise know how to serialize. Here's an example with NumPy:

import numpy as np
from distributed import Client

client = Client(serializers=["pickle"])
client.submit(lambda a, b: a + b, np.arange(3), 2).result()

This may serve as a good starting point for debugging further. Though it may need further adjustment. Like using client.run to hit all workers. Or only returning or only passing a NumPy array instead of both (as done above). Additionally there may be other flags we need to set to ensure pickling happens (though using MemoryviewHolder should avoid that).

Hopefully that gives some ideas on how to narrow this down further.

@mrocklin
Copy link
Member

mrocklin commented May 29, 2020 via email

@jcrist
Copy link
Member

jcrist commented May 29, 2020

Yes, since pickle recurses, anything that has a __reduce__ implementation that returns numpy arrays (or other objects that directly support pickle 5) will also then use the new protocol. No special casing in pandas needed.

@mrocklin
Copy link
Member

mrocklin commented May 29, 2020 via email

@jakirkham
Copy link
Member Author

Yeah this is what @TomAugspurger and I were learning in issue ( pandas-dev/pandas#34244 ). Also brought this up in comment ( #614 (comment) ). It would be good to have some tests in Pandas that cover pickling with protocol 5 as well. Though based on local exploration this does seem to efficiently serialize Pandas objects.

While more efficient serialization of Pandas objects is an immediate win with impact, I think the bigger win is that any object that supports this protocol is now serialized efficiently. So it lowers the bar for getting efficient serialization with Dask. We've done this sort of work for RAPIDS for example. Possibly other libraries like spaCy will do this as well ( explosion/spaCy#5472 ). It might be nice to identify some other pain points like this one. They should now be low hanging fruit.

Additionally it would be nice to cover the last mile of Python pre-3.8 support through the pickle5 backport package. This would allow Dask to have efficient serialization on any supported Python version. Did some work to make cloudpickle compatible with pickle5 in PR ( cloudpipe/cloudpickle#370 ) thanks to help from @pierreglaser. Stuck on a couple remaining test failures (if anyone has thoughts there 😉). Expect the changes for pickle5 support in Dask to be much simpler once cloudpickle supports it.

Once we wrap that up, it would be good to share this work more openly. Was thinking of doing a blogpost to discuss this along the lines of your, "Pickle isn't slow, it's a protocol" blogpost. It would be nice to point out to the community this avenue for efficient serialization in distributed computing (whether or not they use Dask). In fact it seems some people are already taking notice ( ray-project/ray#8577 ) 😉

@jakirkham jakirkham mentioned this pull request May 29, 2020
@gshimansky
Copy link

Well, I am actually experiencing the problem when I run tests on Modin project https://github.com/modin-project/modin and Pandas DataFrames and Series are the objects that are passed to Dask workers.

@jcrist
Copy link
Member

jcrist commented May 29, 2020

Are you able to reproduce without modin?

@gshimansky
Copy link

gshimansky commented May 29, 2020

Are you able to reproduce without modin?

Not yet. It is quite fragile and easy to lose when making changes.

@jakirkham
Copy link
Member Author

@gshimansky, have you tried my suggestion above?

@gshimansky
Copy link

I tried to debug distributed code. So far I found that HIGHEST_PROTOCOL value is always 4. I also found out that pickle_loads gets called with frames of length 2 which results in buffers being not-None. frames of length 2 come from deserialize_bytes but there the trace is lost because I don't know whether decompress should return such value or not.

@jakirkham
Copy link
Member Author

Maybe Dask is splitting up the frames somehow when serializing and not joining them correctly when deserializing? That's my best guess. Though an example would really help (in addition to being a useful test case for a fix 😉)

@gshimansky
Copy link

My question is, how should it work correctly when HIGHEST_PROTOCOL equals 4 and therefore loads doesn't have a buffers argument? If frames argument comes of length 2, this results in buffers not-None and loads gets called with buffers which it doesn't support.

@jakirkham
Copy link
Member Author

We don't collect buffers when protocol is 4. So it should be empty.

However Dask sometimes splits up frames for various reasons (networking, limitations on compression, etc.) independently of whether pickling was used. If Dask didn't join them back together correctly, that might present as the bug you are seeing.

Again we would need an MRE to confirm that though.

@gshimansky
Copy link

Ok so do you mean that if protocol is 4 then decompress in deserialize_bytes should not return frames list of length greater than 1? I need to know if this behavior is allowed or not to investigate farther.



def pickle_loads(header, frames):
return pickle.loads(b"".join(frames))
x, buffers = frames[0], frames[1:]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation is that frames[1:] would be an empty list here. It might not be though. If it's not, that would be good to fix. Whether that has to do with deserialize_bytes behavior or not, I'm less sure. Maybe someone else knows?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

frames in pickle_loads comes from deserialize_bytes for me (take a look at exception stack trace). And for me frames[1:] is not empty. That's why I am asking if this is expected or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. So do we have a reproducer? That should help us debug this further.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I've been trying to reproduce this too, but haven't had any luck. Though I believe you that there could be a problem here. It's just tricky to come up with fixes in the dark. So anything you can come up with would help 🙂

Copy link

@gshimansky gshimansky May 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you say that Dask is able to split frames, may it happen when large pandas DataFrames are serialized? My test is a benchmark which operates on such objects, about 20M of records long and 40 columns wide.
Modin splits them for parallel processing, but they still remain considerably large in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like PR ( #3841 ) will help? Still lack a test case though. So I have no way to confirm whether it actually helps (or ensure we don't accidentally regress).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even when sending large bytes that do get split by frame_split_size, I'm unable to reproduce an error. Things are properly reassembled on the other side. Not sure what's going on there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah me neither unfortunately. Happy to dig deeper once we identify a reproducer 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar issue was raised recently ( #3851 ). Not sure if that is the same. Suspect PR ( #3639 ) fixes this. Would be good if you can try though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed by PR ( #3639 ).

@jakirkham
Copy link
Member Author

This has been extended to Python versions pre-3.8 with pickle5 with PR ( #3849 ).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants