-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Implement limit physical operator #34705
Conversation
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
e648694
to
7727fad
Compare
# If we don't know the number of rows in the input, try to | ||
# split at the maximum number of rows we can consume | ||
# (`self._limit - self._consumed_rows`). | ||
blocks_splits, metadata_splits = _split_at_indices( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this is a blocking call right? I think we want to ensure all operators are streaming compatible. Maybe there should be an op that we insert before the Limit that ensures all bundles have num_rows set, fetching it if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, can we assume num_rows is always available? After the read, I think this should always be populated.
return self._buffer.popleft() | ||
|
||
def get_stats(self) -> StatsDict: | ||
return {self._name: self._output_metadata} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is the correct way to handle stats. I just follow map_operator. Is there any docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems reasonable, probably we should improve the docstring in interfaces.py
# Slice the last block. | ||
num_rows_to_take = self._limit - self._consumed_rows | ||
self._consumed_rows = self._limit | ||
block = BlockAccessor.for_block(ray.get(block)).slice( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still run it in a remote task to avoid needing to fetch the data block locally. This avoids the ray.put() later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the purpose of doing this is to avoid putting too much data on the driver side? Or is there any other reason why we shouldn't do ray.put?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, avoiding fetching large blocks to the driver is important, and it also avoids an extra data copy. If the data is split in a remote task, then only one put() happens, instead of two.
@@ -115,3 +115,18 @@ def __init__( | |||
) | |||
self._key = key | |||
self._aggs = aggs | |||
|
|||
|
|||
class Limit(AbstractAllToAll): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this extend AbstractOneToOne instead of AbstractAllToAll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the logical operator. Not used right now, but I added this for completeness of the Datastream.limit
API.
I think limit is also a kind of all-to-all logical operator. So this should make sense? Or do you prefer to have it directly extend to LogicalOperator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't have a strong opinion, but it makes sense that it should extend LogicalOperator to me and not AllToAll, since there's technically not any all:all data dependency between rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Fixed.
if logical_plan is not None: | ||
op = Limit(logical_plan.dag, limit=limit) | ||
logical_plan = LogicalPlan(op) | ||
return Datastream(plan, self._epoch, self._lazy, logical_plan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming you want to do the early shutdown in the second PR. Existing tests will suffice for this refactor.
python/ray/data/_internal/execution/operators/limit_operator.py
Outdated
Show resolved
Hide resolved
Yep, I plan to do that in a second PR. There are multiple ways to do this. I'll comment on the original issue. Let's continue discussing there. |
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
196fe77
to
595d110
Compare
Signed-off-by: Hao Chen <[email protected]>
Some tests seem to be failing with "ValueError: The size in bytes of the block must be known: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000056010000), BlockMetadata(num_rows=10, size_bytes=None, schema=None, input_files=[], exec_stats=None))", but I think this is a bug in the test. The new streaming executor assumes that at least Previous the tests passed since we bypassed operator execution entirely with the old limit() impl. |
## Why are these changes needed? - Implemented the Limit physical operator for streaming execution. - Added the `LimitStage` for legacy compatibility. Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR. ## Related issue number ray-project#34234 Signed-off-by: Jack He <[email protected]>
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234 Signed-off-by: Jack He <[email protected]>
## Why are these changes needed? - Implemented the Limit physical operator for streaming execution. - Added the `LimitStage` for legacy compatibility. Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR. ## Related issue number ray-project#34234
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234
Why are these changes needed?
LimitStage
for legacy compatibility.Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR.
Related issue number
#34234
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.