Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't share host_array when receiving from network #8308

Merged
merged 2 commits into from
Nov 3, 2023

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Oct 27, 2023

Resolve issues with memory deallocation:

  1. Two or more numpy or pandas objects are packed into the same network message by WorkerStateMachine._select_for_gather, scatter, or Client.gather. After they are received, one of the objects is dereferenced, but its memory won't be released until all objects with a buffer in the original message have been dereferenced.

  2. An object with both buffers and non-trivial amounts of pure-pickle data - such as a pandas.DataFrame with object columns - is sent over the network. For as long as the object lives, the memory holding the pickled version of the object column won't be released.

  3. In Zero-copy array shuffle #8282, when using a MemoryBuffer the shards that have already been merged into output chunks are not dereferenced until all shards on the same worker have been merged. This is because shards belonging to different output chunks were sent over within the same RPC call.

Notes

  • These issues only apply to uncompressed data.
  • Use case 2 also afflicts the SpillBuffer. It is out of scope of this PR.

@crusaderky crusaderky self-assigned this Oct 27, 2023
@crusaderky crusaderky changed the title [WIP] Don't share host_array when receiving from network Don't share host_array when receiving from network Oct 27, 2023
@github-actions
Copy link
Contributor

github-actions bot commented Oct 27, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       27 files  ±    0         27 suites  ±0   15h 7m 53s ⏱️ + 54m 42s
  3 963 tests +  20    3 841 ✔️ +  18     117 💤 ±  0  5 +3 
49 786 runs  +260  47 371 ✔️ +217  2 409 💤 +41  6 +3 

For more details on these failures, see this check.

Results for commit bf43081. ± Comparison against base commit 954e9d0.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the share_host_array branch 4 times, most recently from f4a902d to 14df449 Compare October 29, 2023 14:23
@crusaderky
Copy link
Collaborator Author

A/B test results:

  • no wall time changes whatsoever
  • test_dataframe_align uses 10% less avg memory and 15% less peak memory
  • test_set_index_on_uber_lyft[tasks] uses 10% less avg memory and 15% less peak memory

image
image

@crusaderky
Copy link
Collaborator Author

crusaderky commented Oct 29, 2023

This PR is blocked by and incorporates #8312
Ready for review (it's made of exactly 2 commits)

Copy link
Contributor

@milesgranger milesgranger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'd depend on my review alone, but looks good to me, with one clarifying comment. :) Thanks!

Comment on lines 369 to 370
n = await stream.read_into(chunk) # type: ignore[arg-type]
assert n == chunk_nbytes, (n, chunk_nbytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for any naive things here, for my own understanding, this assert is a sanity check, as according to the docs for read_into this won't return until chunk (of which is chunk_nbytes in length) is completely filled. Is there a potential that it won't be filled due to chunk being larger than any remaining bytes, and thus sit idle?

I would suppose not judging by the caller setting n but got slightly confused with the openssl sizes and that may make a chunk larger than any remaining stream data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or now that I look at it again.. bytes in the stream would always be larger than the buffer, and thus stream would always have bytes to write given we're accounting for OpenSSL sizing. Do I have that right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that assertion is just defensive programming.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes overall LGTM. Most/all of my comments are rather for clarification but as long as I didn't entirely misunderstand this, I'm happy to merge.

chunk_nbytes = chunk.nbytes
n = await stream.read_into(chunk)
assert n == chunk_nbytes, (n, chunk_nbytes)
# Don't store multiple numpy or parquet buffers into the same buffer, or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: To my knowledge, we're never sending parquet over the network unless of course a user decides to do this themselves.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are referring to pyarrow Table objects or anything that can be directly instantiated from a buffer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong location for the comment?
parquet was a typo; i meant arrow.

frames_nbytes = [header_nbytes, *frames_nbytes]
frames_nbytes_total += header_nbytes

if frames_nbytes_total < 2**17: # 128kiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the testing you did recently. Do you think this number still makes sense? Something to look into or not worth the effort?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks about right given my recent testing.

(1, 0, False), # <2 kiB (including prologue and msgpack header)
(1, 1800, False), # <2 kiB
(1, 2100, True), # >2 kiB
(200_000, 9500, False), # <5% of numpy array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the 9500 extra bytes here will be written to the same memory buffer the numpy array is using, i.e. those 9500 bytes will only be released once the numpy array is released. Similarly, the numpy array will only be released once the other thing has been released. However, that other thing is guaranteed to be a bytes object or some header information or some other garbage that is guaranteed to be released after the message is deserialized.

So, in other words, we're accepting a memory overhead of up to 5% for numpy arrays/arrow tables/etc. (and previously this could've been a multiple, depending on how large a single fetch was)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in other words, we're accepting a memory overhead of up to 5% for numpy arrays/arrow tables/etc.

This is correct. This is really only material for pandas objects with substantial pure-python index / columns / other metadata; numpy objects tend to be <100 bytes worth of metadata.

(and previously this could've been a multiple, depending on how large a single fetch was)

It was worse than a multiple.
There were two nightmare scenarios:

  • pandas dataframe heavy with object string columns, with some numerical columns. The whole serialized data for the object columns remains alive for as long as the deserialized object is alive, because it's referenced by the numerical columns.
  • The key at the top of the WorkerStateMachine.fetch heap is a 49 MiB nump array. The second object in the heap from the same worker is a 1 MiB numpy array (or vice versa). The two are fetched together (distributed.worker.transfer.message-bytes-limit: 50 MiB). The 49 MiB array will survive its own free-keys command, as it is referenced by the 1 MiB array.

@crusaderky crusaderky merged commit c91a735 into dask:main Nov 3, 2023
26 of 30 checks passed
@crusaderky crusaderky deleted the share_host_array branch November 3, 2023 21:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants