Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Msgpack handles extract serialize #4531

Merged
merged 13 commits into from
Mar 8, 2021

Conversation

madsbk
Copy link
Contributor

@madsbk madsbk commented Feb 22, 2021

This PR address reduce the overhead of protocol.dumps() and protocol.loads() by handling the extraction of serializable objects directly in msgpack.

  • Closes Handling custom serialization with MsgPack directly #4379
  • Tests added / passed
  • Passes black distributed / flake8 distributed
  • Handle extraction of embedded bytes
  • Find and fix BUG triggered by not converting to bytes array implicitly:
    # TODO: the new protocol.dumps() implementation doesn't convert
    # memoryviews to bytes implicitly, which triggers a communication BUG.
    # My guess is that somewhere we are using `len(buffer)` instead of
    # `buffer.nbytes`. Casting to bytes here fixes the issue.
    data = memoryview(data).cast("B")

@madsbk
Copy link
Contributor Author

madsbk commented Feb 22, 2021

@mrocklin @jakirkham @quasiben, this PR isn't ready to be reviewed yet but it should work in most cases. It would be great if one of you could run a benchmark and see how much of an impact it has.

Running the following code, I am seeing 6-10 times speedup :)

    n = 10**6
    msg = [{"op": "health", "status": "OK"} for _ in range(n)]
    t1 = time.time()
    dumps(msg)
    t2 = time.time()
    print(f"n: {n}, time: ", t2-t1)

@mrocklin
Copy link
Member

I'll give it a shot :)

@mrocklin
Copy link
Member

Well, it has certainly disappeared from the summary.

image

I haven't yet seen the runtime difference though. I'll do that next.

@mrocklin
Copy link
Member

Running the following computation on my laptop, with 20 workers with 1 thread each

from dask.distributed import Client, performance_report, wait
client = Client("localhost:8786")

import dask
import dask.dataframe as dd
dask.config.set({"optimization.fuse.active": False})
df = dask.datasets.timeseries(
    start="2020-01-01",
    end="2020-01-10",
    partition_freq="1h",
    freq="60s"
).persist()

df2 = df.set_index("x").persist()
wait(df2)
  • master takes 8.7s
  • this branch takes 6.5s

@jakirkham
Copy link
Member

So a 25% reduction then?

How does msgpack.packb and msgpack.unpackb rank without this change? Noticing the latter comes up as the 6th item in that list

@mrocklin
Copy link
Member

mrocklin commented Feb 22, 2021 via email

@jakirkham
Copy link
Member

No worries

@mrocklin
Copy link
Member

OK, I've reinstalled twice into two different environments. I'm now also running the computation ten times in a loop.

Results

  • master: 8.17 s +- 1.6832019403346938
  • this branch: 7.38 s +- 0.8006945865771267

Definitely still a significant improvement. Variation is still high though. It would be good to see what this looks like on a quieter system.

@jakirkham
Copy link
Member

Thanks for the update!

What do you mean by a quieter system? One dedicated to the benchmark or something else?

@mrocklin
Copy link
Member

One not running Chrome, and one that is not over-saturated with 4x the number of workers as cores for example :)

I've started running with nice in order to try to protect the scheduler a it, but there is only so much that can be done. I really do want that raspberry pi cluster right about now :)

@@ -85,6 +85,7 @@ def test_maybe_compress_sample():
assert compressed == payload


@pytest.mark.xfail(reason="TODO: fix")
def test_large_bytes():
for tp in (bytes, bytearray):
msg = {"x": tp(b"0" * 1000000), "y": 1}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this fail? Does msgpack fail on large messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is because it doesn't support splitting of large frames. I am work on a PR that should make it much easier to handle splitting and writability: #4541

@mrocklin
Copy link
Member

msgpack.unpackb does start to show up more in traces, but it's less than extract_serialize used to be.

image

@jakirkham jakirkham mentioned this pull request Feb 23, 2021
@madsbk madsbk mentioned this pull request Feb 23, 2021
2 tasks
@jakirkham
Copy link
Member

That's from_frames though. I think extract_serialize is only used in to_frames

@jakirkham
Copy link
Member

Ben and I tried to benchmark this today, but ran into some issues with hangs near the end of the shuffle. Unfortunately we weren't able to identify exactly what they were coming from. Also these hangs did not happen every time, but happened frequently enough that a few retries was sufficient to see a hang. That being said, as things are still being worked on here, maybe this isn't totally unexpected.

@madsbk madsbk force-pushed the msgpack_extract_serialize branch 2 times, most recently from 4ab1ac9 to 9e0e884 Compare February 24, 2021 11:04
@madsbk
Copy link
Contributor Author

madsbk commented Feb 24, 2021

Ben and I tried to benchmark this today, but ran into some issues with hangs near the end of the shuffle. Unfortunately we weren't able to identify exactly what they were coming from. Also these hangs did not happen every time, but happened frequently enough that a few retries was sufficient to see a hang. That being said, as things are still being worked on here, maybe this isn't totally unexpected.

Yeah, this PR still need some work. Getting #4541 merged will help a lot.

@mrocklin
Copy link
Member

