-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dataset] Support push-based shuffle in groupby operations #25910
[dataset] Support push-based shuffle in groupby operations #25910
Conversation
Turns out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, I added something similar to this for a generic tree-reduce operation in a prototype, happy to see this land!
Just a few nits, I'd like to see if the use_push_based_shuffle
parameterized fixture would work since that should decrease the code impact on the tests by a good bit.
@@ -356,6 +356,7 @@ def aggregate_combined_blocks( | |||
blocks: List[Block[Tuple[KeyType, AggType]]], | |||
key: KeyFn, | |||
aggs: Tuple[AggregateFn], | |||
finalize: bool, | |||
) -> Tuple[Block[Tuple[KeyType, U]], BlockMetadata]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this changes the return type to Tuple[Block[Tuple[KeyType, Union[U, AggType]]], BlockMetadata]
.count() | ||
) | ||
assert agg_ds.count() == 0 | ||
@pytest.mark.parametrize("use_push_based_shuffle", [False, True]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: You should be able to reduce the context munging and try-finally boilerplate by parameterizing over a few fixtures that does this for you, e.g.
@pytest.fixture(params=[True, False])
def use_push_baed_shuffle(request):
ctx = ray.data.context.DatasetContext.get_current()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = request.param
yield
ctx.use_push_based_shuffle = original
def test_groupby_arrow(ray_start_regular_shared, use_push_based_shuffle):
# Test empty dataset.
agg_ds = (
ray.data.range_table(10)
.filter(lambda r: r["value"] > 10)
.groupby("value")
.count()
)
assert agg_ds.count() == 0
Why are these changes needed?
Allows option for push-based shuffle in groupby operations.