-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add objects GC in dataset iterator #34030
Conversation
…project#32493)" (ray-project#33485)" This reverts commit 5c79954.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -24,6 +35,8 @@ def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only): | |||
|
|||
check_no_spill(ctx, ds.repeat()) | |||
check_no_spill(ctx, ds.window(blocks_per_window=20)) | |||
check_to_torch_no_spill(ctx, ds.repeat()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail (i.e. have spilling) if without this PR.
Nice find! |
@@ -30,7 +30,7 @@ def _to_block_iterator( | |||
ds = self._base_dataset | |||
block_iterator, stats, executor = ds._plan.execute_to_iterator() | |||
ds._current_executor = executor | |||
return block_iterator, stats | |||
return block_iterator, stats, False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we update the type hint at line 28 as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
if epoch_pipeline._first_dataset is not None: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer | ||
) | ||
else: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._peek()._plan.execute()._owned_by_consumer | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add a comment in code for why we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pipelined_ingestion_1500_gb has been consistently passing: https://buildkite.com/ray-project/release-tests-pr/builds?branch=jianoaix%3Aiteratorgcblocks
Will wait the CI to pass and then merge.
@@ -30,7 +30,7 @@ def _to_block_iterator( | |||
ds = self._base_dataset | |||
block_iterator, stats, executor = ds._plan.execute_to_iterator() | |||
ds._current_executor = executor | |||
return block_iterator, stats | |||
return block_iterator, stats, False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep!
if epoch_pipeline._first_dataset is not None: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._first_dataset._plan.execute()._owned_by_consumer | ||
) | ||
else: | ||
blocks_owned_by_consumer = ( | ||
epoch_pipeline._peek()._plan.execute()._owned_by_consumer | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There is failure in python/ray/data/tests/test_dataset_consumption.py:: test_dataset_lineage_serialization_unsupported, but it's not relevant here. |
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)" This reverts commit 5c79954. * Add objects GC in dataset iterator * test it * more tests * fix comment * add a little more memory as it's close to the limit and may make test flaky * feedback
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)" This reverts commit 5c79954. * Add objects GC in dataset iterator * test it * more tests * fix comment * add a little more memory as it's close to the limit and may make test flaky * feedback Signed-off-by: elliottower <[email protected]>
* Revert "[Datasets] Revert "Enable streaming executor by default (ray-project#32493)" (ray-project#33485)" This reverts commit 5c79954. * Add objects GC in dataset iterator * test it * more tests * fix comment * add a little more memory as it's close to the limit and may make test flaky * feedback Signed-off-by: Jack He <[email protected]>
Why are these changes needed?
Fix: #33846
The DatasetIterator doesn't eagerly GC objects, which resulted in OOM of consumer nodes. The new consumer nodes that got brought up were not in sync with other healthy consumer nodes. The DatasetPipeline requires all consumers to read windows in sync, so this caused the pipeline to hang and then fail with timeout.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.