That's from_frames though. I think extract_serialize is only used in to_frames

extract_serialize was removed entirely from the trace. What I'm saying here is that there are new costs associated to this msgpack approach. This was a picture of one of them.

@jakirkham
Copy link
Member

Yeah, this PR still need some work. Getting #4541 merged will help a lot.

Sounds good. Reviewed yesterday. Will follow up on any comments/updates today

@jakirkham
Copy link
Member

That's from_frames though. I think extract_serialize is only used in to_frames

extract_serialize was removed entirely from the trace. What I'm saying here is that there are new costs associated to this msgpack approach. This was a picture of one of them.

Yep I believe that. Just noting the particular screenshot wouldn't have shown any calls to extract_serialize before this PR, but have no problem believing that would be different in another part of the trace or that the trace would now start to contain things like msgpack

@madsbk madsbk force-pushed the msgpack_extract_serialize branch 7 times, most recently from 3b8bd25 to 56246cd Compare February 26, 2021 14:08
@madsbk
Copy link
Contributor Author

madsbk commented Feb 26, 2021

@mrocklin, @jakirkham , @quasiben, I have fixed most of the bugs and the PR should be ready for testing/benchmarking.

@madsbk madsbk force-pushed the msgpack_extract_serialize branch from 90aab1e to 4c3c46c Compare March 2, 2021 11:50
@madsbk
Copy link
Contributor Author

madsbk commented Mar 2, 2021

@quasiben could you try to test again? I found and fixed a BUG in Tornado triggered when len(x) != x.nbytes: tornadoweb/tornado#2996
This PR now includes #4555, which also fixes the issue.

@jakirkham
Copy link
Member

Ah this makes more sense. Thanks for digging into this Mads. Do you think it would be possible to do this casting as part of MsgPack serialization?

@quasiben
Copy link
Member

quasiben commented Mar 2, 2021

Thanks @madsbk . While the shuffle now completes I do see errors like the following:

distributed.core - ERROR - Exception while handling op get_data
Traceback (most recent call last):
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/core.py", line 500, in handle_comm
    result = await result
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/worker.py", line 1359, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/comm/tcp.py", line 267, in write
    each_frame = memoryview(each_frame).cast("B")
TypeError: memoryview: cannot cast view with zeros in shape or strides
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f4983677dc0>, <Task finished name='Task-640' coro=<BaseTCPListener._handle_stream() done, defined at /gpfs/fs1/bzaitlen/miniconda3/envs/202
10302-nightly-0.19/lib/python3.8/site-packages/distributed/comm/tcp.py:472> exception=TypeError('memoryview: cannot cast view with zeros in shape or strides')>)
Traceback (most recent call last):
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/comm/tcp.py", line 489, in _handle_stream
    await self.comm_handler(comm)
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/core.py", line 500, in handle_comm
    result = await result
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/worker.py", line 1359, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/comm/tcp.py", line 267, in write
    each_frame = memoryview(each_frame).cast("B")
TypeError: memoryview: cannot cast view with zeros in shape or strides
distributed.worker - ERROR - Worker stream died during communication: tcp://10.33.12.22:36911
Traceback (most recent call last):
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210302-nightly-0.19/lib/python3.8/site-packages/distributed/comm/tcp.py", line 195, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

distributed/comm/tcp.py Outdated Show resolved Hide resolved
@quasiben
Copy link
Member

quasiben commented Mar 2, 2021

Woot! Working smoothly now with better performance too! We have typically been seeing: 23.11 +/- .77

start shuffle:  0
22.364916563034058
start shuffle:  1
17.257044792175293
start shuffle:  2
17.656667709350586
start shuffle:  3
17.426403522491455
start shuffle:  4
17.977993726730347
start shuffle:  5
18.042349576950073
start shuffle:  6
16.9795560836792
start shuffle:  7
21.994297981262207
start shuffle:  8
18.86323642730713
start shuffle:  9

@jakirkham
Copy link
Member

So 5-6s faster than what we currently see. Does that sound right?

Looks like the 0th iteration was slower than that though. Is that consistent? Do we know what is slow about that iteration?

@madsbk madsbk mentioned this pull request Mar 3, 2021
3 tasks
@jakirkham
Copy link
Member

Went ahead and merged dask/master into this PR to incorporate other recent merges and retest with those changes. Hope that is ok 🙂

@jakirkham jakirkham requested a review from mrocklin March 4, 2021 02:43
@madsbk
Copy link
Contributor Author

madsbk commented Mar 4, 2021

Went ahead and merged dask/master into this PR to incorporate other recent merges and retest with those changes. Hope that is ok slightly_smiling_face

Thanks @jakirkham !

@quasiben
Copy link
Member

quasiben commented Mar 4, 2021

Do we want to hold off on merging this until after the release ?

@jakirkham
Copy link
Member

That seems reasonable. Though I think our testing here has given me more confidence in this change

@jakirkham jakirkham merged commit 2231f90 into dask:master Mar 8, 2021
@jakirkham
Copy link
Member

Thanks Mads for working on this and everyone for the reviews! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handling custom serialization with MsgPack directly
5 participants