Skip to content

Commit

Permalink
[Data] Progress Bar: Sort sample in "rows" and remove the duplicate S…
Browse files Browse the repository at this point in the history
…ort sample. (ray-project#47106)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

Currently, the sort sample is not in rows and there is a duplicate sort
sample progress bar.

![image](https://github.com/user-attachments/assets/30aa9fc3-8e96-473e-a794-da4fc023093a)

With this modification, Sort sample will be also in rows and the
additional progress bar will be removed.

![image](https://github.com/user-attachments/assets/f0a3e5b6-3f84-4993-9f03-36a350aa47b0)

In fact there should only one sort sample progress bar which is created
at
https://github.com/ray-project/ray/blob/e066289b374464f1e2692382fdea871eb34e3156/python/ray/data/_internal/planner/exchange/sort_task_spec.py#L166
while the one created in
```
sub_progress_bar_names=[
                SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME,
                ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME,
                ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME,
            ],
``` 
should be deleted.
## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [√] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [√] I've run `scripts/format.sh` to lint the changes in this PR.
- [ √ I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [√] Unit tests
   - [√] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: zhilong <[email protected]>
  • Loading branch information
Bye-legumes authored and simonsays1980 committed Aug 16, 2024
1 parent 750b1ec commit 8e0fd3d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def __init__(
"Aggregate",
input_op,
sub_progress_bar_names=[
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME,
ExchangeTaskSpec.MAP_SUB_PROGRESS_BAR_NAME,
ExchangeTaskSpec.REDUCE_SUB_PROGRESS_BAR_NAME,
],
Expand Down
7 changes: 4 additions & 3 deletions python/ray/data/_internal/planner/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ def fn(
else:
# Use same number of output partitions.
num_outputs = num_mappers
sample_bar = ctx.sub_progress_bar_dict[
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME
]
# Sample boundaries for aggregate key.
boundaries = SortTaskSpec.sample_boundaries(
blocks,
SortKey(key),
num_outputs,
blocks, SortKey(key), num_outputs, sample_bar
)

agg_spec = SortAggregateTaskSpec(
Expand Down
18 changes: 11 additions & 7 deletions python/ray/data/_internal/planner/exchange/sort_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ def reduce(

@staticmethod
def sample_boundaries(
blocks: List[ObjectRef[Block]], sort_key: SortKey, num_reducers: int
blocks: List[ObjectRef[Block]],
sort_key: SortKey,
num_reducers: int,
sample_bar: Optional[ProgressBar] = None,
) -> List[T]:
"""
Return (num_reducers - 1) items in ascending order from the blocks that
Expand All @@ -162,13 +165,14 @@ def sample_boundaries(
sample_results = [
sample_block.remote(block, n_samples, sort_key) for block in blocks
]
sample_bar = ProgressBar(
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME,
len(sample_results),
unit="block",
)
if sample_bar is None:
sample_bar = ProgressBar(
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME,
len(blocks) * n_samples,
unit="rows",
)
# TODO(zhilong): Update sort sample bar before finished.
samples = sample_bar.fetch_until_complete(sample_results)
sample_bar.close()
del sample_results
samples = [s for s in samples if len(s) > 0]
# The dataset is empty
Expand Down
7 changes: 6 additions & 1 deletion python/ray/data/_internal/planner/sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ def fn(

# Sample boundaries for sort key.
if not sort_key.boundaries:
boundaries = SortTaskSpec.sample_boundaries(blocks, sort_key, num_outputs)
sample_bar = ctx.sub_progress_bar_dict[
SortTaskSpec.SORT_SAMPLE_SUB_PROGRESS_BAR_NAME
]
boundaries = SortTaskSpec.sample_boundaries(
blocks, sort_key, num_outputs, sample_bar
)
else:
boundaries = [(b,) for b in sort_key.boundaries]
num_outputs = len(boundaries) + 1
Expand Down
27 changes: 20 additions & 7 deletions python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ def set_progress_bars(enabled: bool) -> bool:
)


def extract_num_rows(result: Any) -> int:
"""Extract the number of rows from a result object.
Args:
result: The result object from which to extract the number of rows.
Returns:
The number of rows, defaulting to 1 if it cannot be determined.
"""
if hasattr(result, "num_rows"):
return result.num_rows
elif hasattr(result, "__len__"):
# For output is DataFrame,i.e. sort_sample
return len(result)
else:
return 1


class ProgressBar:
"""Thin wrapper around tqdm to handle soft imports.
Expand Down Expand Up @@ -136,9 +154,7 @@ def block_until_complete(self, remaining: List[ObjectRef]) -> None:
)
total_rows_processed = 0
for _, result in zip(done, ray.get(done)):
num_rows = (
result.num_rows if hasattr(result, "num_rows") else 1
) # Default to 1 if no row count is available
num_rows = extract_num_rows(result)
total_rows_processed += num_rows
self.update(total_rows_processed)

Expand Down Expand Up @@ -167,11 +183,8 @@ def fetch_until_complete(self, refs: List[ObjectRef]) -> List[Any]:
total_rows_processed = 0
for ref, result in zip(done, ray.get(done)):
ref_to_result[ref] = result
num_rows = (
result.num_rows if hasattr(result, "num_rows") else 1
) # Default to 1 if no row count is available
num_rows = extract_num_rows(result)
total_rows_processed += num_rows
# TODO(zhilong): Change the total to total_row when init progress bar
self.update(total_rows_processed)

with _canceled_threads_lock:
Expand Down

0 comments on commit 8e0fd3d

Please sign in to comment.