Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up network transfer for small buffers #8318

Merged
merged 1 commit into from
Nov 6, 2023

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Nov 1, 2023

While working on #8282, I realised that sending over the network a lot of small zero-copy pickle5 buffers is substantially slower than bytes blobs. So I set up a benchmark that uses just tcp.py that would send back and forth a single message containing a lot of small numpy arrays (aka shards, from the shuffle lingo), encoded in different ways.

In all cases, the message size is 2 MiB or the shard size (whatever is larger).
The round-trip was performed on localhost, with sender and receiver running in the same event loop.
All tests are downstream of #8308
The full benchmark suite is available here: https://gist.github.com/crusaderky/3e11fd4be8b61d06109a01781dda9c83

small_buffers_performance

Note: 8 kiB shards size matches the p2p rechunk tests in coiled/benchmarks for 8 MiB chunk size.
128 kiB shard size matches those with 128 MiB chunk size.

Legend

list[ndarray]

The numpy arrays are passed to the network stack unserialized, in a plain list.
The list is traversed by dask's serialization stack, and the numpy arrays are encoded by serialize_numpy_ndarray.
Their buffers are extracted and are individually sent over the network; on the other side they are received into individual numpy.empty buffers (#8308) and are finally deserialized by deserialize_numpy_array.

deep-copies: 0
distributed.protocol.serialize: heavy usage
number of buffers: 802 (for 2 kiB shards)

Note: this is what's used by gather_dep.

opaque list[ndarray]

As above, but wrapped by the dummy class _List(list) defined by the shuffle module.
This opaque object is serialized/deserialized by pickle5, which extracts the buffers.

deep-copies: 0
distributed.protocol.serialize: trivial usage
number of buffers: 802 (for 2 kiB shards)

Note: this is what's used for p2p rechunk downstream of #8282.

bytes

The list of numpy arrays is manually serialized by pickle, without buffers_callback, into a single monolithic bytes object, ahead of being sent to the network stack.
The dask serialization stack passes on the bytes object verbatim as a buffer, without a further deep copy, thanks to serialize_bytes.

deep-copies: 2 (pickle.dumps; pickle.loads)
distributed.protocol.serialize: trivial usage
number of buffers: 2

list[bytes]

The numpy arrays are manually serialized with pickle, without buffers_callback, and are sent to the network stack as a list of individually small-ish bytes blobs.
The network stack traverses the list and extracts the individual bytes objects into buffers, which are sent individually.
Upon reception, the buffers have been transformed into memoryviews of numpy.empty's, and are unnecessarily deep-copied back into bytes.
After reception, we manually unpickle the list[bytes] into a list[ndarray].

deep-copies: 3 (pickle.dumps; _deserialize_bytes; pickle.loads)
distributed.protocol.serialize: heavy usage
number of buffers: 802 (for 2 kiB shards)

opaque list[bytes]

The numpy arrays are manually serialized with pickle, without buffers_callback, and are sent to the network stack wrapped into an opaque container.
The network stack calls pickle again, and since bytes serialization with vanilla pickle doesn't extract the buffers even when buffers_callback is specified, they are deep-copied into a single monolithic bytes object, which is then sent in one go over the network. On the other side, the monolithic bytes blob is deep-copied and deserialized into _List[bytes], and then again we call pickle.loads to convert that into a _List[ndarray].

deep-copies: 4 (pickle.dumps; pickle.dumps (dask_serialize of unknown object); pickle.loads (dask_deserialize of uknown object); pickle.loads)
distributed.protocol.serialize: trivial usage
number of buffers: 2

Note: this is what's used for p2p rechunk upstream of #8282, and is what's still used for dataframe shuffle (with the difference that we use parquet instead of pickle for serialization into bytes blobs).

Observations

  • Implementing a zero-copy strategy is faster than all other methods starting from 64~128 kiB per buffer
  • There isn't appreciable difference between pickle with buffer_callback and distributed.protocol.serialize
  • For the use case of many small objects, a single call to pickle without buffer_callback outperforms everything else
  • This makes me believe that using an opaque list for dataframe shuffle carries a substantial performance penalty for large shards. Separate testing will follow.

This PR

This PR performs a simple tweak when sending many small buffers; it mitigates the regression introduced by #8282 but doesn't entirely fix it. A shuffle-specific hack is coming in a separate PR.

Benchmark of this PR:

merge_small_shards

@crusaderky crusaderky self-assigned this Nov 1, 2023
@crusaderky crusaderky marked this pull request as ready for review November 1, 2023 14:33
Copy link
Contributor

github-actions bot commented Nov 1, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       27 files  ±0         27 suites  ±0   14h 0m 0s ⏱️ - 51m 6s
  3 963 tests ±0    3 841 ✔️ +3     117 💤 ±0  5  - 3 
49 786 runs  ±0  47 372 ✔️ +5  2 408 💤  - 1  6  - 4 

For more details on these failures, see this check.

Results for commit 50c7390. ± Comparison against base commit c91a735.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Nov 2, 2023

(with the difference that we use parquet instead of pickle for serialization into bytes blobs).

FWIW We're not using parquet but an arrow IPC format

This makes me believe that using an opaque list for dataframe shuffle carries a substantial performance penalty for large shards. Separate testing will follow.

I don't understand how you arrive at this conclusion based on your tests. The _List and list are treated entirely identical in your benchmarks as long as the list is not nested. As soon as you have a nested collection, you should notice a difference, e.g. (1) [(5, np.array(...)), ...] and (2) [(np.array(...), 5), ...] will have different performance profiles for a list but they will/should behave identical for _List. However, it's not obvious when _List or list is better for a case like (2)

My above statement was wrong. _List is thrown into distributed.protocol.pickle.dumps and serialize does not traverse it itself. I still don't see how you are arriving at your conclusion

@crusaderky
Copy link
Collaborator Author

crusaderky commented Nov 2, 2023

(with the difference that we use parquet instead of pickle for serialization into bytes blobs).

FWIW We're not using parquet but an arrow IPC format

This makes me believe that using an opaque list for dataframe shuffle carries a substantial performance penalty for large shards. Separate testing will follow.

I don't understand how you arrive at this conclusion based on your tests. The _List and list are treated entirely identical in your benchmarks as long as the list is not nested. As soon as you have a nested collection, you should notice a difference, e.g. (1) [(5, np.array(...)), ...] and (2) [(np.array(...), 5), ...] will have different performance profiles for a list but they will/should behave identical for _List. However, it's not obvious when _List or list is better for a case like (2)

My above statement was wrong. _List is thrown into distributed.protocol.pickle.dumps and serialize does not traverse it itself. I still don't see how you are arriving at your conclusion

  1. arrow serializes the shards into bytes objects. First deep-copy; probably unavoidable because it also serves the purpose to sever the reference to the unsharded input chunk.
  2. distributed.protocol.pickle.dumps encounters an unknown object type (_List) and passes the ball to pickle.dumps(..., protocol=5, buffer_callback=...)
  3. _List.__reduce__ returns the internal bytes objects
  4. pickle.dumps(..., protocol=5, buffer_callback=...) does not export bytes objects as buffers. Instead, it just coagulates everything into a single bytes blob, like without buffer_callback, causing a second avoidable deep-copy. This is unlike distributed.protocol.serialize, which instead exports bytes and bytearrays as zero-copy buffers.
  5. network transfer of a single, monolithic bytes object
  6. pickle.loads unpickles the monolithic bytes object back into a _List[bytes]. Third avoidable deep-copy.
  7. MemoryBuffer only: arrow deserializes the bytes object into an arrow table. Fourth avoidable deep-copy.

For comparison, if you didn't pass to the network stack _List[bytes] but just list[bytes], it would be traversed by distributed.protocol.serialize, which in turn would transfer it over the network thanks to _serialize_bytes:

# Teach serialize how to handle bytes
@dask_serialize.register(bytes)
def _serialize_bytes(obj):
header = {} # no special metadata
frames = [obj]
return header, frames

(not to be confused with serialize_bytes, which is completely unrelated).

so you would get rid of the deep-copy at step 4, although not the one at step 6:

@dask_deserialize.register(bytes)
def _deserialize_bytes(header, frames):
if len(frames) == 1 and isinstance(frames[0], bytes):
return frames[0]
else:
return b"".join(frames)

(here frame is a memoryview of a numpy.empty. This deep-copy is also avoidable, as arrow would be capable of ingesting it directly without a conversion to bytes).

This however would actually cause a slowdown for shards smaller than ~64 kiB, as measured above.

To avoid the deep copies at step 1 and 7, you'd need to use pickle instead of arrow as your serialization engine. This would mean additional costs in pickling+unpickling extra times, which may outweight the benefit for very small buffers (I have not benchmarked the speed of pickle5 vs. arrow).

@fjetter
Copy link
Member

fjetter commented Nov 2, 2023

I can't tell where your logic is true or false but empirically, I find that if I just throw a _List into serialize, I am preserving memory views as intended.

import numpy as np
from distributed.protocol.serialize import Serialize, serialize

class _List(list):
    pass

buffer = _List([
    np.random.random((3, )),
    np.random.random((4, )),
    np.random.random((5, )),
])
serialize(buffer)
({'serializer': 'pickle', 'writeable': (True, True, True)},
 [b'\x80\x05\x95\x90\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x14_make_skeleton_class\x94\x93\x94(\x8c\x08builtins\x94\x8c\x04type\x94\x93\x94\x8c\x05_List\x94h\x03\x8c\x04list\x94\x93\x94\x85\x94}\x94\x8c\n__module__\x94\x8c\x08__main__\x94s\x8c da114ba5d9554d56a5fff20b05203256\x94Nt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x0f_class_setstate\x94\x93\x94h\x0f}\x94(h\x0bh\x0c\x8c\x07__doc__\x94N\x8c\r__slotnames__\x94]\x94u}\x94\x86\x94\x86R0)\x81\x94(\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x97\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x03\x85\x94\x8c\x01C\x94t\x94R\x94h\x1c(\x97h"K\x04\x85\x94h&t\x94R\x94h\x1c(\x97h"K\x05\x85\x94h&t\x94R\x94e.',
  <memory at 0x10ee80d00>,
  <memory at 0x10ee80c40>,
  <memory at 0x10ee80dc0>])
# Memoryview is indeed pointing to the np array
for mv, arr in zip(serialize(buffer)[1][1:], buffer):
    assert mv == arr.data
    assert mv.obj is arr

The same is true for pyarrow tables which is just a little more involved to show since there is no data attribute/property that returns the memory view directly.

import pyarrow as pa
import pandas as pd

tab = pa.Table.from_pandas(pd.DataFrame({"a": range(10)}))
arr = tab['a']
int_arr = arr.chunks[0]

# This indexing scheme is picking out the `pyarrow.Buffer` object that is being wrapped in a `PickleBuffer`
# here https://github.com/apache/arrow/blob/ac2d207611ce25c91fb9fc90d5eaff2933609660/python/pyarrow/io.pxi#L1315-L1326
pickle_buffer = int_arr.__reduce_ex__(5)[1][0][4][1].__reduce_ex__(5)[1][0]
buffer = _List([tab])
serialize(buffer)


({'serializer': 'pickle', 'writeable': (True,)},
 [b'\x80\x05\x95\xad\x03\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x14_make_skeleton_class\x94\x93\x94(\x8c\x08builtins\x94\x8c\x04type\x94\x93\x94\x8c\x05_List\x94h\x03\x8c\x04list\x94\x93\x94\x85\x94}\x94\x8c\n__module__\x94\x8c\x08__main__\x94s\x8c da114ba5d9554d56a5fff20b05203256\x94Nt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x0f_class_setstate\x94\x93\x94h\x0f}\x94(h\x0bh\x0c\x8c\x07__doc__\x94N\x8c\r__slotnames__\x94]\x94u}\x94\x86\x94\x86R0)\x81\x94\x8c\x0bpyarrow.lib\x94\x8c\x12_reconstruct_table\x94\x93\x94]\x94h\x1a\x8c\rchunked_array\x94\x93\x94]\x94h\x1a\x8c\x0e_restore_array\x94\x93\x94(h\x1a\x8c\x0etype_for_alias\x94\x93\x94\x8c\x05int64\x94\x85\x94R\x94K\nK\x00K\x00]\x94(Nh\x1a\x8c\tpy_buffer\x94\x93\x94\x97\x85\x94R\x94e]\x94Nt\x94\x85\x94R\x94ah$\x8c\x05int64\x94\x85\x94R\x94\x86\x94R\x94ah\x1a\x8c\x06schema\x94\x93\x94]\x94h\x1a\x8c\x05field\x94\x93\x94(\x8c\x01a\x94h$\x8c\x05int64\x94\x85\x94R\x94\x88Nt\x94R\x94a}\x94C\x06pandas\x94B\xa7\x01\x00\x00{"index_columns": [{"kind": "range", "name": null, "start": 0, "stop": 10, "step": 1}], "column_indexes": [{"name": null, "field_name": null, "pandas_type": "unicode", "numpy_type": "object", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "a", "field_name": "a", "pandas_type": "int64", "numpy_type": "int64", "metadata": null}], "creator": {"library": "pyarrow", "version": "13.0.0"}, "pandas_version": "2.1.2"}\x94s\x86\x94R\x94\x86\x94R\x94a.',
  <memory at 0x13b57fc40>])
assert serialize(buffer)[1][-1] == pickle_buffer.raw()

@crusaderky
Copy link
Collaborator Author

crusaderky commented Nov 2, 2023

I can't tell where your logic is true or false but empirically, I find that if I just throw a _List into serialize, I am preserving memory views as intended.

Yes, _List[ndarray] and list[ndarray] both export the buffers.
_List[bytes] doesn't though; whereas list[bytes] does.

The same is true for pyarrow tables which is just a little more involved to show since there is no data attribute/property that returns the memory view directly.

To my understanding however you're sending into the network stack plain bytes objects; not pyarrow?

@hendrikmakait
Copy link
Member

Regardless of the debate on PyArrow (de-)serialization, this PR seems to be a general improvement. Do you see anything blocking this from getting merged, @fjetter?

MemoryBuffer only: arrow deserializes the bytes object into an arrow table. Fourth avoidable deep-copy.

PyArrow does not perform a memory copy when deserializing an IPC stream.

@fjetter fjetter merged commit 8c787a8 into dask:main Nov 6, 2023
27 of 34 checks passed
@crusaderky crusaderky deleted the fast_small_buffers branch November 6, 2023 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants