-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] [Local Shuffle - 1/N] Add local shuffling option. #26094
[Datasets] [Local Shuffle - 1/N] Add local shuffling option. #26094
Conversation
with stats.iter_get_s.timer(): | ||
block = ray.get(block) | ||
# NOTE: Since we add one block at a time and then immediately consume | ||
# batches, we don't check batcher.can_add() before adding the block. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batcher.can_add()
will be used once we're adding multiple blocks before attempting to consume batches, e.g. if prefetching and shuffling was connected via a queue. We may go this route in a future PR, but with the current implementation, we're guaranteed that the block can be added.
We could add an assert batcher.can_add(block)
here, in the calling code, in order to more strongly document this guarantee in addition to the comment, but that assertion is already done within batcher.add(block)
, so I left it out and opted for the comment.
while batcher.has_batch(): | ||
# While the batcher has full batches, yield batches. | ||
with stats.iter_next_batch_s.timer(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new iterator stage timer, since next_batch()
can potentially be a bit expensive (building the shuffle buffer, generating random indices, etc.)
@krfricke , could you also take a look at this with Matt and Jian? |
91bd0d6
to
04783ee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments
44faf67
to
3b764ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes - this looks good to me!
Thanks for the review @krfricke! @matthewdeng @jianoaix with one approving review, I'll go ahead with the feature guide updates and benchmarking. |
3b764ef
to
86becc1
Compare
|
||
from ray.data.block import Block, BlockAccessor | ||
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder | ||
|
||
|
||
class Batcher: | ||
"""Chunks blocks into batches. | ||
class BatcherInterface: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test_batcher.py covering code here? We now have added complexities.
@matthewdeng @ericl @jianoaix @krfricke API has been updated to the |
Look good to me, thanks for patience and update! |
random but will be faster and less resource-intensive. This buffer size | ||
must be greater than or equal to ``batch_size``, and therefore | ||
``batch_size`` must also be specified when using local shuffling. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC is this a requirement for the implementation to work, or imposed because having a buffer size smaller than the batch size results in close to 0 randomness?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this a bit here: #26094 (comment)
It's required for the current implementation to work, but we could have a much simpler shuffling algorithm for the unbatched case. To keep the PR small and given that we're not aware of any use cases for unbatched local shuffling, we've been treating that as a P1 for a follow-up PR.
@matthewdeng @jianoaix @ericl PR is updated with a feature guide and some focused test coverage for the batcher, PTAL! |
local_shuffle_buffer_size: If non-None, the data will be randomly shuffled | ||
using a local in-memory shuffle buffer, and this value will serve as the | ||
minimum number of rows that must be in the local in-memory shuffle | ||
buffer in order to yield a batch. This is a light-weight alternative to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention how the last remainder rows are handled? When there are less than local_shuffle_buffer_size rows, we should let users know if they should expect batches yielded from them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a sentence, but the wording feels a bit off... let me know what you think!
assert self._shuffle_buffer is not None | ||
buffer_size = BlockAccessor.for_block(self._shuffle_buffer).num_rows() | ||
# Truncate the batch to the buffer size, if necessary. | ||
batch_size = min(self._batch_size, buffer_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So looks we do continue to yield batches when the num of rows in the buffer drop below shuffle_buffer_min_size. Can you adjust the parameter documentation above to reflect this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Co-authored-by: Eric Liang <[email protected]>
Signed-off-by: Matthew Deng <[email protected]>
Test failures. |
Signed-off-by: Richard Liaw <[email protected]>
Signed-off-by: Richard Liaw <[email protected]>
…ject#26094) Co-authored-by: Eric Liang <[email protected]> Co-authored-by: matthewdeng <[email protected]> Co-authored-by: Matthew Deng <[email protected]> Co-authored-by: Richard Liaw <[email protected]> Signed-off-by: Ubuntu <[email protected]>
…ject#26094) Co-authored-by: Eric Liang <[email protected]> Co-authored-by: matthewdeng <[email protected]> Co-authored-by: Matthew Deng <[email protected]> Co-authored-by: Richard Liaw <[email protected]> Signed-off-by: Xiaowei Jiang <[email protected]>
…ject#26094) Co-authored-by: Eric Liang <[email protected]> Co-authored-by: matthewdeng <[email protected]> Co-authored-by: Matthew Deng <[email protected]> Co-authored-by: Richard Liaw <[email protected]> Signed-off-by: Stefan van der Kleij <[email protected]>
This PR adds a local shuffling option to
ds.iter_batches()
, a lightweight alternative to the globalds.random_shuffle()
that randomly shuffles data using a local in-memory shuffle buffer and yields shuffled batches.Not all training datasets/models benefit from high-quality global or pseudo-global (windowed) shuffles, but in these cases, users still want to cheaply decorrelate samples to a small degree. This local shuffle option (optionally coupled with block randomization via
ds.randomize_block_order()
) yields a high throughput in-iterator shuffling option.API Usage
TODOs
local_shuffle_buffer_size
API.to_torch()
andto_tf()
APIs.Related issue number
Closes #24159, closes #18297
Checks
scripts/format.sh
to lint the changes in this PR.