-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add objects GC in dataset iterator #34030
Changes from 9 commits
925a247
b33ae23
4ef5d35
e6dcd6e
482e9dc
3e2d393
cb0840c
1696afb
cec613c
bfc5f39
ae6a6b8
abb9551
017f14f
b6cb319
30eb6f5
5445241
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,11 +35,20 @@ def _to_block_iterator( | |
]: | ||
epoch_pipeline = self._get_next_dataset() | ||
|
||
if epoch_pipeline._first_dataset is not None: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer | ||
) | ||
else: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._peek()._plan.execute()._owned_by_consumer | ||
) | ||
Comment on lines
+42
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you add a comment in code for why we need to do this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
def block_iter(): | ||
for ds in epoch_pipeline.iter_datasets(): | ||
yield from ds._plan.execute().iter_blocks_with_metadata() | ||
|
||
return block_iter(), None | ||
return block_iter(), None, blocks_owned_by_consumer | ||
|
||
def iter_batches( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,17 @@ def check_no_spill(ctx, pipe): | |
assert "Spilled" not in meminfo, meminfo | ||
|
||
|
||
def check_to_torch_no_spill(ctx, pipe): | ||
# Run up to 10 epochs of the pipeline to stress test that | ||
# no spilling will happen. | ||
max_epoch = 10 | ||
for p in pipe.iter_epochs(max_epoch): | ||
for _ in p.to_torch(batch_size=None): | ||
pass | ||
meminfo = memory_summary(ctx.address_info["address"], stats_only=True) | ||
assert "Spilled" not in meminfo, meminfo | ||
|
||
|
||
def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only): | ||
# The object store is about 300MB. | ||
ctx = ray.init(num_cpus=1, object_store_memory=300e6) | ||
|
@@ -24,6 +35,8 @@ def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only): | |
|
||
check_no_spill(ctx, ds.repeat()) | ||
check_no_spill(ctx, ds.window(blocks_per_window=20)) | ||
check_to_torch_no_spill(ctx, ds.repeat()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will fail (i.e. have spilling) if without this PR. |
||
check_to_torch_no_spill(ctx, ds.window(blocks_per_window=20)) | ||
|
||
|
||
def test_iter_batches_no_spilling_upon_rewindow(shutdown_only): | ||
|
@@ -35,6 +48,9 @@ def test_iter_batches_no_spilling_upon_rewindow(shutdown_only): | |
check_no_spill( | ||
ctx, ds.window(blocks_per_window=20).repeat().rewindow(blocks_per_window=10) | ||
) | ||
check_to_torch_no_spill( | ||
ctx, ds.window(blocks_per_window=20).repeat().rewindow(blocks_per_window=10) | ||
) | ||
|
||
|
||
def test_iter_batches_no_spilling_upon_prior_transformation(shutdown_only): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we update the type hint at line 28 as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!